🎯 Ejemplos recomendados
Balanced sample collections from various categories for you to explore
Ejemplos de Multithreading Web Go
Ejemplos de multithreading Web Go incluyendo goroutines, sincronización con mutexes y worker pools
💻 Crear Goroutines go
🟢 simple
⭐⭐
Crear e iniciar goroutines (hilos ligeros) para ejecución concurrente
⏱️ 20 min
🏷️ go, web, multithreading
Prerequisites:
Basic Go, sync package, channels
// Web Go Goroutines Examples
// Creating and managing goroutines for concurrent execution
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// 1. Basic Goroutine
// simpleGoroutine demonstrates a basic goroutine
func simpleGoroutine(id int) {
fmt.Printf("Goroutine %d is running\n", id)
}
// BasicGoroutineExample shows how to create a basic goroutine
func BasicGoroutineExample() {
fmt.Println("--- Basic Goroutine ---")
// Launch multiple goroutines
for i := 1; i <= 3; i++ {
go simpleGoroutine(i)
}
// Wait for goroutines to finish
time.Sleep(time.Second)
fmt.Println("Main function completed")
}
// 2. Goroutine with WaitGroup
// workerWithWaitGroup demonstrates using WaitGroup
func workerWithWaitGroup(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
// Simulate work
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d completed\n", id)
}
// WaitGroupExample demonstrates using WaitGroup to wait for goroutines
func WaitGroupExample() {
fmt.Println("\n--- Goroutine with WaitGroup ---")
var wg sync.WaitGroup
// Launch 3 workers
for i := 1; i <= 3; i++ {
wg.Add(1)
go workerWithWaitGroup(i, &wg)
}
// Wait for all workers to complete
wg.Wait()
fmt.Println("All workers completed")
}
// 3. Goroutine with Parameters
// processTask processes a task with parameters
func processTask(taskID int, taskName string, duration time.Duration) {
fmt.Printf("Task %d (%s) started\n", taskID, taskName)
time.Sleep(duration)
fmt.Printf("Task %d (%s) completed in %v\n", taskID, taskName, duration)
}
// GoroutineWithParametersExample shows passing parameters to goroutines
func GoroutineWithParametersExample() {
fmt.Println("\n--- Goroutine with Parameters ---")
// Launch goroutines with different parameters
go processTask(1, "Download", 300*time.Millisecond)
go processTask(2, "Process", 500*time.Millisecond)
go processTask(3, "Upload", 200*time.Millisecond)
// Wait for completion
time.Sleep(time.Second)
}
// 4. Anonymous Goroutine
// AnonymousGoroutineExample demonstrates anonymous goroutines
func AnonymousGoroutineExample() {
fmt.Println("\n--- Anonymous Goroutine ---")
for i := 1; i <= 3; i++ {
go func(id int) {
fmt.Printf("Anonymous goroutine %d\n", id)
}(i)
}
time.Sleep(500 * time.Millisecond)
}
// 5. Goroutine with Return Values (using channels)
// computeResult performs computation and returns result via channel
func computeResult(a, b int, resultChan chan<- int) {
result := a + b
resultChan <- result
}
// GoroutineWithReturnValueExample shows getting return values from goroutines
func GoroutineWithReturnValueExample() {
fmt.Println("\n--- Goroutine with Return Values ---")
resultChan := make(chan int)
go computeResult(10, 20, resultChan)
// Wait for result
result := <-resultChan
fmt.Printf("Result from goroutine: %d\n", result)
}
// 6. Multiple Goroutines with Multiple Return Values
// divideAndRemainder calculates division and remainder
func divideAndRemainder(a, b int, resultChan chan<- [2]int) {
quotient := a / b
remainder := a % b
resultChan <- [2]int{quotient, remainder}
}
// MultipleReturnValuesExample demonstrates multiple return values
func MultipleReturnValuesExample() {
fmt.Println("\n--- Multiple Return Values ---")
resultChan := make(chan [2]int)
go divideAndRemainder(17, 5, resultChan)
result := <-resultChan
fmt.Printf("Quotient: %d, Remainder: %d\n", result[0], result[1])
}
// 7. Controlling Number of Goroutines
// LimitedGoroutinesExample shows how to limit concurrent goroutines
func LimitedGoroutinesExample() {
fmt.Println("\n--- Limited Concurrent Goroutines ---")
maxConcurrent := 2
semaphore := make(chan struct{}, maxConcurrent)
var wg sync.WaitGroup
tasks := []string{"Task1", "Task2", "Task3", "Task4", "Task5"}
for _, task := range tasks {
wg.Add(1)
// Acquire semaphore
semaphore <- struct{}{}
go func(taskName string) {
defer wg.Done()
defer func() { <-semaphore }() // Release semaphore
fmt.Printf("%s started\n", taskName)
time.Sleep(300 * time.Millisecond)
fmt.Printf("%s completed\n", taskName)
}(task)
}
wg.Wait()
fmt.Println("All tasks completed")
}
// 8. Goroutine Lifecycle Management
// ManagedGoroutine manages goroutine lifecycle
func ManagedGoroutine(id int, stopChan <-chan struct{}) {
fmt.Printf("Goroutine %d started\n", id)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("Goroutine %d working\n", id)
case <-stopChan:
fmt.Printf("Goroutine %d stopping\n", id)
return
}
}
}
// GoroutineLifecycleExample demonstrates managing goroutine lifecycle
func GoroutineLifecycleExample() {
fmt.Println("\n--- Goroutine Lifecycle Management ---")
stopChan := make(chan struct{})
// Start managed goroutine
go ManagedGoroutine(1, stopChan)
// Let it run for 500ms
time.Sleep(500 * time.Millisecond)
// Stop the goroutine
close(stopChan)
time.Sleep(100 * time.Millisecond)
fmt.Println("Goroutine stopped")
}
// 9. Goroutine Panic Recovery
// safeGoroutine demonstrates panic recovery in goroutines
func safeGoroutine(id int, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
fmt.Printf("Goroutine %d recovered from panic: %v\n", id, r)
}
}()
if id == 2 {
panic("Intentional panic in goroutine 2")
}
fmt.Printf("Goroutine %d completed successfully\n", id)
}
// PanicRecoveryExample shows how to handle panics in goroutines
func PanicRecoveryExample() {
fmt.Println("\n--- Panic Recovery in Goroutines ---")
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go safeGoroutine(i, &wg)
}
wg.Wait()
fmt.Println("All goroutines completed (with recovery)")
}
// 10. Goroutine Statistics
// GoroutineStatsExample shows goroutine statistics
func GoroutineStatsExample() {
fmt.Println("\n--- Goroutine Statistics ---")
// Get initial number of goroutines
startingCount := runtime.NumGoroutine()
fmt.Printf("Initial goroutine count: %d\n", startingCount)
// Launch some goroutines
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
}(i)
}
runningCount := runtime.NumGoroutine()
fmt.Printf("Running goroutine count: %d\n", runningCount)
wg.Wait()
finalCount := runtime.NumGoroutine()
fmt.Printf("Final goroutine count: %d\n", finalCount)
}
// 11. Goroutine with Timeout
// workerWithTimeout demonstrates goroutine with timeout
func workerWithTimeout(id int, timeout time.Duration, done chan<- bool) {
select {
case <-time.After(timeout):
fmt.Printf("Worker %d completed within timeout\n", id)
done <- true
case <-time.After(2 * timeout):
fmt.Printf("Worker %d timed out\n", id)
done <- false
}
}
// TimeoutExample demonstrates timeout handling with goroutines
func TimeoutExample() {
fmt.Println("\n--- Goroutine with Timeout ---")
doneChan := make(chan bool)
go workerWithTimeout(1, 300*time.Millisecond, doneChan)
result := <-doneChan
if result {
fmt.Println("Worker completed successfully")
} else {
fmt.Println("Worker timed out")
}
}
// 12. Fan-out Pattern
// processWithFanOut demonstrates fan-out pattern
func processWithFanOut(tasks []string, results chan<- string) {
for _, task := range tasks {
go func(t string) {
time.Sleep(100 * time.Millisecond)
results <- fmt.Sprintf("Processed: %s", t)
}(task)
}
}
// FanOutExample demonstrates fan-out pattern
func FanOutExample() {
fmt.Println("\n--- Fan-out Pattern ---")
tasks := []string{"Task1", "Task2", "Task3", "Task4", "Task5"}
results := make(chan string, len(tasks))
processWithFanOut(tasks, results)
// Collect results
for i := 0; i < len(tasks); i++ {
result := <-results
fmt.Println(result)
}
fmt.Println("All tasks processed")
}
// Main function demonstrating all examples
func main() {
fmt.Println("=== Web Go Goroutines Examples ===\n")
BasicGoroutineExample()
WaitGroupExample()
GoroutineWithParametersExample()
AnonymousGoroutineExample()
GoroutineWithReturnValueExample()
MultipleReturnValuesExample()
LimitedGoroutinesExample()
GoroutineLifecycleExample()
PanicRecoveryExample()
GoroutineStatsExample()
TimeoutExample()
FanOutExample()
fmt.Println("\n=== All Goroutine Examples Completed ===")
}
💻 Sincronización de Hilos go
🟡 intermediate
⭐⭐⭐
Usar mutexes, grupos de espera y canales para sincronizar goroutines y proteger recursos compartidos
⏱️ 25 min
🏷️ go, web, multithreading
Prerequisites:
Intermediate Go, sync package, channels
// Web Go Thread Synchronization Examples
// Synchronizing goroutines using mutexes, channels, and other sync primitives
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 1. Mutex (Mutual Exclusion)
// SafeCounter demonstrates using mutex to protect shared data
type SafeCounter struct {
mu sync.Mutex
value int
}
// Increment increments the counter safely
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
// Value returns the current value
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// MutexExample demonstrates basic mutex usage
func MutexExample() {
fmt.Println("--- Mutex Example ---")
counter := SafeCounter{}
var wg sync.WaitGroup
// Launch 100 goroutines to increment counter
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("Final counter value: %d\n", counter.Value())
}
// 2. RWMutex (Read-Write Mutex)
// ThreadSafeMap demonstrates using RWMutex for multiple readers
type ThreadSafeMap struct {
mu sync.RWMutex
data map[string]int
}
// NewThreadSafeMap creates a new thread-safe map
func NewThreadSafeMap() *ThreadSafeMap {
return &ThreadSafeMap{
data: make(map[string]int),
}
}
// Get reads a value (allows multiple concurrent readers)
func (m *ThreadSafeMap) Get(key string) (int, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}
// Set writes a value (exclusive lock)
func (m *ThreadSafeMap) Set(key string, value int) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = value
}
// RWMutexExample demonstrates read-write mutex
func RWMutexExample() {
fmt.Println("\n--- RWMutex Example ---")
tsMap := NewThreadSafeMap()
var wg sync.WaitGroup
// Writers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id)
tsMap.Set(key, id*10)
fmt.Printf("Writer: Set %s = %d\n", key, id*10)
}(i)
}
// Readers
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id%5)
if val, ok := tsMap.Get(key); ok {
fmt.Printf("Reader: Got %s = %d\n", key, val)
}
}(i)
}
wg.Wait()
fmt.Println("All readers and writers completed")
}
// 3. Once (Single Execution)
// Singleton demonstrates using sync.Once
type Singleton struct {
data string
}
var (
instance *Singleton
once sync.Once
)
// GetInstance returns the singleton instance
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{data: "Singleton initialized"}
fmt.Println("Singleton created")
})
return instance
}
// OnceExample demonstrates sync.Once usage
func OnceExample() {
fmt.Println("\n--- Once Example ---")
var wg sync.WaitGroup
// Multiple goroutines trying to get singleton
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
inst := GetInstance()
fmt.Printf("Goroutine %d got instance: %p\n", id, inst)
}(i)
}
wg.Wait()
}
// 4. WaitGroup
// WaitGroupExample demonstrates WaitGroup for waiting on multiple goroutines
func WaitGroupExample() {
fmt.Println("\n--- WaitGroup Example ---")
var wg sync.WaitGroup
tasks := []string{"Download", "Process", "Upload", "Verify"}
for _, task := range tasks {
wg.Add(1)
go func(taskName string) {
defer wg.Done()
fmt.Printf("%s started\n", taskName)
time.Sleep(200 * time.Millisecond)
fmt.Printf("%s completed\n", taskName)
}(task)
}
wg.Wait()
fmt.Println("All tasks completed")
}
// 5. Channels for Synchronization
// ChannelSyncExample demonstrates using channels for synchronization
func ChannelSyncExample() {
fmt.Println("\n--- Channel Synchronization Example ---")
done := make(chan bool)
// Worker goroutine
go func() {
fmt.Println("Worker started")
time.Sleep(500 * time.Millisecond)
fmt.Println("Worker completed")
done <- true
}()
fmt.Println("Main waiting for worker...")
<-done
fmt.Println("Main received completion signal")
}
// 6. Buffered Channels
// BufferedChannelExample demonstrates buffered channels
func BufferedChannelExample() {
fmt.Println("\n--- Buffered Channel Example ---")
// Create buffered channel with capacity 3
ch := make(chan string, 3)
// Producer
go func() {
for i := 1; i <= 5; i++ {
msg := fmt.Sprintf("Message %d", i)
ch <- msg
fmt.Printf("Sent: %s\n", msg)
}
close(ch)
}()
// Consumer
time.Sleep(100 * time.Millisecond) // Let producer send some messages
for msg := range ch {
fmt.Printf("Received: %s\n", msg)
time.Sleep(100 * time.Millisecond)
}
}
// 7. Select Statement
// SelectExample demonstrates select for multiple channel operations
func SelectExample() {
fmt.Println("\n--- Select Statement Example ---")
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "Channel 1 message"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "Channel 2 message"
}()
// Receive from whichever channel is ready first
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Printf("Received from ch1: %s\n", msg)
case msg := <-ch2:
fmt.Printf("Received from ch2: %s\n", msg)
}
}
}
// 8. Select with Timeout
// SelectTimeoutExample demonstrates timeout with select
func SelectTimeoutExample() {
fmt.Println("\n--- Select with Timeout Example ---")
ch := make(chan string)
go func() {
time.Sleep(500 * time.Millisecond)
ch <- "Delayed message"
}()
select {
case msg := <-ch:
fmt.Printf("Received: %s\n", msg)
case <-time.After(200 * time.Millisecond):
fmt.Println("Timeout occurred")
}
}
// 9. Atomic Operations
// AtomicExample demonstrates atomic operations
func AtomicExample() {
fmt.Println("\n--- Atomic Operations Example ---")
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Printf("Final counter value (atomic): %d\n", atomic.LoadInt64(&counter))
}
// 10. Condition Variables
// CondExample demonstrates condition variables
func CondExample() {
fmt.Println("\n--- Condition Variable Example ---")
var mu sync.Mutex
cond = sync.NewCond(&mu)
ready bool
// Worker goroutine
go func() {
mu.Lock()
for !ready {
cond.Wait()
}
fmt.Println("Worker: Condition met, proceeding...")
mu.Unlock()
}()
// Main goroutine
time.Sleep(100 * time.Millisecond)
mu.Lock()
ready = true
fmt.Println("Main: Signaling condition")
cond.Signal()
mu.Unlock()
time.Sleep(100 * time.Millisecond)
}
var cond *sync.Cond
// 11. ErrGroup (Error Group)
// ErrGroupExample demonstrates error handling with goroutines
func ErrGroupExample() {
fmt.Println("\n--- ErrGroup Example ---")
g, ctx := sync.ErrGroup(ctx.Background())
// Launch multiple goroutines
for i := 1; i <= 3; i++ {
id := i
g.Go(func() error {
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
if id == 2 {
return fmt.Errorf("error in goroutine %d", id)
}
fmt.Printf("Goroutine %d completed\n", id)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("ErrGroup finished with error: %v\n", err)
}
}
// 12. Worker Pool Pattern
// WorkerPoolExample demonstrates worker pool pattern
func WorkerPoolExample() {
fmt.Println("\n--- Worker Pool Example ---")
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start 3 workers
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// Send 5 jobs
for j := 1; j <= 5; j++ {
jobs <- j
fmt.Printf("Sent job %d\n", j)
}
close(jobs)
// Collect results
for j := 1; j <= 5; j++ {
result := <-results
fmt.Printf("Received result %d\n", result)
}
}
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}
// Main function
func main() {
fmt.Println("=== Web Go Thread Synchronization Examples ===\n")
MutexExample()
RWMutexExample()
OnceExample()
WaitGroupExample()
ChannelSyncExample()
BufferedChannelExample()
SelectExample()
SelectTimeoutExample()
AtomicExample()
CondExample()
ErrGroupExample()
WorkerPoolExample()
fmt.Println("\n=== All Thread Synchronization Examples Completed ===")
}
💻 Worker Pools go
🔴 complex
⭐⭐⭐⭐
Implementar worker pools para procesamiento concurrente eficiente de tareas
⏱️ 30 min
🏷️ go, web, multithreading
Prerequisites:
Advanced Go, channels, sync package
// Web Go Worker Pool Examples
// Implementing worker pools for efficient concurrent task processing
package main
import (
"fmt"
"sync"
"time"
)
// 1. Basic Worker Pool
// Job represents a unit of work
type Job struct {
ID int
Data string
}
// Result represents the result of a job
type Result struct {
JobID int
Output string
Error error
}
// WorkerPool represents a pool of workers
type WorkerPool struct {
workerCount int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool(workerCount int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
}
}
// Start starts the worker pool
func (wp *WorkerPool) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// worker processes jobs from the job channel
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
// Simulate work
time.Sleep(100 * time.Millisecond)
// Send result
wp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed by worker %d", job.ID, id),
}
}
}
// Submit submits a job to the pool
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
// Stop stops the worker pool
func (wp *WorkerPool) Stop() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
}
// Results returns a channel for reading results
func (wp *WorkerPool) Results() <-chan Result {
return wp.results
}
// BasicWorkerPoolExample demonstrates basic worker pool
func BasicWorkerPoolExample() {
fmt.Println("--- Basic Worker Pool Example ---")
pool := NewWorkerPool(3)
pool.Start()
// Submit jobs
for i := 1; i <= 5; i++ {
job := Job{
ID: i,
Data: fmt.Sprintf("Task %d", i),
}
pool.Submit(job)
}
// Wait for all results to be processed
go func() {
time.Sleep(time.Second)
pool.Stop()
}()
// Collect results
for range pool.Results() {
// Results consumed
}
fmt.Println("All jobs completed")
}
// 2. Worker Pool with Error Handling
// JobWithErr represents a job that might fail
type JobWithErr struct {
ID int
ShouldFail bool
}
// WorkerPoolWithError handles jobs with error tracking
type WorkerPoolWithError struct {
workerCount int
jobs chan JobWithErr
results chan Result
errors chan error
wg sync.WaitGroup
}
// NewWorkerPoolWithError creates a new worker pool with error handling
func NewWorkerPoolWithError(workerCount int) *WorkerPoolWithError {
return &WorkerPoolWithError{
workerCount: workerCount,
jobs: make(chan JobWithErr, 100),
results: make(chan Result, 100),
errors: make(chan error, 100),
}
}
// Start starts the worker pool
func (wp *WorkerPoolWithError) Start() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
}
// worker processes jobs with error handling
func (wp *WorkerPoolWithError) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
time.Sleep(100 * time.Millisecond)
if job.ShouldFail {
wp.errors <- fmt.Errorf("job %d failed", job.ID)
} else {
wp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed successfully", job.ID),
}
}
}
}
// Submit submits a job to the pool
func (wp *WorkerPoolWithError) Submit(job JobWithErr) {
wp.jobs <- job
}
// Stop stops the worker pool
func (wp *WorkerPoolWithError) Stop() {
close(wp.jobs)
wp.wg.Wait()
close(wp.results)
close(wp.errors)
}
// WorkerPoolWithErrorExample demonstrates worker pool with error handling
func WorkerPoolWithErrorExample() {
fmt.Println("\n--- Worker Pool with Error Handling ---")
pool := NewWorkerPoolWithError(2)
pool.Start()
// Submit jobs
for i := 1; i <= 5; i++ {
job := JobWithErr{
ID: i,
ShouldFail: i%3 == 0, // Every 3rd job fails
}
pool.Submit(job)
}
// Wait for processing
go func() {
time.Sleep(time.Second)
pool.Stop()
}()
// Collect results
for {
select {
case result, ok := <-pool.results:
if !ok {
pool.results = nil
} else {
fmt.Printf("Success: %s\n", result.Output)
}
case err, ok := <-pool.errors:
if !ok {
pool.errors = nil
} else {
fmt.Printf("Error: %v\n", err)
}
}
if pool.results == nil && pool.errors == nil {
break
}
}
fmt.Println("Worker pool stopped")
}
// 3. Dynamic Worker Pool
// DynamicWorkerPool can resize the number of workers
type DynamicWorkerPool struct {
minWorkers int
maxWorkers int
jobs chan Job
results chan Result
workerCount int
mu sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// NewDynamicWorkerPool creates a new dynamic worker pool
func NewDynamicWorkerPool(minWorkers, maxWorkers int) *DynamicWorkerPool {
pool := &DynamicWorkerPool{
minWorkers: minWorkers,
maxWorkers: maxWorkers,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
quit: make(chan struct{}),
}
// Start minimum workers
for i := 0; i < minWorkers; i++ {
pool.addWorker()
}
return pool
}
// addWorker adds a new worker
func (dp *DynamicWorkerPool) addWorker() {
dp.wg.Add(1)
dp.workerCount++
go func() {
defer func() {
dp.wg.Done()
dp.mu.Lock()
dp.workerCount--
dp.mu.Unlock()
}()
for {
select {
case job, ok := <-dp.jobs:
if !ok {
return
}
fmt.Printf("Worker processing job %d\n", job.ID)
time.Sleep(100 * time.Millisecond)
dp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed", job.ID),
}
case <-dp.quit:
return
}
}
}()
}
// ScaleUp increases the number of workers
func (dp *DynamicWorkerPool) ScaleUp(count int) {
dp.mu.Lock()
defer dp.mu.Unlock()
currentWorkers := dp.workerCount
available := dp.maxWorkers - currentWorkers
if available <= 0 {
fmt.Println("Already at max workers")
return
}
toAdd := count
if toAdd > available {
toAdd = available
}
for i := 0; i < toAdd; i++ {
dp.addWorker()
}
fmt.Printf("Scaled up from %d to %d workers\n", currentWorkers, dp.workerCount)
}
// ScaleDown decreases the number of workers
func (dp *DynamicWorkerPool) ScaleDown(count int) {
dp.mu.Lock()
defer dp.mu.Unlock()
currentWorkers := dp.workerCount
available := currentWorkers - dp.minWorkers
if available <= 0 {
fmt.Println("Already at min workers")
return
}
toRemove := count
if toRemove > available {
toRemove = available
}
// Signal workers to quit
for i := 0; i < toRemove; i++ {
dp.quit <- struct{}{}
}
fmt.Printf("Scaled down from %d to %d workers\n", currentWorkers, currentWorkers-toRemove)
}
// Submit submits a job to the pool
func (dp *DynamicWorkerPool) Submit(job Job) {
dp.jobs <- job
}
// Stop stops the worker pool
func (dp *DynamicWorkerPool) Stop() {
close(dp.jobs)
dp.wg.Wait()
close(dp.results)
}
// GetWorkerCount returns current worker count
func (dp *DynamicWorkerPool) GetWorkerCount() int {
dp.mu.Lock()
defer dp.mu.Unlock()
return dp.workerCount
}
// DynamicWorkerPoolExample demonstrates dynamic worker pool
func DynamicWorkerPoolExample() {
fmt.Println("\n--- Dynamic Worker Pool Example ---")
pool := NewDynamicWorkerPool(2, 5)
fmt.Printf("Initial workers: %d\n", pool.GetWorkerCount())
// Scale up
pool.ScaleUp(2)
fmt.Printf("After scale up: %d\n", pool.GetWorkerCount())
// Submit jobs
for i := 1; i <= 10; i++ {
pool.Submit(Job{ID: i, Data: fmt.Sprintf("Task %d", i)})
}
// Collect some results
for i := 0; i < 5; i++ {
<-pool.results
}
// Scale down
pool.ScaleDown(2)
fmt.Printf("After scale down: %d\n", pool.GetWorkerCount())
// Stop pool
go func() {
time.Sleep(500 * time.Millisecond)
pool.Stop()
}()
for range pool.results {
// Drain remaining results
}
fmt.Println("Dynamic worker pool stopped")
}
// 4. Worker Pool with Context
// Context-aware worker pool
type ContextWorkerPool struct {
workerCount int
jobs chan Job
results chan Result
cancel chan struct{}
wg sync.WaitGroup
}
// NewContextWorkerPool creates a context-aware worker pool
func NewContextWorkerPool(workerCount int) *ContextWorkerPool {
return &ContextWorkerPool{
workerCount: workerCount,
jobs: make(chan Job, 100),
results: make(chan Result, 100),
cancel: make(chan struct{}),
}
}
// Start starts the worker pool
func (cp *ContextWorkerPool) Start() {
for i := 0; i < cp.workerCount; i++ {
cp.wg.Add(1)
go cp.worker(i)
}
}
// worker processes jobs with context awareness
func (cp *ContextWorkerPool) worker(id int) {
defer cp.wg.Done()
for {
select {
case job, ok := <-cp.jobs:
if !ok {
return
}
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
// Check for cancellation
select {
case <-cp.cancel:
fmt.Printf("Worker %d: job %d cancelled\n", id, job.ID)
return
default:
}
time.Sleep(100 * time.Millisecond)
cp.results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job %d processed", job.ID),
}
case <-cp.cancel:
fmt.Printf("Worker %d: received cancel signal\n", id)
return
}
}
}
// Submit submits a job to the pool
func (cp *ContextWorkerPool) Submit(job Job) {
cp.jobs <- job
}
// Cancel cancels all workers
func (cp *ContextWorkerPool) Cancel() {
close(cp.cancel)
}
// Stop stops the worker pool
func (cp *ContextWorkerPool) Stop() {
close(cp.jobs)
cp.wg.Wait()
close(cp.results)
}
// ContextWorkerPoolExample demonstrates context-aware worker pool
func ContextWorkerPoolExample() {
fmt.Println("\n--- Context Worker Pool Example ---")
pool := NewContextWorkerPool(3)
pool.Start()
// Submit jobs
for i := 1; i <= 5; i++ {
pool.Submit(Job{ID: i, Data: fmt.Sprintf("Task %d", i)})
}
// Let some work complete
time.Sleep(200 * time.Millisecond)
// Cancel remaining work
fmt.Println("Cancelling worker pool...")
pool.Cancel()
time.Sleep(100 * time.Millisecond)
pool.Stop()
fmt.Println("Context worker pool cancelled and stopped")
}
// Main function
func main() {
fmt.Println("=== Web Go Worker Pool Examples ===\n")
BasicWorkerPoolExample()
WorkerPoolWithErrorExample()
DynamicWorkerPoolExample()
ContextWorkerPoolExample()
fmt.Println("\n=== All Worker Pool Examples Completed ===")
}