Многопоточное Программирование Windows - Примеры C++

Полные примеры многопоточного программирования в C++ для платформы Windows включая создание потоков, синхронизацию и реализацию пулов потоков

💻 Создание и Управление Потоками cpp

🟢 simple ⭐⭐

Базовое создание потоков, объединение и управление с использованием std::thread и Windows API

⏱️ 25 min 🏷️ cpp, threading, multithreading, windows
Prerequisites: Basic C++, C++11/14 features, Windows API basics
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
#include <string>
#include <atomic>
#include <future>
#include <random>
#include <windows.h>

// Thread worker function
void SimpleWorker(int threadId, int iterations)
{
    std::cout << "Thread " << threadId << " started (ID: "
              << std::this_thread::get_id() << ")" << std::endl;

    for (int i = 0; i < iterations; i++)
    {
        std::cout << "Thread " << threadId << " - Iteration "
                  << i + 1 << "/" << iterations << std::endl;

        // Simulate work
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    std::cout << "Thread " << threadId << " completed" << std::endl;
}

// 1. Basic thread creation
void BasicThreadCreation()
{
    std::cout << "=== Basic Thread Creation ===" << std::endl;

    const int numThreads = 4;
    const int iterationsPerThread = 3;

    std::vector<std::thread> threads;

    // Create multiple threads
    for (int i = 0; i < numThreads; i++)
    {
        // Create thread that runs SimpleWorker function
        threads.emplace_back(SimpleWorker, i + 1, iterationsPerThread);
        std::cout << "Created thread " << (i + 1) << std::endl;
    }

    // Wait for all threads to complete
    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "All threads have completed" << std::endl;
}

// 2. Thread with lambda function
void LambdaThreadExample()
{
    std::cout << "\n=== Thread with Lambda Function ===" << std::endl;

    int sharedValue = 42;
    std::string message = "Hello from lambda thread!";

    // Create thread with lambda
    std::thread t([&sharedValue, message](int threadId)
    {
        std::cout << "Lambda thread " << threadId << " started" << std::endl;
        std::cout << "Message: " << message << std::endl;
        std::cout << "Shared value: " << sharedValue << std::endl;

        // Modify shared value
        sharedValue = 100;

        std::this_thread::sleep_for(std::chrono::milliseconds(500));

        std::cout << "Lambda thread " << threadId << " completed" << std::endl;
    }, 1);

    t.join();

    std::cout << "Shared value after thread completion: " << sharedValue << std::endl;
}

// 3. Thread with function object
class ThreadWorker
{
private:
    std::string name;
    int id;

public:
    ThreadWorker(const std::string& workerName, int workerId)
        : name(workerName), id(workerId) {}

    void operator()()
    {
        std::cout << "Worker thread '" << name << "' (ID: " << id
                  << ") started (Thread ID: " << std::this_thread::get_id() << ")" << std::endl;

        // Simulate work with different durations
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> dis(200, 800);

        int workTime = dis(gen);
        std::cout << "Worker '" << name << "' will work for " << workTime << "ms" << std::endl;

        std::this_thread::sleep_for(std::chrono::milliseconds(workTime));

        std::cout << "Worker thread '" << name << "' completed" << std::endl;
    }

    void DoWork(int iterations)
    {
        for (int i = 0; i < iterations; i++)
        {
            std::cout << "Worker '" << name << "' - Work iteration "
                      << i + 1 << "/" << iterations << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(150));
        }
    }
};

void FunctionObjectThreadExample()
{
    std::cout << "\n=== Thread with Function Object ===" << std::endl;

    ThreadWorker worker1("Worker-Alice", 101);
    ThreadWorker worker2("Worker-Bob", 102);

    // Create threads with function objects
    std::thread t1(std::ref(worker1)); // Use std::ref to avoid copying
    std::thread t2(std::ref(worker2));

    t1.join();
    t2.join();

    // Thread with member function
    ThreadWorker worker3("Worker-Charlie", 103);
    std::thread t3(&ThreadWorker::DoWork, &worker3, 3);

    t3.join();
}

// 4. Thread with move semantics
void MoveSemanticThreadExample()
{
    std::cout << "\n=== Thread with Move Semantics ===" << std::endl;

    // Create a unique pointer that can be moved
    auto data = std::make_unique<std::vector<int>>(1000);
    for (int i = 0; i < 1000; i++)
    {
        (*data)[i] = i * i;
    }

    std::cout << "Created vector with " << data->size() << " elements" << std::endl;

    // Move the unique pointer into the thread
    std::thread t([](std::unique_ptr<std::vector<int>> vec)
    {
        std::cout << "Thread received vector with " << vec->size() << " elements" << std::endl;
        std::cout << "First element: " << (*vec)[0] << std::endl;
        std::cout << "Last element: " << (*vec)[vec->size() - 1] << std::endl;

        // Process the data
        long long sum = 0;
        for (int value : *vec)
        {
            sum += value;
        }

        std::cout << "Sum of all elements: " << sum << std::endl;
        std::cout << "Thread completed processing" << std::endl;

        // vector is automatically destroyed when thread exits
    }, std::move(data));

    t.join();

    std::cout << "Main thread: vector has been moved and is now null" << std::endl;
}

// 5. Thread with return value (using std::future)
void FutureThreadExample()
{
    std::cout << "\n=== Thread with Return Value (std::future) ===" << std::endl;

    // Function that will run in a separate thread
    auto calculateSum = [](int start, int end) -> long long
    {
        std::cout << "Sum thread calculating sum from " << start << " to " << end << std::endl;

        long long sum = 0;
        for (int i = start; i <= end; i++)
        {
            sum += i;
            std::this_thread::sleep_for(std::chrono::milliseconds(1)); // Simulate work
        }

        std::cout << "Sum thread completed calculation: " << sum << std::endl;
        return sum;
    };

    // Launch asynchronous task
    std::future<long long> futureSum = std::async(std::launch::async, calculateSum, 1, 1000);

    // Main thread can do other work while calculation is in progress
    std::cout << "Main thread: Doing other work while sum is being calculated..." << std::endl;
    for (int i = 0; i < 5; i++)
    {
        std::cout << "Main thread work iteration " << i + 1 << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }

    // Get the result (this will block if the calculation is not complete)
    std::cout << "Main thread: Waiting for calculation result..." << std::endl;
    long long result = futureSum.get(); // Get the result

    std::cout << "Final result: " << result << std::endl;
    std::cout << "Expected result: " << (1000 * 1001) / 2 << std::endl;
}

// 6. Thread management with RAII
class ManagedThread
{
private:
    std::thread thread;
    bool running;
    std::atomic<bool> stopFlag;

public:
    ManagedThread() : running(false), stopFlag(false) {}

    ~ManagedThread()
    {
        if (running)
        {
            Stop();
        }
    }

    template<typename Function, typename... Args>
    void Start(Function&& func, Args&&... args)
    {
        if (running)
        {
            throw std::runtime_error("Thread is already running");
        }

        stopFlag = false;
        thread = std::thread([this, func = std::forward<Function>(func), args...]()
        {
            running = true;
            try
            {
                func(args..., stopFlag);
            }
            catch (const std::exception& e)
            {
                std::cerr << "Thread exception: " << e.what() << std::endl;
            }
            running = false;
        });
    }

    void Stop()
    {
        if (running)
        {
            stopFlag = true;
            if (thread.joinable())
            {
                thread.join();
            }
            running = false;
        }
    }

    bool IsRunning() const
    {
        return running;
    }

    void Join()
    {
        if (thread.joinable())
        {
            thread.join();
        }
    }
};

void ManagedThreadExample()
{
    std::cout << "\n=== Managed Thread with RAII ===" << std::endl;

    ManagedThread managedThread;

    // Worker function that respects stop flag
    auto worker = [](const std::string& name, std::atomic<bool>& stopFlag)
    {
        int counter = 0;
        while (!stopFlag)
        {
            std::cout << name << " working... iteration " << ++counter << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
        std::cout << name << " received stop signal, exiting..." << std::endl;
    };

    // Start the managed thread
    managedThread.Start(worker, "Worker-Thread");

    // Let it run for a few seconds
    std::this_thread::sleep_for(std::chrono::seconds(3));

    // Stop the thread
    std::cout << "Main thread: Stopping worker thread..." << std::endl;
    managedThread.Stop();

    std::cout << "Worker thread stopped" << std::endl;
}

// 7. Thread affinity and scheduling (Windows specific)
void WindowsThreadAffinityExample()
{
    std::cout << "\n=== Windows Thread Affinity Example ===" << std::endl;

    // Get system information
    SYSTEM_INFO sysInfo;
    GetSystemInfo(&sysInfo);
    int numProcessors = sysInfo.dwNumberOfProcessors;

    std::cout << "Number of processors: " << numProcessors << std::endl;

    // Create thread with specific affinity
    std::thread affinityThread([numProcessors]()
    {
        DWORD_PTR mask = 1; // Use first CPU core

        // Set thread affinity
        HANDLE hThread = GetCurrentThread();
        DWORD_PTR result = SetThreadAffinityMask(hThread, mask);

        if (result == 0)
        {
            std::cerr << "Failed to set thread affinity" << std::endl;
        }
        else
        {
            std::cout << "Thread affinity set to CPU core 0" << std::endl;
        }

        // Do some work
        for (int i = 0; i < 5; i++)
        {
            std::cout << "Affinity thread working on CPU " << GetCurrentProcessorNumber() << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }
    });

    affinityThread.join();

    // Thread with different priority
    std::thread priorityThread([]()
    {
        HANDLE hThread = GetCurrentThread();

        // Set thread priority to above normal
        if (SetThreadPriority(hThread, THREAD_PRIORITY_ABOVE_NORMAL))
        {
            std::cout << "Thread priority set to ABOVE_NORMAL" << std::endl;
        }
        else
        {
            std::cerr << "Failed to set thread priority" << std::endl;
        }

        // Get current thread priority
        int priority = GetThreadPriority(hThread);
        std::cout << "Current thread priority: " << priority << std::endl;

        for (int i = 0; i < 3; i++)
        {
            std::cout << "High priority thread working..." << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(300));
        }
    });

    priorityThread.join();
}

// 8. Thread naming helper (Windows specific)
void SetThreadName(const std::string& name)
{
    const DWORD MS_VC_EXCEPTION = 0x406D1388;

#pragma pack(push, 8)
    typedef struct tagTHREADNAME_INFO
    {
        DWORD dwType;        // Must be 0x1000
        LPCSTR szName;       // Pointer to name (in user addr space)
        DWORD dwThreadID;    // Thread ID (-1 = caller thread)
        DWORD dwFlags;       // Reserved for future use, must be zero
    } THREADNAME_INFO;
#pragma pack(pop)

    THREADNAME_INFO info;
    info.dwType = 0x1000;
    info.szName = name.c_str();
    info.dwThreadID = GetCurrentThreadId();
    info.dwFlags = 0;

    __try
    {
        RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
    }
    __except (EXCEPTION_EXECUTE_HANDLER)
    {
        // Exception is expected and ignored
    }
}

void NamedThreadExample()
{
    std::cout << "\n=== Named Thread Example ===" << std::endl;

    // Thread with custom name for debugging
    std::thread namedThread([]()
    {
        SetThreadName("WorkerThread-01");

        std::cout << "Named thread started (check debugger for thread name)" << std::endl;

        for (int i = 0; i < 3; i++)
        {
            std::cout << "Named thread iteration " << i + 1 << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(400));
        }

        std::cout << "Named thread completed" << std::endl;
    });

    namedThread.join();
}

// 9. Thread with timeout using std::future
void TimeoutThreadExample()
{
    std::cout << "\n=== Thread with Timeout Example ===" << std::endl;

    auto slowTask = []() -> std::string
    {
        std::cout << "Slow task started..." << std::endl;

        // Simulate a task that takes 5 seconds
        std::this_thread::sleep_for(std::chrono::seconds(5));

        std::cout << "Slow task completed!" << std::endl;
        return "Task result";
    };

    // Start the task with timeout
    auto future = std::async(std::launch::async, slowTask);

    std::cout << "Main thread: Waiting for task completion with timeout..." << std::endl;

    // Wait for 3 seconds
    auto status = future.wait_for(std::chrono::seconds(3));

    if (status == std::future_status::ready)
    {
        std::cout << "Task completed within timeout: " << future.get() << std::endl;
    }
    else if (status == std::future_status::timeout)
    {
        std::cout << "Task timed out!" << std::endl;
        // Note: The task is still running in the background
        // In a real application, you might want to implement cancellation
    }
    else
    {
        std::cout << "Task deferred" << std::endl;
    }
}

// 10. Thread local storage example
thread_local int threadLocalCounter = 0;

void ThreadLocalStorageExample()
{
    std::cout << "\n=== Thread Local Storage Example ===" << std::endl;

    auto worker = [](int threadId)
    {
        threadLocalCounter = threadId * 100;

        for (int i = 0; i < 3; i++)
        {
            threadLocalCounter++;
            std::cout << "Thread " << threadId << " - local counter: "
                      << threadLocalCounter << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }
    };

    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    std::thread t3(worker, 3);

    t1.join();
    t2.join();
    t3.join();

    std::cout << "Main thread local counter: " << threadLocalCounter << std::endl;
}

int main()
{
    std::cout << "=== C++ Windows Multithreading Examples ===" << std::endl;
    std::cout << "Demonstrating various thread creation and management techniques\n" << std::endl;

    try
    {
        // Basic thread operations
        BasicThreadCreation();

        LambdaThreadExample();

        FunctionObjectThreadExample();

        MoveSemanticThreadExample();

        FutureThreadExample();

        ManagedThreadExample();

        // Windows-specific features
        WindowsThreadAffinityExample();

        NamedThreadExample();

        TimeoutThreadExample();

        ThreadLocalStorageExample();

        std::cout << "\nAll multithreading examples completed successfully!" << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cerr << "Unexpected error: " << e.what() << std::endl;
        return 1;
    }

    return 0;
}

💻 Реализация Пула Потоков cpp

undefined advanced ⭐⭐⭐⭐

Пользовательская реализация пула потоков с очередью задач, планированием и обработкой результатов

⏱️ 40 min 🏷️ cpp, threading, thread pool, parallel, windows
Prerequisites: Advanced C++ concurrency, Design patterns, Windows API, Performance optimization
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <atomic>
#include <memory>
#include <chrono>
#include <random>
#include <windows.h>

// Thread pool implementation
class ThreadPool
{
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queueMutex;
    std::condition_variable condition;
    std::atomic<bool> stop;
    std::atomic<int> activeThreads;
    std::condition_variable finished;

public:
    ThreadPool(size_t threads) : stop(false), activeThreads(0)
    {
        std::cout << "Creating thread pool with " << threads << " threads" << std::endl;

        for (size_t i = 0; i < threads; ++i)
        {
            workers.emplace_back([this, i]
            {
                std::cout << "Worker thread " << i << " started" << std::endl;

                while (true)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queueMutex);

                        // Wait for a task or stop signal
                        this->condition.wait(lock, [this]
                        {
                            return this->stop || !this->tasks.empty();
                        });

                        if (this->stop && this->tasks.empty())
                        {
                            break;
                        }

                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    activeThreads++;
                    task();
                    activeThreads--;

                    finished.notify_one();
                }

                std::cout << "Worker thread " << i << " stopped" << std::endl;
            });
        }
    }

    template <class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>
    {
        using return_type = typename std::result_of<F(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> result = task->get_future();

        {
            std::unique_lock<std::mutex> lock(queueMutex);

            if (stop)
            {
                throw std::runtime_error("enqueue on stopped ThreadPool");
            }

            tasks.emplace([task](){ (*task)(); });
        }

        condition.notify_one();
        return result;
    }

    ~ThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }

        condition.notify_all();

        for (std::thread& worker : workers)
        {
            worker.join();
        }
    }

    void waitForAll()
    {
        std::unique_lock<std::mutex> lock(queueMutex);
        finished.wait(lock, [this] {
            return tasks.empty() && activeThreads == 0;
        });
    }

    size_t getQueueSize() const
    {
        std::lock_guard<std::mutex> lock(queueMutex);
        return tasks.size();
    }

    size_t getActiveThreadCount() const
    {
        return activeThreads.load();
    }
};

// 1. Basic thread pool usage
void BasicThreadPoolExample()
{
    std::cout << "=== Basic Thread Pool Example ===" << std::endl;

    ThreadPool pool(4);
    std::vector<std::future<int>> results;

    // Enqueue tasks
    for (int i = 0; i < 8; i++)
    {
        results.emplace_back(
            pool.enqueue([i]
            {
                std::cout << "Task " << i << " started (Thread ID: "
                         << std::this_thread::get_id() << ")" << std::endl;

                // Simulate work
                std::this_thread::sleep_for(std::chrono::milliseconds(200 + i * 50));

                std::cout << "Task " << i << " completed" << std::endl;
                return i * i;
            })
        );
    }

    // Get results
    std::cout << "\nGetting results:" << std::endl;
    for (size_t i = 0; i < results.size(); i++)
    {
        std::cout << "Result " << i << ": " << results[i].get() << std::endl;
    }
}

// 2. Producer-consumer with thread pool
void ProducerConsumerExample()
{
    std::cout << "\n=== Producer-Consumer with Thread Pool ===" << std::endl;

    ThreadPool pool(3);
    std::queue<int> dataQueue;
    std::mutex queueMutex;
    std::condition_variable queueCV;
    bool producerFinished = false;

    // Producer task
    auto producerTask = [&]()
    {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> dis(1, 100);

        for (int i = 1; i <= 10; i++)
        {
            int value = dis(gen);

            {
                std::lock_guard<std::mutex> lock(queueMutex);
                dataQueue.push(value);
                std::cout << "Producer: produced " << value << std::endl;
            }

            queueCV.notify_one();
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }

        {
            std::lock_guard<std::mutex> lock(queueMutex);
            producerFinished = true;
        }
        queueCV.notify_all();

        std::cout << "Producer finished" << std::endl;
    };

    // Consumer task
    auto consumerTask = [&](int consumerId)
    {
        while (true)
        {
            int value;

            {
                std::unique_lock<std::mutex> lock(queueMutex);
                queueCV.wait(lock, [&] { return !dataQueue.empty() || producerFinished; });

                if (dataQueue.empty() && producerFinished)
                {
                    break;
                }

                value = dataQueue.front();
                dataQueue.pop();
            }

            std::cout << "Consumer " << consumerId << ": consumed " << value << std::endl;

            // Simulate processing
            std::this_thread::sleep_for(std::chrono::milliseconds(150));
        }

        std::cout << "Consumer " << consumerId << " finished" << std::endl;
    };

    // Submit tasks to thread pool
    auto producerFuture = pool.enqueue(producerTask);
    std::vector<std::future<void>> consumerFutures;

    for (int i = 1; i <= 2; i++)
    {
        consumerFutures.emplace_back(pool.enqueue(consumerTask, i));
    }

    // Wait for all tasks
    producerFuture.get();
    for (auto& future : consumerFutures)
    {
        future.get();
    }
}

// 3. Parallel processing with thread pool
void ParallelProcessingExample()
{
    std::cout << "\n=== Parallel Processing Example ===" << std::endl;

    ThreadPool pool(4);

    // Create a large array
    std::vector<int> data(1000000);
    for (int i = 0; i < data.size(); i++)
    {
        data[i] = i;
    }

    auto processChunk = [&](int start, int end, int chunkId) -> long long
    {
        std::cout << "Processing chunk " << chunkId << " (elements " << start
                  << " to " << end << ")" << std::endl;

        long long sum = 0;
        for (int i = start; i < end; i++)
        {
            sum += data[i] * data[i]; // Square and sum
        }

        std::cout << "Chunk " << chunkId << " completed" << std::endl;
        return sum;
    };

    const int numChunks = 8;
    const int chunkSize = data.size() / numChunks;
    std::vector<std::future<long long>> chunkFutures;

    auto startTime = std::chrono::high_resolution_clock::now();

    // Submit chunks to thread pool
    for (int i = 0; i < numChunks; i++)
    {
        int start = i * chunkSize;
        int end = (i == numChunks - 1) ? data.size() : start + chunkSize;

        chunkFutures.emplace_back(
            pool.enqueue(processChunk, start, end, i + 1)
        );
    }

    // Collect results
    long long totalSum = 0;
    for (auto& future : chunkFutures)
    {
        totalSum += future.get();
    }

    auto endTime = std::chrono::high_resolution_clock::now();
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime);

    std::cout << "\nParallel processing results:" << std::endl;
    std::cout << "Total sum of squares: " << totalSum << std::endl;
    std::cout << "Processing time: " << duration.count() << " ms" << std::endl;

    // Sequential comparison
    startTime = std::chrono::high_resolution_clock::now();
    long long sequentialSum = 0;
    for (int value : data)
    {
        sequentialSum += value * value;
    }
    endTime = std::chrono::high_resolution_clock::now();
    duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime);

    std::cout << "\nSequential processing:" << std::endl;
    std::cout << "Total sum of squares: " << sequentialSum << std::endl;
    std::cout << "Processing time: " << duration.count() << " ms" << std::endl;
    std::cout << "Speedup: " << (double)duration.count() / ((double)duration.count() * 0.25) << "x" << std::endl;
}

// 4. Thread pool with priorities
class PriorityThreadPool
{
private:
    struct Task
    {
        int priority;
        std::function<void()> function;
        bool operator<(const Task& other) const
        {
            return priority < other.priority; // Higher priority first
        }
    };

    std::vector<std::thread> workers;
    std::priority_queue<Task> tasks;
    std::mutex queueMutex;
    std::condition_variable condition;
    std::atomic<bool> stop;

public:
    PriorityThreadPool(size_t threads) : stop(false)
    {
        for (size_t i = 0; i < threads; ++i)
        {
            workers.emplace_back([this]
            {
                while (true)
                {
                    Task task;

                    {
                        std::unique_lock<std::mutex> lock(this->queueMutex);

                        this->condition.wait(lock, [this]
                        {
                            return this->stop || !this->tasks.empty();
                        });

                        if (this->stop && this->tasks.empty())
                        {
                            break;
                        }

                        task = std::move(const_cast<Task&>(this->tasks.top()));
                        this->tasks.pop();
                    }

                    task.function();
                }
            });
        }
    }

    template <class F, class... Args>
    void enqueue(int priority, F&& f, Args&&... args)
    {
        auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

        {
            std::unique_lock<std::mutex> lock(queueMutex);

            if (stop)
            {
                throw std::runtime_error("enqueue on stopped PriorityThreadPool");
            }

            tasks.push({priority, task});
        }

        condition.notify_one();
    }

    ~PriorityThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(queueMutex);
            stop = true;
        }

        condition.notify_all();

        for (std::thread& worker : workers)
        {
            worker.join();
        }
    }
};

void PriorityThreadPoolExample()
{
    std::cout << "\n=== Priority Thread Pool Example ===" << std::endl;

    PriorityThreadPool pool(2);

    // Submit tasks with different priorities
    for (int i = 1; i <= 10; i++)
    {
        int priority = i % 3 + 1; // Priorities 1, 2, 3

        pool.enqueue(priority, [i, priority]()
        {
            std::cout << "Task " << i << " (priority " << priority << ") executing" << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
            std::cout << "Task " << i << " (priority " << priority << ") completed" << std::endl;
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Priority thread pool demonstration completed" << std::endl;
}

// 5. Thread pool with work stealing
class WorkStealingThreadPool
{
private:
    struct Worker
    {
        std::thread thread;
        std::queue<std::function<void()>> localQueue;
        std::mutex localMutex;
        std::atomic<bool> running;
    };

    std::vector<std::unique_ptr<Worker>> workers;
    std::queue<std::function<void()>> globalQueue;
    std::mutex globalMutex;
    std::atomic<int> nextWorker;
    std::atomic<bool> stop;

    int getNextWorker()
    {
        return nextWorker.fetch_add(1) % workers.size();
    }

    bool tryStealWork(int fromWorker, std::function<void()>& task)
    {
        std::lock_guard<std::mutex> lock(workers[fromWorker]->localMutex);
        if (!workers[fromWorker]->localQueue.empty())
        {
            task = std::move(workers[fromWorker]->localQueue.front());
            workers[fromWorker]->localQueue.pop();
            return true;
        }
        return false;
    }

public:
    WorkStealingThreadPool(size_t threads) : stop(false), nextWorker(0)
    {
        for (size_t i = 0; i < threads; ++i)
        {
            auto worker = std::make_unique<Worker>();
            worker->running = true;

            worker->thread = std::thread([this, i]()
            {
                while (true)
                {
                    std::function<void()> task;

                    // Try to get work from local queue
                    {
                        std::lock_guard<std::mutex> lock(workers[i]->localMutex);
                        if (!workers[i]->localQueue.empty())
                        {
                            task = std::move(workers[i]->localQueue.front());
                            workers[i]->localQueue.pop();
                        }
                    }

                    // If no local work, try global queue
                    if (!task)
                    {
                        {
                            std::lock_guard<std::mutex> lock(globalMutex);
                            if (!globalQueue.empty())
                            {
                                task = std::move(globalQueue.front());
                                globalQueue.pop();
                            }
                        }
                    }

                    // If still no work, try stealing from other workers
                    if (!task)
                    {
                        for (size_t j = 1; j < workers.size(); j++)
                        {
                            int targetWorker = (i + j) % workers.size();
                            if (tryStealWork(targetWorker, task))
                            {
                                std::cout << "Worker " << i << " stole work from worker " << targetWorker << std::endl;
                                break;
                            }
                        }
                    }

                    // Execute task if found, otherwise check if we should stop
                    if (task)
                    {
                        task();
                    }
                    else if (stop)
                    {
                        break;
                    }
                    else
                    {
                        // No work available, short sleep
                        std::this_thread::sleep_for(std::chrono::milliseconds(1));
                    }
                }

                workers[i]->running = false;
            });

            workers.push_back(std::move(worker));
        }
    }

    template <class F, class... Args>
    void enqueue(F&& f, Args&&... args)
    {
        if (stop)
        {
            throw std::runtime_error("enqueue on stopped WorkStealingThreadPool");
        }

        auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

        // Add to random worker's local queue
        int workerIndex = getNextWorker();
        {
            std::lock_guard<std::mutex> lock(workers[workerIndex]->localMutex);
            workers[workerIndex]->localQueue.push(task);
        }
    }

    ~WorkStealingThreadPool()
    {
        stop = true;

        for (auto& worker : workers)
        {
            if (worker->thread.joinable())
            {
                worker->thread.join();
            }
        }
    }
};

void WorkStealingExample()
{
    std::cout << "\n=== Work Stealing Thread Pool Example ===" << std::endl;

    WorkStealingThreadPool pool(4);

    // Submit many small tasks to demonstrate work stealing
    for (int i = 0; i < 20; i++)
    {
        pool.enqueue([i]()
        {
            std::cout << "Task " << i << " started" << std::endl;

            // Variable work time to create opportunity for work stealing
            std::this_thread::sleep_for(std::chrono::milliseconds(50 + (i % 3) * 50));

            std::cout << "Task " << i << " completed" << std::endl;
        });
    }

    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Work stealing demonstration completed" << std::endl;
}

// 6. Windows-specific thread pool using Windows Thread Pool API
void WindowsThreadPoolAPIExample()
{
    std::cout << "\n=== Windows Thread Pool API Example ===" << std::endl;

    // Simple callback function
    auto callback = [](PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work)
    {
        int taskId = *static_cast<int*>(context);
        std::cout << "Windows thread pool task " << taskId << " executing" << std::endl;

        // Simulate work
        std::this_thread::sleep_for(std::chrono::milliseconds(200));

        std::cout << "Windows thread pool task " << taskId << " completed" << std::endl;
        delete static_cast<int*>(context);
    };

    // Create thread pool environment
    PTP_POOL pool = CreateThreadpool(nullptr);
    if (pool == nullptr)
    {
        std::cerr << "Failed to create thread pool" << std::endl;
        return;
    }

    // Set thread pool parameters
    SetThreadpoolThreadMaximum(pool, 4);
    SetThreadpoolThreadMinimum(pool, 1);

    // Create cleanup group
    PTP_CLEANUP_GROUP cleanupGroup = CreateThreadpoolCleanupGroup();
    if (cleanupGroup == nullptr)
    {
        std::cerr << "Failed to create cleanup group" << std::endl;
        CloseThreadpool(pool);
        return;
    }

    // Create thread pool environment
    TP_CALLBACK_ENVIRON callbackEnviron;
    InitializeThreadpoolEnvironment(&callbackEnviron);
    SetThreadpoolCallbackPool(&callbackEnviron, pool);
    SetThreadpoolCallbackCleanupGroup(&callbackEnviron, cleanupGroup, nullptr);

    // Submit work items
    std::vector<PTP_WORK> works;
    for (int i = 1; i <= 6; i++)
    {
        int* taskId = new int(i);
        PTP_WORK work = CreateThreadpoolWork(callback, taskId, &callbackEnviron);
        if (work != nullptr)
        {
            works.push_back(work);
            SubmitThreadpoolWork(work);
        }
        else
        {
            delete taskId;
        }
    }

    std::cout << "Submitted " << works.size() << " tasks to Windows thread pool" << std::endl;

    // Wait for all work to complete
    WaitForThreadpoolWorkCallbacks(works[0], FALSE);

    // Cleanup
    for (auto work : works)
    {
        CloseThreadpoolWork(work);
    }

    CloseThreadpoolCleanupGroupMembers(cleanupGroup);
    CloseThreadpoolCleanupGroup(cleanupGroup);
    CloseThreadpool(pool);

    std::cout << "Windows thread pool API example completed" << std::endl;
}

// 7. Performance comparison
void PerformanceComparison()
{
    std::cout << "\n=== Thread Pool Performance Comparison ===" << std::endl;

    const int numTasks = 100;
    const int workTimeMs = 10; // Each task takes 10ms

    // Test 1: Individual threads
    {
        std::cout << "\nTest 1: Individual threads" << std::endl;
        auto start = std::chrono::high_resolution_clock::now();

        std::vector<std::thread> threads;
        for (int i = 0; i < numTasks; i++)
        {
            threads.emplace_back([i, workTimeMs]()
            {
                std::this_thread::sleep_for(std::chrono::milliseconds(workTimeMs));
            });
        }

        for (auto& thread : threads)
        {
            thread.join();
        }

        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        std::cout << "Time: " << duration.count() << " ms" << std::endl;
    }

    // Test 2: Thread pool
    {
        std::cout << "\nTest 2: Thread pool (4 threads)" << std::endl;
        ThreadPool pool(4);
        auto start = std::chrono::high_resolution_clock::now();

        std::vector<std::future<void>> futures;
        for (int i = 0; i < numTasks; i++)
        {
            futures.emplace_back(
                pool.enqueue([i, workTimeMs]()
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(workTimeMs));
                })
            );
        }

        for (auto& future : futures)
        {
            future.get();
        }

        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        std::cout << "Time: " << duration.count() << " ms" << std::endl;
    }

    // Test 3: Thread pool (optimal threads)
    {
        std::cout << "\nTest 3: Thread pool (optimal threads)" << std::endl;
        unsigned int optimalThreads = std::thread::hardware_concurrency();
        if (optimalThreads == 0) optimalThreads = 4;

        std::cout << "Using " << optimalThreads << " threads (hardware concurrency)" << std::endl;

        ThreadPool pool(optimalThreads);
        auto start = std::chrono::high_resolution_clock::now();

        std::vector<std::future<void>> futures;
        for (int i = 0; i < numTasks; i++)
        {
            futures.emplace_back(
                pool.enqueue([i, workTimeMs]()
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(workTimeMs));
                })
            );
        }

        for (auto& future : futures)
        {
            future.get();
        }

        auto end = std::chrono::high_resolution_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
        std::cout << "Time: " << duration.count() << " ms" << std::endl;
    }
}

int main()
{
    std::cout << "=== C++ Thread Pool Implementation Examples ===" << std::endl;
    std::cout << "Demonstrating various thread pool patterns and implementations\n" << std::endl;

    try
    {
        // Basic thread pool
        BasicThreadPoolExample();

        // Producer-consumer pattern
        ProducerConsumerExample();

        // Parallel processing
        ParallelProcessingExample();

        // Advanced patterns
        PriorityThreadPoolExample();
        WorkStealingExample();

        // Platform-specific
        WindowsThreadPoolAPIExample();

        // Performance comparison
        PerformanceComparison();

        std::cout << "\nAll thread pool examples completed successfully!" << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cerr << "Unexpected error: " << e.what() << std::endl;
        return 1;
    }

    return 0;
}

💻 Механизмы Синхронизации Потоков cpp

🟡 intermediate ⭐⭐⭐

Синхронизация потоков с использованием мьютексов, переменных условия, атомарных операций и примитивов синхронизации Windows

⏱️ 35 min 🏷️ cpp, threading, synchronization, windows
Prerequisites: C++11/14 concurrency, Windows API, Synchronization concepts
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <vector>
#include <queue>
#include <random>
#include <semaphore>
#include <windows.h>
#include <shared_mutex>

// 1. Basic mutex example
void BasicMutexExample()
{
    std::cout << "=== Basic Mutex Example ===" << std::endl;

    std::mutex mtx;
    int sharedCounter = 0;
    const int numIterations = 1000;

    auto incrementCounter = [&mtx, &sharedCounter, numIterations](int threadId)
    {
        for (int i = 0; i < numIterations; i++)
        {
            // Lock the mutex before accessing shared resource
            std::lock_guard<std::mutex> lock(mtx);

            sharedCounter++;

            // Critical section - shared resource is protected
            if (i % 100 == 0)
            {
                std::cout << "Thread " << threadId << ": counter = " << sharedCounter << std::endl;
            }
        }
        std::cout << "Thread " << threadId << " completed" << std::endl;
    };

    // Create multiple threads
    const int numThreads = 5;
    std::vector<std::thread> threads;

    for (int i = 0; i < numThreads; i++)
    {
        threads.emplace_back(incrementCounter, i + 1);
    }

    // Wait for all threads to complete
    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Final counter value: " << sharedCounter << std::endl;
    std::cout << "Expected value: " << numThreads * numIterations << std::endl;
}

// 2. Deadlock demonstration and prevention
void DeadlockExample()
{
    std::cout << "\n=== Deadlock Prevention Example ===" << std::endl;

    std::mutex mtx1, mtx2;
    int shared1 = 0, shared2 = 0;

    // Function that may cause deadlock if not careful
    auto worker1 = [&mtx1, &mtx2, &shared1, &shared2]()
    {
        for (int i = 0; i < 5; i++)
        {
            // BAD: Different lock order can cause deadlock
            /*
            std::lock_guard<std::mutex> lock1(mtx1);
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            std::lock_guard<std::mutex> lock2(mtx2);
            */

            // GOOD: Use std::lock to avoid deadlock
            std::lock(mtx1, mtx2);
            std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
            std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);

            shared1++;
            shared2++;
            std::cout << "Worker 1: shared1=" << shared1 << ", shared2=" << shared2 << std::endl;

            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    };

    auto worker2 = [&mtx1, &mtx2, &shared1, &shared2]()
    {
        for (int i = 0; i < 5; i++)
        {
            // BAD: Different lock order can cause deadlock
            /*
            std::lock_guard<std::mutex> lock2(mtx2);
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
            std::lock_guard<std::mutex> lock1(mtx1);
            */

            // GOOD: Use std::lock to avoid deadlock
            std::lock(mtx1, mtx2);
            std::lock_guard<std::mutex> lock1(mtx1, std::adopt_lock);
            std::lock_guard<std::mutex> lock2(mtx2, std::adopt_lock);

            shared1 += 2;
            shared2 += 2;
            std::cout << "Worker 2: shared1=" << shared1 << ", shared2=" << shared2 << std::endl;

            std::this_thread::sleep_for(std::chrono::milliseconds(50));
        }
    };

    std::thread t1(worker1);
    std::thread t2(worker2);

    t1.join();
    t2.join();

    std::cout << "Final values: shared1=" << shared1 << ", shared2=" << shared2 << std::endl;
    std::cout << "No deadlock occurred!" << std::endl;
}

// 3. Condition variable example
void ConditionVariableExample()
{
    std::cout << "\n=== Condition Variable Example ===" << std::endl;

    std::mutex mtx;
    std::condition_variable cv;
    std::queue<int> dataQueue;
    bool finished = false;

    // Producer thread
    auto producer = [&mtx, &cv, &dataQueue, &finished]()
    {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> dis(1, 1000);

        for (int i = 1; i <= 10; i++)
        {
            // Produce data
            int value = dis(gen);

            {
                std::lock_guard<std::mutex> lock(mtx);
                dataQueue.push(value);
                std::cout << "Producer: produced " << value << " (queue size: " << dataQueue.size() << ")" << std::endl;
            }

            // Notify consumers
            cv.notify_one();

            // Simulate production time
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }

        // Signal completion
        {
            std::lock_guard<std::mutex> lock(mtx);
            finished = true;
        }
        cv.notify_all();

        std::cout << "Producer: finished production" << std::endl;
    };

    // Consumer thread
    auto consumer = [&mtx, &cv, &dataQueue, &finished](int consumerId)
    {
        while (true)
        {
            std::unique_lock<std::mutex> lock(mtx);

            // Wait for data or completion signal
            cv.wait(lock, [&]() { return !dataQueue.empty() || finished; });

            if (dataQueue.empty() && finished)
            {
                break;
            }

            if (!dataQueue.empty())
            {
                int value = dataQueue.front();
                dataQueue.pop();

                std::cout << "Consumer " << consumerId << ": consumed "
                         << value << " (queue size: " << dataQueue.size() << ")" << std::endl;

                // Unlock before processing
                lock.unlock();

                // Simulate processing time
                std::this_thread::sleep_for(std::chrono::milliseconds(150));
            }
        }

        std::cout << "Consumer " << consumerId << ": finished" << std::endl;
    };

    // Start producer and consumers
    std::thread producerThread(producer);
    std::thread consumer1Thread(consumer, 1);
    std::thread consumer2Thread(consumer, 2);

    producerThread.join();
    consumer1Thread.join();
    consumer2Thread.join();

    std::cout << "Producer-Consumer pattern completed successfully" << std::endl;
}

// 4. Read-write lock (shared_mutex) example
void ReadWriteLockExample()
{
    std::cout << "\n=== Read-Write Lock Example ===" << std::endl;

    std::shared_mutex rwMutex;
    int sharedData = 0;
    std::atomic<int> readOperations(0);
    std::atomic<int> writeOperations(0);

    auto reader = [&rwMutex, &sharedData, &readOperations](int readerId)
    {
        for (int i = 0; i < 5; i++)
        {
            // Acquire shared lock for reading
            std::shared_lock<std::shared_mutex> lock(rwMutex);

            readOperations++;
            std::cout << "Reader " << readerId << " reading value: " << sharedData
                     << " (read #" << readOperations << ")" << std::endl;

            // Simulate reading time
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }

        std::cout << "Reader " << readerId << " completed" << std::endl;
    };

    auto writer = [&rwMutex, &sharedData, &writeOperations](int writerId)
    {
        for (int i = 0; i < 3; i++)
        {
            // Acquire exclusive lock for writing
            std::unique_lock<std::shared_mutex> lock(rwMutex);

            writeOperations++;
            sharedData += 10;

            std::cout << "Writer " << writerId << " wrote value: " << sharedData
                     << " (write #" << writeOperations << ")" << std::endl;

            // Simulate writing time
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }

        std::cout << "Writer " << writerId << " completed" << std::endl;
    };

    // Start multiple readers and writers
    std::vector<std::thread> threads;

    // Create readers
    for (int i = 1; i <= 4; i++)
    {
        threads.emplace_back(reader, i);
    }

    // Create writers
    for (int i = 1; i <= 2; i++)
    {
        threads.emplace_back(writer, i);
    }

    // Wait for all threads
    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Final shared data value: " << sharedData << std::endl;
    std::cout << "Total read operations: " << readOperations << std::endl;
    std::cout << "Total write operations: " << writeOperations << std::endl;
}

// 5. Atomic operations example
void AtomicOperationsExample()
{
    std::cout << "\n=== Atomic Operations Example ===" << std::endl;

    std::atomic<int> atomicCounter(0);
    std::atomic<bool> flag(false);
    std::atomic<int*> atomicPtr(nullptr);

    auto worker = [&atomicCounter, &flag](int threadId)
    {
        for (int i = 0; i < 10000; i++)
        {
            // Atomic increment (thread-safe)
            atomicCounter.fetch_add(1, std::memory_order_relaxed);

            // Atomic compare and swap
            int expected = false;
            if (flag.compare_exchange_weak(expected, true, std::memory_order_acq_rel))
            {
                // Successfully acquired flag
                std::cout << "Thread " << threadId << " acquired flag" << std::endl;
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
                flag.store(false, std::memory_order_release);
                std::cout << "Thread " << threadId << " released flag" << std::endl;
            }
        }

        std::cout << "Thread " << threadId << " completed" << std::endl;
    };

    const int numThreads = 4;
    std::vector<std::thread> threads;

    for (int i = 0; i < numThreads; i++)
    {
        threads.emplace_back(worker, i + 1);
    }

    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Final atomic counter: " << atomicCounter.load() << std::endl;
    std::cout << "Expected: " << numThreads * 10000 << std::endl;

    // Test atomic pointer
    int* testPtr = new int(42);
    atomicPtr.store(testPtr);

    std::cout << "Atomic pointer value: " << *atomicPtr.load() << std::endl;
    delete testPtr;
}

// 6. Semaphore example (C++20)
void SemaphoreExample()
{
    std::cout << "\n=== Semaphore Example ===" << std::endl;

    // Create a semaphore that allows 3 concurrent accesses
    std::counting_semaphore<> semaphore(3);

    auto worker = [&semaphore](int workerId)
    {
        std::cout << "Worker " << workerId << " waiting for semaphore..." << std::endl;

        // Acquire semaphore
        semaphore.acquire();

        std::cout << "Worker " << workerId << " acquired semaphore, working..." << std::endl;

        // Simulate work
        std::this_thread::sleep_for(std::chrono::seconds(2));

        std::cout << "Worker " << workerId << " finished, releasing semaphore" << std::endl;

        // Release semaphore
        semaphore.release();
    };

    const int numWorkers = 8;
    std::vector<std::thread> threads;

    // Start workers
    for (int i = 1; i <= numWorkers; i++)
    {
        threads.emplace_back(worker, i);
    }

    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Semaphore example completed" << std::endl;
}

// 7. Windows-specific synchronization (CRITICAL_SECTION)
void WindowsCriticalSectionExample()
{
    std::cout << "\n=== Windows Critical Section Example ===" << std::endl;

    CRITICAL_SECTION cs;
    InitializeCriticalSection(&cs);

    int sharedResource = 0;
    const int numThreads = 4;
    const int iterationsPerThread = 1000;

    auto worker = [&cs, &sharedResource, iterationsPerThread](int threadId)
    {
        for (int i = 0; i < iterationsPerThread; i++)
        {
            // Enter critical section
            EnterCriticalSection(&cs);

            // Critical section - protected access
            int current = sharedResource;
            current++;
            sharedResource = current;

            if (i % 200 == 0)
            {
                std::cout << "Thread " << threadId << ": sharedResource = " << sharedResource << std::endl;
            }

            // Leave critical section
            LeaveCriticalSection(&cs);
        }

        std::cout << "Thread " << threadId << " completed" << std::endl;
    };

    std::vector<std::thread> threads;

    for (int i = 0; i < numThreads; i++)
    {
        threads.emplace_back(worker, i + 1);
    }

    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Final shared resource value: " << sharedResource << std::endl;
    std::cout << "Expected: " << numThreads * iterationsPerThread << std::endl;

    DeleteCriticalSection(&cs);
}

// 8. Windows Event object example
void WindowsEventExample()
{
    std::cout << "\n=== Windows Event Object Example ===" << std::endl;

    // Create manual-reset event
    HANDLE hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr);
    if (hEvent == nullptr)
    {
        std::cerr << "Failed to create event" << std::endl;
        return;
    }

    std::atomic<int> completedWorkers(0);

    auto worker = [hEvent, &completedWorkers](int workerId)
    {
        std::cout << "Worker " << workerId << " started" << std::endl;

        // Simulate work
        std::this_thread::sleep_for(std::chrono::milliseconds(1000 + workerId * 500));

        std::cout << "Worker " << workerId << " completed" << std::endl;

        // Increment counter
        int completed = completedWorkers.fetch_add(1) + 1;

        // Signal event if all workers are done
        if (completed == 3)
        {
            SetEvent(hEvent);
            std::cout << "Event signaled by worker " << workerId << std::endl;
        }
    };

    // Start workers
    std::thread t1(worker, 1);
    std::thread t2(worker, 2);
    std::thread t3(worker, 3);

    // Wait for event
    std::cout << "Main thread waiting for all workers to complete..." << std::endl;
    DWORD waitResult = WaitForSingleObject(hEvent, INFINITE);

    if (waitResult == WAIT_OBJECT_0)
    {
        std::cout << "Event signaled - all workers completed!" << std::endl;
    }
    else
    {
        std::cerr << "Wait failed with error: " << GetLastError() << std::endl;
    }

    t1.join();
    t2.join();
    t3.join();

    CloseHandle(hEvent);
}

// 9. Barrier synchronization example
void BarrierExample()
{
    std::cout << "\n=== Barrier Synchronization Example ===" << std::endl;

    class SimpleBarrier
    {
    private:
        std::mutex mtx;
        std::condition_variable cv;
        int count;
        int waiting;
        int phase;

    public:
        SimpleBarrier(int threadCount) : count(threadCount), waiting(0), phase(0) {}

        void wait()
        {
            std::unique_lock<std::mutex> lock(mtx);
            int currentPhase = phase;

            waiting++;

            if (waiting == count)
            {
                // Last thread to arrive
                phase++;
                waiting = 0;
                cv.notify_all();
            }
            else
            {
                // Wait for other threads
                cv.wait(lock, [this, currentPhase]() { return phase != currentPhase; });
            }
        }
    };

    const int numThreads = 4;
    SimpleBarrier barrier(numThreads);

    auto worker = [&barrier](int threadId)
    {
        for (int phase = 1; phase <= 3; phase++)
        {
            std::cout << "Thread " << threadId << " - Phase " << phase << " - Working" << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200 + threadId * 100));
            std::cout << "Thread " << threadId << " - Phase " << phase << " - At barrier" << std::endl;

            barrier.wait();

            std::cout << "Thread " << threadId << " - Phase " << phase << " - Passed barrier" << std::endl;
        }
    };

    std::vector<std::thread> threads;

    for (int i = 1; i <= numThreads; i++)
    {
        threads.emplace_back(worker, i);
    }

    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Barrier synchronization completed" << std::endl;
}

// 10. Latch example (single-use barrier)
void LatchExample()
{
    std::cout << "\n=== Latch Example ===" << std::endl;

    class Latch
    {
    private:
        std::mutex mtx;
        std::condition_variable cv;
        int count;

    public:
        Latch(int initialCount) : count(initialCount) {}

        void countDown()
        {
            std::lock_guard<std::mutex> lock(mtx);
            count--;
            if (count == 0)
            {
                cv.notify_all();
            }
        }

        void wait()
        {
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock, [this]() { return count == 0; });
        }
    };

    Latch latch(3);
    std::atomic<int> readyCount(0);

    auto worker = [&latch, &readyCount](int threadId)
    {
        // Simulate initialization work
        std::this_thread::sleep_for(std::chrono::milliseconds(500 + threadId * 200));

        std::cout << "Worker " << threadId << " ready" << std::endl;
        readyCount++;

        latch.countDown();

        // Wait for all workers to be ready
        latch.wait();

        std::cout << "Worker " << threadId << " starting main work" << std::endl;

        // Main work
        for (int i = 0; i < 3; i++)
        {
            std::cout << "Worker " << threadId << " working..." << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(300));
        }

        std::cout << "Worker " << threadId << " completed" << std::endl;
    };

    std::vector<std::thread> threads;

    for (int i = 1; i <= 3; i++)
    {
        threads.emplace_back(worker, i);
    }

    for (auto& thread : threads)
    {
        thread.join();
    }

    std::cout << "Latch example completed" << std::endl;
}

int main()
{
    std::cout << "=== C++ Windows Thread Synchronization Examples ===" << std::endl;
    std::cout << "Demonstrating various synchronization mechanisms\n" << std::endl;

    try
    {
        // Standard C++ synchronization
        BasicMutexExample();
        DeadlockExample();
        ConditionVariableExample();
        ReadWriteLockExample();
        AtomicOperationsExample();
        SemaphoreExample();
        BarrierExample();
        LatchExample();

        // Windows-specific synchronization
        WindowsCriticalSectionExample();
        WindowsEventExample();

        std::cout << "\nAll synchronization examples completed successfully!" << std::endl;
    }
    catch (const std::exception& e)
    {
        std::cerr << "Unexpected error: " << e.what() << std::endl;
        return 1;
    }

    return 0;
}