Exemples de Multithreading Web Rust

Exemples de multithreading Web Rust incluant la création de threads, la synchronisation et les pools de threads

💻 Création de Threads rust

🟢 simple ⭐⭐⭐

Créer et gérer des threads en utilisant std::thread pour l'exécution concurrente

⏱️ 20 min 🏷️ rust, web, multithreading
Prerequisites: Basic Rust, std::thread
// Web Rust Thread Creation Examples
// Using std::thread for concurrent execution

use std::thread;
use std::time::{Duration, Instant};

// 1. Basic Thread Creation

/// Spawn a basic thread
fn spawn_basic_thread() {
    thread::spawn(|| {
        for i in 1..=5 {
            println!("Thread: {}", i);
            thread::sleep(Duration::from_millis(100));
        }
    });

    // Main thread work
    for i in 1..=5 {
        println!("Main: {}", i);
        thread::sleep(Duration::from_millis(100));
    }

    // Note: If main thread exits, spawned threads will be terminated
    thread::sleep(Duration::from_millis(600));
}

/// Spawn a thread with move closure
fn spawn_thread_with_move() {
    let data = vec![1, 2, 3, 4, 5];

    thread::spawn(move || {
        println!("Thread received data: {:?}", data);
    });

    thread::sleep(Duration::from_millis(100));
}

// 2. Thread with Return Values

/// Spawn thread that returns a value
fn spawn_thread_with_return() -> thread::JoinHandle<i32> {
    thread::spawn(|| {
        let mut sum = 0;
        for i in 1..=100 {
            sum += i;
        }
        sum
    })
}

/// Wait for thread completion and get result
fn wait_for_thread_result() {
    let handle = spawn_thread_with_return();
    match handle.join() {
        Ok(result) => println!("Thread returned: {}", result),
        Err(e) => println!("Thread panicked: {:?}", e),
    }
}

// 3. Multiple Threads

/// Spawn multiple threads
fn spawn_multiple_threads() {
    let mut handles = vec![];

    for i in 0..5 {
        let handle = thread::spawn(move || {
            println!("Thread {} starting", i);
            thread::sleep(Duration::from_millis(100 * (i + 1) as u64));
            println!("Thread {} finishing", i);
            i * 2
        });
        handles.push(handle);
    }

    // Wait for all threads
    for handle in handles {
        let _ = handle.join();
    }
}

/// Collect results from multiple threads
fn collect_thread_results() -> Vec<i32> {
    let mut handles = vec![];

    for i in 0..5 {
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(100));
            i * i
        });
        handles.push(handle);
    }

    handles.into_iter().filter_map(|h| h.join().ok()).collect()
}

// 4. Thread Builder

/// Use thread::Builder for more control
fn spawn_named_thread() -> thread::JoinHandle<()> {
    thread::Builder::new()
        .name("my-worker-thread".to_string())
        .stack_size(1024 * 1024) // 1MB stack
        .spawn(|| {
            println!("Named thread executing");
            thread::sleep(Duration::from_millis(100));
        })
        .expect("Failed to spawn thread")
}

// 5. Thread with Error Handling

/// Thread that can return Result
fn spawn_thread_with_result() -> thread::JoinHandle<Result<i32, String>> {
    thread::spawn(|| {
        let mut sum = 0;
        for i in 1..=100 {
            sum += i;
        }
        Ok(sum)
    })
}

/// Handle thread errors
fn handle_thread_errors() {
    let handle = spawn_thread_with_result();
    match handle.join() {
        Ok(Ok(result)) => println!("Success: {}", result),
        Ok(Err(e)) => println!("Thread error: {}", e),
        Err(e) => println!("Thread panicked: {:?}", e),
    }
}

// 6. Parallel Processing

/// Process data in parallel
fn parallel_processing(data: Vec<i32>) -> Vec<i32> {
    let chunk_size = (data.len() + 3) / 4; // Split into 4 chunks
    let mut handles = vec![];

    for chunk in data.chunks(chunk_size) {
        let chunk = chunk.to_vec();
        let handle = thread::spawn(move || {
            chunk.into_iter().map(|x| x * x).collect::<Vec<_>>()
        });
        handles.push(handle);
    }

    handles
        .into_iter()
        .filter_map(|h| h.join().ok())
        .flatten()
        .collect()
}

// 7. Thread Sleep and Yield

/// Demonstrate thread sleep
fn thread_sleep_example() {
    println!("Starting sleep");
    thread::sleep(Duration::from_secs(1));
    println!("Finished sleep");
}

/// Demonstrate thread yield
fn thread_yield_example() {
    let handle = thread::spawn(|| {
        for i in 0..5 {
            println!("Working... {}", i);
            thread::yield_now(); // Suggest to scheduler to yield
        }
    });

    let _ = handle.join();
}

// 8. Thread Local Storage

use std::cell::RefCell;

thread_local! {
    static THREAD_LOCAL_DATA: RefCell<Vec<i32>> = RefCell::new(Vec::new());
}

/// Use thread local storage
fn thread_local_storage_example() {
    let mut handles = vec![];

    for i in 0..3 {
        let handle = thread::spawn(move || {
            THREAD_LOCAL_DATA.with_borrow_mut(|data| {
                data.push(i);
                data.push(i * 2);
                println!("Thread {} local data: {:?}", i, data);
            });
        });
        handles.push(handle);
    }

    for handle in handles {
        let _ = handle.join();
    }
}

// 9. Thread Panics

/// Thread that panics
fn spawn_panicking_thread() -> thread::JoinHandle<()> {
    thread::spawn(|| {
        panic!("Something went wrong!");
    })
}

/// Catch thread panic
fn catch_thread_panic() {
    let handle = spawn_panicking_thread();
    match handle.join() {
        Ok(_) => println!("Thread completed"),
        Err(e) => println!("Thread panicked with: {:?}", e),
    }
}

// 10. Scoped Threads (using crossbeam)

/// Process with scoped threads
fn scoped_threads_example() {
    let data = vec![1, 2, 3, 4, 5];
    let mut results = vec![0; data.len()];

    // Using crossbeam::scope for scoped threads
    // Note: Requires crossbeam crate
    // crossbeam::scope(|s| {
    //     for (i, &value) in data.iter().enumerate() {
    //         s.spawn(|_| {
    //             results[i] = value * value;
    //         });
    //     }
    // }).unwrap();

    println!("Results: {:?}", results);
}

// Usage Examples
fn main() {
    println!("=== Web Rust Thread Creation Examples ===\n");

    // 1. Basic thread
    println!("--- 1. Basic Thread ---");
    spawn_basic_thread();

    // 2. Thread with return value
    println!("\n--- 2. Thread with Return Value ---");
    wait_for_thread_result();

    // 3. Multiple threads
    println!("\n--- 3. Multiple Threads ---");
    spawn_multiple_threads();

    // 4. Collect results
    println!("\n--- 4. Collect Thread Results ---");
    let results = collect_thread_results();
    println!("Results: {:?}", results);

    // 5. Named thread
    println!("\n--- 5. Named Thread ---");
    let handle = spawn_named_thread();
    let _ = handle.join();

    // 6. Parallel processing
    println!("\n--- 6. Parallel Processing ---");
    let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let start = Instant::now();
    let results = parallel_processing(data);
    println!("Results: {:?}", results);
    println!("Time: {:?}", start.elapsed());

    // 7. Thread local storage
    println!("\n--- 7. Thread Local Storage ---");
    thread_local_storage_example();

    // 8. Error handling
    println!("\n--- 8. Thread Error Handling ---");
    handle_thread_errors();

    println!("\n=== All Thread Creation Examples Completed ===");
}

💻 Synchronisation de Threads rust

🟡 intermediate ⭐⭐⭐⭐

Utiliser des mutex, des canaux et d'autres primitives de synchronisation pour un accès concurrent sûr

⏱️ 30 min 🏷️ rust, web, multithreading, synchronization
Prerequisites: Intermediate Rust, std::sync, std::sync::mpsc
// Web Rust Thread Synchronization Examples
// Using mutexes, channels, and atomic operations

use std::sync::{Arc, Mutex, Condvar, RwLock, Barrier, Semaphore};
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;

// 1. Mutex (Mutual Exclusion)

/// Basic mutex usage
fn basic_mutex_example() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Counter: {}", *counter.lock().unwrap());
}

/// Mutex with multiple operations
fn mutex_operations_example() {
    let data = Arc::new(Mutex::new(vec![1, 2, 3]));

    let data1 = Arc::clone(&data);
    let handle1 = thread::spawn(move || {
        let mut data = data1.lock().unwrap();
        data.push(4);
        println!("Thread 1: {:?}", data);
    });

    let data2 = Arc::clone(&data);
    let handle2 = thread::spawn(move || {
        thread::sleep(Duration::from_millis(10));
        let mut data = data2.lock().unwrap();
        data.push(5);
        println!("Thread 2: {:?}", data);
    });

    handle1.join().unwrap();
    handle2.join().unwrap();

    println!("Final data: {:?}", *data.lock().unwrap());
}

// 2. RwLock (Read-Write Lock)

/// Multiple readers, single writer
fn rwlock_example() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));
    let mut handles = vec![];

    // Reader threads
    for i in 0..3 {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let r = data.read().unwrap();
            println!("Reader {}: {:?}", i, *r);
            thread::sleep(Duration::from_millis(100));
        }));
    }

    // Writer thread
    let data = Arc::clone(&data);
    handles.push(thread::spawn(move || {
        thread::sleep(Duration::from_millis(50));
        let mut w = data.write().unwrap();
        w.push(4);
        println!("Writer: Modified data");
    }));

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Final data: {:?}", *data.read().unwrap());
}

// 3. Channels (Message Passing)

use std::sync::mpsc;

/// Basic channel usage
fn basic_channel_example() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Received: {}", received);
}

/// Multiple producers
fn multiple_producers_example() {
    let (tx, rx) = mpsc::channel();
    let mut handles = vec![];

    for i in 0..5 {
        let tx = tx.clone();
        handles.push(thread::spawn(move || {
            tx.send(i).unwrap();
        }));
    }

    drop(tx); // Drop original sender

    // Receive all messages
    let mut results = vec![];
    for received in rx {
        results.push(received);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Received: {:?}", results);
}

/// Sync channel (rendezvous)
fn sync_channel_example() {
    let (tx, rx) = mpsc::sync_channel(1); // Buffer size 1

    let handle = thread::spawn(move || {
        println!("Sending 1");
        tx.send(1).unwrap();
        println!("Sending 2");
        tx.send(2).unwrap();
    });

    thread::sleep(Duration::from_millis(100));
    println!("Receiving...");
    println!("Received: {}", rx.recv().unwrap());
    println!("Received: {}", rx.recv().unwrap());

    handle.join().unwrap();
}

// 4. Atomic Operations

/// Atomic counter
fn atomic_counter_example() {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(thread::spawn(move || {
            counter.fetch_add(1, Ordering::SeqCst);
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Counter: {}", counter.load(Ordering::SeqCst));
}

/// Compare and swap
fn compare_and_swap_example() {
    let value = Arc::new(AtomicI32::new(5));

    let value1 = Arc::clone(&value);
    let handle1 = thread::spawn(move || {
        let mut old = value1.load(Ordering::SeqCst);
        loop {
            match value1.compare_exchange_weak(old, old * 2, Ordering::SeqCst, Ordering::SeqCst) {
                Ok(_) => break,
                Err(actual) => old = actual,
            }
        }
        println!("Thread 1: Changed value to {}", value1.load(Ordering::SeqCst));
    });

    let value2 = Arc::clone(&value);
    let handle2 = thread::spawn(move || {
        thread::sleep(Duration::from_millis(10));
        let mut old = value2.load(Ordering::SeqCst);
        loop {
            match value2.compare_exchange_weak(old, old + 10, Ordering::SeqCst, Ordering::SeqCst) {
                Ok(_) => break,
                Err(actual) => old = actual,
            }
        }
        println!("Thread 2: Changed value to {}", value2.load(Ordering::SeqCst));
    });

    handle1.join().unwrap();
    handle2.join().unwrap();

    println!("Final value: {}", value.load(Ordering::SeqCst));
}

// 5. Barrier (Synchronization Point)

/// Barrier example
fn barrier_example() {
    let mut handles = vec![];
    let barrier = Arc::new(Barrier::new(3));
    let mut data = vec![0; 3];

    for i in 0..3 {
        let barrier = Arc::clone(&barrier);
        handles.push(thread::spawn(move || {
            println!("Thread {} started", i);
            thread::sleep(Duration::from_millis(100 * i as u64));
            println!("Thread {} reached barrier", i);
            barrier.wait();
            println!("Thread {} passed barrier", i);
            i * 2
        }));
    }

    for (i, handle) in handles.into_iter().enumerate() {
        data[i] = handle.join().unwrap();
    }

    println!("Results: {:?}", data);
}

// 6. Condition Variable

/// Condition variable example
fn condition_variable_example() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = Arc::clone(&pair);

    thread::spawn(move || {
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        cvar.notify_one();
    });

    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        started = cvar.wait(started).unwrap();
    }
    println!("Condition met!");
}

/// Producer-consumer with condition variable
fn producer_consumer_example() {
    let mutex = Arc::new(Mutex::new(vec![]));
    let condvar = Arc::new(Condvar::new());

    // Producer
    let mutex1 = Arc::clone(&mutex);
    let condvar1 = Arc::clone(&condvar);
    let producer = thread::spawn(move || {
        let mut data = mutex1.lock().unwrap();
        for i in 0..5 {
            data.push(i);
            println!("Produced: {}", i);
            condvar1.notify_one();
            thread::sleep(Duration::from_millis(100));
        }
    });

    // Consumer
    let mutex2 = Arc::clone(&mutex);
    let condvar2 = Arc::clone(&condvar);
    let consumer = thread::spawn(move || {
        let mut data = mutex2.lock().unwrap();
        for _ in 0..5 {
            while data.is_empty() {
                data = condvar2.wait(data).unwrap();
            }
            if let Some(value) = data.pop() {
                println!("Consumed: {}", value);
            }
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

// 7. Once (Single Execution)

use std::sync::Once;

static INIT: Once = Once::new();
static mut GLOBAL_VALUE: i32 = 0;

fn initialize_global() {
    unsafe {
        GLOBAL_VALUE = 42;
    }
}

fn once_example() {
    for _ in 0..5 {
        thread::spawn(|| {
            INIT.call_once(|| {
                initialize_global();
                println!("Initialized!");
            });
            unsafe {
                println!("Global value: {}", GLOBAL_VALUE);
            }
        });
    }
    thread::sleep(Duration::from_millis(100));
}

// 8. Semaphore Pattern

/// Simple semaphore implementation
struct Semaphore {
    permits: Arc<(Mutex<usize>, Condvar)>,
}

impl Semaphore {
    fn new(permits: usize) -> Self {
        Semaphore {
            permits: Arc::new((Mutex::new(permits), Condvar::new())),
        }
    }

    fn acquire(&self) {
        let (lock, cvar) = &*self.permits;
        let mut permits = lock.lock().unwrap();
        while *permits == 0 {
            permits = cvar.wait(permits).unwrap();
        }
        *permits -= 1;
    }

    fn release(&self) {
        let (lock, cvar) = &*self.permits;
        let mut permits = lock.lock().unwrap();
        *permits += 1;
        cvar.notify_one();
    }
}

/// Semaphore usage
fn semaphore_example() {
    let semaphore = Semaphore::new(2); // 2 permits
    let mut handles = vec![];

    for i in 0..5 {
        let semaphore = semaphore.clone();
        handles.push(thread::spawn(move || {
            semaphore.acquire();
            println!("Thread {} acquired permit", i);
            thread::sleep(Duration::from_millis(100));
            println!("Thread {} releasing permit", i);
            semaphore.release();
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }
}

// Clone for semaphore
impl Clone for Semaphore {
    fn clone(&self) -> Self {
        Semaphore {
            permits: Arc::clone(&self.permits),
        }
    }
}

// Usage Examples
fn main() {
    println!("=== Web Rust Thread Synchronization Examples ===\n");

    // 1. Mutex
    println!("--- 1. Mutex Example ---");
    basic_mutex_example();

    // 2. RwLock
    println!("\n--- 2. RwLock Example ---");
    rwlock_example();

    // 3. Channels
    println!("\n--- 3. Channel Example ---");
    basic_channel_example();

    // 4. Atomic operations
    println!("\n--- 4. Atomic Counter ---");
    atomic_counter_example();

    // 5. Barrier
    println!("\n--- 5. Barrier Example ---");
    barrier_example();

    // 6. Condition variable
    println!("\n--- 6. Condition Variable ---");
    condition_variable_example();

    // 7. Once
    println!("\n--- 7. Once Example ---");
    once_example();

    // 8. Semaphore
    println!("\n--- 8. Semaphore Example ---");
    semaphore_example();

    println!("\n=== All Thread Synchronization Examples Completed ===");
}

💻 Pool de Threads rust

🔴 complex ⭐⭐⭐⭐

Implémenter et utiliser des pools de threads pour une exécution concurrente efficace des tâches

⏱️ 35 min 🏷️ rust, web, multithreading, thread pool
Prerequisites: Advanced Rust, std::thread, std::sync
// Web Rust Thread Pool Examples
// Implement and use thread pools for efficient concurrent task execution

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

// 1. Simple Thread Pool

/// Simple thread pool implementation
struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<std::sync::mpsc::Sender<Message>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

enum Message {
    NewJob(Job),
    Terminate,
}

impl ThreadPool {
    /// Create a new thread pool
    fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = std::sync::mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    /// Execute a job in the thread pool
    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.as_ref().unwrap().send(Message::NewJob(job)).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        println!("Shutting down thread pool");

        // Send terminate message to all workers
        for _ in &self.workers {
            self.sender.as_ref().unwrap().send(Message::Terminate).unwrap();
        }

        // Drop sender to close channel
        drop(self.sender.take());

        // Wait for workers to finish
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Message>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv().unwrap();

            match message {
                Message::NewJob(job) => {
                    println!("Worker {} got a job; executing.", id);
                    job();
                }
                Message::Terminate => {
                    println!("Worker {} was told to terminate.", id);
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

/// Use simple thread pool
fn simple_thread_pool_example() {
    let pool = ThreadPool::new(4);

    for i in 0..8 {
        pool.execute(move || {
            println!("Task {} executing", i);
            thread::sleep(Duration::from_millis(100));
            println!("Task {} completed", i);
        });
    }

    thread::sleep(Duration::from_secs(1));
}

// 2. Thread Pool with Return Values

use std::sync::mpsc;

/// Thread pool that can return results
struct ResultThreadPool {
    workers: Vec<thread::JoinHandle<()>>,
    sender: mpsc::Sender<JobWithResult>,
    receiver: mpsc::Receiver<usize>,
}

type JobWithResult = Box<dyn FnOnce(mpsc::Sender<usize>) + Send + 'static>;

impl ResultThreadPool {
    fn new(size: usize) -> ResultThreadPool {
        let (job_sender, job_receiver) = mpsc::channel();
        let (result_sender, result_receiver) = mpsc::channel();
        let job_receiver = Arc::new(Mutex::new(job_receiver));

        let mut workers = Vec::with_capacity(size);

        for _ in 0..size {
            let job_receiver = Arc::clone(&job_receiver);
            let result_sender = result_sender.clone();

            let handle = thread::spawn(move || {
                loop {
                    let job = job_receiver.lock().unwrap().recv();
                    match job {
                        Ok(job) => {
                            job(result_sender.clone());
                        }
                        Err(_) => break,
                    }
                }
            });

            workers.push(handle);
        }

        ResultThreadPool {
            workers,
            sender: job_sender,
            receiver: result_receiver,
        }
    }

    fn execute<F>(&self, f: F) -> usize
    where
        F: FnOnce() -> usize + Send + 'static,
    {
        let (tx, rx) = mpsc::channel();
        self.sender.send(Box::new(move |result_tx| {
            let result = f();
            result_tx.send(result).unwrap();
            tx.send(()).unwrap();
        })).unwrap();

        rx.recv().unwrap()
    }
}

/// Thread pool with results example
fn thread_pool_with_results_example() {
    let pool = ResultThreadPool::new(4);
    let mut results = vec![];

    for i in 0..8 {
        let result = pool.execute(move || {
            println!("Task {} executing", i);
            thread::sleep(Duration::from_millis(100));
            println!("Task {} completed", i);
            i * 2
        });
        results.push(result);
    }

    println!("Results: {:?}", results);
}

// 3. Work Stealing Thread Pool

/// Simple work-stealing queue
struct WorkStealingQueue {
    local: Arc<Mutex<Vec<Job>>>,
    stolen: Arc<Mutex<Vec<Job>>>,
}

impl WorkStealingQueue {
    fn new() -> Self {
        WorkStealingQueue {
            local: Arc::new(Mutex::new(Vec::new())),
            stolen: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn push(&self, job: Job) {
        self.local.lock().unwrap().push(job);
    }

    fn pop(&self) -> Option<Job> {
        self.local.lock().unwrap().pop()
    }

    fn steal(&self) -> Option<Job> {
        let mut stolen = self.stolen.lock().unwrap();
        if stolen.is_empty() {
            // Swap local with stolen
            let mut local = self.local.lock().unwrap();
            std::mem::swap(&mut *local, &mut *stolen);
            stolen.pop()
        } else {
            stolen.pop()
        }
    }
}

// 4. Async/Await Thread Pool

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

/// Simple task executor
struct TaskExecutor {
    tasks: Arc<Mutex<Vec<Box<dyn Future<Output = ()> + Unpin + Send>>>>,
}

impl TaskExecutor {
    fn new() -> Self {
        TaskExecutor {
            tasks: Arc::new(Mutex::new(Vec::new())),
        }
    }

    fn spawn<F>(&self, future: F)
    where
        F: Future<Output = ()> + Unpin + Send + 'static,
    {
        self.tasks.lock().unwrap().push(Box::new(future));
    }

    fn run(&self) {
        let mut tasks = self.tasks.lock().unwrap();
        let mut i = 0;
        while i < tasks.len() {
            let mut future = tasks.remove(i);
            let waker = unsafe { Waker::from_raw(std::ptr::null()) };
            let mut cx = Context::from_waker(&waker);

            match Pin::new(&mut future).poll(&mut cx) {
                Poll::Ready(_) => {}
                Poll::Pending => {
                    tasks.insert(i, future);
                    i += 1;
                }
            }
        }
    }
}

// 5. Parallel For Loop

/// Execute function in parallel over range
fn parallel_for<F>(start: usize, end: usize, f: F)
where
    F: Fn(usize) + Send + Sync + 'static,
{
    let pool = ThreadPool::new(4);
    let f = Arc::new(f);

    for i in start..end {
        let f = Arc::clone(&f);
        pool.execute(move || {
            f(i);
        });
    }

    thread::sleep(Duration::from_millis(100));
}

/// Parallel for example
fn parallel_for_example() {
    println!("Parallel for example:");
    parallel_for(0, 10, |i| {
        println!("Processing {}", i);
        thread::sleep(Duration::from_millis(50));
    });
}

// 6. Batch Processing with Thread Pool

/// Process items in batches
fn batch_processing<T, F, R>(items: Vec<T>, batch_size: usize, f: F) -> Vec<R>
where
    T: Send + 'static,
    R: Send + 'static,
    F: Fn(Vec<T>) -> Vec<R> + Send + Sync + 'static,
{
    let pool = ThreadPool::new(4);
    let (sender, receiver) = mpsc::channel();
    let f = Arc::new(f);

    for chunk in items.chunks(batch_size) {
        let chunk = chunk.to_vec();
        let sender = sender.clone();
        let f = Arc::clone(&f);

        pool.execute(move || {
            let results = f(chunk);
            sender.send(results).unwrap();
        });
    }

    drop(sender);

    receiver.iter().flatten().collect()
}

/// Batch processing example
fn batch_processing_example() {
    let data: Vec<i32> = (0..20).collect();
    let results = batch_processing(data, 5, |batch| {
        println!("Processing batch: {:?}", batch);
        batch.into_iter().map(|x| x * 2).collect()
    });
    println!("Results: {:?}", results);
}

// 7. Priority Thread Pool

use std::collections::BinaryHeap;
use std::cmp::Ordering;

/// Priority job
struct PriorityJob {
    priority: i32,
    job: Job,
}

impl PartialEq for PriorityJob {
    fn eq(&self, other: &Self) -> bool {
        self.priority == other.priority
    }
}

impl Eq for PriorityJob {}

impl PartialOrd for PriorityJob {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for PriorityJob {
    fn cmp(&self, other: &Self) -> Ordering {
        other.priority.cmp(&self.priority) // Reverse for max-heap
    }
}

/// Priority thread pool
struct PriorityThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<PriorityJob>>,
}

impl PriorityThreadPool {
    fn new(size: usize) -> Self {
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        PriorityThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    fn execute_with_priority<F>(&self, priority: i32, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        let priority_job = PriorityJob { priority, job };
        self.sender.as_ref().unwrap().send(priority_job).unwrap();
    }
}

// 8. Thread Pool with Timeout

/// Execute job with timeout
fn execute_with_timeout<F, R>(pool: &ThreadPool, f: F, timeout: Duration) -> Option<R>
where
    F: FnOnce() -> R + Send + 'static,
    R: Send + 'static,
{
    let (sender, receiver) = mpsc::channel();

    pool.execute(move || {
        let result = f();
        sender.send(Some(result)).unwrap();
    });

    // Wait with timeout
    let start = std::time::Instant::now();
    while start.elapsed() < timeout {
        if let Ok(result) = receiver.recv_timeout(Duration::from_millis(100)) {
            return result;
        }
    }

    None
}

/// Timeout example
fn timeout_example() {
    let pool = ThreadPool::new(4);

    // Quick task
    let result = execute_with_timeout(&pool, || {
        thread::sleep(Duration::from_millis(100));
        "Quick result"
    }, Duration::from_secs(1));
    println!("Quick task result: {:?}", result);

    // Slow task
    let result = execute_with_timeout(&pool, || {
        thread::sleep(Duration::from_secs(2));
        "Slow result"
    }, Duration::from_millis(500));
    println!("Slow task result: {:?}", result);
}

// Usage Examples
fn main() {
    println!("=== Web Rust Thread Pool Examples ===\n");

    // 1. Simple thread pool
    println!("--- 1. Simple Thread Pool ---");
    simple_thread_pool_example();

    thread::sleep(Duration::from_secs(2));

    // 2. Thread pool with results
    println!("\n--- 2. Thread Pool with Results ---");
    thread_pool_with_results_example();

    // 3. Parallel for
    println!("\n--- 3. Parallel For ---");
    parallel_for_example();

    // 4. Batch processing
    println!("\n--- 4. Batch Processing ---");
    batch_processing_example();

    // 5. Timeout
    println!("\n--- 5. Timeout Example ---");
    timeout_example();

    println!("\n=== All Thread Pool Examples Completed ===");
}