Windows 多线程 - C# 示例
Windows平台C#多线程示例,包括线程创建、同步、线程池和并发编程模式
💻 线程创建和管理 csharp
🟢 simple
⭐⭐
创建、启动和管理C#线程进行并行执行
⏱️ 20 min
🏷️ csharp, threading, concurrency, parallel, windows
Prerequisites:
Basic C# syntax, Understanding of concurrent programming
using System;
using System.Threading;
using System.Threading.Tasks;
class ThreadCreation
{
// 1. Simple thread creation
public static void CreateSimpleThread()
{
Console.WriteLine("=== Simple Thread Creation ===");
Thread workerThread = new Thread(new ThreadStart(SimpleWorkerMethod));
Console.WriteLine("Starting worker thread...");
workerThread.Start();
// Main thread continues working
for (int i = 1; i <= 3; i++)
{
Console.WriteLine($"Main thread working: {i}");
Thread.Sleep(500);
}
// Wait for worker thread to complete
workerThread.Join();
Console.WriteLine("Worker thread completed. Main thread finished.");
}
// Worker method for simple thread
static void SimpleWorkerMethod()
{
for (int i = 1; i <= 5; i++)
{
Console.WriteLine($"Worker thread working: {i}");
Thread.Sleep(300);
}
}
// 2. Thread with parameters
public static void CreateThreadWithParameters()
{
Console.WriteLine("\n=== Thread with Parameters ===");
string message = "Hello from parameterized thread!";
int iterations = 3;
// Create thread with parameterized method
Thread paramThread = new Thread(new ParameterizedThreadStart(ParameterizedWorkerMethod));
Console.WriteLine("Starting parameterized thread...");
paramThread.Start(new { Message = message, Iterations = iterations });
// Wait for completion
paramThread.Join();
Console.WriteLine("Parameterized thread completed.");
}
// Worker method that accepts parameters
static void ParameterizedWorkerMethod(object data)
{
dynamic parameters = data;
string message = parameters.Message;
int iterations = parameters.Iterations;
for (int i = 1; i <= iterations; i++)
{
Console.WriteLine($"{message} (Iteration {i})");
Thread.Sleep(600);
}
}
// 3. Multiple threads with different priorities
public static void ThreadsWithPriorities()
{
Console.WriteLine("\n=== Threads with Different Priorities ===");
Thread highPriorityThread = new Thread(() => WorkerByPriority("High", 3));
Thread normalPriorityThread = new Thread(() => WorkerByPriority("Normal", 3));
Thread lowPriorityThread = new Thread(() => WorkerByPriority("Low", 3));
// Set thread priorities
highPriorityThread.Priority = ThreadPriority.Highest;
normalPriorityThread.Priority = ThreadPriority.Normal;
lowPriorityThread.Priority = ThreadPriority.Lowest;
Console.WriteLine("Starting threads with different priorities...");
highPriorityThread.Start();
normalPriorityThread.Start();
lowPriorityThread.Start();
// Wait for all threads
highPriorityThread.Join();
normalPriorityThread.Join();
lowPriorityThread.Join();
Console.WriteLine("All priority threads completed.");
}
// Worker method that shows thread priority
static void WorkerByPriority(string priorityName, int iterations)
{
Thread.CurrentThread.Name = $"{priorityName} Priority Thread";
Console.WriteLine($"{Thread.CurrentThread.Name} started with priority: {Thread.CurrentThread.Priority}");
for (int i = 1; i <= iterations; i++)
{
Console.WriteLine($"{priorityName} priority - Work {i}");
Thread.Sleep(200);
}
Console.WriteLine($"{priorityName} priority thread finished.");
}
// 4. Thread with return value (using Task)
public static void ThreadWithReturnValue()
{
Console.WriteLine("\n=== Thread with Return Value (Task) ===");
// Create and start a task that returns a value
Task<int> calculationTask = Task.Run(() => CalculateSomething(100));
Console.WriteLine("Main thread continues working while task runs...");
// Do some other work
for (int i = 1; i <= 3; i++)
{
Console.WriteLine($"Main thread work {i}");
Thread.Sleep(300);
}
// Get the result (this will wait if task isn't completed)
int result = calculationTask.Result;
Console.WriteLine($"Calculation result: {result}");
}
// Method that performs calculation and returns result
static int CalculateSomething(int n)
{
Console.WriteLine("Calculation task started...");
int sum = 0;
for (int i = 1; i <= n; i++)
{
sum += i;
if (i % 10 == 0)
{
Console.WriteLine($"Calculation progress: {i}/{n}");
Thread.Sleep(50);
}
}
Console.WriteLine("Calculation task completed.");
return sum;
}
// 5. Thread background vs foreground
public static void BackgroundVsForegroundThreads()
{
Console.WriteLine("\n=== Background vs Foreground Threads ===");
// Create a background thread
Thread backgroundThread = new Thread(() =>
{
Thread.CurrentThread.Name = "Background Thread";
Console.WriteLine($"{Thread.CurrentThread.Name} started");
for (int i = 1; i <= 10; i++)
{
Console.WriteLine($"{Thread.CurrentThread.Name} working: {i}");
Thread.Sleep(1000);
}
Console.WriteLine($"{Thread.CurrentThread.Name} completed");
});
backgroundThread.IsBackground = true;
Console.WriteLine("Starting background thread...");
backgroundThread.Start();
// Create a foreground thread
Thread foregroundThread = new Thread(() =>
{
Thread.CurrentThread.Name = "Foreground Thread";
Console.WriteLine($"{Thread.CurrentThread.Name} started");
for (int i = 1; i <= 3; i++)
{
Console.WriteLine($"{Thread.CurrentThread.Name} working: {i}");
Thread.Sleep(1500);
}
Console.WriteLine($"{Thread.CurrentThread.Name} completed");
});
Console.WriteLine("Starting foreground thread...");
foregroundThread.Start();
// Wait for foreground thread (background thread will be terminated when app exits)
foregroundThread.Join();
Console.WriteLine("Foreground thread completed. Application will exit now.");
Console.WriteLine("(Background thread would be terminated if this were a real application)");
}
// 6. Thread abort and cancellation
public static void ThreadCancellation()
{
Console.WriteLine("\n=== Thread Cancellation ===");
// Create a cancellation token source
CancellationTokenSource cts = new CancellationTokenSource();
// Create a long-running task that can be cancelled
Task<longRunningTask> runningTask = Task.Run(() =>
LongRunningMethod(cts.Token), cts.Token);
// Let the task run for a bit
Thread.Sleep(3000);
Console.WriteLine("Requesting cancellation...");
cts.Cancel();
try
{
// Wait for task to complete or cancel
runningTask.Wait();
Console.WriteLine($"Task completed with result: {runningTask.Result}");
}
catch (AggregateException ex)
{
if (ex.InnerExceptions[0] is OperationCanceledException)
{
Console.WriteLine("Task was successfully cancelled.");
}
else
{
Console.WriteLine($"Task failed: {ex.InnerExceptions[0].Message}");
}
}
}
// Long-running method that supports cancellation
static int LongRunningMethod(CancellationToken cancellationToken)
{
Console.WriteLine("Long-running task started...");
for (int i = 1; i <= 100; i++)
{
// Check for cancellation
if (cancellationToken.IsCancellationRequested)
{
Console.WriteLine($"Cancellation requested at iteration {i}");
cancellationToken.ThrowIfCancellationRequested();
}
Console.WriteLine($"Working... {i}%");
Thread.Sleep(100);
}
Console.WriteLine("Long-running task completed successfully.");
return 42; // Return some result
}
// 7. Thread safe data sharing
public static void ThreadSafeDataSharing()
{
Console.WriteLine("\n=== Thread-Safe Data Sharing ===");
// Shared counter with lock
object lockObject = new object();
int sharedCounter = 0;
// Create multiple threads that modify shared data
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread((threadId) =>
{
int threadNum = (int)threadId;
Thread.CurrentThread.Name = $"Worker {threadNum}";
for (int j = 1; j <= 1000; j++)
{
lock (lockObject)
{
int temp = sharedCounter;
// Simulate some processing
Thread.Sleep(1);
sharedCounter = temp + 1;
if (j % 200 == 0)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Shared counter = {sharedCounter}");
}
}
}
Console.WriteLine($"{Thread.CurrentThread.Name} completed.");
});
}
// Start all threads
for (int i = 0; i < threads.Length; i++)
{
threads[i].Start(i);
}
// Wait for all threads to complete
foreach (Thread thread in threads)
{
thread.Join();
}
Console.WriteLine($"Final shared counter value: {sharedCounter}");
}
// 8. Thread with exception handling
public static void ThreadExceptionHandling()
{
Console.WriteLine("\n=== Thread Exception Handling ===");
Thread threadWithException = new Thread(() =>
{
try
{
Console.WriteLine("Thread starting work...");
// Simulate work
for (int i = 1; i <= 3; i++)
{
Console.WriteLine($"Thread working: {i}");
Thread.Sleep(500);
}
// Throw an exception
throw new InvalidOperationException("Something went wrong in the thread!");
}
catch (Exception ex)
{
Console.WriteLine($"Thread caught exception: {ex.Message}");
// Handle the exception
Thread.CurrentThread.Abort();
}
});
threadWithException.Start();
threadWithException.Join();
Console.WriteLine("Thread execution completed with exception handling.");
}
// 9. Thread pool usage
public static void ThreadPoolUsage()
{
Console.WriteLine("\n=== Thread Pool Usage ===");
Console.WriteLine($"Available worker threads: {ThreadPool.ThreadCount}");
Console.WriteLine($"Available completion port threads: {ThreadPool.CompletedWorkItemCount}");
// Queue work items to thread pool
for (int i = 1; i <= 5; i++)
{
int workItem = i;
ThreadPool.QueueUserWorkItem(state =>
{
Thread.CurrentThread.Name = $"ThreadPool Worker {workItem}";
Console.WriteLine($"{Thread.CurrentThread.Name} processing item {workItem}");
// Simulate work
Thread.Sleep(1000);
Console.WriteLine($"{Thread.CurrentThread.Name} completed item {workItem}");
});
}
// Wait for thread pool work to complete
Thread.Sleep(6000);
Console.WriteLine("All thread pool work items completed.");
}
// 10. Thread naming and identification
public static void ThreadIdentification()
{
Console.WriteLine("\n=== Thread Identification ===");
Console.WriteLine($"Main thread ID: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"Main thread name: {Thread.CurrentThread.Name}");
Console.WriteLine($"Main thread is background: {Thread.CurrentThread.IsBackground}");
Console.WriteLine($"Main thread is alive: {Thread.CurrentThread.IsAlive}");
Console.WriteLine($"Main thread state: {Thread.CurrentThread.ThreadState}");
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.Length; i++)
{
int threadIndex = i;
threads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Named Thread {threadIndex + 1}";
Console.WriteLine($"Thread {Thread.CurrentThread.Name} - ID: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"Thread {Thread.CurrentThread.Name} - Is Background: {Thread.CurrentThread.IsBackground}");
Console.WriteLine($"Thread {Thread.CurrentThread.Name} - State: {Thread.CurrentThread.ThreadState}");
// Simulate work
Thread.Sleep(1000);
Console.WriteLine($"Thread {Thread.CurrentThread.Name} - Finished");
});
}
// Start all threads
foreach (Thread thread in threads)
{
thread.Start();
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
}
static void Main(string[] args)
{
Console.WriteLine("=== C# Thread Creation Demo ===\n");
try
{
CreateSimpleThread();
CreateThreadWithParameters();
ThreadsWithPriorities();
ThreadWithReturnValue();
BackgroundVsForegroundThreads();
ThreadCancellation();
ThreadSafeDataSharing();
ThreadExceptionHandling();
ThreadPoolUsage();
ThreadIdentification();
Console.WriteLine("\nAll thread creation examples completed successfully!");
}
catch (Exception ex)
{
Console.WriteLine($"Error in thread demo: {ex.Message}");
}
}
}
💻 线程同步和锁 csharp
🟡 intermediate
⭐⭐⭐
使用锁、互斥体、信号量和其他同步原语进行线程同步技术
⏱️ 30 min
🏷️ csharp, synchronization, threading, concurrency, windows
Prerequisites:
C# threading basics, Understanding of race conditions
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
class ThreadSynchronization
{
// 1. Simple lock usage
private static readonly object simpleLock = new object();
private static int sharedCounter = 0;
public static void SimpleLockDemo()
{
Console.WriteLine("=== Simple Lock Demo ===");
sharedCounter = 0;
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(() =>
{
for (int j = 0; j < 1000; j++)
{
lock (simpleLock)
{
int temp = sharedCounter;
Thread.Sleep(1); // Simulate work
sharedCounter = temp + 1;
}
}
});
}
// Start all threads
foreach (Thread thread in threads)
{
thread.Start();
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
Console.WriteLine($"Final counter value: {sharedCounter} (Expected: 10000)");
Console.WriteLine($"Lock was {(sharedCounter == 10000 ? "successful" : "unsuccessful")} in preventing race conditions");
}
// 2. Monitor.Enter/Exit with try/finally
private static readonly object monitorLock = new object();
private static bool criticalResourceAvailable = true;
public static void MonitorDemo()
{
Console.WriteLine("\n=== Monitor.Enter/Exit Demo ===");
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.Length; i++)
{
int threadId = i;
threads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Monitor Worker {threadId}";
UseCriticalResource();
});
}
// Start threads with small delay
foreach (Thread thread in threads)
{
thread.Start();
Thread.Sleep(100);
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
Console.WriteLine("All Monitor operations completed.");
}
static void UseCriticalResource()
{
bool lockTaken = false;
try
{
Monitor.Enter(monitorLock, ref lockTaken);
if (!criticalResourceAvailable)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Resource not available, waiting...");
Monitor.Wait(monitorLock); // Release lock and wait
}
Console.WriteLine($"{Thread.CurrentThread.Name}: Acquired critical resource");
// Simulate critical section work
Thread.Sleep(2000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Releasing critical resource");
}
finally
{
if (lockTaken)
{
Monitor.Exit(monitorLock);
}
}
}
// 3. Mutex for inter-process synchronization
private static Mutex mutex = null;
public static void MutexDemo()
{
Console.WriteLine("\n=== Mutex Demo ===");
// Create or open a named mutex
bool isNewMutex;
mutex = new Mutex(false, "MyApplicationMutex", out isNewMutex);
Console.WriteLine($"Mutex created: {isNewMutex}");
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.Length; i++)
{
int threadId = i;
threads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Mutex Thread {threadId}";
UseMutexResource();
});
}
// Start threads
foreach (Thread thread in threads)
{
thread.Start();
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
mutex.ReleaseMutex();
mutex.Dispose();
Console.WriteLine("Mutex demo completed.");
}
static void UseMutexResource()
{
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Waiting for mutex...");
// Wait for mutex with timeout
if (mutex.WaitOne(5000))
{
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Acquired mutex, using shared resource");
// Simulate work with shared resource
Thread.Sleep(3000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed work with shared resource");
}
finally
{
mutex.ReleaseMutex();
Console.WriteLine($"{Thread.CurrentThread.Name}: Released mutex");
}
}
else
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Timeout waiting for mutex");
}
}
catch (AbandonedMutexException)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Acquired abandoned mutex");
mutex.ReleaseMutex();
}
}
// 4. Semaphore for limiting concurrent access
private static SemaphoreSlim semaphore = new SemaphoreSlim(2, 2); // Allow 2 concurrent threads
public static void SemaphoreDemo()
{
Console.WriteLine("\n=== Semaphore Demo ===");
Console.WriteLine("Semaphore allows maximum 2 concurrent threads");
Thread[] threads = new Thread[6];
for (int i = 0; i < threads.Length; i++)
{
int threadId = i;
threads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Semaphore Thread {threadId}";
AccessLimitedResource();
});
}
// Start all threads
foreach (Thread thread in threads)
{
thread.Start();
Thread.Sleep(200); // Stagger starts
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
semaphore.Dispose();
Console.WriteLine("Semaphore demo completed.");
}
static void AccessLimitedResource()
{
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Waiting to access limited resource...");
// Wait for semaphore
semaphore.Wait();
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Acquired semaphore, accessing resource");
Console.WriteLine($"Current semaphore count: {semaphore.CurrentCount}");
// Simulate resource usage
Thread.Sleep(2000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Finished using resource");
}
finally
{
semaphore.Release();
Console.WriteLine($"{Thread.CurrentThread.Name}: Released semaphore");
}
}
catch (Exception ex)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Error - {ex.Message}");
}
}
// 5. ReaderWriterLockSlim for read-heavy scenarios
private static ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();
private static string sharedData = "Initial Data";
public static void ReaderWriterLockDemo()
{
Console.WriteLine("\n=== ReaderWriterLock Demo ===");
Thread[] threads = new Thread[8];
// Create 6 reader threads and 2 writer threads
for (int i = 0; i < threads.Length; i++)
{
int threadId = i;
bool isReader = i < 6;
threads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = isReader ? $"Reader {threadId}" : $"Writer {threadId}";
if (isReader)
{
ReadData();
}
else
{
WriteData($"Updated Data at {DateTime.Now:HH:mm:ss}");
}
});
}
// Start threads
foreach (Thread thread in threads)
{
thread.Start();
Thread.Sleep(100);
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
rwLock.Dispose();
Console.WriteLine("ReaderWriterLock demo completed.");
}
static void ReadData()
{
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Waiting to read...");
rwLock.EnterReadLock();
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Reading data: '{sharedData}'");
Thread.Sleep(1000); // Simulate read operation
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed reading");
}
finally
{
rwLock.ExitReadLock();
}
}
catch (Exception ex)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Read error - {ex.Message}");
}
}
static void WriteData(string newData)
{
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Waiting to write...");
rwLock.EnterWriteLock();
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Writing data...");
sharedData = newData;
Thread.Sleep(2000); // Simulate write operation
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed writing");
}
finally
{
rwLock.ExitWriteLock();
}
}
catch (Exception ex)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Write error - {ex.Message}");
}
}
// 6. Barrier for thread synchronization points
public static void BarrierDemo()
{
Console.WriteLine("\n=== Barrier Demo ===");
// Barrier that requires 4 threads to continue
Barrier barrier = new Barrier(4, b =>
{
Console.WriteLine($"Phase {b.CurrentPhaseNumber} completed. All threads have reached the barrier.");
});
Thread[] threads = new Thread[4];
for (int i = 0; i < threads.Length; i++)
{
int threadId = i;
threads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Barrier Thread {threadId}";
WorkWithBarrier(barrier);
});
}
// Start threads
foreach (Thread thread in threads)
{
thread.Start();
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
barrier.Dispose();
Console.WriteLine("Barrier demo completed.");
}
static void WorkWithBarrier(Barrier barrier)
{
try
{
// Phase 1: Initial work
Console.WriteLine($"{Thread.CurrentThread.Name}: Starting phase 1 work");
Thread.Sleep(new Random().Next(1000, 3000));
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed phase 1, waiting at barrier");
barrier.SignalAndWait();
// Phase 2: Second phase work
Console.WriteLine($"{Thread.CurrentThread.Name}: Starting phase 2 work");
Thread.Sleep(new Random().Next(1000, 3000));
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed phase 2, waiting at barrier");
barrier.SignalAndWait();
// Final phase
Console.WriteLine($"{Thread.CurrentThread.Name}: Starting final phase work");
Thread.Sleep(new Random().Next(500, 1500));
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed all work");
}
catch (Exception ex)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Error - {ex.Message}");
}
}
// 7. CountdownEvent for coordinating multiple threads
public static void CountdownEventDemo()
{
Console.WriteLine("\n=== CountdownEvent Demo ===");
// CountdownEvent that waits for 5 signals
CountdownEvent countdown = new CountdownEvent(5);
// Thread that waits for countdown
Thread coordinatorThread = new Thread(() =>
{
Thread.CurrentThread.Name = "Coordinator";
Console.WriteLine($"{Thread.CurrentThread.Name}: Waiting for all worker threads to signal completion...");
countdown.Wait();
Console.WriteLine($"{Thread.CurrentThread.Name}: All workers completed! Proceeding with coordination task.");
});
coordinatorThread.Start();
// Create worker threads
Thread[] workerThreads = new Thread[5];
for (int i = 0; i < workerThreads.Length; i++)
{
int threadId = i;
workerThreads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Worker {threadId}";
// Simulate work
Console.WriteLine($"{Thread.CurrentThread.Name}: Starting work...");
Thread.Sleep(new Random().Next(1000, 4000));
Console.WriteLine($"{Thread.CurrentThread.Name}: Work completed, signaling coordinator");
countdown.Signal();
});
}
// Start all workers
foreach (Thread thread in workerThreads)
{
thread.Start();
}
// Wait for completion
coordinatorThread.Join();
foreach (Thread thread in workerThreads)
{
thread.Join();
}
countdown.Dispose();
Console.WriteLine("CountdownEvent demo completed.");
}
// 8. AutoResetEvent and ManualResetEvent
private static AutoResetEvent autoEvent = new AutoResetEvent(false);
private static ManualResetEvent manualEvent = new ManualResetEvent(false);
public static void ResetEventDemo()
{
Console.WriteLine("\n=== Reset Events Demo ===");
// AutoResetEvent demo
Thread autoProducer = new Thread(() =>
{
Thread.CurrentThread.Name = "Auto Producer";
for (int i = 1; i <= 5; i++)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Produced item {i}");
Thread.Sleep(1000);
autoEvent.Set(); // Signal one waiting thread
Console.WriteLine($"{Thread.CurrentThread.Name}: Signaled consumer");
}
});
Thread autoConsumer = new Thread(() =>
{
Thread.CurrentThread.Name = "Auto Consumer";
for (int i = 1; i <= 5; i++)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Waiting for item...");
autoEvent.WaitOne();
Console.WriteLine($"{Thread.CurrentThread.Name}: Consumed item {i}");
}
});
autoProducer.Start();
autoConsumer.Start();
// Wait for auto event demo
autoProducer.Join();
autoConsumer.Join();
Console.WriteLine("\n--- ManualResetEvent Demo ---");
// ManualResetEvent demo
Thread manualSignaler = new Thread(() =>
{
Thread.CurrentThread.Name = "Manual Signaler";
Thread.Sleep(2000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Setting manual event (allowing all waiting threads)");
manualEvent.Set();
Thread.Sleep(3000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Resetting manual event (blocking new threads)");
manualEvent.Reset();
Thread.Sleep(2000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Setting manual event again");
manualEvent.Set();
});
Thread[] manualWaiters = new Thread[4];
for (int i = 0; i < manualWaiters.Length; i++)
{
int threadId = i;
manualWaiters[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Manual Waiter {threadId}";
Thread.Sleep(threadId * 1000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Waiting for manual event");
manualEvent.WaitOne();
Console.WriteLine($"{Thread.CurrentThread.Name}: Manual event received, proceeding");
});
}
manualSignaler.Start();
foreach (Thread thread in manualWaiters)
{
thread.Start();
}
// Wait for manual event demo
manualSignaler.Join();
foreach (Thread thread in manualWaiters)
{
thread.Join();
}
// Cleanup
autoEvent.Dispose();
manualEvent.Dispose();
Console.WriteLine("Reset events demo completed.");
}
// 9. Interlocked operations for atomic operations
private static long atomicCounter = 0;
private static double atomicBalance = 1000.0;
public static void InterlockedDemo()
{
Console.WriteLine("\n=== Interlocked Operations Demo ===");
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Interlocked Worker";
// Atomic increment
long newValue = Interlocked.Increment(ref atomicCounter);
Console.WriteLine($"{Thread.CurrentThread.Name}: Incremented counter to {newValue}");
// Atomic compare and exchange
double currentBalance = atomicBalance;
double withdrawal = 50.0;
double newBalance = currentBalance - withdrawal;
double exchangedBalance = Interlocked.CompareExchange(ref atomicBalance, newBalance, currentBalance);
if (exchangedBalance == currentBalance)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Successfully withdrew {withdrawal:C}, new balance: {newBalance:C}");
}
else
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Withdrawal failed due to concurrent modification");
}
Thread.Sleep(100);
});
}
// Start all threads
foreach (Thread thread in threads)
{
thread.Start();
}
// Wait for completion
foreach (Thread thread in threads)
{
thread.Join();
}
Console.WriteLine($"Final atomic counter: {atomicCounter}");
Console.WriteLine($"Final atomic balance: {atomicBalance:C}");
Console.WriteLine("Interlocked operations demo completed.");
}
// 10. Concurrent collections for thread-safe collections
private static ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>();
private static ConcurrentDictionary<string, int> concurrentDict = new ConcurrentDictionary<string, int>();
private static BlockingCollection<string> blockingCollection = new BlockingCollection<string>();
public static void ConcurrentCollectionsDemo()
{
Console.WriteLine("\n=== Concurrent Collections Demo ===");
// Producer-Consumer with BlockingCollection
Thread producer = new Thread(() =>
{
Thread.CurrentThread.Name = "Producer";
for (int i = 1; i <= 10; i++)
{
string item = $"Item {i}";
blockingCollection.Add(item);
Console.WriteLine($"{Thread.CurrentThread.Name}: Produced {item}");
Thread.Sleep(200);
}
blockingCollection.CompleteAdding();
Console.WriteLine($"{Thread.CurrentThread.Name}: Production completed");
});
Thread consumer = new Thread(() =>
{
Thread.CurrentThread.Name = "Consumer";
foreach (string item in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Consumed {item}");
Thread.Sleep(500);
}
Console.WriteLine($"{Thread.CurrentThread.Name}: Consumption completed");
});
// ConcurrentQueue demo
Thread[] queueWorkers = new Thread[3];
for (int i = 0; i < queueWorkers.Length; i++)
{
int threadId = i;
queueWorkers[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Queue Worker {threadId}";
for (int j = 1; j <= 5; j++)
{
string item = $"Thread {threadId} Item {j}";
concurrentQueue.Enqueue(item);
Console.WriteLine($"{Thread.CurrentThread.Name}: Enqueued {item}");
Thread.Sleep(100);
}
});
}
// ConcurrentDictionary demo
Thread[] dictWorkers = new Thread[3];
for (int i = 0; i < dictWorkers.Length; i++)
{
int threadId = i;
dictWorkers[i] = new Thread(() =>
{
Thread.CurrentThread.Name = $"Dict Worker {threadId}";
for (int j = 1; j <= 5; j++)
{
string key = $"Key_{threadId}_{j}";
concurrentDict.AddOrUpdate(key, 1, (k, v) => v + 1);
Console.WriteLine($"{Thread.CurrentThread.Name}: Updated {key}");
Thread.Sleep(150);
}
});
}
// Start all threads
producer.Start();
consumer.Start();
foreach (Thread thread in queueWorkers)
{
thread.Start();
}
foreach (Thread thread in dictWorkers)
{
thread.Start();
}
// Wait for completion
producer.Join();
consumer.Join();
foreach (Thread thread in queueWorkers)
{
thread.Join();
}
foreach (Thread thread in dictWorkers)
{
thread.Join();
}
// Display results
Console.WriteLine("\n--- Queue Contents ---");
while (concurrentQueue.TryDequeue(out string queueItem))
{
Console.WriteLine($"Dequeued: {queueItem}");
}
Console.WriteLine("\n--- Dictionary Contents ---");
foreach (var kvp in concurrentDict)
{
Console.WriteLine($"{kvp.Key}: {kvp.Value}");
}
blockingCollection.Dispose();
Console.WriteLine("Concurrent collections demo completed.");
}
static void Main(string[] args)
{
Console.WriteLine("=== C# Thread Synchronization Demo ===\n");
try
{
SimpleLockDemo();
MonitorDemo();
MutexDemo();
SemaphoreDemo();
ReaderWriterLockDemo();
BarrierDemo();
CountdownEventDemo();
ResetEventDemo();
InterlockedDemo();
ConcurrentCollectionsDemo();
Console.WriteLine("\nAll synchronization examples completed successfully!");
}
catch (Exception ex)
{
Console.WriteLine($"Error in synchronization demo: {ex.Message}");
}
}
}
💻 线程池和任务并行库 csharp
🟡 intermediate
⭐⭐⭐⭐
高级线程池使用、任务并行库(TPL)和并行编程模式
⏱️ 35 min
🏷️ csharp, threadpool, parallel, async, tpl, windows
Prerequisites:
C# async/await, Task Parallel Library basics, LINQ
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
using System.Linq;
class ThreadPoolTPL
{
// 1. Basic ThreadPool usage
public static void BasicThreadPoolDemo()
{
Console.WriteLine("=== Basic ThreadPool Demo ===");
Console.WriteLine($"Min threads: {ThreadPool.GetMinThreads(out int minWorker, out int minIO)} Worker: {minWorker}, IO: {minIO}");
Console.WriteLine($"Max threads: {ThreadPool.GetMaxThreads(out int maxWorker, out int maxIO)} Worker: {maxWorker}, IO: {maxIO}");
// Queue multiple work items
for (int i = 1; i <= 10; i++)
{
int workItem = i;
ThreadPool.QueueUserWorkItem(state =>
{
Thread.CurrentThread.Name = $"ThreadPool Worker {workItem}";
Console.WriteLine($"{Thread.CurrentThread.Name}: Processing item {workItem}");
// Simulate work
Thread.Sleep(1000 + workItem * 100);
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed item {workItem}");
});
}
// Wait for work to complete
Thread.Sleep(15000);
Console.WriteLine("ThreadPool demo completed.");
}
// 2. Task.Run and basic TPL
public static async Task BasicTaskDemo()
{
Console.WriteLine("\n=== Basic Task Demo ===");
// Create and run tasks
Task<int> calculationTask = Task.Run(() =>
{
Console.WriteLine("Calculation task started on thread: " + Thread.CurrentThread.ManagedThreadId);
int result = 0;
for (int i = 1; i <= 100; i++)
{
result += i;
if (i % 25 == 0)
{
Console.WriteLine($"Calculation progress: {i}/100");
}
}
Console.WriteLine("Calculation task completed");
return result;
});
Task<string> stringTask = Task.Run(() =>
{
Console.WriteLine("String task started on thread: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
return "Hello from Task!";
});
// Main thread continues work
Console.WriteLine("Main thread continues working while tasks run...");
for (int i = 1; i <= 3; i++)
{
Console.WriteLine($"Main thread work {i}");
await Task.Delay(500);
}
// Wait for results
int calculationResult = await calculationTask;
string stringResult = await stringTask;
Console.WriteLine($"Calculation result: {calculationResult}");
Console.WriteLine($"String result: {stringResult}");
}
// 3. Task.ContinueWith
public static void TaskContinuationDemo()
{
Console.WriteLine("\n=== Task Continuation Demo ===");
Task<int> initialTask = Task.Run(() =>
{
Console.WriteLine("Initial task starting...");
Thread.Sleep(2000);
Console.WriteLine("Initial task completed");
return 42;
});
// Add continuations
Task continuation1 = initialTask.ContinueWith(antecedent =>
{
Console.WriteLine($"Continuation 1: Received result {antecedent.Result}");
return antecedent.Result * 2;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
Task continuation2 = continuation1.ContinueWith(antecedent =>
{
Console.WriteLine($"Continuation 2: Final result {antecedent.Result}");
}, TaskContinuationOptions.OnlyOnRanToCompletion);
// Error handling continuation
Task errorContinuation = initialTask.ContinueWith(antecedent =>
{
Console.WriteLine($"Error handling: Task failed with {antecedent.Exception?.InnerException?.Message}");
}, TaskContinuationOptions.OnlyOnFaulted);
// Wait for all continuations
Task.WaitAll(continuation2, errorContinuation);
Console.WriteLine("All continuations completed.");
}
// 4. Task.WhenAll and WhenAny
public static async Task TaskWhenDemo()
{
Console.WriteLine("\n=== Task.WhenAll and WhenAny Demo ===");
// Create multiple tasks
Task[] tasks = new Task[5];
for (int i = 0; i < tasks.Length; i++)
{
int taskIndex = i;
tasks[i] = Task.Run(async () =>
{
Console.WriteLine($"Task {taskIndex} starting...");
await Task.Delay(new Random().Next(1000, 5000));
Console.WriteLine($"Task {taskIndex} completed");
return taskIndex * 10;
});
}
// WhenAny - wait for first task to complete
Task firstCompleted = await Task.WhenAny(tasks);
Console.WriteLine($"First task completed: {firstCompleted.Id}");
// WhenAll - wait for all tasks to complete
Console.WriteLine("Waiting for all tasks to complete...");
await Task.WhenAll(tasks);
Console.WriteLine("All tasks completed!");
// Get results from all tasks
Task<int>[] intTasks = tasks.Cast<Task<int>>().ToArray();
int[] results = await Task.WhenAll(intTasks);
Console.WriteLine($"All results: [{string.Join(", ", results)}]");
}
// 5. Parallel.For and Parallel.ForEach
public static void ParallelLoopsDemo()
{
Console.WriteLine("\n=== Parallel Loops Demo ===");
// Parallel.For
Console.WriteLine("--- Parallel.For Demo ---");
object syncObject = new object();
int totalSum = 0;
Parallel.For(1, 101, (i, state) =>
{
if (i > 50) // Stop condition
{
Console.WriteLine($"Stopping parallel loop at {i}");
state.Stop();
return;
}
int localSum = i * i;
// Thread-safe accumulation
lock (syncObject)
{
totalSum += localSum;
Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId}: {i}² = {localSum}");
}
});
Console.WriteLine($"Total sum of squares: {totalSum}");
// Parallel.ForEach
Console.WriteLine("\n--- Parallel.ForEach Demo ---");
var products = new[] { "Laptop", "Mouse", "Keyboard", "Monitor", "Webcam" };
Parallel.ForEach(products, new ParallelOptions { MaxDegreeOfParallelism = 2 }, product =>
{
Console.WriteLine($"Processing {product} on thread {Thread.CurrentThread.ManagedThreadId}");
// Simulate processing
Thread.Sleep(1000 + new Random().Next(500));
Console.WriteLine($"Completed processing {product}");
});
Console.WriteLine("Parallel loops demo completed.");
}
// 6. Parallel LINQ (PLINQ)
public static void PLINQDemo()
{
Console.WriteLine("\n=== PLINQ Demo ===");
// Create a large dataset
int[] numbers = Enumerable.Range(1, 1000000).ToArray();
Console.WriteLine($"Processing {numbers.Length:N0} numbers");
// Sequential LINQ
var sequentialStart = DateTime.Now;
var sequentialResult = numbers
.Where(n => n % 2 == 0)
.Select(n => n * n)
.Take(1000)
.ToList();
var sequentialTime = DateTime.Now - sequentialStart;
Console.WriteLine($"Sequential processing: {sequentialTime.TotalMilliseconds:F2}ms, Found {sequentialResult.Count} results");
// Parallel LINQ
var parallelStart = DateTime.Now;
var parallelResult = numbers
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.Where(n => n % 2 == 0)
.Select(n => n * n)
.Take(1000)
.ToList();
var parallelTime = DateTime.Now - parallelStart;
Console.WriteLine($"Parallel processing: {parallelTime.TotalMilliseconds:F2}ms, Found {parallelResult.Count} results");
Console.WriteLine($"Speedup: {(sequentialTime.TotalMilliseconds / parallelTime.TotalMilliseconds):F2}x");
// AsParallel() with ordering
Console.WriteLine("\n--- PLINQ with Ordering ---");
var orderedResult = numbers
.AsParallel()
.AsOrdered()
.Where(n => n % 1000 == 0)
.Take(10)
.ToList();
Console.WriteLine($"First 10 multiples of 1000: [{string.Join(", ", orderedResult)}]");
}
// 7. Producer-Consumer pattern with BlockingCollection
public static void ProducerConsumerDemo()
{
Console.WriteLine("\n=== Producer-Consumer Demo ===");
var queue = new BlockingCollection<string>(5); // Bounded capacity
var producerTasks = new Task[2];
var consumerTasks = new Task[3];
// Producers
for (int i = 0; i < producerTasks.Length; i++)
{
int producerId = i;
producerTasks[i] = Task.Run(async () =>
{
Thread.CurrentThread.Name = $"Producer {producerId}";
for (int j = 1; j <= 5; j++)
{
string item = $"Producer{producerId}-Item{j}";
Console.WriteLine($"{Thread.CurrentThread.Name}: Producing {item}");
queue.Add(item);
Console.WriteLine($"{Thread.CurrentThread.Name}: Added {item} to queue (Count: {queue.Count})");
await Task.Delay(1000); // Production time
}
Console.WriteLine($"{Thread.CurrentThread.Name}: Finished production");
});
}
// Consumers
for (int i = 0; i < consumerTasks.Length; i++)
{
int consumerId = i;
consumerTasks[i] = Task.Run(async () =>
{
Thread.CurrentThread.Name = $"Consumer {consumerId}";
while (!queue.IsCompleted)
{
try
{
string item = queue.Take(); // Blocks if empty
Console.WriteLine($"{Thread.CurrentThread.Name}: Consumed {item} (Count: {queue.Count})");
await Task.Delay(1500); // Processing time
}
catch (InvalidOperationException)
{
// Queue is completed and empty
break;
}
}
Console.WriteLine($"{Thread.CurrentThread.Name}: Finished consumption");
});
}
// Wait for all producers to complete
Task.WaitAll(producerTasks);
queue.CompleteAdding(); // Signal no more items will be added
// Wait for all consumers to finish
Task.WaitAll(consumerTasks);
Console.WriteLine("Producer-Consumer demo completed.");
}
// 8. TaskFactory and custom task creation
public static void TaskFactoryDemo()
{
Console.WriteLine("\n=== TaskFactory Demo ===");
// Create custom TaskFactory
var customFactory = new TaskFactory(
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskContinuationOptions.None,
TaskScheduler.Default);
// Create tasks with custom factory
Task[] customTasks = new Task[3];
for (int i = 0; i < customTasks.Length; i++)
{
int taskId = i;
customTasks[i] = customFactory.StartNew(() =>
{
Thread.CurrentThread.Name = $"Custom Task {taskId}";
Console.WriteLine($"{Thread.CurrentThread.Name}: Starting (Long-running task)");
// Simulate long-running work
Thread.Sleep(3000);
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed");
});
}
// TaskFactory with parent-child relationships
Task parentTask = Task.Factory.StartNew(() =>
{
Console.WriteLine("Parent task started");
Task child1 = Task.Factory.StartNew(() =>
{
Console.WriteLine("Child task 1 started");
Thread.Sleep(2000);
Console.WriteLine("Child task 1 completed");
}, TaskCreationOptions.AttachedToParent);
Task child2 = Task.Factory.StartNew(() =>
{
Console.WriteLine("Child task 2 started");
Thread.Sleep(1500);
Console.WriteLine("Child task 2 completed");
}, TaskCreationOptions.AttachedToParent);
Console.WriteLine("Parent task waiting for children");
// Parent automatically waits for attached children
});
// Wait for all tasks
Task.WaitAll(customTasks);
parentTask.Wait();
Console.WriteLine("TaskFactory demo completed.");
}
// 9. TaskScheduler and custom scheduling
public static void TaskSchedulerDemo()
{
Console.WriteLine("\n=== TaskScheduler Demo ===");
// Get current scheduler
TaskScheduler currentScheduler = TaskScheduler.Current;
TaskScheduler defaultScheduler = TaskScheduler.Default;
Console.WriteLine($"Current scheduler: {currentScheduler.GetType().Name}");
Console.WriteLine($"Default scheduler: {defaultScheduler.GetType().Name}");
// Task with specific scheduler
Task defaultSchedulerTask = Task.Factory.StartNew(() =>
{
Console.WriteLine($"Running on {TaskScheduler.Current.GetType().Name}");
Console.WriteLine($"Thread ID: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"Is ThreadPool: {Thread.CurrentThread.IsThreadPoolThread}");
});
// UI context simulation (would use SynchronizationContext in real UI app)
TaskScheduler uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
// Continue on UI thread (simulation)
Task uiTask = Task.Run(() =>
{
Console.WriteLine("Background work completed");
return "Result from background";
}).ContinueWith(antecedent =>
{
Console.WriteLine("This would run on UI thread in real application");
Console.WriteLine($"Background result: {antecedent.Result}");
}, uiScheduler);
// Wait for tasks
Task.WaitAll(defaultSchedulerTask, uiTask);
Console.WriteLine("TaskScheduler demo completed.");
}
// 10. Cancellation with TPL
public static async Task CancellationDemo()
{
Console.WriteLine("\n=== Task Cancellation Demo ===");
var cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
// Create long-running tasks that support cancellation
Task[] tasks = new Task[5];
for (int i = 0; i < tasks.Length; i++)
{
int taskId = i;
tasks[i] = Task.Run(async () =>
{
Thread.CurrentThread.Name = $"Cancellable Task {taskId}";
try
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Started");
for (int j = 1; j <= 10; j++)
{
token.ThrowIfCancellationRequested();
Console.WriteLine($"{Thread.CurrentThread.Name}: Working... {j * 10}%");
await Task.Delay(1000, token);
}
Console.WriteLine($"{Thread.CurrentThread.Name}: Completed successfully");
}
catch (OperationCanceledException)
{
Console.WriteLine($"{Thread.CurrentThread.Name}: Was cancelled");
}
}, token);
}
// Cancel after 5 seconds
Task.Delay(5000).ContinueWith(_ =>
{
Console.WriteLine("Initiating cancellation...");
cts.Cancel();
});
try
{
// Wait for all tasks (will handle cancellations gracefully)
await Task.WhenAll(tasks);
}
catch (AggregateException ex)
{
Console.WriteLine($"Caught {ex.InnerExceptions.Count(e => e is OperationCanceledException)} cancellation exceptions");
}
Console.WriteLine("Cancellation demo completed.");
}
// 11. Dataflow pattern with TPL Dataflow
public static void DataflowDemo()
{
Console.WriteLine("\n=== Dataflow Pattern Demo ===");
// Create buffer blocks
var producerBlock = new BufferBlock<int>();
var processorBlock = new TransformBlock<int, string>(x =>
{
Console.WriteLine($"Processing {x} on thread {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(500); // Simulate processing
return $"Processed_{x}";
});
var consumerBlock = new ActionBlock<string>(result =>
{
Console.WriteLine($"Consuming {result} on thread {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(300);
});
// Link blocks together
producerBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
processorBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });
// Start producer
Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
Console.WriteLine($"Producing {i}");
await producerBlock.SendAsync(i);
await Task.Delay(200);
}
producerBlock.Complete();
});
// Wait for completion
consumerBlock.Completion.Wait();
Console.WriteLine("Dataflow demo completed.");
}
static async Task Main(string[] args)
{
Console.WriteLine("=== C# ThreadPool and TPL Demo ===\n");
try
{
BasicThreadPoolDemo();
await BasicTaskDemo();
TaskContinuationDemo();
await TaskWhenDemo();
ParallelLoopsDemo();
PLINQDemo();
ProducerConsumerDemo();
TaskFactoryDemo();
TaskSchedulerDemo();
await CancellationDemo();
DataflowDemo();
Console.WriteLine("\nAll ThreadPool and TPL examples completed successfully!");
}
catch (Exception ex)
{
Console.WriteLine($"Error in TPL demo: {ex.Message}");
}
}
}