Go 有哪些并发设计模式?

AI 概述
本文详解Go语言12种高级并发设计模式,对比CSP与传统线程模型,梳理goroutine、channel、sync包等基础原语。逐一讲解errgroup、流水线、工作池、扇入扇出、信号量、sync.Once、sync.Pool、sync.Map、Context、select、conc库及Channel高阶用法,附可落地代码示例。同时给出并发开发性能与避坑要点,帮助开发者写出安全、高效、健壮的Go并发程序。
目录
文章目录隐藏
  1. 并发基础回顾:Go 的并发哲学
  2. 模式一:errgroup 并行错误处理
  3. 模式二:Pipeline 流水线模式
  4. 模式三:Worker Pool 模式
  5. 模式四:Fan-Out / Fan-In 模式
  6. 模式五:Semaphore 信号量模式
  7. 模式六:sync.Once 单次执行
  8. 模式七:sync.Pool 对象池
  9. 模式八:sync.Map 并发安全 Map
  10. 模式九:Context 取消传播
  11. 模式十:select 多路复用
  12. 模式十一:conc 并发库
  13. 模式十二:Channel 高级用法
  14. 性能注意事项
  15. 最后

Go 有哪些并发设计模式?

Go 的并发模型是其最强大的特性之一。但大多数开发者只会用go func()chan,对 Go 并发生态中的高级模式知之甚少。

2026 年,Go 的并发编程已经发展出一套完整的设计模式:从sync包的原子操作到errgroup的错误传播,从sync.Mapconc并发库,Go 为不同场景提供了最优解。

本文深入解析 Go 并发编程的 12 种设计模式,让你的代码在并发场景下既安全又高效。

并发基础回顾:Go 的并发哲学

CSP vs 传统线程模型

Go 使用的是 CSP(Communicating Sequential Processes) 模型,而不是传统的共享内存模型:

传统模型(Java/Python):
Thread A ──────→ 共享内存  ←────── Thread B
                   ↓↑
              加锁访问

Go 模型(CSP):
Goroutine A ────→ Channel ────→ Goroutine B
                  ↓
              无需锁

Go 并发原语一览

原语 用途 适用场景
goroutine 并发执行 I/O 并行、后台任务
channel 通信同步 数据流、信号传递
sync.Mutex 互斥锁 临界区保护
sync.RWMutex 读写锁 读多写少场景
sync.WaitGroup 等待一组完成 并行任务收集
sync.Once 单次执行 初始化、懒加载
sync.Map 并发安全 Map 高并发键值存储
sync.Pool 对象池 减少 GC 压力
context 取消信号传播 超时、取消、截止时间

模式一:errgroup 并行错误处理

golang.org/x/sync/errgroup是处理一组并发任务的最佳工具,它会自动收集所有Goroutine的错误。

基础用法

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func fetchURLs(urls []string) error {
    g, ctx := errgroup.WithContext(context.Background())

    for _, url := range urls {
        url := url // 避免闭包捕获
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return fmt.Errorf("creating request for %s: %w", url, err)
            }

            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetching %s: %w", url, err)
            }
            defer resp.Body.Close()

            fmt.Printf("Fetched %s: status %d\n", url, resp.StatusCode)
            returnnil
        })
    }

    // 等待所有任务完成,或者第一个错误发生
    return g.Wait()
}

限制并发数

errgroup本身不限制并发数,需要配合semaphore使用:

func fetchURLsWithLimit(urls []string, limit int) error {
    g, ctx := errgroup.WithContext(context.Background())

    // 创建一个带限制的 semaphore
    sem := make(chanstruct{}, limit)

    for _, url := range urls {
        url := url
        g.Go(func() error {
            // 获取信号量
            sem <- struct{}{}
            deferfunc() { <-sem }()

            // 检查上下文是否已取消
            select {
            case <-ctx.Done():
                return ctx.Err()
            default:
            }

            resp, err := http.Get(url)
            if err != nil {
                return err
            }
            defer resp.Body.Close()
            returnnil
        })
    }

    return g.Wait()
}

错误传播机制

当任何一个 Goroutine 返回错误时:

  1. 上下文被取消;
  2. 其他正在运行的Goroutine会收到ctx.Err()
  3. g.Wait()返回第一个错误。
g, ctx := errgroup.WithContext(context.Background())

g.Go(func() error {
    time.Sleep(2 * time.Second)
    return errors.New("task 1 failed")
})

g.Go(func() error {
    <-ctx.Done()
    return ctx.Err() // 收到取消信号
})

err := g.Wait()
// err = "task 1 failed"

模式二:Pipeline 流水线模式

Pipeline是 Go 并发中最强大的模式之一,它将处理过程分解为多个阶段,每个阶段通过Channel连接。

经典 Pipeline

func Pipeline(nums []int) (<-chan int, <-chan error) {
    // Stage 1: 生成数字
    out1 := make(chanint)
    stage1Err := make(chan error, 1)

    gofunc() {
        deferclose(out1)
        for _, n := range nums {
            select {
            case out1 <- n:
            case <-time.After(100 * time.Millisecond):
                // 超时处理
            }
        }
        stage1Err <- nil
    }()

    // Stage 2: 平方
    out2 := make(chanint)
    stage2Err := make(chan error, 1)

    gofunc() {
        deferclose(out2)
        for n := range out1 {
            out2 <- n * n
        }
        stage2Err <- nil
    }()

    // Stage 3: 过滤偶数
    out3 := make(chanint)
    stage3Err := make(chan error, 1)

    gofunc() {
        deferclose(out3)
        for n := range out2 {
            if n%2 == 0 {
                out3 <- n
            }
        }
        stage3Err <- nil
    }()

    // 汇总错误
    errCh := make(chan error, 3)
    gofunc() {
        errCh <- <-stage1Err
        errCh <- <-stage2Err
        errCh <- <-stage3Err
        close(errgr)
    }()

    return out3, errCh
}

使用 iterator 简化 Pipeline(Go 1.23+)

Go 1.23 引入了iter包,可以更优雅地处理Pipeline

import "iter"

func SquarePipeline(nums []int) iter.Seq[int] {
    returnfunc(yield func(int) bool) {
        for _, n := range nums {
            if !yield(n * n) {
                return
            }
        }
    }
}

func FilterEven(seq iter.Seq[int]) iter.Seq[int] {
    returnfunc(yield func(int) bool) {
        for n := range seq {
            if n%2 == 0 {
                if !yield(n) {
                    return
                }
            }
        }
    }
}

// 使用
for n := range FilterEven(SquarePipeline([]int{1, 2, 3, 4})) {
    fmt.Println(n) // 4, 16
}

模式三:Worker Pool 模式

Worker Pool控制并发数量,避免创建过多Goroutine

基础实现

type Job struct {
    ID   int
    Data string
}

type Result struct {
    JobID    int
    Output   string
    WorkerID int
}

func WorkerPool(jobs <-chan Job, results chan<- Result, numWorkers int) {
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        gofunc(workerID int) {
            defer wg.Done()
            for job := range jobs {
                // 模拟处理
                output := process(job.Data)
                results <- Result{
                    JobID:    job.ID,
                    Output:   output,
                    WorkerID: workerID,
                }
            }
        }(i)
    }

    wg.Wait()
    close(results)
}

func process(data string) string {
    time.Sleep(100 * time.Millisecond)
    return strings.ToUpper(data)
}

带错误处理的 Worker Pool

type WorkerPoolWithError struct {
    jobs    chan Job
    results chan Result
    errors  chan error
    done    chanstruct{}
}

func NewWorkerPool(workers int) *WorkerPoolWithError {
    wp := &WorkerPoolWithError{
        jobs:    make(chan Job, workers*2),
        results: make(chan Result, workers*2),
        errors:  make(chan error, workers),
        done:    make(chanstruct{}),
    }

    for i := 0; i < workers; i++ {
        go wp.worker(i)
    }

    return wp
}

func (wp *WorkerPoolWithError) worker(id int) {
    for job := range wp.jobs {
        result, err := wp.process(job)
        if err != nil {
            select {
            case wp.errors <- fmt.Errorf("worker %d: %w", id, err):
            default:
                // 错误队列满,跳过
            }
            continue
        }
        wp.results <- result
    }
}

func (wp *WorkerPoolWithError) Submit(job Job) {
    wp.jobs <- job
}

func (wp *WorkerPoolWithError) Close() {
    close(wp.jobs)
    <-wp.done
}

模式四:Fan-Out / Fan-In 模式

Fan-Out将任务分发到多个GoroutineFan-In将结果合并。

Fan-Out

func fanOut(ctx context.Context, items []Item) <-chan Result {
    results := make(chan Result, len(items))

    for _, item := range items {
        item := item
        gofunc() {
            select {
            case results <- processItem(ctx, item):
            case <-ctx.Done():
                results <- Result{Err: ctx.Err()}
            }
        }()
    }

    return results
}

Fan-In

func fanIn(chans ...<-chan Result) <-chan Result {
    results := make(chan Result)

    var wg sync.WaitGroup
    for _, ch := range chans {
        wg.Add(1)
        gofunc(ch <-chan Result) {
            defer wg.Done()
            for r := range ch {
                results <- r
            }
        }(ch)
    }

    gofunc() {
        wg.Wait()
        close(results)
    }()

    return results
}

实际应用

func fetchAll(ctx context.Context, urls []string) ([]string, error) {
    // 创建多个 channel
    chans := make([]<-chan string, len(urls))
    for i, url := range urls {
        chans[i] = fetchURL(ctx, url)
    }

    // 合并结果
    return fanInStrings(chans...), nil
}

模式五:Semaphore 信号量模式

信号量控制同时访问资源的 Goroutine 数量。

简单实现

type Semaphore struct {
    ch chanstruct{}
}

func NewSemaphore(max int) *Semaphore {
    return &Semaphore{
        ch: make(chanstruct{}, max),
    }
}

func (s *Semaphore) Acquire(ctx context.Context) error {
    select {
    case s.ch <- struct{}{}:
        returnnil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (s *Semaphore) Release() {
    <-s.ch
}

// 使用
sem := NewSemaphore(10) // 最多 10 个并发

for _, task := range tasks {
    if err := sem.Acquire(ctx); err != nil {
        return err
    }

    gofunc() {
        defer sem.Release()
        // 执行任务
    }()
}

golang.org/x/sync/semaphore

Go 标准库提供了官方的信号量实现:

import "golang.org/x/sync/semaphore"

func processWithLimit(ctx context.Context, items []Item, limit int64) error {
    sem := semaphore.NewWeighted(limit)

    for _, item := range items {
        if err := sem.Acquire(ctx, 1); err != nil {
            return err
        }

        gofunc(item Item) {
            defer sem.Release(1)
            process(item)
        }(item)
    }

    return sem.Acquire(ctx, limit) // 等待所有完成
}

模式六:sync.Once 单次执行

sync.Once保证代码只执行一次,常用于懒初始化。

基础用法

type Database struct {
    conn   *Connection
    initOnce sync.Once
    err     error
}

func (db *Database) GetConnection() (*Connection, error) {
    db.initOnce.Do(func() {
        conn, err := connect("localhost:5432")
        if err != nil {
            db.err = err
            return
        }
        db.conn = conn
    })

    return db.conn, db.err
}

改进:支持错误传播

sync.Once不返回错误,需要包装:

type Lazy[T any] struct {
    once  sync.Once
    value T
    err   error
    fn    func() (T, error)
}

func NewLazy[T any](fn func() (T, error)) *Lazy[T] {
    return &Lazy[T]{fn: fn}
}

func (l *Lazy[T]) Get() (T, error) {
    l.once.Do(func() {
        l.value, l.err = l.fn()
    })
    return l.value, l.err
}

模式七:sync.Pool 对象池

sync.Pool减少 GC 压力,复用临时对象。

基础用法

var bufPool = sync.Pool{
    New: func() interface{} {
        b := make([]byte, 4096)
        return &b
    },
}

func process(data []byte) []byte {
    // 获取 buffer
    buf := bufPool.Get().(*[]byte)
    defer bufPool.Put(buf)

    // 处理数据
    // ...

    return result
}

真实场景:减少 GC 压力

type Serializer struct {
    bufPool sync.Pool
}

func NewSerializer() *Serializer {
    return &Serializer{
        bufPool: sync.Pool{
            New: func() interface{} {
                return &bytes.Buffer{}
            },
        },
    }
}

func (s *Serializer) Serialize(v interface{}) ([]byte, error) {
    buf := s.bufPool.Get().(*bytes.Buffer)
    deferfunc() {
        buf.Reset()
        s.bufPool.Put(buf)
    }()

    enc := json.NewEncoder(buf)
    err := enc.Encode(v)
    return buf.Bytes(), err
}

模式八:sync.Map 并发安全 Map

sync.Map优化了读多写少场景的并发访问。

基础用法

var m sync.Map

// 存储
m.Store("key1", "value1")

// 读取
if v, ok := m.Load("key1"); ok {
    fmt.Println(v)
}

// 读取或计算
v, _ := m.LoadOrStore("key2", "computed")

不适合的场景

sync.Map 不是万能的,以下场景不适合:
<pre”>// ❌ 不适合:写多读少 for i := 0; i < 10000; i++ {     m.Store(i, i) // sync.Map 写性能差 } // ❌ 不适合:需要原子操作 m.Store(1, m.Load(1).(int)+1) // 非原子 // ✅ 适合:读多写少 for i := 0; i < 10000; i++ {     m.Load(i) // 读性能好 }

高性能替代:分片 Map

type ShardedMap struct {
    shards []*sync.RWMutex
    maps   []map[string]interface{}
    numShards int
}

func NewShardedMap(shards int) *ShardedMap {
    sm := &ShardedMap{
        numShards: shards,
        shards:    make([]*sync.RWMutex, shards),
        maps:      make([]map[string]interface{}, shards),
    }
    for i := range sm.shards {
        sm.shards[i] = &sync.RWMutex{}
        sm.maps[i] = make(map[string]interface{})
    }
    return sm
}

func (sm *ShardedMap) shard(key string) int {
    h := fnv.New32a()
    h.Write([]byte(key))
    returnint(h.Sum32()) % sm.numShards
}

func (sm *ShardedMap) Store(key, value interface{}) {
    idx := sm.shard(key)
    sm.shards[idx].Lock()
    sm.maps[idx][key] = value
    sm.shards[idx].Unlock()
}

func (sm *ShardedMap) Load(key string) (interface{}, bool) {
    idx := sm.shard(key)
    sm.shards[idx].RLock()
    v, ok := sm.maps[idx][key]
    sm.shards[idx].RUnlock()
    return v, ok
}

模式九:Context 取消传播

context.Context是 Go 中最重要的取消信号传播机制。

四种工厂函数

// 1. Background - 根上下文
ctx := context.Background()

// 2. WithCancel - 可取消
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 记得调用

// 3. WithTimeout - 超时取消
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 4. WithDeadline - 截止时间
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(24*time.Hour))
defer cancel()

在 Goroutine 中使用

func longRunningTask(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err() // 收到取消信号
        default:
            // 执行工作
            time.Sleep(100 * time.Millisecond)
        }
    }
}

链式取消

func parent() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    child(ctx)

    // 取消所有子任务
    cancel()
}

func child(ctx context.Context) {
    grandchild(ctx)
}

func grandchild(ctx context.Context) {
    select {
    case <-ctx.Done():
        fmt.Println("grandchild cancelled")
    }
}

模式十:select 多路复用

select让你同时监听多个Channel

基础用法

select {
case msg := <-ch1:
    fmt.Println("Received from ch1:", msg)
case msg := <-ch2:
    fmt.Println("Received from ch2:", msg)
case <-time.After(time.Second):
    fmt.Println("Timeout")
default:
    fmt.Println("No message available")
}

超时处理

func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return"", err
    }
    defer resp.Body.Close()

    body, _ := io.ReadAll(resp.Body)
    returnstring(body), nil
}

优雅退出

func worker(jobs <-chan Job, done <-chan struct{}) {
    for {
        select {
        case job := <-jobs:
            process(job)
        case <-done:
            // 收到退出信号,处理完当前任务后退出
            return
        }
    }
}

模式十一:conc 并发库

concSourcegraph开发的并发工具库,提供了更高级的抽象。

iter 并行遍历

import "github.com/sourcegraph/conc/iter"

func fetchAll(urls []string) ([]*Result, error) {
    results := make([]*Result, len(urls))

    iter.MapPtr(&results, urls, func(url *string) *Result {
        resp, err := http.Get(*url)
        if err != nil {
            return &Result{Err: err}
        }
        defer resp.Body.Close()

        body, _ := io.ReadAll(resp.Body)
        return &Result{Body: string(body)}
    })

    return results, nil
}

waiter 等待多个任务

import "github.com/sourcegraph/conc/wait"

var wg wait.WaitGroup{}

for _, task := range tasks {
    wg.Go(func() {
        process(task)
    })
}

wg.Wait()

错误处理

import "github.com/sourcegraph/conc/errs"

func processAll(items []Item) error {
    var errs errs.Group

    iter.ForEach(&items, func(item *Item) {
        if err := process(item); err != nil {
            errs.Add(err)
        }
    })

    return errs.Error()
}

模式十二:Channel 高级用法

单向 Channel

// 生产者:只能发送
func producer(ch chan<- int) {
    for i := 0; i < 10; i++ {
        ch <- i
    }
    close(ch)
}

// 消费者:只能接收
func consumer(ch <-chan int) {
    for n := range ch {
        fmt.Println(n)
    }
}

// 使用
ch := make(chanint)
go producer(ch)
consumer(ch)

Buffered Channel 缓冲

// 无缓冲:发送和接收必须同时进行
ch1 := make(chan int)

// 有缓冲:可以积累一定数据
ch2 := make(chan int, 100)

// 适合:生产者快于消费者
// 缓冲可以吸收突发流量

nil Channel 特性

var ch chan int // nil channel

select {
case <-ch:  // 永远不会选中
default:
    fmt.Println("ch is nil, default case")
}

性能注意事项

1. 不要创建过多 Goroutine

// ❌ 差:每个任务一个 Goroutine
for _, task := range hugeSlice {
    go process(task) // 可能创建数百万 Goroutine
}

// ✅ 好:使用 Worker Pool
pool := NewWorkerPool(runtime.NumCPU())
for _, task := range hugeSlice {
    pool.Submit(task)
}

2. 避免内存泄漏

// ❌ Channel 未关闭,导致内存泄漏
func bad() <-chan int {
    ch := make(chanint)
    gofunc() {
        for i := 0; i < 1000; i++ {
            ch <- i
        }
        // 没有 close
    }()
    return ch // 调用者可能忘记关闭
}

// ✅ 好:明确生命周期
func good() (<-chan int, func()) {
    ch := make(chanint)
    done := make(chanstruct{})

    gofunc() {
        for i := 0; i < 1000; i++ {
            select {
            case ch <- i:
            case <-done:
                return
            }
        }
        close(ch)
    }()

    return ch, func() { close(done) }
}

3. 减少锁竞争

// ❌ 差:全局锁
var m sync.Mutex
m.Lock()
count++
m.Unlock()

// ✅ 好:使用原子操作
var count atomic.Int64
count.Add(1)

最后

Go 的并发模型是其最优雅的设计之一。掌握这 12 种设计模式,你就能写出既安全又高效的并发代码。

核心要点:

  1. errgroup:处理并发任务和错误传播
  2. Pipeline:构建数据处理流水线
  3. Worker Pool:控制并发数量
  4. Fan-Out/Fan-In:任务分发和结果合并
  5. Semaphore:资源访问控制
  6. sync.Once:单次执行
  7. sync.Pool:对象复用,减少 GC
  8. sync.Map:并发安全 Map
  9. Context:取消信号传播
  10. select:多路复用
  11. conc:高级并发抽象
  12. Channel 模式:单向、缓冲、nil

记住:并发不是越多越好。合理使用 Worker Pool、Semaphore 控制并发数量,使用 Channel 和 Context 管理任务生命周期,才能写出健壮的并发程序。

以上关于Go 有哪些并发设计模式?的文章就介绍到这了,更多相关内容请搜索码云笔记以前的文章或继续浏览下面的相关文章,希望大家以后多多支持码云笔记。

「点点赞赏,手留余香」

29

给作者打赏,鼓励TA抓紧创作!

微信微信 支付宝支付宝

还没有人赞赏,快来当第一个赞赏的人吧!

声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权/违法违规/事实不符,请将相关资料发送至 admin@mybj123.com 进行投诉反馈,一经查实,立即处理!
重要:如软件存在付费、会员、充值等,均属软件开发者或所属公司行为,与本站无关,网友需自行判断
码云笔记 » Go 有哪些并发设计模式?

发表回复