Loading...
Loading...
Go concurrency patterns for production services: context cancellation, errgroup, worker pools, bounded parallelism, fan-in/fan-out, and common race/deadlock pitfalls
npx skill4agent add bobmatnyc/claude-mpm-skills golang-concurrency-patternscontext.Contextcontexterrgroup.WithContexttime.Afterctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// do work
}
}
}()go func() {
for {
doWork() // leaks forever
}
}()type req struct {
key string
reply chan<- int
}
func mapOwner(ctx context.Context, in <-chan req) {
m := map[string]int{}
for {
select {
case <-ctx.Done():
return
case r := <-in:
r.reply <- m[r.key]
}
}
}type SafeMap struct {
mu sync.RWMutex
m map[string]int
}
func (s *SafeMap) Get(k string) (int, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.m[k]
return v, ok
}errgrouperrgroup.WithContextg, ctx := errgroup.WithContext(ctx)
for _, id := range ids {
id := id // capture
g.Go(func() error {
return process(ctx, id)
})
}
if err := g.Wait(); err != nil {
return err
}var wg sync.WaitGroup
for _, id := range ids {
wg.Add(1)
go func() {
defer wg.Done()
_ = process(context.Background(), id) // ignores caller ctx + captures id
}()
}
wg.Wait()limit := make(chan struct{}, 8) // max 8 concurrent
g, ctx := errgroup.WithContext(ctx)
for _, id := range ids {
id := id
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case limit <- struct{}{}:
}
defer func() { <-limit }()
return process(ctx, id)
})
}
return g.Wait()type Job struct{ ID string }
func runPool(ctx context.Context, jobs <-chan Job, workers int) error {
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < workers; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case j, ok := <-jobs:
if !ok {
return nil
}
if err := handleJob(ctx, j); err != nil {
return err
}
}
}
})
}
return g.Wait()
}func stageA(ctx context.Context, out chan<- int) {
defer close(out)
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return
case out <- i:
}
}
}func stageB(in <-chan int) {
close(in) // compile error in<-chan; also wrong ownership model
}time.Tickertime.Aftertime.NewTickertime.Aftert := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
poll()
}
}for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
poll()
}
}errgroup.WithContextsync.WaitGroupgo test -race ./...
go test -run TestName -race -count=1 ./...func TestSomething(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := doThing(ctx); err != nil {
t.Fatal(err)
}
}context.WithTimeoutok == false<-limitgo test -racectx.Done()time.Tickercontext.Background()errgroup