sync包
第16课:sync包 — 并发安全的守护者
生活类比
想象一个公共卫生间:
- Mutex(互斥锁):门上有一把锁,一个人进去就把门锁上,其他人必须等待。一次只能有一个人使用。
- RWMutex(读写锁):图书馆的阅览室——多人可以同时阅读(读锁),但如果有人要修改资料,必须等所有读者离开,并且独占房间(写锁)。
- Once(单次执行):公司入职时,工牌只发放一次。不管你问多少次,第一次办理后就不再重复。
- Pool(对象池):会议室的椅子——用完放回仓库,下次开会再拿出来用,而不是每次都买新的。
- sync.Map(并发Map):一个有多把钥匙的安全信箱,多个邮递员可以同时投递不同的格子,互不干扰。
核心概念
1. 为什么需要 sync 包?
Go 的 goroutine 轻量且强大,但多个 goroutine 同时访问共享数据时会产生竞态条件(Race Condition):
GO
// 危险!多个goroutine同时修改count
var count int
for i := 0; i < 1000; i++ {
go func() {
count++ // 数据竞争!
}()
}
sync 包提供了同步原语,确保并发安全。
2. 核心类型一览
| 类型 | 用途 | 特点 |
|---|---|---|
sync.Mutex |
互斥锁 | 同一时刻只有一个 goroutine 能持有 |
sync.RWMutex |
读写锁 | 多读单写,读操作不互斥 |
sync.WaitGroup |
等待组 | 等待一组 goroutine 完成(第15课已学) |
sync.Once |
单次执行 | 确保函数只执行一次 |
sync.Pool |
对象池 | 复用对象,减少内存分配 |
sync.Map |
并发安全Map | 无需加锁的并发安全键值存储 |
sync.Cond |
条件变量 | goroutine 间的条件通知 |
atomic 包 |
原子操作 | 最底层、最高效的并发安全操作 |
基本语法与用法
💡 互斥锁 Mutex
GO
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
count := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock() // 加锁
count++ // 安全地修改共享变量
mu.Unlock() // 解锁
}()
}
wg.Wait()
fmt.Println("count =", count) // 输出: count = 1000
}
💡 提示:使用
defer mu.Unlock() 可以确保即使函数 panic 也能释放锁,避免死锁。
💡 读写锁 RWMutex
GO
package main
import (
"fmt"
"sync"
)
func main() {
var rwmu sync.RWMutex
data := make(map[string]string)
var wg sync.WaitGroup
// 写操作:使用写锁
wg.Add(1)
go func() {
defer wg.Done()
rwmu.Lock() // 写锁:独占
data["key"] = "value"
rwmu.Unlock()
}()
// 读操作:使用读锁(可并发)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
rwmu.RLock() // 读锁:共享
_ = data["key"]
rwmu.RUnlock()
}()
}
wg.Wait()
fmt.Println("data:", data)
}
💡 提示:读多写少的场景用
RWMutex 性能更好;如果读写频率差不多,直接用 Mutex 更简单。
💡 单次执行 Once
GO
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
var wg sync.WaitGroup
setup := func() {
fmt.Println("初始化操作(只会执行一次)")
}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
once.Do(setup) // 只有第一个goroutine会执行setup
fmt.Printf("goroutine %d 完成\n", id)
}(i)
}
wg.Wait()
}
💡 提示:
Once 保证即使多个 goroutine 同时调用,传入的函数也只会执行一次。常用于单例初始化。
💡 对象池 Pool
GO
package main
import (
"fmt"
"sync"
)
func main() {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("创建新对象")
return make([]byte, 1024)
},
}
// 第一次Get:调用New创建
buf1 := pool.Get().([]byte)
fmt.Println("获取对象,长度:", len(buf1))
// 用完归还
pool.Put(buf1)
// 第二次Get:复用之前归还的对象
buf2 := pool.Get().([]byte)
fmt.Println("复用对象,长度:", len(buf2))
}
💡 提示:
Pool 中的对象可能在任意 GC 周期被回收,不要假设 Put 的对象一定能被 Get 到。
💡 原子操作 atomic
GO
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var count int64 = 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&count, 1) // 原子递增
}()
}
wg.Wait()
fmt.Println("count =", atomic.LoadInt64(&count)) // 原子读取
}
💡 提示:
atomic 操作比 Mutex 更轻量,适合简单的计数器、标志位等场景。
💡 竞态检测
BASH
go run -race main.go # 运行时检测竞态
go test -race ./... # 测试时检测竞态
💡 提示:开发阶段建议始终开启
-race,它能帮你发现隐藏的数据竞争问题。
实战示例
示例:线程安全的计数器(难度⭐)
一个封装了互斥锁的安全计数器:
GO
package main
import (
"fmt"
"sync"
)
// SafeCounter 线程安全的计数器
type SafeCounter struct {
mu sync.Mutex
count int
}
// Inc 递增计数
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
// Value 获取当前值
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func main() {
counter := &SafeCounter{}
var wg sync.WaitGroup
// 启动100个goroutine,每个递增100次
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
counter.Inc()
}
}()
}
wg.Wait()
fmt.Println("最终计数:", counter.Value()) // 输出: 最终计数: 10000
}
示例:使用 sync.Map 实现并发安全的缓存(难度⭐⭐)
GO
package main
import (
"fmt"
"sync"
)
// Cache 并发安全的缓存
type Cache struct {
store sync.Map
}
// Set 设置缓存
func (c *Cache) Set(key string, value interface{}) {
c.store.Store(key, value)
}
// Get 获取缓存
func (c *Cache) Get(key string) (interface{}, bool) {
return c.store.Load(key)
}
// Delete 删除缓存
func (c *Cache) Delete(key string) {
c.store.Delete(key)
}
// GetOrSet 如果不存在则设置(原子操作,避免重复计算)
func (c *Cache) GetOrSet(key string, factory func() interface{}) interface{} {
if val, ok := c.store.Load(key); ok {
return val
}
// 使用 LoadOrStore 保证原子性
val, _ := c.store.LoadOrStore(key, factory())
return val
}
func main() {
cache := &Cache{}
var wg sync.WaitGroup
// 并发写入
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", id)
cache.Set(key, id*10)
fmt.Printf("写入: %s = %d\n", key, id*10)
}(i)
}
wg.Wait()
// 并发读取
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", id)
if val, ok := cache.Get(key); ok {
fmt.Printf("读取: %s = %v\n", key, val)
}
}(i)
}
wg.Wait()
// 使用 GetOrSet 避免重复计算
result := cache.GetOrSet("computed", func() interface{} {
fmt.Println("执行复杂计算...")
return 42
})
fmt.Println("computed =", result)
// 再次获取,不会重复计算
result2 := cache.GetOrSet("computed", func() interface{} {
fmt.Println("这行不会执行")
return 99
})
fmt.Println("computed =", result2)
}
示例:带超时的工作池(综合运用 sync)(难度⭐⭐⭐)
GO
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Task 任务
type Task struct {
ID int
Data string
}
// Result 结果
type Result struct {
TaskID int
Output string
WorkerID int
Duration time.Duration
}
// WorkerPool 工作池
type WorkerPool struct {
workerCount int
taskCh chan Task
resultCh chan Result
wg sync.WaitGroup
processed int64 // 原子操作的计数器
errors int64
once sync.Once // 确保只初始化一次
pool sync.Pool // 复用 Result 对象
}
// NewWorkerPool 创建工作池
func NewWorkerPool(workerCount, taskBufferSize int) *WorkerPool {
wp := &WorkerPool{
workerCount: workerCount,
taskCh: make(chan Task, taskBufferSize),
resultCh: make(chan Result, taskBufferSize),
}
// 初始化对象池
wp.pool = sync.Pool{
New: func() interface{} {
return &Result{}
},
}
return wp
}
// Start 启动工作池(只执行一次)
func (wp *WorkerPool) Start(ctx context.Context) {
wp.once.Do(func() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(ctx, i)
}
fmt.Printf("工作池已启动 %d 个 worker\n", wp.workerCount)
})
}
// worker 工作者goroutine
func (wp *WorkerPool) worker(ctx context.Context, id int) {
defer wp.wg.Done()
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: 收到退出信号\n", id)
return
case task, ok := <-wp.taskCh:
if !ok {
fmt.Printf("Worker %d: 任务通道已关闭\n", id)
return
}
// 从对象池获取Result
result := wp.pool.Get().(*Result)
result.TaskID = task.ID
result.WorkerID = id
// 模拟处理任务
start := time.Now()
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
result.Output = fmt.Sprintf("处理完成: %s", task.Data)
result.Duration = time.Since(start)
atomic.AddInt64(&wp.processed, 1)
wp.resultCh <- *result
// 归还Result对象到池中(注意:这里发送的是值拷贝)
wp.pool.Put(result)
}
}
}
// Submit 提交任务
func (wp *WorkerPool) Submit(task Task) {
wp.taskCh <- task
}
// Close 关闭工作池
func (wp *WorkerPool) Close() {
close(wp.taskCh)
wp.wg.Wait()
close(wp.resultCh)
}
// Stats 获取统计信息
func (wp *WorkerPool) Stats() (processed, errors int64) {
return atomic.LoadInt64(&wp.processed), atomic.LoadInt64(&wp.errors)
}
func main() {
rand.Seed(time.Now().UnixNano())
// 创建带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 创建工作池
pool := NewWorkerPool(3, 20)
pool.Start(ctx)
// 提交任务
var submitWg sync.WaitGroup
submitWg.Add(1)
go func() {
defer submitWg.Done()
for i := 1; i <= 10; i++ {
pool.Submit(Task{
ID: i,
Data: fmt.Sprintf("任务数据-%d", i),
})
fmt.Printf("提交任务 #%d\n", i)
}
}()
// 收集结果
var collectWg sync.WaitGroup
collectWg.Add(1)
go func() {
defer collectWg.Done()
for result := range pool.resultCh {
fmt.Printf("任务#%d 由Worker%d处理,耗时%v -> %s\n",
result.TaskID, result.WorkerID, result.Duration, result.Output)
}
}()
// 等待任务提交完成
submitWg.Wait()
// 关闭工作池
pool.Close()
// 等待结果收集完成
collectWg.Wait()
// 输出统计
processed, _ := pool.Stats()
fmt.Printf("\n统计: 共处理 %d 个任务\n", processed)
}
实际应用场景
场景1:并发HTTP请求的限流控制
在爬虫或API调用中,需要限制并发数避免被封禁:
GO
package main
import (
"fmt"
"sync"
"time"
)
// RateLimiter 简单的并发限流器
type RateLimiter struct {
sem chan struct{}
mu sync.Mutex
active int
maxConc int
}
// NewRateLimiter 创建限流器,maxConc为最大并发数
func NewRateLimiter(maxConc int) *RateLimiter {
return &RateLimiter{
sem: make(chan struct{}, maxConc),
maxConc: maxConc,
}
}
// Acquire 获取许可
func (r *RateLimiter) Acquire() {
r.sem <- struct{}{}
r.mu.Lock()
r.active++
r.mu.Unlock()
}
// Release 释放许可
func (r *RateLimiter) Release() {
<-r.sem
r.mu.Lock()
r.active--
r.mu.Unlock()
}
// ActiveCount 当前活跃数
func (r *RateLimiter) ActiveCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.active
}
func main() {
limiter := NewRateLimiter(3) // 最多3个并发
var wg sync.WaitGroup
urls := []string{
"https://api.example.com/page1",
"https://api.example.com/page2",
"https://api.example.com/page3",
"https://api.example.com/page4",
"https://api.example.com/page5",
"https://api.example.com/page6",
}
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
limiter.Acquire() // 获取许可(超过3个时会阻塞)
defer limiter.Release() // 释放许可
fmt.Printf("[并发数:%d] 开始请求: %s\n", limiter.ActiveCount(), u)
time.Sleep(time.Duration(100+len(u)*10) * time.Millisecond) // 模拟请求
fmt.Printf("[并发数:%d] 完成请求: %s\n", limiter.ActiveCount(), u)
}(url)
}
wg.Wait()
fmt.Println("所有请求已完成")
}
场景2:配置的懒加载(单例模式)
使用 sync.Once 确保配置只加载一次:
GO
package main
import (
"fmt"
"sync"
)
// Config 应用配置
type Config struct {
DatabaseURL string
APIKey string
MaxRetries int
}
var (
config *Config
once sync.Once
)
// GetConfig 获取配置(懒加载,只初始化一次)
func GetConfig() *Config {
once.Do(func() {
fmt.Println("正在加载配置(只会执行一次)...")
// 模拟从文件或环境变量加载配置
config = &Config{
DatabaseURL: "postgres://localhost:5432/mydb",
APIKey: "sk-xxxxxxxxxxxx",
MaxRetries: 3,
}
})
return config
}
func main() {
var wg sync.WaitGroup
// 多个goroutine同时获取配置
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
cfg := GetConfig()
fmt.Printf("Goroutine %d: DB=%s, Retries=%d\n",
id, cfg.DatabaseURL, cfg.MaxRetries)
}(i)
}
wg.Wait()
fmt.Println("配置加载完成")
}
❓ 常见问题
Q1:Mutex 和 RWMutex 该怎么选?
| 场景 | 推荐 |
|---|---|
| 读多写少(如缓存) | RWMutex — 多个读操作可以并发 |
| 读写均衡或写多 | Mutex — 更简单,开销更低 |
| 简单计数器 | atomic — 最轻量 |
GO
// ❌ 读多写少场景用Mutex,性能差
var mu sync.Mutex
mu.Lock()
val := cache[key] // 读操作也互斥
mu.Unlock()
// ✅ 用RWMutex,读操作可以并发
var rwmu sync.RWMutex
rwmu.RLock()
val := cache[key] // 多个goroutine可以同时读
rwmu.RUnlock()
Q2:sync.Map 和加锁的 map 怎么选?
GO
// 场景1:键值对相对固定,读多写少 -> sync.Map
var m sync.Map
m.Store("key", "value")
val, _ := m.Load("key")
// 场景2:频繁增删、需要遍历 -> 加锁的map
type SafeMap struct {
mu sync.RWMutex
m map[string]string
}
// sync.Map 适合的场景:
// 1. 键只写入一次,但读取多次(如缓存)
// 2. 多个goroutine读写不同的键(无交集)
Q3:Pool 中的对象什么时候会被回收?
Pool 中的对象可能在任意 GC 周期被清除。不要将 Pool 用作长期存储:
GO
// ❌ 错误用法:把Pool当缓存
pool := &sync.Pool{New: func() interface{} { return expensiveObject() }}
obj := pool.Get()
// ... 使用
pool.Put(obj)
// 下次GC后,obj可能已被回收
// ✅ 正确用法:复用临时对象,减少分配
pool := &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 4096) // 预分配buffer
},
}
buf := pool.Get().([]byte)[:0] // 获取并重置
// ... 使用buf
pool.Put(buf) // 归还
Q4:如何避免死锁?
GO
// ❌ 死锁:同一个goroutine重复加锁
var mu sync.Mutex
mu.Lock()
mu.Lock() // 永远阻塞!死锁!
// ✅ 方案1:使用defer确保释放
mu.Lock()
defer mu.Unlock()
// ... 即使panic也会释放
// ✅ 方案2:注意加锁顺序,避免交叉锁
// 两个goroutine分别持有A锁等B锁、B锁等A锁 -> 死锁
// 解决:统一按 A->B 的顺序加锁
// ✅ 方案3:使用带超时的锁(Go 1.18+推荐用context)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 配合select使用channel实现超时控制
📖 小节
| 概念 | 关键点 |
|---|---|
sync.Mutex |
互斥锁,一次只允许一个goroutine持有 |
sync.RWMutex |
读写锁,多读单写,适合读多写少 |
sync.Once |
保证函数只执行一次,适合单例/初始化 |
sync.Pool |
对象池,复用临时对象,减少GC压力 |
sync.Map |
并发安全Map,特定场景优于加锁map |
atomic |
原子操作,最轻量的并发安全方案 |
-race |
竞态检测器,开发阶段必须开启 |
| 死锁预防 | defer Unlock、统一加锁顺序、避免嵌套锁 |
选择原则:
- 简单计数器/标志位 →
atomic - 通用互斥 →
Mutex - 读多写少 →
RWMutex - 单次初始化 →
Once - 对象复用 →
Pool - 并发读写键值 →
sync.Map
📝 作业
练习1:实现一个线程安全的栈
实现一个支持 Push、Pop、Size 操作的并发安全栈(LIFO)。
GO
package main
import (
"errors"
"fmt"
"sync"
)
// ThreadSafeStack 并发安全的栈
type ThreadSafeStack struct {
// 你的代码:
// - 选择合适的数据结构
// - 选择合适的同步原语
}
// NewStack 创建新栈
func NewStack() *ThreadSafeStack {
// 你的代码
return nil
}
// Push 入栈
func (s *ThreadSafeStack) Push(val int) {
// 你的代码
}
// Pop 出栈(栈为空时返回错误)
func (s *ThreadSafeStack) Pop() (int, error) {
// 你的代码
return 0, errors.New("栈为空")
}
// Size 获取栈中元素数量
func (s *ThreadSafeStack) Size() int {
// 你的代码
return 0
}
func main() {
stack := NewStack()
var wg sync.WaitGroup
// 并发入栈
for i := 0; i < 100; i++ {
wg.Add(1)
go func(val int) {
defer wg.Done()
stack.Push(val)
}(i)
}
wg.Wait()
fmt.Println("栈大小:", stack.Size()) // 应输出 100
// 并发出栈
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if val, err := stack.Pop(); err == nil {
fmt.Println("弹出:", val)
}
}()
}
wg.Wait()
fmt.Println("剩余大小:", stack.Size()) // 应输出 50
}
练习2:实现带过期的缓存
使用 sync.RWMutex 实现一个支持 TTL(过期时间)的缓存:
GO
package main
import (
"fmt"
"sync"
"time"
)
// TTLCache 带过期时间的缓存
type TTLCache struct {
// 你的代码:
// - 存储结构
// - 同步锁
// - TTL设置
}
// NewTTLCache 创建缓存,ttl为过期时间
func NewTTLCache(ttl time.Duration) *TTLCache {
// 你的代码
return nil
}
// Set 设置键值对
func (c *TTLCache) Set(key string, value interface{}) {
// 你的代码
}
// Get 获取值(已过期返回false)
func (c *TTLCache) Get(key string) (interface{}, bool) {
// 你的代码
return nil, false
}
// Delete 删除键
func (c *TTLCache) Delete(key string) {
// 你的代码
}
// Size 返回有效键值对数量
func (c *TTLCache) Size() int {
// 你的代码
return 0
}
func main() {
cache := NewTTLCache(2 * time.Second)
cache.Set("name", "Go语言")
cache.Set("version", "1.21")
if val, ok := cache.Get("name"); ok {
fmt.Println("name =", val)
}
fmt.Println("缓存大小:", cache.Size())
time.Sleep(3 * time.Second)
if _, ok := cache.Get("name"); !ok {
fmt.Println("name 已过期")
}
fmt.Println("过期后大小:", cache.Size())
}
练习3:并行Map-Reduce
使用 sync 包实现一个简单的并行 Map-Reduce:
GO
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// ParallelMapReduce 并行处理数据并汇总结果
// 参数:
// - data: 输入数据切片
// - mapFn: 映射函数,将输入转为数值
// - reduceFn: 归约函数,将两个结果合并
//
// 要求:
// - 将数据分成多段,每段由一个goroutine处理
// - 使用sync.WaitGroup等待所有goroutine完成
// - 使用sync.Mutex保护归约结果
// - 使用atomic统计处理的元素总数
func ParallelMapReduce(
data []int,
mapFn func(int) int,
reduceFn func(int, int) int,
) int {
// 你的代码:
// 1. 确定分段数量(建议使用4段)
// 2. 启动goroutine处理每段
// 3. mapFn处理每个元素
// 4. reduceFn合并各段结果
// 5. 返回最终结果
return 0
}
func main() {
data := make([]int, 100)
for i := range data {
data[i] = i + 1 // 1到100
}
// 计算所有元素的平方和
result := ParallelMapReduce(
data,
func(x int) int { return x * x }, // map: 求平方
func(a, b int) int { return a + b }, // reduce: 求和
)
fmt.Println("1² + 2² + 3² + ... + 100² =", result)
// 预期输出: 338350
}
下一课
学完了 sync 包,你已经掌握了 Go 并发编程的核心工具。接下来,我们将通过综合练习来巩固所学知识。



