Exemples de Multithreading Web Go
Exemples de multithreading Web Go incluant goroutines, synchronisation avec mutex et worker pools
Key Facts
- Category
- Go
- Items
- 3
- Format Families
- sample
Sample Overview
Exemples de multithreading Web Go incluant goroutines, synchronisation avec mutex et worker pools This sample set belongs to Go and can be used to test related workflows inside Elysia Tools.
💻 Créer Goroutines go
🟢 simple
⭐⭐
Créer et démarrer des goroutines (threads légers) pour l'exécution concurrente
⏱️ 20 min
🏷️ go, web, multithreading
Prerequisites:
Basic Go, sync package, channels
// Web Go Goroutines Examples
// Creating and managing goroutines for concurrent execution
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// 1. Basic Goroutine
// simpleGoroutine demonstrates a basic goroutine
func simpleGoroutine(id int) {
fmt.Printf("Goroutine %d is running\n", id)
}
// BasicGoroutineExample shows how to create a basic goroutine
func BasicGoroutineExample() {
fmt.Println("--- Basic Goroutine ---")
// Launch multiple goroutines
for i := 1; i <= 3; i++ {
go simpleGoroutine(i)
}
// Wait for goroutines to finish
time.Sleep(time.Second)
fmt.Println("Main function completed")
}
// 2. Goroutine with WaitGroup
// workerWithWaitGroup demonstrates using WaitGroup
func workerWithWaitGroup(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
// Simulate work
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d completed\n", id)
}
// WaitGroupExample demonstrates using WaitGroup to wait for goroutines
func WaitGroupExample() {
fmt.Println("\n--- Goroutine with WaitGroup ---")
var wg sync.WaitGroup
// Launch 3 workers
for i := 1; i <= 3; i++ {
wg.Add(1)
go workerWithWaitGroup(i, &wg)
}
// Wait for all workers to complete
wg.Wait()
fmt.Println("All workers completed")
}
// 3. Goroutine with Parameters
// processTask processes a task with parameters
func processTask(taskID int, taskName string, duration time.Duration) {
fmt.Printf("Task %d (%s) started\n", taskID, taskName)
time.Sleep(duration)
fmt.Printf("Task %d (%s) completed in %v\n", taskID, taskName, duration)
}
// GoroutineWithParametersExample shows passing parameters to goroutines
func GoroutineWithParametersExample() {
fmt.Println("\n--- Goroutine with Parameters ---")
// Launch goroutines with different parameters
go processTask(1, "Download", 300*time.Millisecond)
go processTask(2, "Process", 500*time.Millisecond)
go processTask(3, "Upload", 200*time.Millisecond)
// Wait for completion
time.Sleep(time.Second)
}
// 4. Anonymous Goroutine
// AnonymousGoroutineExample demonstrates anonymous goroutines
func AnonymousGoroutineExample() {
fmt.Println("\n--- Anonymous Goroutine ---")
for i := 1; i <= 3; i++ {
go func(id int) {
fmt.Printf("Anonymous goroutine %d\n", id)
}(i)
}
time.Sleep(500 * time.Millisecond)
}
// 5. Goroutine with Return Values (using channels)
// computeResult performs computation and returns result via channel
func computeResult(a, b int, resultChan chan<- int) {
result := a + b
resultChan <- result
}
// GoroutineWithReturnValueExample shows getting return values from goroutines
func GoroutineWithReturnValueExample() {
fmt.Println("\n--- Goroutine with Return Values ---")
resultChan := make(chan int)
go computeResult(10, 20, resultChan)
// Wait for result
result := <-resultChan
fmt.Printf("Result from goroutine: %d\n", result)
}
// 6. Multiple Goroutines with Multiple Return Values
// divideAndRemainder calculates division and remainder
func divideAndRemainder(a, b int, resultChan chan<- [2]int) {
quotient := a / b
remainder := a % b
resultChan <- [2]int{quotient, remainder}
}
// MultipleReturnValuesExample demonstrates multiple return values
func MultipleReturnValuesExample() {
fmt.Println("\n--- Multiple Return Values ---")
resultChan := make(chan [2]int)
go divideAndRemainder(17, 5, resultChan)
result := <-resultChan
fmt.Printf("Quotient: %d, Remainder: %d\n", result[0], result[1])
}
// 7. Controlling Number of Goroutines
// LimitedGoroutinesExample shows how to limit concurrent goroutines
func LimitedGoroutinesExample() {
fmt.Println("\n--- Limited Concurrent Goroutines ---")
maxConcurrent := 2
semaphore := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
tasks := []string{"Task1", "Task2", "Task3", "Task4", "Task5"}
for _, task := range tasks {
wg.Add(1)
// Acquire semaphore
semaphore <- struct{}{}
go func(taskName string) {
defer wg.Done()
defer func() { <-semaphore }() // Release semaphore
fmt.Printf("%s started\n", taskName)
time.Sleep(300 * time.Millisecond)
fmt.Printf("%s completed\n", taskName)
}(task)
}
wg.Wait()
fmt.Println("All tasks completed")
}
// 8. Goroutine Lifecycle Management
// ManagedGoroutine manages goroutine lifecycle
func ManagedGoroutine(id int, stopChan <-chan struct{}) {
fmt.Printf("Goroutine %d started\n", id)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("Goroutine %d working\n", id)
case <-stopChan:
fmt.Printf("Goroutine %d stopping\n", id)
return
}
}
}
// GoroutineLifecycleExample demonstrates managing goroutine lifecycle
func GoroutineLifecycleExample() {
fmt.Println("\n--- Goroutine Lifecycle Management ---")
stopChan := make(chan struct{})
// Start managed goroutine
go ManagedGoroutine(1, stopChan)
// Let it run for 500ms
time.Sleep(500 * time.Millisecond)
// Stop the goroutine
close(stopChan)
time.Sleep(100 * time.Millisecond)
fmt.Println("Goroutine stopped")
}
// 9. Goroutine Panic Recovery
// safeGoroutine demonstrates panic recovery in goroutines
func safeGoroutine(id int, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("Goroutine %d recovered from panic: %v\n", id, r)
}
}()
if id == 2 {
panic("Intentional panic in goroutine 2")
}
fmt.Printf("Goroutine %d completed successfully\n", id)
}
// PanicRecoveryExample shows how to handle panics in goroutines
func PanicRecoveryExample() {
fmt.Println("\n--- Panic Recovery in Goroutines ---")
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go safeGoroutine(i, &wg)
}
wg.Wait()
fmt.Println("All goroutines completed (with recovery)")
}
// 10. Goroutine Statistics
// GoroutineStatsExample shows goroutine statistics
func GoroutineStatsExample() {
fmt.Println("\n--- Goroutine Statistics ---")
// Get initial number of goroutines
startingCount := runtime.NumGoroutine()
fmt.Printf("Initial goroutine count: %d\n", startingCount)
// Launch some goroutines
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
}(i)
}
runningCount := runtime.NumGoroutine()
fmt.Printf("Running goroutine count: %d\n", runningCount)
wg.Wait()
finalCount := runtime.NumGoroutine()
fmt.Printf("Final goroutine count: %d\n", finalCount)
}
// 11. Goroutine with Timeout
// workerWithTimeout demonstrates goroutine with timeout
func workerWithTimeout(id int, timeout time.Duration, done chan<- bool) {
select {
case <-time.After(timeout):
fmt.Printf("Worker %d completed within timeout\n", id)
done <- true
case <-time.After(2 * timeout):
fmt.Printf("Worker %d timed out\n", id)
done <- false
}
}
// TimeoutExample demonstrates timeout handling with goroutines
func TimeoutExample() {
fmt.Println("\n--- Goroutine with Timeout ---")
doneChan := make(chan bool)
go workerWithTimeout(1, 300*time.Millisecond, doneChan)
result := <-doneChan
if result {
fmt.Println("Worker completed successfully")
} else {
fmt.Println("Worker timed out")
}
}
// 12. Fan-out Pattern
// processWithFanOut demonstrates fan-out pattern
func processWithFanOut(tasks []string, results chan<- string) {
for _, task := range tasks {
go func(t string) {
time.Sleep(100 * time.Millisecond)
results <- fmt.Sprintf("Processed: %s", t)
}(task)
}
}
// FanOutExample demonstrates fan-out pattern
func FanOutExample() {
fmt.Println("\n--- Fan-out Pattern ---")
tasks := []string{"Task1", "Task2", "Task3", "Task4", "Task5"}
results := make(chan string, len(tasks))
processWithFanOut(tasks, results)
// Collect results
for i := 0; i < len(tasks); i++ {
result := <-results
fmt.Println(result)
}
fmt.Println("All tasks processed")
}
// Main function demonstrating all examples
func main() {
fmt.Println("=== Web Go Goroutines Examples ===\n")
BasicGoroutineExample()
WaitGroupExample()
GoroutineWithParametersExample()
AnonymousGoroutineExample()
GoroutineWithReturnValueExample()
MultipleReturnValuesExample()
LimitedGoroutinesExample()
GoroutineLifecycleExample()
PanicRecoveryExample()
GoroutineStatsExample()
TimeoutExample()
FanOutExample()
fmt.Println("\n=== All Goroutine Examples Completed ===")
}
💻 Synchronisation de Threads go
🟡 intermediate
⭐⭐⭐
Utiliser des mutex, des groupes d'attente et des canaux pour synchroniser les goroutines et protéger les ressources partagées
⏱️ 25 min
🏷️ go, web, multithreading
Prerequisites:
Intermediate Go, sync package, channels
// Web Go Thread Synchronization Examples
// Synchronizing goroutines using mutexes, channels, and other sync primitives
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 1. Mutex (Mutual Exclusion)
// SafeCounter demonstrates using mutex to protect shared data
type SafeCounter struct {
mu sync.Mutex
value int
}
// Increment increments the counter safely
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
// Value returns the current value
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// MutexExample demonstrates basic mutex usage
func MutexExample() {
fmt.Println("--- Mutex Example ---")
counter := SafeCounter{}
var wg sync.WaitGroup
// Launch 100 goroutines to increment counter
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter.Value())
}
// 2. RWMutex (Read-Write Mutex)
// ThreadSafeMap demonstrates using RWMutex for multiple readers
type ThreadSafeMap struct {
mu sync.RWMutex
data map[string]int
}
// NewThreadSafeMap creates a new thread-safe map
func NewThreadSafeMap() *ThreadSafeMap {
return &ThreadSafeMap{
data: make(map[string]int),
}
}
// Get reads a value (allows multiple concurrent readers)
func (m *ThreadSafeMap) Get(key string) (int, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}
// Set writes a value (exclusive lock)
func (m *ThreadSafeMap) Set(key string, value int) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = value
}
// RWMutexExample demonstrates read-write mutex
func RWMutexExample() {
fmt.Println("\n--- RWMutex Example ---")
tsMap := NewThreadSafeMap()
var wg sync.WaitGroup
// Writers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id)
tsMap.Set(key, id*10)
fmt.Printf("Writer: Set %s = %d\n", key, id*10)
}(i)
}
// Readers
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id%5)
if val, ok := tsMap.Get(key); ok {
fmt.Printf("Reader: Got %s = %d\n", key, val)
}
}(i)
}
wg.Wait()
fmt.Println("All readers and writers completed")
}
// 3. Once (Single Execution)
// Singleton demonstrates using sync.Once
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
// GetInstance returns the singleton instance
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "Singleton initialized"}
fmt.Println("Singleton created")
})
return instance
}
// OnceExample demonstrates sync.Once usage
func OnceExample() {
fmt.Println("\n--- Once Example ---")
var wg sync.WaitGroup
// Multiple goroutines trying to get singleton
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
inst := GetInstance()
fmt.Printf("Goroutine %d got instance: %p\n", id, inst)
}(i)
}
wg.Wait()
}
// 4. WaitGroup
// WaitGroupExample demonstrates WaitGroup for waiting on multiple goroutines
func WaitGroupExample() {
fmt.Println("\n--- WaitGroup Example ---")
var wg sync.WaitGroup
tasks := []string{"Download", "Process", "Upload", "Verify"}
for _, task := range tasks {
wg.Add(1)
go func(taskName string) {
defer wg.Done()
fmt.Printf("%s started\n", taskName)
time.Sleep(200 * time.Millisecond)
fmt.Printf("%s completed\n", taskName)
}(task)
}
wg.Wait()
fmt.Println("All tasks completed")
}
// 5. Channels for Synchronization
// ChannelSyncExample demonstrates using channels for synchronization
func ChannelSyncExample() {
fmt.Println("\n--- Channel Synchronization Example ---")
done := make(chan bool)
// Worker goroutine
go func() {
fmt.Println("Worker started")
time.Sleep(500 * time.Millisecond)
fmt.Println("Worker completed")
done <- true
}()
fmt.Println("Main waiting for worker...")
<-done
fmt.Println("Main received completion signal")
}
// 6. Buffered Channels
// BufferedChannelExample demonstrates buffered channels
func BufferedChannelExample() {
fmt.Println("\n--- Buffered Channel Example ---")
// Create buffered channel with capacity 3
ch := make(chan string, 3)
// Producer
go func() {
for i := 1; i <= 5; i++ {
msg := fmt.Sprintf("Message %d", i)
ch <- msg
fmt.Printf("Sent: %s\n", msg)
}
close(ch)
}()
// Consumer
time.Sleep(100 * time.Millisecond) // Let producer send some messages
for msg := range ch {
fmt.Printf("Received: %s\n", msg)
time.Sleep(100 * time.Millisecond)
}
}
// 7. Select Statement
// SelectExample demonstrates select for multiple channel operations
func SelectExample() {
fmt.Println("\n--- Select Statement Example ---")
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "Channel 1 message"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "Channel 2 message"
}()
// Receive from whichever channel is ready first
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Printf("Received from ch1: %s\n", msg)
case msg := <-ch2:
fmt.Printf("Received from ch2: %s\n", msg)
}
}
}
// 8. Select with Timeout
// SelectTimeoutExample demonstrates timeout with select
func SelectTimeoutExample() {
fmt.Println("\n--- Select with Timeout Example ---")
ch := make(chan string)
go func() {
time.Sleep(500 * time.Millisecond)
ch <- "Delayed message"
}()
select {
case msg := <-ch:
fmt.Printf("Received: %s\n", msg)
case <-time.After(200 * time.Millisecond):
fmt.Println("Timeout occurred")
}
}
// 9. Atomic Operations
// AtomicExample demonstrates atomic operations
func AtomicExample() {
fmt.Println("\n--- Atomic Operations Example ---")
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Printf("Final counter value (atomic): %d\n", atomic.LoadInt64(&counter))
}
// 10. Condition Variables
// CondExample demonstrates condition variables
func CondExample() {
fmt.Println("\n--- Condition Variable Example ---")
var mu sync.Mutex
cond := sync.NewCond(&mu)
ready := false
// Worker goroutine
go func() {
mu.Lock()
for !ready {
cond.Wait()
}
fmt.Println("Worker: Condition met, proceeding...")
mu.Unlock()
}()
// Main goroutine
time.Sleep(100 * time.Millisecond)
mu.Lock()
ready = true
fmt.Println("Main: Signaling condition")
cond.Signal()
mu.Unlock()
time.Sleep(100 * time.Millisecond)
}
var cond *sync.Cond
// 11. ErrGroup (Error Group)
// ErrGroupExample demonstrates error handling with goroutines
func ErrGroupExample() {
fmt.Println("\n--- ErrGroup Example ---")
g, ctx := sync.ErrGroup(ctx.Background())
// Launch multiple goroutines
for i := 1; i <= 3; i++ {
id := i
g.Go(func() error {
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
if id == 2 {
return fmt.Errorf("error in goroutine %d", id)
}
fmt.Printf("Goroutine %d completed\n", id)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("ErrGroup finished with error: %v\n", err)
}
}
// 12. Worker Pool Pattern
// WorkerPoolExample demonstrates worker pool pattern
func WorkerPoolExample() {
fmt.Println("\n--- Worker Pool Example ---")
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start 3 workers
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// Send 5 jobs
for j := 1; j <= 5; j++ {
jobs <- j
fmt.Printf("Sent job %d\n", j)
}
close(jobs)
// Collect results
for j := 1; j <= 5; j++ {
result := <-results
fmt.Printf("Received result %d\n", result)
}
}
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}
// Main function
func main() {
fmt.Println("=== Web Go Thread Synchronization Examples ===\n")
MutexExample()
RWMutexExample()
OnceExample()
WaitGroupExample()
ChannelSyncExample()
BufferedChannelExample()
SelectExample()
SelectTimeoutExample()
AtomicExample()
CondExample()
ErrGroupExample()
WorkerPoolExample()
fmt.Println("\n=== All Thread Synchronization Examples Completed ===")
}
💻 Worker Pools go
🔴 complex
⭐⭐⭐⭐
Implémenter des worker pools pour le traitement concurrent efficace des tâches
⏱️ 30 min
🏷️ go, web, multithreading
Prerequisites:
Advanced Go, channels, sync package
// Web Go Worker Pool Examples
// Implementing worker pools for efficient concurrent task processing
package main
import (
"fmt"
"sync"
"time"
)
// 1. Basic Worker Pool
// Job represents a unit of work
type Job struct {
ID int
Data string
}
// Result represents the result of a job
type Result struct {
JobID int
Output string
Error error
}
// WorkerPool represents a pool of workers
type WorkerPool struct {
workerCount int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(workerCount int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
}
}
// Start starts the worker pool
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// worker processes jobs from the job channel
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
// Simulate work
time.Sleep(100 * time.Millisecond)
// Send result
wp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed by worker %d", job.ID, id),
}
}
}
// Submit submits a job to the pool
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
// Stop stops the worker pool
func (wp *WorkerPool) Stop() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
// Results returns a channel for reading results
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
// BasicWorkerPoolExample demonstrates basic worker pool
func BasicWorkerPoolExample() {
fmt.Println("--- Basic Worker Pool Example ---")
pool := NewWorkerPool(3)
pool.Start()
// Submit jobs
for i := 1; i <= 5; i++ {
job := Job{
ID: i,
Data: fmt.Sprintf("Task %d", i),
}
pool.Submit(job)
}
// Wait for all results to be processed
go func() {
time.Sleep(time.Second)
pool.Stop()
}()
// Collect results
for range pool.Results() {
// Results consumed
}
fmt.Println("All jobs completed")
}
// 2. Worker Pool with Error Handling
// JobWithErr represents a job that might fail
type JobWithErr struct {
ID int
ShouldFail bool
}
// WorkerPoolWithError handles jobs with error tracking
type WorkerPoolWithError struct {
workerCount int
jobs chan JobWithErr
results chan Result
errors chan error
wg sync.WaitGroup
}
// NewWorkerPoolWithError creates a new worker pool with error handling
func NewWorkerPoolWithError(workerCount int) *WorkerPoolWithError {
return &WorkerPoolWithError{
workerCount: workerCount,
jobs: make(chan JobWithErr, 100),
results: make(chan Result, 100),
errors: make(chan error, 100),
}
}
// Start starts the worker pool
func (wp *WorkerPoolWithError) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// worker processes jobs with error handling
func (wp *WorkerPoolWithError) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
time.Sleep(100 * time.Millisecond)
if job.ShouldFail {
wp.errors <- fmt.Errorf("job %d failed", job.ID)
} else {
wp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed successfully", job.ID),
}
}
}
}
// Submit submits a job to the pool
func (wp *WorkerPoolWithError) Submit(job JobWithErr) {
wp.jobs <- job
}
// Stop stops the worker pool
func (wp *WorkerPoolWithError) Stop() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
close(wp.errors)
}
// WorkerPoolWithErrorExample demonstrates worker pool with error handling
func WorkerPoolWithErrorExample() {
fmt.Println("\n--- Worker Pool with Error Handling ---")
pool := NewWorkerPoolWithError(2)
pool.Start()
// Submit jobs
for i := 1; i <= 5; i++ {
job := JobWithErr{
ID: i,
ShouldFail: i%3 == 0, // Every 3rd job fails
}
pool.Submit(job)
}
// Wait for processing
go func() {
time.Sleep(time.Second)
pool.Stop()
}()
// Collect results
for {
select {
case result, ok := <-pool.results:
if !ok {
pool.results = nil
} else {
fmt.Printf("Success: %s\n", result.Output)
}
case err, ok := <-pool.errors:
if !ok {
pool.errors = nil
} else {
fmt.Printf("Error: %v\n", err)
}
}
if pool.results == nil && pool.errors == nil {
break
}
}
fmt.Println("Worker pool stopped")
}
// 3. Dynamic Worker Pool
// DynamicWorkerPool can resize the number of workers
type DynamicWorkerPool struct {
minWorkers int
maxWorkers int
jobs chan Job
results chan Result
workerCount int
mu sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// NewDynamicWorkerPool creates a new dynamic worker pool
func NewDynamicWorkerPool(minWorkers, maxWorkers int) *DynamicWorkerPool {
pool := &DynamicWorkerPool{
minWorkers: minWorkers,
maxWorkers: maxWorkers,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
quit: make(chan struct{}),
}
// Start minimum workers
for i := 0; i < minWorkers; i++ {
pool.addWorker()
}
return pool
}
// addWorker adds a new worker
func (dp *DynamicWorkerPool) addWorker() {
dp.wg.Add(1)
dp.workerCount++
go func() {
defer func() {
dp.wg.Done()
dp.mu.Lock()
dp.workerCount--
dp.mu.Unlock()
}()
for {
select {
case job, ok := <-dp.jobs:
if !ok {
return
}
fmt.Printf("Worker processing job %d\n", job.ID)
time.Sleep(100 * time.Millisecond)
dp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed", job.ID),
}
case <-dp.quit:
return
}
}
}()
}
// ScaleUp increases the number of workers
func (dp *DynamicWorkerPool) ScaleUp(count int) {
dp.mu.Lock()
defer dp.mu.Unlock()
currentWorkers := dp.workerCount
available := dp.maxWorkers - currentWorkers
if available <= 0 {
fmt.Println("Already at max workers")
return
}
toAdd := count
if toAdd > available {
toAdd = available
}
for i := 0; i < toAdd; i++ {
dp.addWorker()
}
fmt.Printf("Scaled up from %d to %d workers\n", currentWorkers, dp.workerCount)
}
// ScaleDown decreases the number of workers
func (dp *DynamicWorkerPool) ScaleDown(count int) {
dp.mu.Lock()
defer dp.mu.Unlock()
currentWorkers := dp.workerCount
available := currentWorkers - dp.minWorkers
if available <= 0 {
fmt.Println("Already at min workers")
return
}
toRemove := count
if toRemove > available {
toRemove = available
}
// Signal workers to quit
for i := 0; i < toRemove; i++ {
dp.quit <- struct{}{}
}
fmt.Printf("Scaled down from %d to %d workers\n", currentWorkers, currentWorkers-toRemove)
}
// Submit submits a job to the pool
func (dp *DynamicWorkerPool) Submit(job Job) {
dp.jobs <- job
}
// Stop stops the worker pool
func (dp *DynamicWorkerPool) Stop() {
close(dp.jobs)
dp.wg.Wait()
close(dp.results)
}
// GetWorkerCount returns current worker count
func (dp *DynamicWorkerPool) GetWorkerCount() int {
dp.mu.Lock()
defer dp.mu.Unlock()
return dp.workerCount
}
// DynamicWorkerPoolExample demonstrates dynamic worker pool
func DynamicWorkerPoolExample() {
fmt.Println("\n--- Dynamic Worker Pool Example ---")
pool := NewDynamicWorkerPool(2, 5)
fmt.Printf("Initial workers: %d\n", pool.GetWorkerCount())
// Scale up
pool.ScaleUp(2)
fmt.Printf("After scale up: %d\n", pool.GetWorkerCount())
// Submit jobs
for i := 1; i <= 10; i++ {
pool.Submit(Job{ID: i, Data: fmt.Sprintf("Task %d", i)})
}
// Collect some results
for i := 0; i < 5; i++ {
<-pool.results
}
// Scale down
pool.ScaleDown(2)
fmt.Printf("After scale down: %d\n", pool.GetWorkerCount())
// Stop pool
go func() {
time.Sleep(500 * time.Millisecond)
pool.Stop()
}()
for range pool.results {
// Drain remaining results
}
fmt.Println("Dynamic worker pool stopped")
}
// 4. Worker Pool with Context
// Context-aware worker pool
type ContextWorkerPool struct {
workerCount int
jobs chan Job
results chan Result
cancel chan struct{}
wg sync.WaitGroup
}
// NewContextWorkerPool creates a context-aware worker pool
func NewContextWorkerPool(workerCount int) *ContextWorkerPool {
return &ContextWorkerPool{
workerCount: workerCount,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
cancel: make(chan struct{}),
}
}
// Start starts the worker pool
func (cp *ContextWorkerPool) Start() {
for i := 0; i < cp.workerCount; i++ {
cp.wg.Add(1)
go cp.worker(i)
}
}
// worker processes jobs with context awareness
func (cp *ContextWorkerPool) worker(id int) {
defer cp.wg.Done()
for {
select {
case job, ok := <-cp.jobs:
if !ok {
return
}
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
// Check for cancellation
select {
case <-cp.cancel:
fmt.Printf("Worker %d: job %d cancelled\n", id, job.ID)
return
default:
}
time.Sleep(100 * time.Millisecond)
cp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed", job.ID),
}
case <-cp.cancel:
fmt.Printf("Worker %d: received cancel signal\n", id)
return
}
}
}
// Submit submits a job to the pool
func (cp *ContextWorkerPool) Submit(job Job) {
cp.jobs <- job
}
// Cancel cancels all workers
func (cp *ContextWorkerPool) Cancel() {
close(cp.cancel)
}
// Stop stops the worker pool
func (cp *ContextWorkerPool) Stop() {
close(cp.jobs)
cp.wg.Wait()
close(cp.results)
}
// ContextWorkerPoolExample demonstrates context-aware worker pool
func ContextWorkerPoolExample() {
fmt.Println("\n--- Context Worker Pool Example ---")
pool := NewContextWorkerPool(3)
pool.Start()
// Submit jobs
for i := 1; i <= 5; i++ {
pool.Submit(Job{ID: i, Data: fmt.Sprintf("Task %d", i)})
}
// Let some work complete
time.Sleep(200 * time.Millisecond)
// Cancel remaining work
fmt.Println("Cancelling worker pool...")
pool.Cancel()
time.Sleep(100 * time.Millisecond)
pool.Stop()
fmt.Println("Context worker pool cancelled and stopped")
}
// Main function
func main() {
fmt.Println("=== Web Go Worker Pool Examples ===\n")
BasicWorkerPoolExample()
WorkerPoolWithErrorExample()
DynamicWorkerPoolExample()
ContextWorkerPoolExample()
fmt.Println("\n=== All Worker Pool Examples Completed ===")
}