🎯 Exemples recommandés
Balanced sample collections from various categories for you to explore
Exemples de Multithreading Web Rust
Exemples de multithreading Web Rust incluant la création de threads, la synchronisation et les pools de threads
💻 Création de Threads rust
🟢 simple
⭐⭐⭐
Créer et gérer des threads en utilisant std::thread pour l'exécution concurrente
⏱️ 20 min
🏷️ rust, web, multithreading
Prerequisites:
Basic Rust, std::thread
// Web Rust Thread Creation Examples
// Using std::thread for concurrent execution
use std::thread;
use std::time::{Duration, Instant};
// 1. Basic Thread Creation
/// Spawn a basic thread
fn spawn_basic_thread() {
thread::spawn(|| {
for i in 1..=5 {
println!("Thread: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
// Main thread work
for i in 1..=5 {
println!("Main: {}", i);
thread::sleep(Duration::from_millis(100));
}
// Note: If main thread exits, spawned threads will be terminated
thread::sleep(Duration::from_millis(600));
}
/// Spawn a thread with move closure
fn spawn_thread_with_move() {
let data = vec![1, 2, 3, 4, 5];
thread::spawn(move || {
println!("Thread received data: {:?}", data);
});
thread::sleep(Duration::from_millis(100));
}
// 2. Thread with Return Values
/// Spawn thread that returns a value
fn spawn_thread_with_return() -> thread::JoinHandle<i32> {
thread::spawn(|| {
let mut sum = 0;
for i in 1..=100 {
sum += i;
}
sum
})
}
/// Wait for thread completion and get result
fn wait_for_thread_result() {
let handle = spawn_thread_with_return();
match handle.join() {
Ok(result) => println!("Thread returned: {}", result),
Err(e) => println!("Thread panicked: {:?}", e),
}
}
// 3. Multiple Threads
/// Spawn multiple threads
fn spawn_multiple_threads() {
let mut handles = vec![];
for i in 0..5 {
let handle = thread::spawn(move || {
println!("Thread {} starting", i);
thread::sleep(Duration::from_millis(100 * (i + 1) as u64));
println!("Thread {} finishing", i);
i * 2
});
handles.push(handle);
}
// Wait for all threads
for handle in handles {
let _ = handle.join();
}
}
/// Collect results from multiple threads
fn collect_thread_results() -> Vec<i32> {
let mut handles = vec![];
for i in 0..5 {
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
i * i
});
handles.push(handle);
}
handles.into_iter().filter_map(|h| h.join().ok()).collect()
}
// 4. Thread Builder
/// Use thread::Builder for more control
fn spawn_named_thread() -> thread::JoinHandle<()> {
thread::Builder::new()
.name("my-worker-thread".to_string())
.stack_size(1024 * 1024) // 1MB stack
.spawn(|| {
println!("Named thread executing");
thread::sleep(Duration::from_millis(100));
})
.expect("Failed to spawn thread")
}
// 5. Thread with Error Handling
/// Thread that can return Result
fn spawn_thread_with_result() -> thread::JoinHandle<Result<i32, String>> {
thread::spawn(|| {
let mut sum = 0;
for i in 1..=100 {
sum += i;
}
Ok(sum)
})
}
/// Handle thread errors
fn handle_thread_errors() {
let handle = spawn_thread_with_result();
match handle.join() {
Ok(Ok(result)) => println!("Success: {}", result),
Ok(Err(e)) => println!("Thread error: {}", e),
Err(e) => println!("Thread panicked: {:?}", e),
}
}
// 6. Parallel Processing
/// Process data in parallel
fn parallel_processing(data: Vec<i32>) -> Vec<i32> {
let chunk_size = (data.len() + 3) / 4; // Split into 4 chunks
let mut handles = vec![];
for chunk in data.chunks(chunk_size) {
let chunk = chunk.to_vec();
let handle = thread::spawn(move || {
chunk.into_iter().map(|x| x * x).collect::<Vec<_>>()
});
handles.push(handle);
}
handles
.into_iter()
.filter_map(|h| h.join().ok())
.flatten()
.collect()
}
// 7. Thread Sleep and Yield
/// Demonstrate thread sleep
fn thread_sleep_example() {
println!("Starting sleep");
thread::sleep(Duration::from_secs(1));
println!("Finished sleep");
}
/// Demonstrate thread yield
fn thread_yield_example() {
let handle = thread::spawn(|| {
for i in 0..5 {
println!("Working... {}", i);
thread::yield_now(); // Suggest to scheduler to yield
}
});
let _ = handle.join();
}
// 8. Thread Local Storage
use std::cell::RefCell;
thread_local! {
static THREAD_LOCAL_DATA: RefCell<Vec<i32>> = RefCell::new(Vec::new());
}
/// Use thread local storage
fn thread_local_storage_example() {
let mut handles = vec![];
for i in 0..3 {
let handle = thread::spawn(move || {
THREAD_LOCAL_DATA.with_borrow_mut(|data| {
data.push(i);
data.push(i * 2);
println!("Thread {} local data: {:?}", i, data);
});
});
handles.push(handle);
}
for handle in handles {
let _ = handle.join();
}
}
// 9. Thread Panics
/// Thread that panics
fn spawn_panicking_thread() -> thread::JoinHandle<()> {
thread::spawn(|| {
panic!("Something went wrong!");
})
}
/// Catch thread panic
fn catch_thread_panic() {
let handle = spawn_panicking_thread();
match handle.join() {
Ok(_) => println!("Thread completed"),
Err(e) => println!("Thread panicked with: {:?}", e),
}
}
// 10. Scoped Threads (using crossbeam)
/// Process with scoped threads
fn scoped_threads_example() {
let data = vec![1, 2, 3, 4, 5];
let mut results = vec![0; data.len()];
// Using crossbeam::scope for scoped threads
// Note: Requires crossbeam crate
// crossbeam::scope(|s| {
// for (i, &value) in data.iter().enumerate() {
// s.spawn(|_| {
// results[i] = value * value;
// });
// }
// }).unwrap();
println!("Results: {:?}", results);
}
// Usage Examples
fn main() {
println!("=== Web Rust Thread Creation Examples ===\n");
// 1. Basic thread
println!("--- 1. Basic Thread ---");
spawn_basic_thread();
// 2. Thread with return value
println!("\n--- 2. Thread with Return Value ---");
wait_for_thread_result();
// 3. Multiple threads
println!("\n--- 3. Multiple Threads ---");
spawn_multiple_threads();
// 4. Collect results
println!("\n--- 4. Collect Thread Results ---");
let results = collect_thread_results();
println!("Results: {:?}", results);
// 5. Named thread
println!("\n--- 5. Named Thread ---");
let handle = spawn_named_thread();
let _ = handle.join();
// 6. Parallel processing
println!("\n--- 6. Parallel Processing ---");
let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
let start = Instant::now();
let results = parallel_processing(data);
println!("Results: {:?}", results);
println!("Time: {:?}", start.elapsed());
// 7. Thread local storage
println!("\n--- 7. Thread Local Storage ---");
thread_local_storage_example();
// 8. Error handling
println!("\n--- 8. Thread Error Handling ---");
handle_thread_errors();
println!("\n=== All Thread Creation Examples Completed ===");
}
💻 Synchronisation de Threads rust
🟡 intermediate
⭐⭐⭐⭐
Utiliser des mutex, des canaux et d'autres primitives de synchronisation pour un accès concurrent sûr
⏱️ 30 min
🏷️ rust, web, multithreading, synchronization
Prerequisites:
Intermediate Rust, std::sync, std::sync::mpsc
// Web Rust Thread Synchronization Examples
// Using mutexes, channels, and atomic operations
use std::sync::{Arc, Mutex, Condvar, RwLock, Barrier, Semaphore};
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
// 1. Mutex (Mutual Exclusion)
/// Basic mutex usage
fn basic_mutex_example() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter: {}", *counter.lock().unwrap());
}
/// Mutex with multiple operations
fn mutex_operations_example() {
let data = Arc::new(Mutex::new(vec![1, 2, 3]));
let data1 = Arc::clone(&data);
let handle1 = thread::spawn(move || {
let mut data = data1.lock().unwrap();
data.push(4);
println!("Thread 1: {:?}", data);
});
let data2 = Arc::clone(&data);
let handle2 = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
let mut data = data2.lock().unwrap();
data.push(5);
println!("Thread 2: {:?}", data);
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("Final data: {:?}", *data.lock().unwrap());
}
// 2. RwLock (Read-Write Lock)
/// Multiple readers, single writer
fn rwlock_example() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
let mut handles = vec![];
// Reader threads
for i in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let r = data.read().unwrap();
println!("Reader {}: {:?}", i, *r);
thread::sleep(Duration::from_millis(100));
}));
}
// Writer thread
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
let mut w = data.write().unwrap();
w.push(4);
println!("Writer: Modified data");
}));
for handle in handles {
handle.join().unwrap();
}
println!("Final data: {:?}", *data.read().unwrap());
}
// 3. Channels (Message Passing)
use std::sync::mpsc;
/// Basic channel usage
fn basic_channel_example() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Received: {}", received);
}
/// Multiple producers
fn multiple_producers_example() {
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
for i in 0..5 {
let tx = tx.clone();
handles.push(thread::spawn(move || {
tx.send(i).unwrap();
}));
}
drop(tx); // Drop original sender
// Receive all messages
let mut results = vec![];
for received in rx {
results.push(received);
}
for handle in handles {
handle.join().unwrap();
}
println!("Received: {:?}", results);
}
/// Sync channel (rendezvous)
fn sync_channel_example() {
let (tx, rx) = mpsc::sync_channel(1); // Buffer size 1
let handle = thread::spawn(move || {
println!("Sending 1");
tx.send(1).unwrap();
println!("Sending 2");
tx.send(2).unwrap();
});
thread::sleep(Duration::from_millis(100));
println!("Receiving...");
println!("Received: {}", rx.recv().unwrap());
println!("Received: {}", rx.recv().unwrap());
handle.join().unwrap();
}
// 4. Atomic Operations
/// Atomic counter
fn atomic_counter_example() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
counter.fetch_add(1, Ordering::SeqCst);
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Counter: {}", counter.load(Ordering::SeqCst));
}
/// Compare and swap
fn compare_and_swap_example() {
let value = Arc::new(AtomicI32::new(5));
let value1 = Arc::clone(&value);
let handle1 = thread::spawn(move || {
let mut old = value1.load(Ordering::SeqCst);
loop {
match value1.compare_exchange_weak(old, old * 2, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => break,
Err(actual) => old = actual,
}
}
println!("Thread 1: Changed value to {}", value1.load(Ordering::SeqCst));
});
let value2 = Arc::clone(&value);
let handle2 = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
let mut old = value2.load(Ordering::SeqCst);
loop {
match value2.compare_exchange_weak(old, old + 10, Ordering::SeqCst, Ordering::SeqCst) {
Ok(_) => break,
Err(actual) => old = actual,
}
}
println!("Thread 2: Changed value to {}", value2.load(Ordering::SeqCst));
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("Final value: {}", value.load(Ordering::SeqCst));
}
// 5. Barrier (Synchronization Point)
/// Barrier example
fn barrier_example() {
let mut handles = vec![];
let barrier = Arc::new(Barrier::new(3));
let mut data = vec![0; 3];
for i in 0..3 {
let barrier = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
println!("Thread {} started", i);
thread::sleep(Duration::from_millis(100 * i as u64));
println!("Thread {} reached barrier", i);
barrier.wait();
println!("Thread {} passed barrier", i);
i * 2
}));
}
for (i, handle) in handles.into_iter().enumerate() {
data[i] = handle.join().unwrap();
}
println!("Results: {:?}", data);
}
// 6. Condition Variable
/// Condition variable example
fn condition_variable_example() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
});
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
println!("Condition met!");
}
/// Producer-consumer with condition variable
fn producer_consumer_example() {
let mutex = Arc::new(Mutex::new(vec![]));
let condvar = Arc::new(Condvar::new());
// Producer
let mutex1 = Arc::clone(&mutex);
let condvar1 = Arc::clone(&condvar);
let producer = thread::spawn(move || {
let mut data = mutex1.lock().unwrap();
for i in 0..5 {
data.push(i);
println!("Produced: {}", i);
condvar1.notify_one();
thread::sleep(Duration::from_millis(100));
}
});
// Consumer
let mutex2 = Arc::clone(&mutex);
let condvar2 = Arc::clone(&condvar);
let consumer = thread::spawn(move || {
let mut data = mutex2.lock().unwrap();
for _ in 0..5 {
while data.is_empty() {
data = condvar2.wait(data).unwrap();
}
if let Some(value) = data.pop() {
println!("Consumed: {}", value);
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
// 7. Once (Single Execution)
use std::sync::Once;
static INIT: Once = Once::new();
static mut GLOBAL_VALUE: i32 = 0;
fn initialize_global() {
unsafe {
GLOBAL_VALUE = 42;
}
}
fn once_example() {
for _ in 0..5 {
thread::spawn(|| {
INIT.call_once(|| {
initialize_global();
println!("Initialized!");
});
unsafe {
println!("Global value: {}", GLOBAL_VALUE);
}
});
}
thread::sleep(Duration::from_millis(100));
}
// 8. Semaphore Pattern
/// Simple semaphore implementation
struct Semaphore {
permits: Arc<(Mutex<usize>, Condvar)>,
}
impl Semaphore {
fn new(permits: usize) -> Self {
Semaphore {
permits: Arc::new((Mutex::new(permits), Condvar::new())),
}
}
fn acquire(&self) {
let (lock, cvar) = &*self.permits;
let mut permits = lock.lock().unwrap();
while *permits == 0 {
permits = cvar.wait(permits).unwrap();
}
*permits -= 1;
}
fn release(&self) {
let (lock, cvar) = &*self.permits;
let mut permits = lock.lock().unwrap();
*permits += 1;
cvar.notify_one();
}
}
/// Semaphore usage
fn semaphore_example() {
let semaphore = Semaphore::new(2); // 2 permits
let mut handles = vec![];
for i in 0..5 {
let semaphore = semaphore.clone();
handles.push(thread::spawn(move || {
semaphore.acquire();
println!("Thread {} acquired permit", i);
thread::sleep(Duration::from_millis(100));
println!("Thread {} releasing permit", i);
semaphore.release();
}));
}
for handle in handles {
handle.join().unwrap();
}
}
// Clone for semaphore
impl Clone for Semaphore {
fn clone(&self) -> Self {
Semaphore {
permits: Arc::clone(&self.permits),
}
}
}
// Usage Examples
fn main() {
println!("=== Web Rust Thread Synchronization Examples ===\n");
// 1. Mutex
println!("--- 1. Mutex Example ---");
basic_mutex_example();
// 2. RwLock
println!("\n--- 2. RwLock Example ---");
rwlock_example();
// 3. Channels
println!("\n--- 3. Channel Example ---");
basic_channel_example();
// 4. Atomic operations
println!("\n--- 4. Atomic Counter ---");
atomic_counter_example();
// 5. Barrier
println!("\n--- 5. Barrier Example ---");
barrier_example();
// 6. Condition variable
println!("\n--- 6. Condition Variable ---");
condition_variable_example();
// 7. Once
println!("\n--- 7. Once Example ---");
once_example();
// 8. Semaphore
println!("\n--- 8. Semaphore Example ---");
semaphore_example();
println!("\n=== All Thread Synchronization Examples Completed ===");
}
💻 Pool de Threads rust
🔴 complex
⭐⭐⭐⭐
Implémenter et utiliser des pools de threads pour une exécution concurrente efficace des tâches
⏱️ 35 min
🏷️ rust, web, multithreading, thread pool
Prerequisites:
Advanced Rust, std::thread, std::sync
// Web Rust Thread Pool Examples
// Implement and use thread pools for efficient concurrent task execution
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
// 1. Simple Thread Pool
/// Simple thread pool implementation
struct ThreadPool {
workers: Vec<Worker>,
sender: Option<std::sync::mpsc::Sender<Message>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
impl ThreadPool {
/// Create a new thread pool
fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = std::sync::mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
/// Execute a job in the thread pool
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Shutting down thread pool");
// Send terminate message to all workers
for _ in &self.workers {
self.sender.as_ref().unwrap().send(Message::Terminate).unwrap();
}
// Drop sender to close channel
drop(self.sender.take());
// Wait for workers to finish
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
/// Use simple thread pool
fn simple_thread_pool_example() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} executing", i);
thread::sleep(Duration::from_millis(100));
println!("Task {} completed", i);
});
}
thread::sleep(Duration::from_secs(1));
}
// 2. Thread Pool with Return Values
use std::sync::mpsc;
/// Thread pool that can return results
struct ResultThreadPool {
workers: Vec<thread::JoinHandle<()>>,
sender: mpsc::Sender<JobWithResult>,
receiver: mpsc::Receiver<usize>,
}
type JobWithResult = Box<dyn FnOnce(mpsc::Sender<usize>) + Send + 'static>;
impl ResultThreadPool {
fn new(size: usize) -> ResultThreadPool {
let (job_sender, job_receiver) = mpsc::channel();
let (result_sender, result_receiver) = mpsc::channel();
let job_receiver = Arc::new(Mutex::new(job_receiver));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let job_receiver = Arc::clone(&job_receiver);
let result_sender = result_sender.clone();
let handle = thread::spawn(move || {
loop {
let job = job_receiver.lock().unwrap().recv();
match job {
Ok(job) => {
job(result_sender.clone());
}
Err(_) => break,
}
}
});
workers.push(handle);
}
ResultThreadPool {
workers,
sender: job_sender,
receiver: result_receiver,
}
}
fn execute<F>(&self, f: F) -> usize
where
F: FnOnce() -> usize + Send + 'static,
{
let (tx, rx) = mpsc::channel();
self.sender.send(Box::new(move |result_tx| {
let result = f();
result_tx.send(result).unwrap();
tx.send(()).unwrap();
})).unwrap();
rx.recv().unwrap()
}
}
/// Thread pool with results example
fn thread_pool_with_results_example() {
let pool = ResultThreadPool::new(4);
let mut results = vec![];
for i in 0..8 {
let result = pool.execute(move || {
println!("Task {} executing", i);
thread::sleep(Duration::from_millis(100));
println!("Task {} completed", i);
i * 2
});
results.push(result);
}
println!("Results: {:?}", results);
}
// 3. Work Stealing Thread Pool
/// Simple work-stealing queue
struct WorkStealingQueue {
local: Arc<Mutex<Vec<Job>>>,
stolen: Arc<Mutex<Vec<Job>>>,
}
impl WorkStealingQueue {
fn new() -> Self {
WorkStealingQueue {
local: Arc::new(Mutex::new(Vec::new())),
stolen: Arc::new(Mutex::new(Vec::new())),
}
}
fn push(&self, job: Job) {
self.local.lock().unwrap().push(job);
}
fn pop(&self) -> Option<Job> {
self.local.lock().unwrap().pop()
}
fn steal(&self) -> Option<Job> {
let mut stolen = self.stolen.lock().unwrap();
if stolen.is_empty() {
// Swap local with stolen
let mut local = self.local.lock().unwrap();
std::mem::swap(&mut *local, &mut *stolen);
stolen.pop()
} else {
stolen.pop()
}
}
}
// 4. Async/Await Thread Pool
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
/// Simple task executor
struct TaskExecutor {
tasks: Arc<Mutex<Vec<Box<dyn Future<Output = ()> + Unpin + Send>>>>,
}
impl TaskExecutor {
fn new() -> Self {
TaskExecutor {
tasks: Arc::new(Mutex::new(Vec::new())),
}
}
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Unpin + Send + 'static,
{
self.tasks.lock().unwrap().push(Box::new(future));
}
fn run(&self) {
let mut tasks = self.tasks.lock().unwrap();
let mut i = 0;
while i < tasks.len() {
let mut future = tasks.remove(i);
let waker = unsafe { Waker::from_raw(std::ptr::null()) };
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut future).poll(&mut cx) {
Poll::Ready(_) => {}
Poll::Pending => {
tasks.insert(i, future);
i += 1;
}
}
}
}
}
// 5. Parallel For Loop
/// Execute function in parallel over range
fn parallel_for<F>(start: usize, end: usize, f: F)
where
F: Fn(usize) + Send + Sync + 'static,
{
let pool = ThreadPool::new(4);
let f = Arc::new(f);
for i in start..end {
let f = Arc::clone(&f);
pool.execute(move || {
f(i);
});
}
thread::sleep(Duration::from_millis(100));
}
/// Parallel for example
fn parallel_for_example() {
println!("Parallel for example:");
parallel_for(0, 10, |i| {
println!("Processing {}", i);
thread::sleep(Duration::from_millis(50));
});
}
// 6. Batch Processing with Thread Pool
/// Process items in batches
fn batch_processing<T, F, R>(items: Vec<T>, batch_size: usize, f: F) -> Vec<R>
where
T: Send + 'static,
R: Send + 'static,
F: Fn(Vec<T>) -> Vec<R> + Send + Sync + 'static,
{
let pool = ThreadPool::new(4);
let (sender, receiver) = mpsc::channel();
let f = Arc::new(f);
for chunk in items.chunks(batch_size) {
let chunk = chunk.to_vec();
let sender = sender.clone();
let f = Arc::clone(&f);
pool.execute(move || {
let results = f(chunk);
sender.send(results).unwrap();
});
}
drop(sender);
receiver.iter().flatten().collect()
}
/// Batch processing example
fn batch_processing_example() {
let data: Vec<i32> = (0..20).collect();
let results = batch_processing(data, 5, |batch| {
println!("Processing batch: {:?}", batch);
batch.into_iter().map(|x| x * 2).collect()
});
println!("Results: {:?}", results);
}
// 7. Priority Thread Pool
use std::collections::BinaryHeap;
use std::cmp::Ordering;
/// Priority job
struct PriorityJob {
priority: i32,
job: Job,
}
impl PartialEq for PriorityJob {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
impl Eq for PriorityJob {}
impl PartialOrd for PriorityJob {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PriorityJob {
fn cmp(&self, other: &Self) -> Ordering {
other.priority.cmp(&self.priority) // Reverse for max-heap
}
}
/// Priority thread pool
struct PriorityThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<PriorityJob>>,
}
impl PriorityThreadPool {
fn new(size: usize) -> Self {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
PriorityThreadPool {
workers,
sender: Some(sender),
}
}
fn execute_with_priority<F>(&self, priority: i32, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
let priority_job = PriorityJob { priority, job };
self.sender.as_ref().unwrap().send(priority_job).unwrap();
}
}
// 8. Thread Pool with Timeout
/// Execute job with timeout
fn execute_with_timeout<F, R>(pool: &ThreadPool, f: F, timeout: Duration) -> Option<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (sender, receiver) = mpsc::channel();
pool.execute(move || {
let result = f();
sender.send(Some(result)).unwrap();
});
// Wait with timeout
let start = std::time::Instant::now();
while start.elapsed() < timeout {
if let Ok(result) = receiver.recv_timeout(Duration::from_millis(100)) {
return result;
}
}
None
}
/// Timeout example
fn timeout_example() {
let pool = ThreadPool::new(4);
// Quick task
let result = execute_with_timeout(&pool, || {
thread::sleep(Duration::from_millis(100));
"Quick result"
}, Duration::from_secs(1));
println!("Quick task result: {:?}", result);
// Slow task
let result = execute_with_timeout(&pool, || {
thread::sleep(Duration::from_secs(2));
"Slow result"
}, Duration::from_millis(500));
println!("Slow task result: {:?}", result);
}
// Usage Examples
fn main() {
println!("=== Web Rust Thread Pool Examples ===\n");
// 1. Simple thread pool
println!("--- 1. Simple Thread Pool ---");
simple_thread_pool_example();
thread::sleep(Duration::from_secs(2));
// 2. Thread pool with results
println!("\n--- 2. Thread Pool with Results ---");
thread_pool_with_results_example();
// 3. Parallel for
println!("\n--- 3. Parallel For ---");
parallel_for_example();
// 4. Batch processing
println!("\n--- 4. Batch Processing ---");
batch_processing_example();
// 5. Timeout
println!("\n--- 5. Timeout Example ---");
timeout_example();
println!("\n=== All Thread Pool Examples Completed ===");
}