Go 并发编程:goroutine、channel、sync 包详细使用实战
Go 并发编程:goroutine、channel、sync 包详细使用实战
🎯 概述
Go 语言的并发模型基于 CSP(Communicating Sequential Processes) 理论,核心是 goroutine 和 channel,辅以 sync 包提供的同步原语。
一、Goroutine 基础与实战
1. Goroutine 基本概念
// goroutine 是轻量级线程,由 Go 运行时管理
// 创建 goroutine 只需要在函数调用前加 go 关键字
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
// 启动一个 goroutine
go sayHello()
// 主 goroutine 继续执行
fmt.Println("Hello from main!")
// 等待一段时间,让 goroutine 有机会执行
time.Sleep(100 * time.Millisecond)
}
2. Goroutine 特性详解
package main
import (
"fmt"
"runtime"
"sync"
)
func printInfo(id int, wg *sync.WaitGroup) {
defer wg.Done()
// 获取 goroutine ID(不推荐在生产环境使用)
fmt.Printf("Goroutine %d: started
", id)
// 让出 CPU 时间片
runtime.Gosched()
fmt.Printf("Goroutine %d: finished
", id)
}
func main() {
// 设置最大 CPU 核心数
fmt.Printf("CPU cores: %d
", runtime.NumCPU())
runtime.GOMAXPROCS(4) // 使用 4 个逻辑处理器
var wg sync.WaitGroup
// 启动 10 个 goroutine
for i := 1; i <= 10; i++ {
wg.Add(1)
go printInfo(i, &wg)
}
// 等待所有 goroutine 完成
wg.Wait()
// 查看当前 goroutine 数量
fmt.Printf("Number of goroutines: %d
", runtime.NumGoroutine())
}
3. Goroutine 生命周期管理
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: Context cancelled
", id)
return
default:
// 模拟工作
fmt.Printf("Worker %d: Working...
", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
// 启动 3 个 worker
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
// 让 workers 运行一段时间
time.Sleep(2 * time.Second)
// 取消所有 workers
fmt.Println("Cancelling all workers...")
cancel()
// 等待 workers 退出
time.Sleep(1 * time.Second)
fmt.Println("All workers stopped")
}
4. Goroutine 泄漏检测与预防
package main
import (
"fmt"
"net/http"
"runtime"
"time"
)
// 错误的 goroutine 使用(会导致泄漏)
func leakyWorker(ch chan int) {
for val := range ch {
// 如果这里 panic 或者没有正确处理 channel 关闭
// goroutine 会永远阻塞
fmt.Println("Processing:", val)
}
}
// 正确的 goroutine 使用
func safeWorker(ctx context.Context, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case val, ok := <-ch:
if !ok {
// channel 已关闭
fmt.Println("Channel closed, exiting")
return
}
fmt.Println("Processing:", val)
case <-ctx.Done():
// 收到取消信号
fmt.Println("Context cancelled, exiting")
return
}
}
}
// 使用 recover 防止 goroutine panic
func protectedWorker() {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Recovered from panic: %v
", r)
}
}()
// 可能 panic 的操作
panic("something went wrong")
}
func main() {
// 监控 goroutine 数量
go func() {
for {
fmt.Printf("Goroutines: %d
", runtime.NumGoroutine())
time.Sleep(1 * time.Second)
}
}()
// 测试 protectedWorker
go protectedWorker()
time.Sleep(2 * time.Second)
}
二、Channel 深度实战
1. Channel 基础操作
package main
import "fmt"
func main() {
// 1. 创建 channel
ch := make(chan int) // 无缓冲 channel
bufferedCh := make(chan int, 3) // 缓冲大小为 3
// 2. 发送和接收数据
go func() {
ch <- 42 // 发送数据
}()
value := <-ch // 接收数据
fmt.Println("Received:", value)
// 3. 关闭 channel
close(bufferedCh)
// 4. 判断 channel 是否关闭
val, ok := <-bufferedCh
if !ok {
fmt.Println("Channel is closed")
}
}
2. 无缓冲 vs 有缓冲 Channel
package main
import (
"fmt"
"time"
)
func unbufferedExample() {
fmt.Println("
=== 无缓冲 Channel 示例 ===")
ch := make(chan string)
go func() {
fmt.Println("Goroutine: 准备发送数据...")
ch <- "Hello" // 这里会阻塞,直到有人接收
fmt.Println("Goroutine: 数据已发送")
}()
time.Sleep(1 * time.Second) // 让 goroutine 先运行
fmt.Println("Main: 准备接收数据...")
msg := <-ch
fmt.Println("Main: 收到数据:", msg)
}
func bufferedExample() {
fmt.Println("
=== 有缓冲 Channel 示例 ===")
ch := make(chan string, 2)
// 可以发送两个数据而不会阻塞
ch <- "消息1"
ch <- "消息2"
go func() {
ch <- "消息3" // 第三个会阻塞,因为缓冲区已满
fmt.Println("消息3 已发送")
}()
time.Sleep(500 * time.Millisecond)
// 接收数据,腾出缓冲区空间
fmt.Println("收到:", <-ch)
fmt.Println("收到:", <-ch)
time.Sleep(500 * time.Millisecond)
}
func main() {
unbufferedExample()
bufferedExample()
}
3. Channel 高级模式
package main
import (
"fmt"
"sync"
"time"
)
// 1. 单向 Channel
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(200 * time.Millisecond)
}
close(ch)
}
func consumer(ch <-chan int, wg *sync.WaitGroup, id int) {
defer wg.Done()
for num := range ch {
fmt.Printf("Consumer %d: 收到 %d
", id, num)
}
}
// 2. Channel 作为函数返回值
func createWorker(id int) chan<- string {
ch := make(chan string)
go func() {
for task := range ch {
fmt.Printf("Worker %d: 处理 %s
", id, task)
time.Sleep(500 * time.Millisecond)
}
fmt.Printf("Worker %d: 停止
", id)
}()
return ch
}
// 3. 使用 nil channel 进行控制
func nilChannelExample() {
var ch chan int // nil channel
go func() {
// 从 nil channel 接收会永远阻塞
// <-ch // 这行如果取消注释,goroutine 会永远阻塞
}()
// 向 nil channel 发送也会永远阻塞
// ch <- 1 // 这行如果取消注释,会永远阻塞
}
func main() {
// 单向 Channel 示例
fmt.Println("=== 单向 Channel 示例 ===")
ch := make(chan int)
var wg sync.WaitGroup
go producer(ch)
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(ch, &wg, i)
}
wg.Wait()
// Channel 作为返回值示例
fmt.Println("
=== Channel 作为返回值 ===")
worker1 := createWorker(1)
worker2 := createWorker(2)
worker1 <- "任务A"
worker2 <- "任务B"
time.Sleep(1 * time.Second)
close(worker1)
close(worker2)
time.Sleep(500 * time.Millisecond)
}
4. Select 多路复用
package main
import (
"fmt"
"math/rand"
"time"
)
func generator(name string, delay time.Duration) <-chan string {
ch := make(chan string)
go func() {
for i := 1; ; i++ {
ch <- fmt.Sprintf("%s: 消息 %d", name, i)
time.Sleep(delay)
}
}()
return ch
}
func selectExample() {
fmt.Println("
=== Select 多路复用 ===")
// 创建多个数据源
ch1 := generator("生成器1", 300*time.Millisecond)
ch2 := generator("生成器2", 500*time.Millisecond)
// 超时控制
timeout := time.After(2 * time.Second)
// 接收数据,直到超时
for {
select {
case msg := <-ch1:
fmt.Println("收到:", msg)
case msg := <-ch2:
fmt.Println("收到:", msg)
case <-timeout:
fmt.Println("超时,退出")
return
default:
// 非阻塞检查
// fmt.Println("没有数据可读")
time.Sleep(100 * time.Millisecond)
}
}
}
func selectWithPriority() {
fmt.Println("
=== Select 优先级 ===")
highPriority := make(chan string, 10)
lowPriority := make(chan string, 10)
// 填充一些数据
go func() {
for i := 0; i < 5; i++ {
highPriority <- fmt.Sprintf("高优先级 %d", i)
lowPriority <- fmt.Sprintf("低优先级 %d", i)
time.Sleep(100 * time.Millisecond)
}
}()
// 优先处理高优先级 channel
for i := 0; i < 10; i++ {
select {
case msg := <-highPriority:
fmt.Println("处理:", msg)
case msg := <-lowPriority:
// 只有当高优先级 channel 没有数据时才处理低优先级
select {
case highMsg := <-highPriority:
fmt.Println("处理:", highMsg)
default:
fmt.Println("处理:", msg)
}
}
}
}
func main() {
selectExample()
selectWithPriority()
}
5. Pipeline 模式
package main
import (
"fmt"
"sync"
)
// 第一阶段:生成数据
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 第二阶段:平方计算
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 第三阶段:求和
func sum(in <-chan int) <-chan int {
out := make(chan int)
go func() {
total := 0
for n := range in {
total += n
}
out <- total
close(out)
}()
return out
}
// 扇出模式:一个输入,多个处理
func fanOut(in <-chan int, workers int) []<-chan int {
outputs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
output := make(chan int)
outputs[i] = output
go func(id int) {
for n := range in {
fmt.Printf("Worker %d 处理: %d
", id, n)
output <- n * 2
}
close(output)
}(i)
}
return outputs
}
// 扇入模式:多个输入,一个输出
func fanIn(inputs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(input)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func pipelineExample() {
fmt.Println("=== Pipeline 模式 ===")
// 构建 pipeline: 生成 -> 平方 -> 求和
gen := generator(1, 2, 3, 4, 5)
sq := square(gen)
result := <-sum(sq)
fmt.Printf("Pipeline 结果: %d
", result)
}
func fanInFanOutExample() {
fmt.Println("
=== Fan-In/Fan-Out 模式 ===")
// 生成数据
gen := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// 扇出到 3 个 worker
outputs := fanOut(gen, 3)
// 扇入结果
merged := fanIn(outputs...)
// 收集结果
var results []int
for n := range merged {
results = append(results, n)
}
fmt.Printf("处理结果: %v
", results)
}
func main() {
pipelineExample()
fanInFanOutExample()
}
三、sync 包同步原语实战
1. Mutex(互斥锁)
package main
import (
"fmt"
"sync"
"time"
)
type SafeCounter struct {
mu sync.Mutex
value int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// 错误的例子:忘记解锁
type BadCounter struct {
mu sync.Mutex
value int
}
func (b *BadCounter) Increment() {
b.mu.Lock()
// 忘记调用 b.mu.Unlock()
b.value++
}
func mutexExample() {
fmt.Println("=== Mutex 示例 ===")
counter := &SafeCounter{}
var wg sync.WaitGroup
// 启动 1000 个 goroutine 同时增加计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Printf("最终值: %d (期望: 1000)
", counter.Value())
}
// 尝试锁(非阻塞)
func tryLockExample() {
fmt.Println("
=== TryLock 示例 ===")
var mu sync.Mutex
// 第一个 goroutine 获取锁
go func() {
mu.Lock()
fmt.Println("Goroutine 1: 获取锁")
time.Sleep(2 * time.Second)
mu.Unlock()
fmt.Println("Goroutine 1: 释放锁")
}()
time.Sleep(500 * time.Millisecond) // 让第一个 goroutine 先获取锁
// 第二个 goroutine 尝试获取锁
go func() {
// Go 1.18+ 支持 TryLock
// if mu.TryLock() {
// defer mu.Unlock()
// fmt.Println("Goroutine 2: 成功获取锁")
// } else {
// fmt.Println("Goroutine 2: 锁被占用,跳过")
// }
// 在 Go 1.18 之前,可以使用带缓冲的 channel 模拟
fmt.Println("Goroutine 2: 尝试获取锁...")
}()
time.Sleep(3 * time.Second)
}
func main() {
mutexExample()
tryLockExample()
}
2. RWMutex(读写锁)
package main
import (
"fmt"
"sync"
"time"
)
type ConfigManager struct {
mu sync.RWMutex
configs map[string]string
version int
}
func NewConfigManager() *ConfigManager {
return &ConfigManager{
configs: make(map[string]string),
}
}
func (cm *ConfigManager) Get(key string) (string, int) {
cm.mu.RLock() // 读锁
defer cm.mu.RUnlock()
return cm.configs[key], cm.version
}
func (cm *ConfigManager) GetAll() map[string]string {
cm.mu.RLock()
defer cm.mu.RUnlock()
// 返回副本,避免外部修改
configs := make(map[string]string)
for k, v := range cm.configs {
configs[k] = v
}
return configs
}
func (cm *ConfigManager) Set(key, value string) {
cm.mu.Lock() // 写锁
defer cm.mu.Unlock()
cm.configs[key] = value
cm.version++
}
func (cm *ConfigManager) Delete(key string) {
cm.mu.Lock()
defer cm.mu.Unlock()
delete(cm.configs, key)
cm.version++
}
func rwmutexExample() {
fmt.Println("=== RWMutex 示例 ===")
cm := NewConfigManager()
var wg sync.WaitGroup
// 启动多个读 goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
val, ver := cm.Get("key1")
fmt.Printf("Reader %d: key1=%s, version=%d
", id, val, ver)
time.Sleep(100 * time.Millisecond)
}
}(i)
}
// 启动写 goroutine
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 3; i++ {
cm.Set("key1", fmt.Sprintf("value%d", i))
fmt.Printf("Writer: 更新配置到 value%d
", i)
time.Sleep(500 * time.Millisecond)
}
}()
wg.Wait()
fmt.Println("所有操作完成")
}
func main() {
rwmutexExample()
}
3. WaitGroup
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 基本用法
func basicWaitGroup() {
fmt.Println("=== WaitGroup 基本用法 ===")
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 模拟工作
duration := time.Duration(rand.Intn(1000)) * time.Millisecond
time.Sleep(duration)
fmt.Printf("Worker %d 完成,耗时 %v
", id, duration)
}(i)
}
fmt.Println("等待所有 worker 完成...")
wg.Wait()
fmt.Println("所有 worker 已完成")
}
// 错误用法:在 goroutine 外部调用 Done
func wrongWaitGroupUsage() {
fmt.Println("
=== WaitGroup 错误用法 ===")
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
// 错误:应该在函数开始时调用 defer wg.Done()
// 或者在函数结束时调用 wg.Done()
time.Sleep(500 * time.Millisecond)
fmt.Printf("Worker %d 完成
", id)
// 应该在这里调用:wg.Done()
}(i)
}
wg.Wait() // 这里会永远阻塞
}
// WaitGroup 复用
func reusableWaitGroup() {
fmt.Println("
=== WaitGroup 复用 ===")
var wg sync.WaitGroup
// 第一轮任务
fmt.Println("第一轮任务开始")
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(500 * time.Millisecond)
fmt.Printf("任务 %d-1 完成
", id)
}(i)
}
wg.Wait()
// 第二轮任务(复用同一个 WaitGroup)
fmt.Println("
第二轮任务开始")
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(500 * time.Millisecond)
fmt.Printf("任务 %d-2 完成
", id)
}(i)
}
wg.Wait()
fmt.Println("所有任务完成")
}
func main() {
rand.Seed(time.Now().UnixNano())
basicWaitGroup()
reusableWaitGroup()
// wrongWaitGroupUsage() // 注释掉,因为会死锁
}
4. Once(一次性执行)
package main
import (
"fmt"
"sync"
"time"
)
type Singleton struct {
value string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{value: "我是单例"}
fmt.Println("单例已初始化")
})
return instance
}
func onceExample() {
fmt.Println("=== Once 示例 ===")
var wg sync.WaitGroup
// 多次尝试获取单例
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
singleton := GetInstance()
fmt.Printf("Goroutine %d: %p -> %v
", id, singleton, singleton.value)
}(i)
}
wg.Wait()
// 验证确实是同一个实例
s1 := GetInstance()
s2 := GetInstance()
if s1 == s2 {
fmt.Println("✓ 两个引用指向同一个实例")
}
}
// Once 的错误处理
func onceWithError() {
fmt.Println("
=== Once 错误处理 ===")
var once sync.Once
var initErr error
initialize := func() {
fmt.Println("初始化中...")
// 模拟可能失败的操作
if time.Now().Unix()%2 == 0 { // 随机失败
initErr = fmt.Errorf("初始化失败")
return
}
fmt.Println("初始化成功")
}
// 尝试初始化
once.Do(initialize)
if initErr != nil {
fmt.Printf("初始化错误: %v
", initErr)
// 注意:once 已经执行过了,即使失败了也不会再执行
}
// 再次调用,不会执行
once.Do(func() {
fmt.Println("这行不会打印")
})
}
func main() {
onceExample()
onceWithError()
}
5. Cond(条件变量)
package main
import (
"fmt"
"sync"
"time"
)
// 生产者-消费者模型使用 Cond
type Queue struct {
items []int
mu sync.Mutex
cond *sync.Cond
}
func NewQueue() *Queue {
q := &Queue{}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Enqueue(item int) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
fmt.Printf("生产: %d
", item)
// 通知一个等待的消费者
q.cond.Signal()
}
func (q *Queue) Dequeue() int {
q.mu.Lock()
defer q.mu.Unlock()
// 如果队列为空,等待
for len(q.items) == 0 {
fmt.Println("队列为空,等待中...")
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
fmt.Printf("消费: %d
", item)
return item
}
func condExample() {
fmt.Println("=== Cond 示例 ===")
queue := NewQueue()
var wg sync.WaitGroup
// 启动消费者(先于生产者启动)
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 2; j++ {
queue.Dequeue()
time.Sleep(200 * time.Millisecond)
}
}(i)
}
time.Sleep(100 * time.Millisecond) // 确保消费者先开始等待
// 启动生产者
for i := 1; i <= 6; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
queue.Enqueue(id)
}(i)
}
wg.Wait()
}
// Broadcast 示例
func broadcastExample() {
fmt.Println("
=== Cond.Broadcast 示例 ===")
var mu sync.Mutex
cond := sync.NewCond(&mu)
var ready bool
// 等待的 goroutines
for i := 1; i <= 5; i++ {
go func(id int) {
mu.Lock()
for !ready {
fmt.Printf("Goroutine %d: 等待条件满足
", id)
cond.Wait()
}
mu.Unlock()
fmt.Printf("Goroutine %d: 条件已满足,继续执行
", id)
}(i)
}
time.Sleep(1 * time.Second)
// 广播通知所有等待的 goroutines
fmt.Println("
广播通知所有等待者...")
mu.Lock()
ready = true
cond.Broadcast() // 唤醒所有等待的 goroutine
mu.Unlock()
time.Sleep(1 * time.Second)
}
func main() {
condExample()
broadcastExample()
}
6. Pool(对象池)
package main
import (
"fmt"
"sync"
"time"
)
// 数据库连接模拟
type DBConnection struct {
ID int
}
func (c *DBConnection) Query(sql string) string {
return fmt.Sprintf("连接 %d 执行: %s", c.ID, sql)
}
// 连接池
type ConnectionPool struct {
pool sync.Pool
mu sync.Mutex
}
func NewConnectionPool() *ConnectionPool {
id := 0
return &ConnectionPool{
pool: sync.Pool{
New: func() interface{} {
id++
return &DBConnection{ID: id}
},
},
}
}
func (p *ConnectionPool) Get() *DBConnection {
conn := p.pool.Get().(*DBConnection)
fmt.Printf("获取连接 %d
", conn.ID)
return conn
}
func (p *ConnectionPool) Put(conn *DBConnection) {
fmt.Printf("归还连接 %d
", conn.ID)
p.pool.Put(conn)
}
func poolExample() {
fmt.Println("=== Pool 示例 ===")
pool := NewConnectionPool()
var wg sync.WaitGroup
// 模拟多个 goroutine 使用连接
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 获取连接
conn := pool.Get()
// 使用连接
result := conn.Query(fmt.Sprintf("SELECT * FROM users WHERE id = %d", id))
fmt.Printf("Goroutine %d: %s
", id, result)
time.Sleep(100 * time.Millisecond)
// 归还连接
pool.Put(conn)
}(i)
}
wg.Wait()
// 查看池中的对象是否被复用
fmt.Println("
测试对象复用:")
conn1 := pool.Get()
pool.Put(conn1)
conn2 := pool.Get()
if conn1 == conn2 {
fmt.Println("✓ 连接被复用")
} else {
fmt.Println("✗ 创建了新连接")
}
}
func main() {
poolExample()
}
7. Map(并发安全 Map)
package main
import (
"fmt"
"sync"
"time"
)
func syncMapExample() {
fmt.Println("=== sync.Map 示例 ===")
var m sync.Map
var wg sync.WaitGroup
// 并发写入
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key%d", id%3) // 只有3个不同的key
value := fmt.Sprintf("value%d", id)
m.Store(key, value)
fmt.Printf("存储: %s -> %s
", key, value)
}(i)
}
wg.Wait()
fmt.Println("
所有写入完成")
// 读取
fmt.Println("
读取数据:")
m.Range(func(key, value interface{}) bool {
fmt.Printf(" %s: %s
", key, value)
return true // 继续迭代
})
// LoadOrStore
fmt.Println("
LoadOrStore 测试:")
// 如果 key 存在,返回现有值
if actual, loaded := m.LoadOrStore("key1", "newvalue1"); loaded {
fmt.Printf("key1 已存在: %s
", actual)
} else {
fmt.Printf("key1 不存在,已存储新值
")
}
// Delete
m.Delete("key2")
fmt.Println("
删除 key2 后:")
m.Range(func(key, value interface{}) bool {
fmt.Printf(" %s: %s
", key, value)
return true
})
}
// 与普通 map+mutex 对比
func compareWithRegularMap() {
fmt.Println("
=== sync.Map vs map+mutex 对比 ===")
// sync.Map
var syncMap sync.Map
start := time.Now()
var wg1 sync.WaitGroup
for i := 0; i < 10000; i++ {
wg1.Add(1)
go func(id int) {
defer wg1.Done()
syncMap.Store(id, id*2)
}(i)
}
wg1.Wait()
syncTime := time.Since(start)
// map + mutex
regularMap := make(map[int]int)
var mu sync.RWMutex
start = time.Now()
var wg2 sync.WaitGroup
for i := 0; i < 10000; i++ {
wg2.Add(1)
go func(id int) {
defer wg2.Done()
mu.Lock()
regularMap[id] = id * 2
mu.Unlock()
}(i)
}
wg2.Wait()
regularTime := time.Since(start)
fmt.Printf("sync.Map 耗时: %v
", syncTime)
fmt.Printf("map+mutex 耗时: %v
", regularTime)
}
func main() {
syncMapExample()
compareWithRegularMap()
}
四、综合实战案例
案例1:并发 Web 爬虫
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type Crawler struct {
visited sync.Map
semaphore chan struct{} // 限制并发数
mu sync.RWMutex
wg sync.WaitGroup
}
func NewCrawler(maxConcurrent int) *Crawler {
return &Crawler{
semaphore: make(chan struct{}, maxConcurrent),
}
}
func (c *Crawler) Crawl(url string, depth int) {
defer c.wg.Done()
if depth <= 0 {
return
}
// 检查是否已访问
if _, visited := c.visited.LoadOrStore(url, true); visited {
return
}
// 获取信号量(控制并发数)
c.semaphore <- struct{}{}
defer func() { <-c.semaphore }()
fmt.Printf("爬取: %s (深度: %d)
", url, depth)
// 模拟 HTTP 请求
resp, err := http.Get(url)
if err != nil {
fmt.Printf("错误: %v
", err)
return
}
defer resp.Body.Close()
// 模拟解析页面,找到链接
// 这里简化处理,实际应该解析 HTML
// 假设找到了一些链接
links := []string{
url + "/page1",
url + "/page2",
url + "/page3",
}
// 递归爬取子链接
for _, link := range links {
c.wg.Add(1)
go c.Crawl(link, depth-1)
}
}
func (c *Crawler) Wait() {
c.wg.Wait()
}
func main() {
fmt.Println("=== 并发 Web 爬虫 ===")
crawler := NewCrawler(5) // 最大并发数 5
startURLs := []string{
"https://example.com",
"https://example.org",
}
// 开始爬取
for _, url := range startURLs {
crawler.wg.Add(1)
go crawler.Crawl(url, 3)
}
// 等待所有爬取完成
crawler.Wait()
fmt.Println("爬取完成")
}
案例2:高性能并发计数器
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
// 方案1:使用 atomic(最快)
type AtomicCounter struct {
value int64
}
func (c *AtomicCounter) Increment() {
atomic.AddInt64(&c.value, 1)
}
func (c *AtomicCounter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
// 方案2:使用 sync.Mutex(最安全)
type MutexCounter struct {
mu sync.Mutex
value int64
}
func (c *MutexCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *MutexCounter) Value() int64 {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
// 方案3:使用 channel(最符合 Go 哲学)
type ChannelCounter struct {
incCh chan struct{}
getCh chan chan int64
value int64
}
func NewChannelCounter() *ChannelCounter {
c := &ChannelCounter{
incCh: make(chan struct{}, 1000),
getCh: make(chan chan int64),
}
// 启动管理 goroutine
go func() {
for {
select {
case <-c.incCh:
c.value++
case ch := <-c.getCh:
ch <- c.value
}
}
}()
return c
}
func (c *ChannelCounter) Increment() {
c.incCh <- struct{}{}
}
func (c *ChannelCounter) Value() int64 {
ch := make(chan int64)
c.getCh <- ch
return <-ch
}
func benchmarkCounter(name string, counter interface {
Increment()
Value() int64
}, goroutines, increments int) {
fmt.Printf("
=== %s 性能测试 ===
", name)
fmt.Printf("goroutines: %d, increments per goroutine: %d
", goroutines, increments)
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < increments; j++ {
counter.Increment()
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("最终值: %d (期望: %d)
", counter.Value(), goroutines*increments)
fmt.Printf("耗时: %v
", elapsed)
fmt.Printf("操作数/秒: %.0f
", float64(goroutines*increments)/elapsed.Seconds())
}
func main() {
// 测试配置
goroutines := 100
increments := 10000
// 测试各种计数器
benchmarkCounter("Atomic Counter", &AtomicCounter{}, goroutines, increments)
benchmarkCounter("Mutex Counter", &MutexCounter{}, goroutines, increments)
benchmarkCounter("Channel Counter", NewChannelCounter(), goroutines, increments)
}
案例3:并发任务调度器
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// 任务定义
type Task struct {
ID int
Name string
Weight int // 任务权重,模拟耗时
}
func (t Task) Execute() (string, error) {
// 模拟任务执行时间
time.Sleep(time.Duration(t.Weight) * 100 * time.Millisecond)
// 模拟可能失败
if rand.Float32() < 0.1 { // 10% 失败率
return "", fmt.Errorf("任务 %d 执行失败", t.ID)
}
return fmt.Sprintf("任务 %d 完成: %s", t.ID, t.Name), nil
}
// 任务调度器
type Scheduler struct {
taskQueue chan Task
resultChan chan string
errorChan chan error
workerCount int
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func NewScheduler(workerCount, queueSize int) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
taskQueue: make(chan Task, queueSize),
resultChan: make(chan string, queueSize),
errorChan: make(chan error, queueSize),
workerCount: workerCount,
ctx: ctx,
cancel: cancel,
}
}
func (s *Scheduler) Start() {
// 启动 worker
for i := 0; i < s.workerCount; i++ {
s.wg.Add(1)
go s.worker(i)
}
// 启动结果处理器
s.wg.Add(1)
go s.processResults()
}
func (s *Scheduler) worker(id int) {
defer s.wg.Done()
for {
select {
case task := <-s.taskQueue:
fmt.Printf("Worker %d 开始执行任务 %d
", id, task.ID)
result, err := task.Execute()
if err != nil {
s.errorChan <- err
} else {
s.resultChan <- result
}
case <-s.ctx.Done():
fmt.Printf("Worker %d 停止
", id)
return
}
}
}
func (s *Scheduler) processResults() {
defer s.wg.Done()
for {
select {
case result := <-s.resultChan:
fmt.Printf("结果: %s
", result)
case err := <-s.errorChan:
fmt.Printf("错误: %v
", err)
case <-s.ctx.Done():
fmt.Println("结果处理器停止")
return
}
}
}
func (s *Scheduler) Submit(task Task) {
select {
case s.taskQueue <- task:
fmt.Printf("提交任务 %d
", task.ID)
case <-s.ctx.Done():
fmt.Println("调度器已停止,无法提交任务")
}
}
func (s *Scheduler) Stop() {
fmt.Println("停止调度器...")
s.cancel()
s.wg.Wait()
close(s.taskQueue)
close(s.resultChan)
close(s.errorChan)
fmt.Println("调度器已停止")
}
func main() {
fmt.Println("=== 并发任务调度器 ===")
// 创建调度器
scheduler := NewScheduler(3, 10) // 3个worker,队列大小10
scheduler.Start()
// 提交任务
rand.Seed(time.Now().UnixNano())
for i := 1; i <= 20; i++ {
task := Task{
ID: i,
Name: fmt.Sprintf("任务-%d", i),
Weight: rand.Intn(5) + 1, // 1-5秒
}
scheduler.Submit(task)
}
// 等待一段时间
time.Sleep(5 * time.Second)
// 停止调度器
scheduler.Stop()
}
五、最佳实践与常见陷阱
1. Goroutine 泄漏检测
package main
import (
"fmt"
"runtime"
"time"
)
// 泄漏检测器
type LeakDetector struct {
startGoroutines int
}
func NewLeakDetector() *LeakDetector {
return &LeakDetector{
startGoroutines: runtime.NumGoroutine(),
}
}
func (ld *LeakDetector) Check(name string) {
current := runtime.NumGoroutine()
if current > ld.startGoroutines {
fmt.Printf("⚠️ 可能泄漏: %s (goroutines: %d -> %d)
",
name, ld.startGoroutines, current)
} else {
fmt.Printf("✓ 正常: %s (goroutines: %d)
", name, current)
}
}
// 常见的泄漏模式
func leakExample1() {
fmt.Println("=== 泄漏示例1: 无限循环的 goroutine ===")
ld := NewLeakDetector()
// 泄漏:启动后永远不会退出的 goroutine
go func() {
for {
time.Sleep(1 * time.Second)
// 没有退出条件
}
}()
time.Sleep(100 * time.Millisecond)
ld.Check("无限循环")
}
func leakExample2() {
fmt.Println("
=== 泄漏示例2: 阻塞的 channel ===")
ld := NewLeakDetector()
ch := make(chan int)
// 泄漏:发送到没有接收者的 channel
go func() {
ch <- 42 // 永远阻塞
}()
// 泄漏:从没有发送者的 channel 接收
go func() {
<-ch // 永远阻塞
}()
time.Sleep(100 * time.Millisecond)
ld.Check("阻塞 channel")
}
// 如何避免泄漏
func safePattern() {
fmt.Println("
=== 安全模式 ===")
ld := NewLeakDetector()
// 使用 context 控制 goroutine 生命周期
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return // 正确退出
case <-time.After(1 * time.Second):
// 正常工作
}
}
}(ctx)
time.Sleep(100 * time.Millisecond)
cancel() // 取消 context
time.Sleep(100 * time.Millisecond) // 给 goroutine 时间退出
ld.Check("使用 context")
}
func main() {
leakExample1()
leakExample2()
safePattern()
}
2. 死锁检测与避免
package main
import (
"fmt"
"time"
)
// 常见的死锁模式
func deadlockExample1() {
fmt.Println("=== 死锁示例1: 互相等待 ===")
var mu1, mu2 sync.Mutex
go func() {
mu1.Lock()
fmt.Println("Goroutine 1: 获取锁1")
time.Sleep(100 * time.Millisecond)
mu2.Lock() // 等待锁2
fmt.Println("Goroutine 1: 获取锁2")
mu2.Unlock()
mu1.Unlock()
}()
go func() {
mu2.Lock()
fmt.Println("Goroutine 2: 获取锁2")
time.Sleep(100 * time.Millisecond)
mu1.Lock() // 等待锁1
fmt.Println("Goroutine 2: 获取锁1")
mu1.Unlock()
mu2.Unlock()
}()
time.Sleep(1 * time.Second)
fmt.Println("死锁发生,程序可能卡住")
}
// 避免死锁的方法
func avoidDeadlock() {
fmt.Println("
=== 避免死锁的方法 ===")
var mu1, mu2 sync.Mutex
// 方法1:总是以相同的顺序获取锁
go func() {
mu1.Lock()
defer mu1.Unlock()
mu2.Lock()
defer mu2.Unlock()
fmt.Println("Goroutine 1: 完成")
}()
go func() {
mu1.Lock() // 同样是先锁1,再锁2
defer mu1.Unlock()
mu2.Lock()
defer mu2.Unlock()
fmt.Println("Goroutine 2: 完成")
}()
time.Sleep(1 * time.Second)
fmt.Println("没有死锁")
}
// 使用 timeout 避免死锁
func timeoutExample() {
fmt.Println("
=== 使用 Timeout ===")
var mu sync.Mutex
// 第一个 goroutine 获取锁
go func() {
mu.Lock()
fmt.Println("Goroutine 1: 获取锁")
time.Sleep(2 * time.Second) // 长时间持有锁
mu.Unlock()
fmt.Println("Goroutine 1: 释放锁")
}()
time.Sleep(100 * time.Millisecond) // 让第一个先运行
// 第二个 goroutine 尝试获取锁,但设置超时
go func() {
// 使用 channel 模拟带超时的锁获取
locked := make(chan bool, 1)
go func() {
mu.Lock()
locked <- true
}()
select {
case <-locked:
fmt.Println("Goroutine 2: 获取锁成功")
mu.Unlock()
case <-time.After(1 * time.Second):
fmt.Println("Goroutine 2: 获取锁超时")
}
}()
time.Sleep(3 * time.Second)
}
func main() {
// deadlockExample1() // 注释掉,因为会导致死锁
avoidDeadlock()
timeoutExample()
}
3. 性能优化技巧
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// 1. 减少锁竞争
func reduceLockContention() {
fmt.Println("=== 减少锁竞争 ===")
// 不好的做法:所有 goroutine 竞争同一个锁
var badCounter struct {
mu sync.Mutex
value int64
}
// 好的做法:使用分段锁
type GoodCounter struct {
segments [64]struct {
mu sync.Mutex
value int64
}
}
goodCounter := &GoodCounter{}
start := time.Now()
var wg sync.WaitGroup
// 测试不好的实现
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
badCounter.mu.Lock()
badCounter.value++
badCounter.mu.Unlock()
}()
}
wg.Wait()
badTime := time.Since(start)
// 测试好的实现
start = time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
segment := &goodCounter.segments[id%len(goodCounter.segments)]
segment.mu.Lock()
segment.value++
segment.mu.Unlock()
}(i)
}
wg.Wait()
goodTime := time.Since(start)
fmt.Printf("单锁实现: %v
", badTime)
fmt.Printf("分段锁实现: %v
", goodTime)
}
// 2. 使用 sync.Pool 减少内存分配
func usePool() {
fmt.Println("
=== 使用 sync.Pool ===")
type ExpensiveObject struct {
data [1024]byte
}
pool := &sync.Pool{
New: func() interface{} {
return &ExpensiveObject{}
},
}
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 从池中获取对象
obj := pool.Get().(*ExpensiveObject)
// 使用对象
obj.data[0] = byte(i % 256)
// 归还对象
pool.Put(obj)
}()
}
wg.Wait()
fmt.Printf("使用 Pool 耗时: %v
", time.Since(start))
}
// 3. 合理设置 GOMAXPROCS
func tuneGOMAXPROCS() {
fmt.Println("
=== 调整 GOMAXPROCS ===")
tasks := 1000
work := func(id int) {
// 模拟 CPU 密集型任务
sum := 0
for i := 0; i < 1000000; i++ {
sum += i % 256
}
}
// 测试不同 GOMAXPROCS 设置
for _, procs := range []int{1, 2, 4, 8, runtime.NumCPU()} {
runtime.GOMAXPROCS(procs)
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < tasks; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
work(id)
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("GOMAXPROCS=%d: %v (%.1f tasks/sec)
",
procs, elapsed, float64(tasks)/elapsed.Seconds())
}
}
func main() {
reduceLockContention()
usePool()
tuneGOMAXPROCS()
}
六、测试与调试
1. 竞争检测
package main
import (
"fmt"
"sync"
"testing"
"time"
)
// 有数据竞争的例子
func dataRaceExample() {
var counter int
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 这里存在数据竞争
}()
}
wg.Wait()
fmt.Printf("Counter: %d
", counter)
}
// 修复数据竞争
func fixedDataRace() {
var counter int
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counter++
mu.Unlock()
}()
}
wg.Wait()
fmt.Printf("Counter: %d
", counter)
}
// 使用 -race 标志进行测试
func TestDataRace(t *testing.T) {
// 运行测试: go test -race -v
dataRaceExample() // 这会触发竞争检测器
}
func main() {
fmt.Println("运行竞争检测:")
fmt.Println(" 编译: go build -race main.go")
fmt.Println(" 测试: go test -race ./...")
fmt.Println(" 运行: ./main")
// 测试修复后的版本
fixedDataRace()
}
2. 性能分析
package main
import (
"flag"
"fmt"
"log"
"net/http"
_ "net/http/pprof"
"os"
"runtime/pprof"
"sync"
"time"
)
var cpuprofile = flag.String("cpuprofile", "", "写入 cpu profile 到文件")
var memprofile = flag.String("memprofile", "", "写入 mem profile 到文件")
func heavyWork() {
time.Sleep(10 * time.Millisecond)
// 模拟一些计算
sum := 0
for i := 0; i < 1000000; i++ {
sum += i % 256
}
}
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
heavyWork()
results <- job * 2
}
}
func main() {
flag.Parse()
// CPU 性能分析
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
// 启动 pprof HTTP 服务器
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// 创建 worker pool
const numWorkers = 100
const numJobs = 1000
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 启动 workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送 jobs
for j := 0; j < numJobs; j++ {
jobs <- j
}
close(jobs)
// 收集结果
go func() {
wg.Wait()
close(results)
}()
// 读取结果
var total int
for result := range results {
total += result
}
fmt.Printf("Total: %d
", total)
// 内存性能分析
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
}
}
总结
关键要点
-
Goroutine:
- 轻量级,创建成本低
- 使用 context 管理生命周期
- 避免泄漏,确保能正常退出
-
Channel:
- 无缓冲 channel 用于同步
- 有缓冲 channel 用于解耦
- 使用 select 处理多个 channel
- 记住关闭 channel(发送方关闭)
-
sync 包:
- Mutex:保护共享资源
- RWMutex:读多写少的场景
- WaitGroup:等待一组 goroutine
- Once:确保一次性执行
- Pool:重用昂贵对象
- Map:并发安全的 map
最佳实践
- 通过通信共享内存,而不是通过共享内存来通信
- 优先使用 channel,其次考虑 sync 包
- 每个 goroutine 都应该有明确的退出路径
- 使用 -race 进行竞争检测
- 合理设置 GOMAXPROCS
- 监控 goroutine 数量,防止泄漏
调试技巧
- 使用
runtime.NumGoroutine()监控 goroutine 数量 - 使用
go test -race检测数据竞争 - 使用 pprof 进行性能分析
- 使用
GODEBUG=gctrace=1观察 GC 行为 - 使用
panic和recover处理 goroutine 中的异常
性能优化
- 减少锁竞争(使用分段锁、atomic)
- 使用 sync.Pool 减少内存分配
- 合理设置 channel 缓冲区大小
- 避免在热路径上使用 defer(在性能关键代码中)
- 使用 strings.Builder 或 bytes.Buffer 进行字符串拼接
通过掌握这些并发编程的技术和最佳实践,你将能够编写出高效、安全、可维护的 Go 并发程序。









