Loading...
Loading...
Go concurrency patterns for high-throughput web applications including worker pools, rate limiting, race detection, and safe shared state management. Use when implementing background task processing, rate limiters, or concurrent request handling.
npx skill4agent add existential-birds/beagle go-concurrency-web| Topic | Reference |
|---|---|
| Worker Pools & errgroup | references/worker-pools.md |
| Rate Limiting | references/rate-limiting.md |
| Race Detection & Fixes | references/race-detection.md |
context.Contextsync.WaitGroupsync.Mutexsync.RWMutexsync/atomicerrgroup// Worker pool for background tasks (e.g., sending emails)
type WorkerPool struct {
jobs chan Job
wg sync.WaitGroup
logger *slog.Logger
}
type Job struct {
ID string
Execute func(ctx context.Context) error
}
func NewWorkerPool(numWorkers int, queueSize int, logger *slog.Logger) *WorkerPool {
wp := &WorkerPool{
jobs: make(chan Job, queueSize),
logger: logger,
}
for i := 0; i < numWorkers; i++ {
wp.wg.Add(1)
go wp.worker(i)
}
return wp
}
func (wp *WorkerPool) worker(id int) {
defer wp.wg.Done()
for job := range wp.jobs {
wp.logger.Info("processing job", "worker", id, "job_id", job.ID)
if err := job.Execute(context.Background()); err != nil {
wp.logger.Error("job failed", "worker", id, "job_id", job.ID, "err", err)
}
}
}
func (wp *WorkerPool) Submit(job Job) {
wp.jobs <- job
}
func (wp *WorkerPool) Shutdown() {
close(wp.jobs)
wp.wg.Wait()
}func (s *Server) handleCreateUser(w http.ResponseWriter, r *http.Request) {
user, err := s.userService.Create(r.Context(), decodeUser(r))
if err != nil {
handleError(w, r, err)
return
}
// Dispatch background task — never spawn raw goroutines in handlers
s.workers.Submit(Job{
ID: "welcome-email-" + user.ID,
Execute: func(ctx context.Context) error {
return s.emailService.SendWelcome(ctx, user)
},
})
writeJSON(w, http.StatusCreated, user)
}errgroupgolang.org/x/time/rate429 Too Many RequestsRetry-Aftergo test -race ./...
go build -race -o myserver ./cmd/server// BAD — shared state without protection
type Server struct {
requestCount int // data race!
}
func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
s.requestCount++ // concurrent writes = race condition
}
// GOOD — use atomic or mutex
type Server struct {
requestCount atomic.Int64
}
func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
s.requestCount.Add(1)
}
// GOOD — use mutex for complex state
type Server struct {
mu sync.RWMutex
cache map[string]*CachedItem
}
func (s *Server) handleGetCached(w http.ResponseWriter, r *http.Request) {
s.mu.RLock()
item, ok := s.cache[r.PathValue("key")]
s.mu.RUnlock()
// ...
}r.Context()*Server*sql.DBsync.Map// BAD — no limit on concurrent goroutines
func (s *Server) handleWebhook(w http.ResponseWriter, r *http.Request) {
go func() {
// What if 10,000 requests arrive at once?
s.processWebhook(r.Context(), decodeWebhook(r))
}()
w.WriteHeader(http.StatusAccepted)
}
// GOOD — use a worker pool
func (s *Server) handleWebhook(w http.ResponseWriter, r *http.Request) {
webhook := decodeWebhook(r)
s.workers.Submit(Job{
ID: "webhook-" + webhook.ID,
Execute: func(ctx context.Context) error {
return s.processWebhook(ctx, webhook)
},
})
w.WriteHeader(http.StatusAccepted)
}// BAD — loses cancellation signal
func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) {
results, err := s.search(context.Background(), r.URL.Query().Get("q"))
// ...
}
// GOOD — use request context
func (s *Server) handleSearch(w http.ResponseWriter, r *http.Request) {
results, err := s.search(r.Context(), r.URL.Query().Get("q"))
// ...
}// BAD — goroutine blocks forever if nobody reads the channel
func fetchWithTimeout(ctx context.Context, url string) (*Response, error) {
ch := make(chan *Response)
go func() {
resp, _ := http.Get(url) // blocks forever if ctx cancels
ch <- resp // stuck here if nobody reads
}()
select {
case resp := <-ch:
return resp, nil
case <-ctx.Done():
return nil, ctx.Err() // goroutine leaked!
}
}
// GOOD — use buffered channel so goroutine can exit
func fetchWithTimeout(ctx context.Context, url string) (*Response, error) {
ch := make(chan *Response, 1) // buffered — goroutine can always send
go func() {
resp, _ := http.Get(url)
ch <- resp
}()
select {
case resp := <-ch:
return resp, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}time.Sleep// BAD — sleeping to wait for goroutines
go doWork()
time.Sleep(5 * time.Second) // hoping it finishes
// GOOD — use sync primitives
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
doWork()
}()
wg.Wait()