Go 有哪些并发设计模式?

Go 的并发模型是其最强大的特性之一。但大多数开发者只会用go func()和chan,对 Go 并发生态中的高级模式知之甚少。
2026 年,Go 的并发编程已经发展出一套完整的设计模式:从sync包的原子操作到errgroup的错误传播,从sync.Map到conc并发库,Go 为不同场景提供了最优解。
本文深入解析 Go 并发编程的 12 种设计模式,让你的代码在并发场景下既安全又高效。
并发基础回顾:Go 的并发哲学
CSP vs 传统线程模型
Go 使用的是 CSP(Communicating Sequential Processes) 模型,而不是传统的共享内存模型:
传统模型(Java/Python): Thread A ──────→ 共享内存 ←────── Thread B ↓↑ 加锁访问 Go 模型(CSP): Goroutine A ────→ Channel ────→ Goroutine B ↓ 无需锁
Go 并发原语一览
| 原语 | 用途 | 适用场景 |
|---|---|---|
| goroutine | 并发执行 | I/O 并行、后台任务 |
| channel | 通信同步 | 数据流、信号传递 |
| sync.Mutex | 互斥锁 | 临界区保护 |
| sync.RWMutex | 读写锁 | 读多写少场景 |
| sync.WaitGroup | 等待一组完成 | 并行任务收集 |
| sync.Once | 单次执行 | 初始化、懒加载 |
| sync.Map | 并发安全 Map | 高并发键值存储 |
| sync.Pool | 对象池 | 减少 GC 压力 |
| context | 取消信号传播 | 超时、取消、截止时间 |
模式一:errgroup 并行错误处理
golang.org/x/sync/errgroup是处理一组并发任务的最佳工具,它会自动收集所有Goroutine的错误。
基础用法
package main
import (
"context"
"fmt"
"net/http"
"golang.org/x/sync/errgroup"
)
func fetchURLs(urls []string) error {
g, ctx := errgroup.WithContext(context.Background())
for _, url := range urls {
url := url // 避免闭包捕获
g.Go(func() error {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return fmt.Errorf("creating request for %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
defer resp.Body.Close()
fmt.Printf("Fetched %s: status %d\n", url, resp.StatusCode)
returnnil
})
}
// 等待所有任务完成,或者第一个错误发生
return g.Wait()
}
限制并发数
errgroup本身不限制并发数,需要配合semaphore使用:
func fetchURLsWithLimit(urls []string, limit int) error {
g, ctx := errgroup.WithContext(context.Background())
// 创建一个带限制的 semaphore
sem := make(chanstruct{}, limit)
for _, url := range urls {
url := url
g.Go(func() error {
// 获取信号量
sem <- struct{}{}
deferfunc() { <-sem }()
// 检查上下文是否已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
returnnil
})
}
return g.Wait()
}
错误传播机制
当任何一个 Goroutine 返回错误时:
- 上下文被取消;
- 其他正在运行的
Goroutine会收到ctx.Err(); g.Wait()返回第一个错误。
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
time.Sleep(2 * time.Second)
return errors.New("task 1 failed")
})
g.Go(func() error {
<-ctx.Done()
return ctx.Err() // 收到取消信号
})
err := g.Wait()
// err = "task 1 failed"
模式二:Pipeline 流水线模式
Pipeline是 Go 并发中最强大的模式之一,它将处理过程分解为多个阶段,每个阶段通过Channel连接。
经典 Pipeline
func Pipeline(nums []int) (<-chan int, <-chan error) {
// Stage 1: 生成数字
out1 := make(chanint)
stage1Err := make(chan error, 1)
gofunc() {
deferclose(out1)
for _, n := range nums {
select {
case out1 <- n:
case <-time.After(100 * time.Millisecond):
// 超时处理
}
}
stage1Err <- nil
}()
// Stage 2: 平方
out2 := make(chanint)
stage2Err := make(chan error, 1)
gofunc() {
deferclose(out2)
for n := range out1 {
out2 <- n * n
}
stage2Err <- nil
}()
// Stage 3: 过滤偶数
out3 := make(chanint)
stage3Err := make(chan error, 1)
gofunc() {
deferclose(out3)
for n := range out2 {
if n%2 == 0 {
out3 <- n
}
}
stage3Err <- nil
}()
// 汇总错误
errCh := make(chan error, 3)
gofunc() {
errCh <- <-stage1Err
errCh <- <-stage2Err
errCh <- <-stage3Err
close(errgr)
}()
return out3, errCh
}
使用 iterator 简化 Pipeline(Go 1.23+)
Go 1.23 引入了iter包,可以更优雅地处理Pipeline:
import "iter"
func SquarePipeline(nums []int) iter.Seq[int] {
returnfunc(yield func(int) bool) {
for _, n := range nums {
if !yield(n * n) {
return
}
}
}
}
func FilterEven(seq iter.Seq[int]) iter.Seq[int] {
returnfunc(yield func(int) bool) {
for n := range seq {
if n%2 == 0 {
if !yield(n) {
return
}
}
}
}
}
// 使用
for n := range FilterEven(SquarePipeline([]int{1, 2, 3, 4})) {
fmt.Println(n) // 4, 16
}
模式三:Worker Pool 模式
Worker Pool控制并发数量,避免创建过多Goroutine。
基础实现
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
WorkerID int
}
func WorkerPool(jobs <-chan Job, results chan<- Result, numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
gofunc(workerID int) {
defer wg.Done()
for job := range jobs {
// 模拟处理
output := process(job.Data)
results <- Result{
JobID: job.ID,
Output: output,
WorkerID: workerID,
}
}
}(i)
}
wg.Wait()
close(results)
}
func process(data string) string {
time.Sleep(100 * time.Millisecond)
return strings.ToUpper(data)
}
带错误处理的 Worker Pool
type WorkerPoolWithError struct {
jobs chan Job
results chan Result
errors chan error
done chanstruct{}
}
func NewWorkerPool(workers int) *WorkerPoolWithError {
wp := &WorkerPoolWithError{
jobs: make(chan Job, workers*2),
results: make(chan Result, workers*2),
errors: make(chan error, workers),
done: make(chanstruct{}),
}
for i := 0; i < workers; i++ {
go wp.worker(i)
}
return wp
}
func (wp *WorkerPoolWithError) worker(id int) {
for job := range wp.jobs {
result, err := wp.process(job)
if err != nil {
select {
case wp.errors <- fmt.Errorf("worker %d: %w", id, err):
default:
// 错误队列满,跳过
}
continue
}
wp.results <- result
}
}
func (wp *WorkerPoolWithError) Submit(job Job) {
wp.jobs <- job
}
func (wp *WorkerPoolWithError) Close() {
close(wp.jobs)
<-wp.done
}
模式四:Fan-Out / Fan-In 模式
Fan-Out将任务分发到多个Goroutine,Fan-In将结果合并。
Fan-Out
func fanOut(ctx context.Context, items []Item) <-chan Result {
results := make(chan Result, len(items))
for _, item := range items {
item := item
gofunc() {
select {
case results <- processItem(ctx, item):
case <-ctx.Done():
results <- Result{Err: ctx.Err()}
}
}()
}
return results
}
Fan-In
func fanIn(chans ...<-chan Result) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
for _, ch := range chans {
wg.Add(1)
gofunc(ch <-chan Result) {
defer wg.Done()
for r := range ch {
results <- r
}
}(ch)
}
gofunc() {
wg.Wait()
close(results)
}()
return results
}
实际应用
func fetchAll(ctx context.Context, urls []string) ([]string, error) {
// 创建多个 channel
chans := make([]<-chan string, len(urls))
for i, url := range urls {
chans[i] = fetchURL(ctx, url)
}
// 合并结果
return fanInStrings(chans...), nil
}
模式五:Semaphore 信号量模式
信号量控制同时访问资源的 Goroutine 数量。
简单实现
type Semaphore struct {
ch chanstruct{}
}
func NewSemaphore(max int) *Semaphore {
return &Semaphore{
ch: make(chanstruct{}, max),
}
}
func (s *Semaphore) Acquire(ctx context.Context) error {
select {
case s.ch <- struct{}{}:
returnnil
case <-ctx.Done():
return ctx.Err()
}
}
func (s *Semaphore) Release() {
<-s.ch
}
// 使用
sem := NewSemaphore(10) // 最多 10 个并发
for _, task := range tasks {
if err := sem.Acquire(ctx); err != nil {
return err
}
gofunc() {
defer sem.Release()
// 执行任务
}()
}
golang.org/x/sync/semaphore
Go 标准库提供了官方的信号量实现:
import "golang.org/x/sync/semaphore"
func processWithLimit(ctx context.Context, items []Item, limit int64) error {
sem := semaphore.NewWeighted(limit)
for _, item := range items {
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
gofunc(item Item) {
defer sem.Release(1)
process(item)
}(item)
}
return sem.Acquire(ctx, limit) // 等待所有完成
}
模式六:sync.Once 单次执行
sync.Once保证代码只执行一次,常用于懒初始化。
基础用法
type Database struct {
conn *Connection
initOnce sync.Once
err error
}
func (db *Database) GetConnection() (*Connection, error) {
db.initOnce.Do(func() {
conn, err := connect("localhost:5432")
if err != nil {
db.err = err
return
}
db.conn = conn
})
return db.conn, db.err
}
改进:支持错误传播
sync.Once不返回错误,需要包装:
type Lazy[T any] struct {
once sync.Once
value T
err error
fn func() (T, error)
}
func NewLazy[T any](fn func() (T, error)) *Lazy[T] {
return &Lazy[T]{fn: fn}
}
func (l *Lazy[T]) Get() (T, error) {
l.once.Do(func() {
l.value, l.err = l.fn()
})
return l.value, l.err
}
模式七:sync.Pool 对象池
sync.Pool减少 GC 压力,复用临时对象。
基础用法
var bufPool = sync.Pool{
New: func() interface{} {
b := make([]byte, 4096)
return &b
},
}
func process(data []byte) []byte {
// 获取 buffer
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
// 处理数据
// ...
return result
}
真实场景:减少 GC 压力
type Serializer struct {
bufPool sync.Pool
}
func NewSerializer() *Serializer {
return &Serializer{
bufPool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
}
func (s *Serializer) Serialize(v interface{}) ([]byte, error) {
buf := s.bufPool.Get().(*bytes.Buffer)
deferfunc() {
buf.Reset()
s.bufPool.Put(buf)
}()
enc := json.NewEncoder(buf)
err := enc.Encode(v)
return buf.Bytes(), err
}
模式八:sync.Map 并发安全 Map
sync.Map优化了读多写少场景的并发访问。
基础用法
var m sync.Map
// 存储
m.Store("key1", "value1")
// 读取
if v, ok := m.Load("key1"); ok {
fmt.Println(v)
}
// 读取或计算
v, _ := m.LoadOrStore("key2", "computed")
不适合的场景
sync.Map 不是万能的,以下场景不适合:
<pre”>// ❌ 不适合:写多读少 for i := 0; i < 10000; i++ { m.Store(i, i) // sync.Map 写性能差 } // ❌ 不适合:需要原子操作 m.Store(1, m.Load(1).(int)+1) // 非原子 // ✅ 适合:读多写少 for i := 0; i < 10000; i++ { m.Load(i) // 读性能好 }
高性能替代:分片 Map
type ShardedMap struct {
shards []*sync.RWMutex
maps []map[string]interface{}
numShards int
}
func NewShardedMap(shards int) *ShardedMap {
sm := &ShardedMap{
numShards: shards,
shards: make([]*sync.RWMutex, shards),
maps: make([]map[string]interface{}, shards),
}
for i := range sm.shards {
sm.shards[i] = &sync.RWMutex{}
sm.maps[i] = make(map[string]interface{})
}
return sm
}
func (sm *ShardedMap) shard(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
returnint(h.Sum32()) % sm.numShards
}
func (sm *ShardedMap) Store(key, value interface{}) {
idx := sm.shard(key)
sm.shards[idx].Lock()
sm.maps[idx][key] = value
sm.shards[idx].Unlock()
}
func (sm *ShardedMap) Load(key string) (interface{}, bool) {
idx := sm.shard(key)
sm.shards[idx].RLock()
v, ok := sm.maps[idx][key]
sm.shards[idx].RUnlock()
return v, ok
}
模式九:Context 取消传播
context.Context是 Go 中最重要的取消信号传播机制。
四种工厂函数
// 1. Background - 根上下文 ctx := context.Background() // 2. WithCancel - 可取消 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 记得调用 // 3. WithTimeout - 超时取消 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 4. WithDeadline - 截止时间 ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(24*time.Hour)) defer cancel()
在 Goroutine 中使用
func longRunningTask(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err() // 收到取消信号
default:
// 执行工作
time.Sleep(100 * time.Millisecond)
}
}
}
链式取消
func parent() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
child(ctx)
// 取消所有子任务
cancel()
}
func child(ctx context.Context) {
grandchild(ctx)
}
func grandchild(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println("grandchild cancelled")
}
}
模式十:select 多路复用
select让你同时监听多个Channel。
基础用法
select {
case msg := <-ch1:
fmt.Println("Received from ch1:", msg)
case msg := <-ch2:
fmt.Println("Received from ch2:", msg)
case <-time.After(time.Second):
fmt.Println("Timeout")
default:
fmt.Println("No message available")
}
超时处理
func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return"", err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
returnstring(body), nil
}
优雅退出
func worker(jobs <-chan Job, done <-chan struct{}) {
for {
select {
case job := <-jobs:
process(job)
case <-done:
// 收到退出信号,处理完当前任务后退出
return
}
}
}
模式十一:conc 并发库
conc是Sourcegraph开发的并发工具库,提供了更高级的抽象。
iter 并行遍历
import "github.com/sourcegraph/conc/iter"
func fetchAll(urls []string) ([]*Result, error) {
results := make([]*Result, len(urls))
iter.MapPtr(&results, urls, func(url *string) *Result {
resp, err := http.Get(*url)
if err != nil {
return &Result{Err: err}
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
return &Result{Body: string(body)}
})
return results, nil
}
waiter 等待多个任务
import "github.com/sourcegraph/conc/wait"
var wg wait.WaitGroup{}
for _, task := range tasks {
wg.Go(func() {
process(task)
})
}
wg.Wait()
错误处理
import "github.com/sourcegraph/conc/errs"
func processAll(items []Item) error {
var errs errs.Group
iter.ForEach(&items, func(item *Item) {
if err := process(item); err != nil {
errs.Add(err)
}
})
return errs.Error()
}
模式十二:Channel 高级用法
单向 Channel
// 生产者:只能发送
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
// 消费者:只能接收
func consumer(ch <-chan int) {
for n := range ch {
fmt.Println(n)
}
}
// 使用
ch := make(chanint)
go producer(ch)
consumer(ch)
Buffered Channel 缓冲
// 无缓冲:发送和接收必须同时进行 ch1 := make(chan int) // 有缓冲:可以积累一定数据 ch2 := make(chan int, 100) // 适合:生产者快于消费者 // 缓冲可以吸收突发流量
nil Channel 特性
var ch chan int // nil channel
select {
case <-ch: // 永远不会选中
default:
fmt.Println("ch is nil, default case")
}
性能注意事项
1. 不要创建过多 Goroutine
// ❌ 差:每个任务一个 Goroutine
for _, task := range hugeSlice {
go process(task) // 可能创建数百万 Goroutine
}
// ✅ 好:使用 Worker Pool
pool := NewWorkerPool(runtime.NumCPU())
for _, task := range hugeSlice {
pool.Submit(task)
}
2. 避免内存泄漏
// ❌ Channel 未关闭,导致内存泄漏
func bad() <-chan int {
ch := make(chanint)
gofunc() {
for i := 0; i < 1000; i++ {
ch <- i
}
// 没有 close
}()
return ch // 调用者可能忘记关闭
}
// ✅ 好:明确生命周期
func good() (<-chan int, func()) {
ch := make(chanint)
done := make(chanstruct{})
gofunc() {
for i := 0; i < 1000; i++ {
select {
case ch <- i:
case <-done:
return
}
}
close(ch)
}()
return ch, func() { close(done) }
}
3. 减少锁竞争
// ❌ 差:全局锁 var m sync.Mutex m.Lock() count++ m.Unlock() // ✅ 好:使用原子操作 var count atomic.Int64 count.Add(1)
最后
Go 的并发模型是其最优雅的设计之一。掌握这 12 种设计模式,你就能写出既安全又高效的并发代码。
核心要点:
- errgroup:处理并发任务和错误传播
- Pipeline:构建数据处理流水线
- Worker Pool:控制并发数量
- Fan-Out/Fan-In:任务分发和结果合并
- Semaphore:资源访问控制
- sync.Once:单次执行
- sync.Pool:对象复用,减少 GC
- sync.Map:并发安全 Map
- Context:取消信号传播
- select:多路复用
- conc:高级并发抽象
- Channel 模式:单向、缓冲、nil
记住:并发不是越多越好。合理使用 Worker Pool、Semaphore 控制并发数量,使用 Channel 和 Context 管理任务生命周期,才能写出健壮的并发程序。
以上关于Go 有哪些并发设计模式?的文章就介绍到这了,更多相关内容请搜索码云笔记以前的文章或继续浏览下面的相关文章,希望大家以后多多支持码云笔记。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 admin@mybj123.com 进行投诉反馈,一经查实,立即处理!
重要:如软件存在付费、会员、充值等,均属软件开发者或所属公司行为,与本站无关,网友需自行判断
码云笔记 » Go 有哪些并发设计模式?
微信
支付宝