Go常见并发模式详解
Go提供简洁的并发模式,实现高效并行处理。
Worker Pool工作池
基本实现
Go
func workerPool(tasks []Task, workers int) {
taskCh := make(chan Task, len(tasks))
resultCh := make(chan Result, len(tasks))
// 启动worker
for i := 0; i < workers; i++ {
go worker(taskCh, resultCh)
}
// 分发任务
for _, task := range tasks {
taskCh <- task
}
close(taskCh)
// 收集结果
for i := 0; i < len(tasks); i++ {
result := <-resultCh
processResult(result)
}
}
func worker(taskCh <-chan Task, resultCh chan<- Result) {
for task := range taskCh {
result := process(task)
resultCh <- result
}
}
控制并发数
Go
// 使用带缓冲channel限制并发
sem := make(chan struct{}, 10) // 最大10并发
for _, task := range tasks {
sem <- struct{}{} // 获取令牌
go func(t Task) {
defer func() { <-sem }() // 释放令牌
process(t)
}(task)
}
// 等待全部完成
for i := 0; i < cap(sem); i++ {
sem <- struct{}{}
}
Pipeline流水线
串行处理
Go
func pipeline() {
// 阶段1:生成数据
gen := func(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2:处理数据
sq := func(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段3:消费数据
for n := range sq(gen(1, 2, 3)) {
fmt.Println(n) // 1, 4, 9
}
}
可取消的pipeline
Go
func pipelineWithContext(ctx context.Context) {
in := gen(1, 2, 3)
out := make(chan int)
go func() {
defer close(out)
for {
select {
case n, ok := <-in:
if !ok {
return
}
out <- n * n
case <-ctx.Done():
return // 取消
}
}
}()
}
Fan-in/Fan-out扇入扇出
Fan-out分发
Go
func fanOut(in <-chan int, n int) []<-chan int {
outputs := make([]<-chan int, n)
for i := 0; i < n; i++ {
outputs[i] = process(in) // 每个worker独立处理
}
return outputs
}
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
Fan-in合并
Go
func fanIn(inputs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(inputs))
for _, in := range inputs {
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// 使用
merged := fanIn(fanOut(source, 3)...)
errgroup并发错误处理
基本用法
Go
import "golang.org/x/sync/errgroup"
func processAll(tasks []Task) error {
g, ctx := errgroup.WithContext(context.Background())
for _, task := range tasks {
g.Go(func() error {
return processTask(ctx, task)
})
}
return g.Wait() // 任一错误即返回
}
限制并发数
Go
func processWithLimit(tasks []Task) error {
g := &errgroup.Group{}
sem := make(chan struct{}, 5) // 最大5并发
for _, task := range tasks {
sem <- struct{}{} // 获取令牌
g.Go(func() error {
defer func() { <-sem }() // 释放
return process(task)
})
}
return g.Wait()
}
Graceful Shutdown优雅退出
基本实现
Go
func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
// 启动服务
go serve()
// 等待信号
<-stop
fmt.Println("收到退出信号")
// 清理资源
cleanup()
}
超时退出
Go
func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
go serve()
select {
case <-stop:
fmt.Println("收到信号")
case <-time.After(30 * time.Second):
fmt.Println("超时")
}
// 限时清理
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := shutdown(ctx); err != nil {
fmt.Println("清理超时")
}
}
并发模式对比
| 模式 | 用途 | 特点 |
|---|---|---|
| Worker Pool | 任务并行 | 控制并发数 |
| Pipeline | 串行处理 | 分阶段解耦 |
| Fan-out | 分发处理 | 多worker并行 |
| Fan-in | 合合结果 | 统一输出 |
| errgroup | 错误处理 | 任一错误即停 |
| Semaphore | 并发限制 | 令牌控制 |
| Graceful | 优雅退出 | 清理资源 |
模式选择指南
- 需要控制并发数:Worker Pool或Semaphore
- 数据分阶段处理:Pipeline
- 多worker并行处理:Fan-out/Fan-in
- 需要错误收集:errgroup
- 服务需要退出:Graceful Shutdown
要点总结
- Worker Pool控制并发worker数量
- Pipeline分阶段串行处理数据
- Fan-out分发任务到多worker
- Fan-in合并多worker结果
- errgroup统一处理并发错误
- Semaphore用channel限制并发
- Graceful Shutdown清理后退出
- 模式组合实现复杂并发逻辑
📝 发现内容有误?点击此处直接编辑