🎯 Recommended Samples
Balanced sample collections from various categories for you to explore
Web Multithreading Rust Samples
Web Rust multithreading examples including thread creation, synchronization, and thread pools
💻 Thread Creation rust
🟢 simple
⭐⭐⭐
Create and manage threads using std::thread for concurrent execution
⏱️ 20 min
🏷️ rust, web, multithreading
Prerequisites:
Basic Rust, std::thread
// Web Rust Thread Creation Examples
// Using std::thread for concurrent execution
use std::thread;
use std::time::{Duration, Instant};
// 1. Basic Thread Creation
/// Spawn a basic thread
fn spawn_basic_thread() {
thread::spawn(|| {
for i in 1..=5 {
println!("Thread: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
// Main thread work
for i in 1..=5 {
println!("Main: {}", i);
thread::sleep(Duration::from_millis(100));
}
// Note: If main thread exits, spawned threads will be terminated
thread::sleep(Duration::from_millis(600));
}
/// Spawn a thread with move closure
fn spawn_thread_with_move() {
let data = vec![1, 2, 3, 4, 5];
thread::spawn(move || {
println!("Thread received data: {:?}", data);
});
thread::sleep(Duration::from_millis(100));
}
// 2. Thread with Return Values
/// Spawn thread that returns a value
fn spawn_thread_with_return() -> thread::JoinHandle<i32> {
thread::spawn(|| {
let mut sum = 0;
for i in 1..=100 {
sum += i;
}
sum
})
}
/// Wait for thread completion and get result
fn wait_for_thread_result() {
let handle = spawn_thread_with_return();
match handle.join() {
Ok(result) => println!("Thread returned: {}", result),
Err(e) => println!("Thread panicked: {:?}", e),
}
}
// 3. Multiple Threads
/// Spawn multiple threads
fn spawn_multiple_threads() {
let mut handles = vec![];
for i in 0..5 {
let handle = thread::spawn(move || {
println!("Thread {} starting", i);
thread::sleep(Duration::from_millis(100 * (i + 1) as u64));
println!("Thread {} finishing", i);
i * 2
});
handles.push(handle);
}
// Wait for all threads
for handle in handles {
let _ = handle.join();
}
}
/// Collect results from multiple threads
fn collect_thread_results() -> Vec<i32> {
let mut handles = vec![];
for i in 0..5 {
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
i * i
});
handles.push(handle);
}
handles.into_iter().filter_map(|h| h.join().ok()).collect()
}
// 4. Thread Builder
/// Use thread::Builder for more control
fn spawn_named_thread() -> thread::JoinHandle<()> {
thread::Builder::new()
.name("my-worker-thread".to_string())
.stack_size(1024 * 1024) // 1MB stack
.spawn(|| {
println!("Named thread executing");
thread::sleep(Duration::from_millis(100));
})
.expect("Failed to spawn thread")
}
// 5. Thread with Error Handling
/// Thread that can return Result
fn spawn_thread_with_result() -> thread::JoinHandle<Result<i32, String>> {
thread::spawn(|| {
let mut sum = 0;
for i in 1..=100 {
sum += i;
}
Ok(sum)
})
}
/// Handle thread errors
fn handle_thread_errors() {
let handle = spawn_thread_with_result();
match handle.join() {
Ok(Ok(result)) => println!("Success: {}", result),
Ok(Err(e)) => println!("Thread error: {}", e),
Err(e) => println!("Thread panicked: {:?}", e),
}
}
// 6. Parallel Processing
/// Process data in parallel
fn parallel_processing(data: Vec<i32>) -> Vec<i32> {
let chunk_size = (data.len() + 3) / 4; // Split into 4 chunks
let mut handles = vec![];
for chunk in data.chunks(chunk_size) {
let chunk = chunk.to_vec();
let handle = thread::spawn(move || {
chunk.into_iter().map(|x| x * x).collect::<Vec<_>>()
});
handles.push(handle);
}
handles
.into_iter()
.filter_map(|h| h.join().ok())
.flatten()
.collect()
}
// 7. Thread Sleep and Yield
/// Demonstrate thread sleep
fn thread_sleep_example() {
println!("Starting sleep");
thread::sleep(Duration::from_secs(1));
println!("Finished sleep");
}
/// Demonstrate thread yield
fn thread_yield_example() {
let handle = thread::spawn(|| {
for i in 0..5 {
println!("Working... {}", i);
thread::yield_now(); // Suggest to scheduler to yield
}
});
let _ = handle.join();
}
// 8. Thread Local Storage
use std::cell::RefCell;
thread_local! {
static THREAD_LOCAL_DATA: RefCell<Vec<i32>> = RefCell::new(Vec::new());
}
/// Use thread local storage
fn thread_local_storage_example() {
let mut handles = vec![];
for i in 0..3 {
let handle = thread::spawn(move || {
THREAD_LOCAL_DATA.with_borrow_mut(|data| {
data.push(i);
data.push(i * 2);
println!("Thread {} local data: {:?}", i, data);
});
});
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}
}
// 9. Thread Panics
/// Thread that panics
fn spawn_panicking_thread() -> thread::JoinHandle<()> {
thread::spawn(|| {
panic!("Something went wrong!");
})
}
/// Catch thread panic
fn catch_thread_panic() {
let handle = spawn_panicking_thread();
match handle.join() {
Ok(_) => println!("Thread completed"),
Err(e) => println!("Thread panicked with: {:?}", e),
}
}
// 10. Scoped Threads (using crossbeam)
/// Process with scoped threads
fn scoped_threads_example() {
let data = vec![1, 2, 3, 4, 5];
let mut results = vec![0; data.len()];
// Using crossbeam::scope for scoped threads
// Note: Requires crossbeam crate
// crossbeam::scope(|s| {
// for (i, &value) in data.iter().enumerate() {
// s.spawn(|_| {
// results[i] = value * value;
// });
// }
// }).unwrap();
println!("Results: {:?}", results);
}
// Usage Examples
fn main() {
println!("=== Web Rust Thread Creation Examples ===\n");
// 1. Basic thread
println!("--- 1. Basic Thread ---");
spawn_basic_thread();
// 2. Thread with return value
println!("\n--- 2. Thread with Return Value ---");
wait_for_thread_result();
// 3. Multiple threads
println!("\n--- 3. Multiple Threads ---");
spawn_multiple_threads();
// 4. Collect results
println!("\n--- 4. Collect Thread Results ---");
let results = collect_thread_results();
println!("Results: {:?}", results);
// 5. Named thread
println!("\n--- 5. Named Thread ---");
let handle = spawn_named_thread();
let _ = handle.join();
// 6. Parallel processing
println!("\n--- 6. Parallel Processing ---");
let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
let start = Instant::now();
let results = parallel_processing(data);
println!("Results: {:?}", results);
println!("Time: {:?}", start.elapsed());
// 7. Thread local storage
println!("\n--- 7. Thread Local Storage ---");
thread_local_storage_example();
// 8. Error handling
println!("\n--- 8. Thread Error Handling ---");
handle_thread_errors();
println!("\n=== All Thread Creation Examples Completed ===");
}
💻 Thread Synchronization rust
🟡 intermediate
⭐⭐⭐⭐
Use mutexes, channels, and other synchronization primitives for safe concurrent access
⏱️ 30 min
🏷️ rust, web, multithreading, synchronization
Prerequisites:
Intermediate Rust, std::sync, std::sync::mpsc
// Web Rust Thread Synchronization Examples
// Using mutexes, channels, and atomic operations
use std::sync::{Arc, Mutex, Condvar, RwLock, Barrier, Semaphore};
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
// 1. Mutex (Mutual Exclusion)
/// Basic mutex usage
fn basic_mutex_example() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter: {}", *counter.lock().unwrap());
}
/// Mutex with multiple operations
fn mutex_operations_example() {
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
let data1 = Arc::clone(&data);
let handle1 = thread::spawn(move || {
let mut data = data1.lock().unwrap();
data.push(4);
println!("Thread 1: {:?}", data);
});
let data2 = Arc::clone(&data);
let handle2 = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
let mut data = data2.lock().unwrap();
data.push(5);
println!("Thread 2: {:?}", data);
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("Final data: {:?}", *data.lock().unwrap());
}
// 2. RwLock (Read-Write Lock)
/// Multiple readers, single writer
fn rwlock_example() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// Reader threads
for i in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let r = data.read().unwrap();
println!("Reader {}: {:?}", i, *r);
thread::sleep(Duration::from_millis(100));
}));
}
// Writer thread
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
let mut w = data.write().unwrap();
w.push(4);
println!("Writer: Modified data");
}));
for handle in handles {
handle.join().unwrap();
}
println!("Final data: {:?}", *data.read().unwrap());
}
// 3. Channels (Message Passing)
use std::sync::mpsc;
/// Basic channel usage
fn basic_channel_example() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Received: {}", received);
}
/// Multiple producers
fn multiple_producers_example() {
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
for i in 0..5 {
let tx = tx.clone();
handles.push(thread::spawn(move || {
tx.send(i).unwrap();
}));
}
drop(tx); // Drop original sender
// Receive all messages
let mut results = vec![];
for received in rx {
results.push(received);
}
for handle in handles {
handle.join().unwrap();
}
println!("Received: {:?}", results);
}
/// Sync channel (rendezvous)
fn sync_channel_example() {
let (tx, rx) = mpsc::sync_channel(1); // Buffer size 1
let handle = thread::spawn(move || {
println!("Sending 1");
tx.send(1).unwrap();
println!("Sending 2");
tx.send(2).unwrap();
});
thread::sleep(Duration::from_millis(100));
println!("Receiving...");
println!("Received: {}", rx.recv().unwrap());
println!("Received: {}", rx.recv().unwrap());
handle.join().unwrap();
}
// 4. Atomic Operations
/// Atomic counter
fn atomic_counter_example() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
counter.fetch_add(1, Ordering::SeqCst);
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter: {}", counter.load(Ordering::SeqCst));
}
/// Compare and swap
fn compare_and_swap_example() {
let value = Arc::new(AtomicI32::new(5));
let value1 = Arc::clone(&value);
let handle1 = thread::spawn(move || {
let mut old = value1.load(Ordering::SeqCst);
loop {
match value1.compare_exchange_weak(old, old * 2, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => break,
Err(actual) => old = actual,
}
}
println!("Thread 1: Changed value to {}", value1.load(Ordering::SeqCst));
});
let value2 = Arc::clone(&value);
let handle2 = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
let mut old = value2.load(Ordering::SeqCst);
loop {
match value2.compare_exchange_weak(old, old + 10, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => break,
Err(actual) => old = actual,
}
}
println!("Thread 2: Changed value to {}", value2.load(Ordering::SeqCst));
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("Final value: {}", value.load(Ordering::SeqCst));
}
// 5. Barrier (Synchronization Point)
/// Barrier example
fn barrier_example() {
let mut handles = vec![];
let barrier = Arc::new(Barrier::new(3));
let mut data = vec![0; 3];
for i in 0..3 {
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
println!("Thread {} started", i);
thread::sleep(Duration::from_millis(100 * i as u64));
println!("Thread {} reached barrier", i);
barrier.wait();
println!("Thread {} passed barrier", i);
i * 2
}));
}
for (i, handle) in handles.into_iter().enumerate() {
data[i] = handle.join().unwrap();
}
println!("Results: {:?}", data);
}
// 6. Condition Variable
/// Condition variable example
fn condition_variable_example() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
});
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
println!("Condition met!");
}
/// Producer-consumer with condition variable
fn producer_consumer_example() {
let mutex = Arc::new(Mutex::new(vec![]));
let condvar = Arc::new(Condvar::new());
// Producer
let mutex1 = Arc::clone(&mutex);
let condvar1 = Arc::clone(&condvar);
let producer = thread::spawn(move || {
let mut data = mutex1.lock().unwrap();
for i in 0..5 {
data.push(i);
println!("Produced: {}", i);
condvar1.notify_one();
thread::sleep(Duration::from_millis(100));
}
});
// Consumer
let mutex2 = Arc::clone(&mutex);
let condvar2 = Arc::clone(&condvar);
let consumer = thread::spawn(move || {
let mut data = mutex2.lock().unwrap();
for _ in 0..5 {
while data.is_empty() {
data = condvar2.wait(data).unwrap();
}
if let Some(value) = data.pop() {
println!("Consumed: {}", value);
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
// 7. Once (Single Execution)
use std::sync::Once;
static INIT: Once = Once::new();
static mut GLOBAL_VALUE: i32 = 0;
fn initialize_global() {
unsafe {
GLOBAL_VALUE = 42;
}
}
fn once_example() {
for _ in 0..5 {
thread::spawn(|| {
INIT.call_once(|| {
initialize_global();
println!("Initialized!");
});
unsafe {
println!("Global value: {}", GLOBAL_VALUE);
}
});
}
thread::sleep(Duration::from_millis(100));
}
// 8. Semaphore Pattern
/// Simple semaphore implementation
struct Semaphore {
permits: Arc<(Mutex<usize>, Condvar)>,
}
impl Semaphore {
fn new(permits: usize) -> Self {
Semaphore {
permits: Arc::new((Mutex::new(permits), Condvar::new())),
}
}
fn acquire(&self) {
let (lock, cvar) = &*self.permits;
let mut permits = lock.lock().unwrap();
while *permits == 0 {
permits = cvar.wait(permits).unwrap();
}
*permits -= 1;
}
fn release(&self) {
let (lock, cvar) = &*self.permits;
let mut permits = lock.lock().unwrap();
*permits += 1;
cvar.notify_one();
}
}
/// Semaphore usage
fn semaphore_example() {
let semaphore = Semaphore::new(2); // 2 permits
let mut handles = vec![];
for i in 0..5 {
let semaphore = semaphore.clone();
handles.push(thread::spawn(move || {
semaphore.acquire();
println!("Thread {} acquired permit", i);
thread::sleep(Duration::from_millis(100));
println!("Thread {} releasing permit", i);
semaphore.release();
}));
}
for handle in handles {
handle.join().unwrap();
}
}
// Clone for semaphore
impl Clone for Semaphore {
fn clone(&self) -> Self {
Semaphore {
permits: Arc::clone(&self.permits),
}
}
}
// Usage Examples
fn main() {
println!("=== Web Rust Thread Synchronization Examples ===\n");
// 1. Mutex
println!("--- 1. Mutex Example ---");
basic_mutex_example();
// 2. RwLock
println!("\n--- 2. RwLock Example ---");
rwlock_example();
// 3. Channels
println!("\n--- 3. Channel Example ---");
basic_channel_example();
// 4. Atomic operations
println!("\n--- 4. Atomic Counter ---");
atomic_counter_example();
// 5. Barrier
println!("\n--- 5. Barrier Example ---");
barrier_example();
// 6. Condition variable
println!("\n--- 6. Condition Variable ---");
condition_variable_example();
// 7. Once
println!("\n--- 7. Once Example ---");
once_example();
// 8. Semaphore
println!("\n--- 8. Semaphore Example ---");
semaphore_example();
println!("\n=== All Thread Synchronization Examples Completed ===");
}
💻 Thread Pool rust
🔴 complex
⭐⭐⭐⭐
Implement and use thread pools for efficient concurrent task execution
⏱️ 35 min
🏷️ rust, web, multithreading, thread pool
Prerequisites:
Advanced Rust, std::thread, std::sync
// Web Rust Thread Pool Examples
// Implement and use thread pools for efficient concurrent task execution
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
// 1. Simple Thread Pool
/// Simple thread pool implementation
struct ThreadPool {
workers: Vec<Worker>,
sender: Option<std::sync::mpsc::Sender<Message>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
impl ThreadPool {
/// Create a new thread pool
fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = std::sync::mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
/// Execute a job in the thread pool
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Shutting down thread pool");
// Send terminate message to all workers
for _ in &self.workers {
self.sender.as_ref().unwrap().send(Message::Terminate).unwrap();
}
// Drop sender to close channel
drop(self.sender.take());
// Wait for workers to finish
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
/// Use simple thread pool
fn simple_thread_pool_example() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} executing", i);
thread::sleep(Duration::from_millis(100));
println!("Task {} completed", i);
});
}
thread::sleep(Duration::from_secs(1));
}
// 2. Thread Pool with Return Values
use std::sync::mpsc;
/// Thread pool that can return results
struct ResultThreadPool {
workers: Vec<thread::JoinHandle<()>>,
sender: mpsc::Sender<JobWithResult>,
receiver: mpsc::Receiver<usize>,
}
type JobWithResult = Box<dyn FnOnce(mpsc::Sender<usize>) + Send + 'static>;
impl ResultThreadPool {
fn new(size: usize) -> ResultThreadPool {
let (job_sender, job_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let job_receiver = Arc::new(Mutex::new(job_receiver));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let job_receiver = Arc::clone(&job_receiver);
let result_sender = result_sender.clone();
let handle = thread::spawn(move || {
loop {
let job = job_receiver.lock().unwrap().recv();
match job {
Ok(job) => {
job(result_sender.clone());
}
Err(_) => break,
}
}
});
workers.push(handle);
}
ResultThreadPool {
workers,
sender: job_sender,
receiver: result_receiver,
}
}
fn execute<F>(&self, f: F) -> usize
where
F: FnOnce() -> usize + Send + 'static,
{
let (tx, rx) = mpsc::channel();
self.sender.send(Box::new(move |result_tx| {
let result = f();
result_tx.send(result).unwrap();
tx.send(()).unwrap();
})).unwrap();
rx.recv().unwrap()
}
}
/// Thread pool with results example
fn thread_pool_with_results_example() {
let pool = ResultThreadPool::new(4);
let mut results = vec![];
for i in 0..8 {
let result = pool.execute(move || {
println!("Task {} executing", i);
thread::sleep(Duration::from_millis(100));
println!("Task {} completed", i);
i * 2
});
results.push(result);
}
println!("Results: {:?}", results);
}
// 3. Work Stealing Thread Pool
/// Simple work-stealing queue
struct WorkStealingQueue {
local: Arc<Mutex<Vec<Job>>>,
stolen: Arc<Mutex<Vec<Job>>>,
}
impl WorkStealingQueue {
fn new() -> Self {
WorkStealingQueue {
local: Arc::new(Mutex::new(Vec::new())),
stolen: Arc::new(Mutex::new(Vec::new())),
}
}
fn push(&self, job: Job) {
self.local.lock().unwrap().push(job);
}
fn pop(&self) -> Option<Job> {
self.local.lock().unwrap().pop()
}
fn steal(&self) -> Option<Job> {
let mut stolen = self.stolen.lock().unwrap();
if stolen.is_empty() {
// Swap local with stolen
let mut local = self.local.lock().unwrap();
std::mem::swap(&mut *local, &mut *stolen);
stolen.pop()
} else {
stolen.pop()
}
}
}
// 4. Async/Await Thread Pool
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
/// Simple task executor
struct TaskExecutor {
tasks: Arc<Mutex<Vec<Box<dyn Future<Output = ()> + Unpin + Send>>>>,
}
impl TaskExecutor {
fn new() -> Self {
TaskExecutor {
tasks: Arc::new(Mutex::new(Vec::new())),
}
}
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Unpin + Send + 'static,
{
self.tasks.lock().unwrap().push(Box::new(future));
}
fn run(&self) {
let mut tasks = self.tasks.lock().unwrap();
let mut i = 0;
while i < tasks.len() {
let mut future = tasks.remove(i);
let waker = unsafe { Waker::from_raw(std::ptr::null()) };
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(_) => {}
Poll::Pending => {
tasks.insert(i, future);
i += 1;
}
}
}
}
}
// 5. Parallel For Loop
/// Execute function in parallel over range
fn parallel_for<F>(start: usize, end: usize, f: F)
where
F: Fn(usize) + Send + Sync + 'static,
{
let pool = ThreadPool::new(4);
let f = Arc::new(f);
for i in start..end {
let f = Arc::clone(&f);
pool.execute(move || {
f(i);
});
}
thread::sleep(Duration::from_millis(100));
}
/// Parallel for example
fn parallel_for_example() {
println!("Parallel for example:");
parallel_for(0, 10, |i| {
println!("Processing {}", i);
thread::sleep(Duration::from_millis(50));
});
}
// 6. Batch Processing with Thread Pool
/// Process items in batches
fn batch_processing<T, F, R>(items: Vec<T>, batch_size: usize, f: F) -> Vec<R>
where
T: Send + 'static,
R: Send + 'static,
F: Fn(Vec<T>) -> Vec<R> + Send + Sync + 'static,
{
let pool = ThreadPool::new(4);
let (sender, receiver) = mpsc::channel();
let f = Arc::new(f);
for chunk in items.chunks(batch_size) {
let chunk = chunk.to_vec();
let sender = sender.clone();
let f = Arc::clone(&f);
pool.execute(move || {
let results = f(chunk);
sender.send(results).unwrap();
});
}
drop(sender);
receiver.iter().flatten().collect()
}
/// Batch processing example
fn batch_processing_example() {
let data: Vec<i32> = (0..20).collect();
let results = batch_processing(data, 5, |batch| {
println!("Processing batch: {:?}", batch);
batch.into_iter().map(|x| x * 2).collect()
});
println!("Results: {:?}", results);
}
// 7. Priority Thread Pool
use std::collections::BinaryHeap;
use std::cmp::Ordering;
/// Priority job
struct PriorityJob {
priority: i32,
job: Job,
}
impl PartialEq for PriorityJob {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
impl Eq for PriorityJob {}
impl PartialOrd for PriorityJob {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PriorityJob {
fn cmp(&self, other: &Self) -> Ordering {
other.priority.cmp(&self.priority) // Reverse for max-heap
}
}
/// Priority thread pool
struct PriorityThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<PriorityJob>>,
}
impl PriorityThreadPool {
fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
PriorityThreadPool {
workers,
sender: Some(sender),
}
}
fn execute_with_priority<F>(&self, priority: i32, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
let priority_job = PriorityJob { priority, job };
self.sender.as_ref().unwrap().send(priority_job).unwrap();
}
}
// 8. Thread Pool with Timeout
/// Execute job with timeout
fn execute_with_timeout<F, R>(pool: &ThreadPool, f: F, timeout: Duration) -> Option<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = mpsc::channel();
pool.execute(move || {
let result = f();
sender.send(Some(result)).unwrap();
});
// Wait with timeout
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if let Ok(result) = receiver.recv_timeout(Duration::from_millis(100)) {
return result;
}
}
None
}
/// Timeout example
fn timeout_example() {
let pool = ThreadPool::new(4);
// Quick task
let result = execute_with_timeout(&pool, || {
thread::sleep(Duration::from_millis(100));
"Quick result"
}, Duration::from_secs(1));
println!("Quick task result: {:?}", result);
// Slow task
let result = execute_with_timeout(&pool, || {
thread::sleep(Duration::from_secs(2));
"Slow result"
}, Duration::from_millis(500));
println!("Slow task result: {:?}", result);
}
// Usage Examples
fn main() {
println!("=== Web Rust Thread Pool Examples ===\n");
// 1. Simple thread pool
println!("--- 1. Simple Thread Pool ---");
simple_thread_pool_example();
thread::sleep(Duration::from_secs(2));
// 2. Thread pool with results
println!("\n--- 2. Thread Pool with Results ---");
thread_pool_with_results_example();
// 3. Parallel for
println!("\n--- 3. Parallel For ---");
parallel_for_example();
// 4. Batch processing
println!("\n--- 4. Batch Processing ---");
batch_processing_example();
// 5. Timeout
println!("\n--- 5. Timeout Example ---");
timeout_example();
println!("\n=== All Thread Pool Examples Completed ===");
}