404 Not Found

404 Not Found


nginx

sync包

第16课:sync包 — 并发安全的守护者

生活类比

想象一个公共卫生间


核心概念

1. 为什么需要 sync 包?

Go 的 goroutine 轻量且强大,但多个 goroutine 同时访问共享数据时会产生竞态条件(Race Condition)

GO
// 危险!多个goroutine同时修改count
var count int
for i := 0; i < 1000; i++ {
    go func() {
        count++ // 数据竞争!
    }()
}

sync 包提供了同步原语,确保并发安全。

2. 核心类型一览

类型 用途 特点
sync.Mutex 互斥锁 同一时刻只有一个 goroutine 能持有
sync.RWMutex 读写锁 多读单写,读操作不互斥
sync.WaitGroup 等待组 等待一组 goroutine 完成(第15课已学)
sync.Once 单次执行 确保函数只执行一次
sync.Pool 对象池 复用对象,减少内存分配
sync.Map 并发安全Map 无需加锁的并发安全键值存储
sync.Cond 条件变量 goroutine 间的条件通知
atomic 原子操作 最底层、最高效的并发安全操作

基本语法与用法

💡 互斥锁 Mutex

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Mutex
    count := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()         // 加锁
            count++           // 安全地修改共享变量
            mu.Unlock()       // 解锁
        }()
    }

    wg.Wait()
    fmt.Println("count =", count) // 输出: count = 1000
}
💡 提示:使用 defer mu.Unlock() 可以确保即使函数 panic 也能释放锁,避免死锁。

💡 读写锁 RWMutex

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    var rwmu sync.RWMutex
    data := make(map[string]string)
    var wg sync.WaitGroup

    // 写操作:使用写锁
    wg.Add(1)
    go func() {
        defer wg.Done()
        rwmu.Lock() // 写锁:独占
        data["key"] = "value"
        rwmu.Unlock()
    }()

    // 读操作:使用读锁(可并发)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            rwmu.RLock() // 读锁:共享
            _ = data["key"]
            rwmu.RUnlock()
        }()
    }

    wg.Wait()
    fmt.Println("data:", data)
}
💡 提示:读多写少的场景用 RWMutex 性能更好;如果读写频率差不多,直接用 Mutex 更简单。

💡 单次执行 Once

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once
    var wg sync.WaitGroup

    setup := func() {
        fmt.Println("初始化操作(只会执行一次)")
    }

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            once.Do(setup) // 只有第一个goroutine会执行setup
            fmt.Printf("goroutine %d 完成\n", id)
        }(i)
    }

    wg.Wait()
}
💡 提示Once 保证即使多个 goroutine 同时调用,传入的函数也只会执行一次。常用于单例初始化。

💡 对象池 Pool

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    pool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("创建新对象")
            return make([]byte, 1024)
        },
    }

    // 第一次Get:调用New创建
    buf1 := pool.Get().([]byte)
    fmt.Println("获取对象,长度:", len(buf1))

    // 用完归还
    pool.Put(buf1)

    // 第二次Get:复用之前归还的对象
    buf2 := pool.Get().([]byte)
    fmt.Println("复用对象,长度:", len(buf2))
}
💡 提示Pool 中的对象可能在任意 GC 周期被回收,不要假设 Put 的对象一定能被 Get 到。

💡 原子操作 atomic

GO
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var count int64 = 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&count, 1) // 原子递增
        }()
    }

    wg.Wait()
    fmt.Println("count =", atomic.LoadInt64(&count)) // 原子读取
}
💡 提示atomic 操作比 Mutex 更轻量,适合简单的计数器、标志位等场景。

💡 竞态检测

BASH
go run -race main.go    # 运行时检测竞态
go test -race ./...     # 测试时检测竞态
💡 提示:开发阶段建议始终开启 -race,它能帮你发现隐藏的数据竞争问题。


实战示例

示例:线程安全的计数器(难度⭐)

一个封装了互斥锁的安全计数器:

GO
package main

import (
    "fmt"
    "sync"
)

// SafeCounter 线程安全的计数器
type SafeCounter struct {
    mu    sync.Mutex
    count int
}

// Inc 递增计数
func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

// Value 获取当前值
func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := &SafeCounter{}
    var wg sync.WaitGroup

    // 启动100个goroutine,每个递增100次
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                counter.Inc()
            }
        }()
    }

    wg.Wait()
    fmt.Println("最终计数:", counter.Value()) // 输出: 最终计数: 10000
}
▶ 试一试

示例:使用 sync.Map 实现并发安全的缓存(难度⭐⭐)

GO
package main

import (
    "fmt"
    "sync"
)

// Cache 并发安全的缓存
type Cache struct {
    store sync.Map
}

// Set 设置缓存
func (c *Cache) Set(key string, value interface{}) {
    c.store.Store(key, value)
}

// Get 获取缓存
func (c *Cache) Get(key string) (interface{}, bool) {
    return c.store.Load(key)
}

// Delete 删除缓存
func (c *Cache) Delete(key string) {
    c.store.Delete(key)
}

// GetOrSet 如果不存在则设置(原子操作,避免重复计算)
func (c *Cache) GetOrSet(key string, factory func() interface{}) interface{} {
    if val, ok := c.store.Load(key); ok {
        return val
    }
    // 使用 LoadOrStore 保证原子性
    val, _ := c.store.LoadOrStore(key, factory())
    return val
}

func main() {
    cache := &Cache{}
    var wg sync.WaitGroup

    // 并发写入
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", id)
            cache.Set(key, id*10)
            fmt.Printf("写入: %s = %d\n", key, id*10)
        }(i)
    }

    wg.Wait()

    // 并发读取
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", id)
            if val, ok := cache.Get(key); ok {
                fmt.Printf("读取: %s = %v\n", key, val)
            }
        }(i)
    }

    wg.Wait()

    // 使用 GetOrSet 避免重复计算
    result := cache.GetOrSet("computed", func() interface{} {
        fmt.Println("执行复杂计算...")
        return 42
    })
    fmt.Println("computed =", result)

    // 再次获取,不会重复计算
    result2 := cache.GetOrSet("computed", func() interface{} {
        fmt.Println("这行不会执行")
        return 99
    })
    fmt.Println("computed =", result2)
}
▶ 试一试

示例:带超时的工作池(综合运用 sync)(难度⭐⭐⭐)

GO
package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

// Task 任务
type Task struct {
    ID   int
    Data string
}

// Result 结果
type Result struct {
    TaskID   int
    Output   string
    WorkerID int
    Duration time.Duration
}

// WorkerPool 工作池
type WorkerPool struct {
    workerCount int
    taskCh      chan Task
    resultCh    chan Result
    wg          sync.WaitGroup
    processed   int64 // 原子操作的计数器
    errors      int64
    once        sync.Once // 确保只初始化一次
    pool        sync.Pool // 复用 Result 对象
}

// NewWorkerPool 创建工作池
func NewWorkerPool(workerCount, taskBufferSize int) *WorkerPool {
    wp := &WorkerPool{
        workerCount: workerCount,
        taskCh:      make(chan Task, taskBufferSize),
        resultCh:    make(chan Result, taskBufferSize),
    }

    // 初始化对象池
    wp.pool = sync.Pool{
        New: func() interface{} {
            return &Result{}
        },
    }

    return wp
}

// Start 启动工作池(只执行一次)
func (wp *WorkerPool) Start(ctx context.Context) {
    wp.once.Do(func() {
        for i := 0; i < wp.workerCount; i++ {
            wp.wg.Add(1)
            go wp.worker(ctx, i)
        }
        fmt.Printf("工作池已启动 %d 个 worker\n", wp.workerCount)
    })
}

// worker 工作者goroutine
func (wp *WorkerPool) worker(ctx context.Context, id int) {
    defer wp.wg.Done()

    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: 收到退出信号\n", id)
            return
        case task, ok := <-wp.taskCh:
            if !ok {
                fmt.Printf("Worker %d: 任务通道已关闭\n", id)
                return
            }
            // 从对象池获取Result
            result := wp.pool.Get().(*Result)
            result.TaskID = task.ID
            result.WorkerID = id

            // 模拟处理任务
            start := time.Now()
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            result.Output = fmt.Sprintf("处理完成: %s", task.Data)
            result.Duration = time.Since(start)

            atomic.AddInt64(&wp.processed, 1)
            wp.resultCh <- *result

            // 归还Result对象到池中(注意:这里发送的是值拷贝)
            wp.pool.Put(result)
        }
    }
}

// Submit 提交任务
func (wp *WorkerPool) Submit(task Task) {
    wp.taskCh <- task
}

// Close 关闭工作池
func (wp *WorkerPool) Close() {
    close(wp.taskCh)
    wp.wg.Wait()
    close(wp.resultCh)
}

// Stats 获取统计信息
func (wp *WorkerPool) Stats() (processed, errors int64) {
    return atomic.LoadInt64(&wp.processed), atomic.LoadInt64(&wp.errors)
}

func main() {
    rand.Seed(time.Now().UnixNano())

    // 创建带超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 创建工作池
    pool := NewWorkerPool(3, 20)
    pool.Start(ctx)

    // 提交任务
    var submitWg sync.WaitGroup
    submitWg.Add(1)
    go func() {
        defer submitWg.Done()
        for i := 1; i <= 10; i++ {
            pool.Submit(Task{
                ID:   i,
                Data: fmt.Sprintf("任务数据-%d", i),
            })
            fmt.Printf("提交任务 #%d\n", i)
        }
    }()

    // 收集结果
    var collectWg sync.WaitGroup
    collectWg.Add(1)
    go func() {
        defer collectWg.Done()
        for result := range pool.resultCh {
            fmt.Printf("任务#%d 由Worker%d处理,耗时%v -> %s\n",
                result.TaskID, result.WorkerID, result.Duration, result.Output)
        }
    }()

    // 等待任务提交完成
    submitWg.Wait()

    // 关闭工作池
    pool.Close()

    // 等待结果收集完成
    collectWg.Wait()

    // 输出统计
    processed, _ := pool.Stats()
    fmt.Printf("\n统计: 共处理 %d 个任务\n", processed)
}
▶ 试一试

实际应用场景

场景1:并发HTTP请求的限流控制

在爬虫或API调用中,需要限制并发数避免被封禁:

GO
package main

import (
    "fmt"
    "sync"
    "time"
)

// RateLimiter 简单的并发限流器
type RateLimiter struct {
    sem     chan struct{}
    mu      sync.Mutex
    active  int
    maxConc int
}

// NewRateLimiter 创建限流器,maxConc为最大并发数
func NewRateLimiter(maxConc int) *RateLimiter {
    return &RateLimiter{
        sem:     make(chan struct{}, maxConc),
        maxConc: maxConc,
    }
}

// Acquire 获取许可
func (r *RateLimiter) Acquire() {
    r.sem <- struct{}{}
    r.mu.Lock()
    r.active++
    r.mu.Unlock()
}

// Release 释放许可
func (r *RateLimiter) Release() {
    <-r.sem
    r.mu.Lock()
    r.active--
    r.mu.Unlock()
}

// ActiveCount 当前活跃数
func (r *RateLimiter) ActiveCount() int {
    r.mu.Lock()
    defer r.mu.Unlock()
    return r.active
}

func main() {
    limiter := NewRateLimiter(3) // 最多3个并发
    var wg sync.WaitGroup

    urls := []string{
        "https://api.example.com/page1",
        "https://api.example.com/page2",
        "https://api.example.com/page3",
        "https://api.example.com/page4",
        "https://api.example.com/page5",
        "https://api.example.com/page6",
    }

    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()

            limiter.Acquire()        // 获取许可(超过3个时会阻塞)
            defer limiter.Release()  // 释放许可

            fmt.Printf("[并发数:%d] 开始请求: %s\n", limiter.ActiveCount(), u)
            time.Sleep(time.Duration(100+len(u)*10) * time.Millisecond) // 模拟请求
            fmt.Printf("[并发数:%d] 完成请求: %s\n", limiter.ActiveCount(), u)
        }(url)
    }

    wg.Wait()
    fmt.Println("所有请求已完成")
}

场景2:配置的懒加载(单例模式)

使用 sync.Once 确保配置只加载一次:

GO
package main

import (
    "fmt"
    "sync"
)

// Config 应用配置
type Config struct {
    DatabaseURL string
    APIKey      string
    MaxRetries  int
}

var (
    config *Config
    once   sync.Once
)

// GetConfig 获取配置(懒加载,只初始化一次)
func GetConfig() *Config {
    once.Do(func() {
        fmt.Println("正在加载配置(只会执行一次)...")
        // 模拟从文件或环境变量加载配置
        config = &Config{
            DatabaseURL: "postgres://localhost:5432/mydb",
            APIKey:      "sk-xxxxxxxxxxxx",
            MaxRetries:  3,
        }
    })
    return config
}

func main() {
    var wg sync.WaitGroup

    // 多个goroutine同时获取配置
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            cfg := GetConfig()
            fmt.Printf("Goroutine %d: DB=%s, Retries=%d\n",
                id, cfg.DatabaseURL, cfg.MaxRetries)
        }(i)
    }

    wg.Wait()
    fmt.Println("配置加载完成")
}

❓ 常见问题

Q1:Mutex 和 RWMutex 该怎么选?

场景 推荐
读多写少(如缓存) RWMutex — 多个读操作可以并发
读写均衡或写多 Mutex — 更简单,开销更低
简单计数器 atomic — 最轻量
GO
// ❌ 读多写少场景用Mutex,性能差
var mu sync.Mutex
mu.Lock()
val := cache[key] // 读操作也互斥
mu.Unlock()

// ✅ 用RWMutex,读操作可以并发
var rwmu sync.RWMutex
rwmu.RLock()
val := cache[key] // 多个goroutine可以同时读
rwmu.RUnlock()

Q2:sync.Map 和加锁的 map 怎么选?

GO
// 场景1:键值对相对固定,读多写少 -> sync.Map
var m sync.Map
m.Store("key", "value")
val, _ := m.Load("key")

// 场景2:频繁增删、需要遍历 -> 加锁的map
type SafeMap struct {
    mu sync.RWMutex
    m  map[string]string
}

// sync.Map 适合的场景:
// 1. 键只写入一次,但读取多次(如缓存)
// 2. 多个goroutine读写不同的键(无交集)

Q3:Pool 中的对象什么时候会被回收?

Pool 中的对象可能在任意 GC 周期被清除。不要将 Pool 用作长期存储:

GO
// ❌ 错误用法:把Pool当缓存
pool := &sync.Pool{New: func() interface{} { return expensiveObject() }}
obj := pool.Get()
// ... 使用
pool.Put(obj)
// 下次GC后,obj可能已被回收

// ✅ 正确用法:复用临时对象,减少分配
pool := &sync.Pool{
    New: func() interface{} {
        return make([]byte, 0, 4096) // 预分配buffer
    },
}
buf := pool.Get().([]byte)[:0] // 获取并重置
// ... 使用buf
pool.Put(buf) // 归还

Q4:如何避免死锁?

GO
// ❌ 死锁:同一个goroutine重复加锁
var mu sync.Mutex
mu.Lock()
mu.Lock() // 永远阻塞!死锁!

// ✅ 方案1:使用defer确保释放
mu.Lock()
defer mu.Unlock()
// ... 即使panic也会释放

// ✅ 方案2:注意加锁顺序,避免交叉锁
// 两个goroutine分别持有A锁等B锁、B锁等A锁 -> 死锁
// 解决:统一按 A->B 的顺序加锁

// ✅ 方案3:使用带超时的锁(Go 1.18+推荐用context)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// 配合select使用channel实现超时控制

📖 小节

概念 关键点
sync.Mutex 互斥锁,一次只允许一个goroutine持有
sync.RWMutex 读写锁,多读单写,适合读多写少
sync.Once 保证函数只执行一次,适合单例/初始化
sync.Pool 对象池,复用临时对象,减少GC压力
sync.Map 并发安全Map,特定场景优于加锁map
atomic 原子操作,最轻量的并发安全方案
-race 竞态检测器,开发阶段必须开启
死锁预防 defer Unlock、统一加锁顺序、避免嵌套锁

选择原则

  1. 简单计数器/标志位 → atomic
  2. 通用互斥 → Mutex
  3. 读多写少 → RWMutex
  4. 单次初始化 → Once
  5. 对象复用 → Pool
  6. 并发读写键值 → sync.Map

📝 作业

练习1:实现一个线程安全的栈

实现一个支持 PushPopSize 操作的并发安全栈(LIFO)。

GO
package main

import (
    "errors"
    "fmt"
    "sync"
)

// ThreadSafeStack 并发安全的栈
type ThreadSafeStack struct {
    // 你的代码:
    // - 选择合适的数据结构
    // - 选择合适的同步原语
}

// NewStack 创建新栈
func NewStack() *ThreadSafeStack {
    // 你的代码
    return nil
}

// Push 入栈
func (s *ThreadSafeStack) Push(val int) {
    // 你的代码
}

// Pop 出栈(栈为空时返回错误)
func (s *ThreadSafeStack) Pop() (int, error) {
    // 你的代码
    return 0, errors.New("栈为空")
}

// Size 获取栈中元素数量
func (s *ThreadSafeStack) Size() int {
    // 你的代码
    return 0
}

func main() {
    stack := NewStack()
    var wg sync.WaitGroup

    // 并发入栈
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(val int) {
            defer wg.Done()
            stack.Push(val)
        }(i)
    }
    wg.Wait()

    fmt.Println("栈大小:", stack.Size()) // 应输出 100

    // 并发出栈
    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if val, err := stack.Pop(); err == nil {
                fmt.Println("弹出:", val)
            }
        }()
    }
    wg.Wait()

    fmt.Println("剩余大小:", stack.Size()) // 应输出 50
}

练习2:实现带过期的缓存

使用 sync.RWMutex 实现一个支持 TTL(过期时间)的缓存:

GO
package main

import (
    "fmt"
    "sync"
    "time"
)

// TTLCache 带过期时间的缓存
type TTLCache struct {
    // 你的代码:
    // - 存储结构
    // - 同步锁
    // - TTL设置
}

// NewTTLCache 创建缓存,ttl为过期时间
func NewTTLCache(ttl time.Duration) *TTLCache {
    // 你的代码
    return nil
}

// Set 设置键值对
func (c *TTLCache) Set(key string, value interface{}) {
    // 你的代码
}

// Get 获取值(已过期返回false)
func (c *TTLCache) Get(key string) (interface{}, bool) {
    // 你的代码
    return nil, false
}

// Delete 删除键
func (c *TTLCache) Delete(key string) {
    // 你的代码
}

// Size 返回有效键值对数量
func (c *TTLCache) Size() int {
    // 你的代码
    return 0
}

func main() {
    cache := NewTTLCache(2 * time.Second)

    cache.Set("name", "Go语言")
    cache.Set("version", "1.21")

    if val, ok := cache.Get("name"); ok {
        fmt.Println("name =", val)
    }

    fmt.Println("缓存大小:", cache.Size())

    time.Sleep(3 * time.Second)

    if _, ok := cache.Get("name"); !ok {
        fmt.Println("name 已过期")
    }

    fmt.Println("过期后大小:", cache.Size())
}

练习3:并行Map-Reduce

使用 sync 包实现一个简单的并行 Map-Reduce:

GO
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// ParallelMapReduce 并行处理数据并汇总结果
// 参数:
//   - data: 输入数据切片
//   - mapFn: 映射函数,将输入转为数值
//   - reduceFn: 归约函数,将两个结果合并
//
// 要求:
//   - 将数据分成多段,每段由一个goroutine处理
//   - 使用sync.WaitGroup等待所有goroutine完成
//   - 使用sync.Mutex保护归约结果
//   - 使用atomic统计处理的元素总数
func ParallelMapReduce(
    data []int,
    mapFn func(int) int,
    reduceFn func(int, int) int,
) int {
    // 你的代码:
    // 1. 确定分段数量(建议使用4段)
    // 2. 启动goroutine处理每段
    // 3. mapFn处理每个元素
    // 4. reduceFn合并各段结果
    // 5. 返回最终结果
    return 0
}

func main() {
    data := make([]int, 100)
    for i := range data {
        data[i] = i + 1 // 1到100
    }

    // 计算所有元素的平方和
    result := ParallelMapReduce(
        data,
        func(x int) int { return x * x }, // map: 求平方
        func(a, b int) int { return a + b }, // reduce: 求和
    )

    fmt.Println("1² + 2² + 3² + ... + 100² =", result)
    // 预期输出: 338350
}

下一课

学完了 sync 包,你已经掌握了 Go 并发编程的核心工具。接下来,我们将通过综合练习来巩固所学知识。

👉 第17课:并发编程实战练习

Web-Tutorial.com

Web-Tutorial 技术团队

由多位开发者共同维护的编程教程平台。每篇教程由对应领域的开发者编写和审核,确保内容准确可靠。如发现任何问题,欢迎向我们反馈。

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏