A step-by-step guide to implementing robust concurrency patterns.
In production, a "Hello World" worker pool isn't enough. You need to handle graceful shutdowns, context cancellation, and error tracking. Here is how to write it properly.
Don't just pass raw integers. Create a struct that holds everything a worker needs to know to complete the task.
type Job struct {
ID string
Payload []byte
// Add context if you need per-job deadlines
Ctx context.Context
}
The worker needs to be robust. It must signal when it's done using a WaitGroup so we don't kill it mid-job during a shutdown.
func worker(id int, jobs <-chan Job, wg *sync.WaitGroup) {
defer wg.Done() // Always signal completion
for job := range jobs {
// Process the job
fmt.Printf("Worker %d processing job %s\n", id, job.ID)
// Simulate complex work with error handling
if err := process(job); err != nil {
log.Printf("Job failed: %v", err)
continue
}
}
}
Start your workers before you start accepting traffic. This usually happens in your main() or application initialization.
func startWorkers(numWorkers int) (chan<- Job, *sync.WaitGroup) {
jobs := make(chan Job, 100) // Buffered channel
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, &wg)
}
return jobs, &wg
}
This is the most critical part for production. When the server shuts down, you must stop accepting new jobs and wait for existing jobs to finish.
// In your main function or shutdown handler:
// 1. Close the channel to signal "no more jobs"
close(jobs)
// 2. Wait for all workers to finish their current tasks
wg.Wait()
fmt.Println("All workers finished. Safe to exit.")
wg.Wait() in a timeout using a select block. If a worker hangs forever, you don't want your server to hang forever during shutdown.
Putting it all together in a runnable example.
package main
import (
"fmt"
"sync"
"time"
)
func main() {
// 1. Setup
jobs := make(chan int, 100)
var wg sync.WaitGroup
// 2. Start Workers
for w := 1; w <= 3; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d finished job %d\n", id, j)
}
}(w)
}
// 3. Send Work
for j := 1; j <= 5; j++ {
jobs <- j
}
// 4. Graceful Shutdown
close(jobs) // Signal workers to stop
wg.Wait() // Wait for them to finish
fmt.Println("Shutdown complete")
}