sync Package

Lesson 16: sync Package — The Guardian of Concurrency Safety

Analogy

Imagine a public restroom:


Core Concepts

1. Why Do We Need the sync Package?

Go's goroutines are lightweight and powerful, but when multiple goroutines access shared data simultaneously, race conditions occur:

GO
// Dangerous! Multiple goroutines modifying count simultaneously
var count int
for i := 0; i < 1000; i++ {
    go func() {
        count++ // Data race!
    }()
}

The sync package provides synchronization primitives to ensure concurrency safety.

2. Core Types Overview

Type Purpose Characteristics
sync.Mutex Mutual exclusion lock Only one goroutine can hold it at a time
sync.RWMutex Read-write lock Multiple readers, single writer; reads don't conflict
sync.WaitGroup Wait group Wait for a group of goroutines to complete (covered in Lesson 15)
sync.Once Single execution Ensures a function executes only once
sync.Pool Object pool Reuse objects, reduce memory allocations
sync.Map Concurrent-safe map Lock-free concurrent key-value storage
sync.Cond Condition variable Condition notification between goroutines
atomic package Atomic operations Lowest-level, most efficient concurrency-safe operations

Basic Syntax and Usage

💡 Mutex

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    var mu sync.Mutex
    count := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            mu.Lock()         // Lock
            count++           // Safely modify shared variable
            mu.Unlock()       // Unlock
        }()
    }

    wg.Wait()
    fmt.Println("count =", count) // Output: count = 1000
}
💡 Tip: Using defer mu.Unlock() ensures the lock is released even if the function panics, preventing deadlocks.

💡 RWMutex (Read-Write Lock)

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    var rwmu sync.RWMutex
    data := make(map[string]string)
    var wg sync.WaitGroup

    // Write operation: uses write lock
    wg.Add(1)
    go func() {
        defer wg.Done()
        rwmu.Lock() // Write lock: exclusive
        data["key"] = "value"
        rwmu.Unlock()
    }()

    // Read operation: uses read lock (can be concurrent)
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            rwmu.RLock() // Read lock: shared
            _ = data["key"]
            rwmu.RUnlock()
        }()
    }

    wg.Wait()
    fmt.Println("data:", data)
}
💡 Tip: For read-heavy, write-light scenarios, RWMutex performs better; if read and write frequencies are similar, Mutex is simpler.

💡 Once (Single Execution)

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once
    var wg sync.WaitGroup

    setup := func() {
        fmt.Println("Initialization (runs only once)")
    }

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            once.Do(setup) // Only the first goroutine will execute setup
            fmt.Printf("goroutine %d completed\n", id)
        }(i)
    }

    wg.Wait()
}
💡 Tip: Once guarantees that even if multiple goroutines call it simultaneously, the provided function will only execute once. Commonly used for singleton initialization.

💡 Pool (Object Pool)

GO
package main

import (
    "fmt"
    "sync"
)

func main() {
    pool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("Creating new object")
            return make([]byte, 1024)
        },
    }

    // First Get: calls New to create
    buf1 := pool.Get().([]byte)
    fmt.Println("Got object, length:", len(buf1))

    // Return after use
    pool.Put(buf1)

    // Second Get: reuses the previously returned object
    buf2 := pool.Get().([]byte)
    fmt.Println("Reused object, length:", len(buf2))
}
💡 Tip: Objects in Pool may be garbage collected during any GC cycle. Don't assume a Put object can always be Get-ed back.

💡 Atomic Operations

GO
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var count int64 = 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&count, 1) // Atomic increment
        }()
    }

    wg.Wait()
    fmt.Println("count =", atomic.LoadInt64(&count)) // Atomic read
}
💡 Tip: atomic operations are lighter than Mutex, suitable for simple counters, flags, etc.

💡 Race Detection

BASH
go run -race main.go    # Detect races at runtime
go test -race ./...     # Detect races during testing
💡 Tip: It's recommended to always enable -race during development; it helps you discover hidden data race issues.


Practical Examples

Example: Thread-safe Counter (Difficulty ⭐)

A safe counter encapsulated with a mutex:

GO
package main

import (
    "fmt"
    "sync"
)

// SafeCounter is a thread-safe counter
type SafeCounter struct {
    mu    sync.Mutex
    count int
}

// Inc increments the counter
func (c *SafeCounter) Inc() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

// Value returns the current value
func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.count
}

func main() {
    counter := &SafeCounter{}
    var wg sync.WaitGroup

    // Start 100 goroutines, each incrementing 100 times
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                counter.Inc()
            }
        }()
    }

    wg.Wait()
    fmt.Println("Final count:", counter.Value()) // Output: Final count: 10000
}
▶ Try it Yourself

Example: Concurrent-safe Cache with sync.Map (Difficulty ⭐⭐)

GO
package main

import (
    "fmt"
    "sync"
)

// Cache is a concurrent-safe cache
type Cache struct {
    store sync.Map
}

// Set stores a value in the cache
func (c *Cache) Set(key string, value interface{}) {
    c.store.Store(key, value)
}

// Get retrieves a value from the cache
func (c *Cache) Get(key string) (interface{}, bool) {
    c.store.Load(key)
}

// Delete removes a key from the cache
func (c *Cache) Delete(key string) {
    c.store.Delete(key)
}

// GetOrSet retrieves or sets a value atomically (avoids redundant computation)
func (c *Cache) GetOrSet(key string, factory func() interface{}) interface{} {
    if val, ok := c.store.Load(key); ok {
        return val
    }
    // Use LoadOrStore to ensure atomicity
    val, _ := c.store.LoadOrStore(key, factory())
    return val
}

func main() {
    cache := &Cache{}
    var wg sync.WaitGroup

    // Concurrent writes
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", id)
            cache.Set(key, id*10)
            fmt.Printf("Write: %s = %d\n", key, id*10)
        }(i)
    }

    wg.Wait()

    // Concurrent reads
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", id)
            if val, ok := cache.Get(key); ok {
                fmt.Printf("Read: %s = %v\n", key, val)
            }
        }(i)
    }

    wg.Wait()

    // Use GetOrSet to avoid redundant computation
    result := cache.GetOrSet("computed", func() interface{} {
        fmt.Println("Performing complex computation...")
        return 42
    })
    fmt.Println("computed =", result)

    // Get again, won't recompute
    result2 := cache.GetOrSet("computed", func() interface{} {
        fmt.Println("This line won't execute")
        return 99
    })
    fmt.Println("computed =", result2)
}
▶ Try it Yourself

Example: Worker Pool with Timeout (Comprehensive sync Usage) (Difficulty ⭐⭐⭐)

GO
package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

// Task represents a task
type Task struct {
    ID   int
    Data string
}

// Result represents a result
type Result struct {
    TaskID   int
    Output   string
    WorkerID int
    Duration time.Duration
}

// WorkerPool is a worker pool
type WorkerPool struct {
    workerCount int
    taskCh      chan Task
    resultCh    chan Result
    wg          sync.WaitGroup
    processed   int64 // atomic counter
    errors      int64
    once        sync.Once // ensures single initialization
    pool        sync.Pool // reuse Result objects
}

// NewWorkerPool creates a worker pool
func NewWorkerPool(workerCount, taskBufferSize int) *WorkerPool {
    wp := &WorkerPool{
        workerCount: workerCount,
        taskCh:      make(chan Task, taskBufferSize),
        resultCh:    make(chan Result, taskBufferSize),
    }

    // Initialize object pool
    wp.pool = sync.Pool{
        New: func() interface{} {
            return &Result{}
        },
    }

    return wp
}

// Start starts the worker pool (executes only once)
func (wp *WorkerPool) Start(ctx context.Context) {
    wp.once.Do(func() {
        for i := 0; i < wp.workerCount; i++ {
            wp.wg.Add(1)
            go wp.worker(ctx, i)
        }
        fmt.Printf("Worker pool started with %d workers\n", wp.workerCount)
    })
}

// worker is the worker goroutine
func (wp *WorkerPool) worker(ctx context.Context, id int) {
    defer wp.wg.Done()

    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: received exit signal\n", id)
            return
        case task, ok := <-wp.taskCh:
            if !ok {
                fmt.Printf("Worker %d: task channel closed\n", id)
                return
            }
            // Get Result from pool
            result := wp.pool.Get().(*Result)
            result.TaskID = task.ID
            result.WorkerID = id

            // Simulate task processing
            start := time.Now()
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            result.Output = fmt.Sprintf("Processed: %s", task.Data)
            result.Duration = time.Since(start)

            atomic.AddInt64(&wp.processed, 1)
            wp.resultCh <- *result

            // Return Result object to pool (note: sends a value copy)
            wp.pool.Put(result)
        }
    }
}

// Submit submits a task
func (wp *WorkerPool) Submit(task Task) {
    wp.taskCh <- task
}

// Close closes the worker pool
func (wp *WorkerPool) Close() {
    close(wp.taskCh)
    wp.wg.Wait()
    close(wp.resultCh)
}

// Stats returns statistics
func (wp *WorkerPool) Stats() (processed, errors int64) {
    return atomic.LoadInt64(&wp.processed), atomic.LoadInt64(&wp.errors)
}

func main() {
    rand.Seed(time.Now().UnixNano())

    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // Create worker pool
    pool := NewWorkerPool(3, 20)
    pool.Start(ctx)

    // Submit tasks
    var submitWg sync.WaitGroup
    submitWg.Add(1)
    go func() {
        defer submitWg.Done()
        for i := 1; i <= 10; i++ {
            pool.Submit(Task{
                ID:   i,
                Data: fmt.Sprintf("task-data-%d", i),
            })
            fmt.Printf("Submitted task #%d\n", i)
        }
    }()

    // Collect results
    var collectWg sync.WaitGroup
    collectWg.Add(1)
    go func() {
        defer collectWg.Done()
        for result := range pool.resultCh {
            fmt.Printf("Task#%d processed by Worker%d, took %v -> %s\n",
                result.TaskID, result.WorkerID, result.Duration, result.Output)
        }
    }()

    // Wait for task submission to complete
    submitWg.Wait()

    // Close worker pool
    pool.Close()

    // Wait for result collection to complete
    collectWg.Wait()

    // Output statistics
    processed, _ := pool.Stats()
    fmt.Printf("\nStats: processed %d tasks total\n", processed)
}
▶ Try it Yourself

Practical Use Cases

Case 1: Rate Limiting for Concurrent HTTP Requests

In crawlers or API calls, you need to limit concurrency to avoid being banned:

GO
package main

import (
    "fmt"
    "sync"
    "time"
)

// RateLimiter is a simple concurrency rate limiter
type RateLimiter struct {
    sem     chan struct{}
    mu      sync.Mutex
    active  int
    maxConc int
}

// NewRateLimiter creates a rate limiter; maxConc is the max concurrency
func NewRateLimiter(maxConc int) *RateLimiter {
    return &RateLimiter{
        sem:     make(chan struct{}, maxConc),
        maxConc: maxConc,
    }
}

// Acquire acquires a permit
func (r *RateLimiter) Acquire() {
    r.sem <- struct{}{}
    r.mu.Lock()
    r.active++
    r.mu.Unlock()
}

// Release releases a permit
func (r *RateLimiter) Release() {
    <-r.sem
    r.mu.Lock()
    r.active--
    r.mu.Unlock()
}

// ActiveCount returns the current active count
func (r *RateLimiter) ActiveCount() int {
    r.mu.Lock()
    defer r.mu.Unlock()
    return r.active
}

func main() {
    limiter := NewRateLimiter(3) // max 3 concurrent
    var wg sync.WaitGroup

    urls := []string{
        "https://api.example.com/page1",
        "https://api.example.com/page2",
        "https://api.example.com/page3",
        "https://api.example.com/page4",
        "https://api.example.com/page5",
        "https://api.example.com/page6",
    }

    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()

            limiter.Acquire()        // acquire permit (blocks when more than 3)
            defer limiter.Release()  // release permit

            fmt.Printf("[Concurrent:%d] Start request: %s\n", limiter.ActiveCount(), u)
            time.Sleep(time.Duration(100+len(u)*10) * time.Millisecond) // simulate request
            fmt.Printf("[Concurrent:%d] Complete request: %s\n", limiter.ActiveCount(), u)
        }(url)
    }

    wg.Wait()
    fmt.Println("All requests completed")
}

Case 2: Lazy Configuration Loading (Singleton Pattern)

Use sync.Once to ensure configuration is loaded only once:

GO
package main

import (
    "fmt"
    "sync"
)

// Config is the application configuration
type Config struct {
    DatabaseURL string
    APIKey      string
    MaxRetries  int
}

var (
    config *Config
    once   sync.Once
)

// GetConfig gets the configuration (lazy loading, initializes only once)
func GetConfig() *Config {
    once.Do(func() {
        fmt.Println("Loading configuration (runs only once)...")
        // Simulate loading config from file or environment variables
        config = &Config{
            DatabaseURL: "postgres://localhost:5432/mydb",
            APIKey:      "sk-xxxxxxxxxxxx",
            MaxRetries:  3,
        }
    })
    return config
}

func main() {
    var wg sync.WaitGroup

    // Multiple goroutines getting config simultaneously
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            cfg := GetConfig()
            fmt.Printf("Goroutine %d: DB=%s, Retries=%d\n",
                id, cfg.DatabaseURL, cfg.MaxRetries)
        }(i)
    }

    wg.Wait()
    fmt.Println("Configuration loading complete")
}

❓ FAQ

Q1: How to choose between Mutex and RWMutex?

Scenario Recommendation
Read-heavy, write-light (e.g., cache) RWMutex — multiple reads can be concurrent
Balanced read/write or write-heavy Mutex — simpler, lower overhead
Simple counter atomic — lightest weight
GO
// ❌ Using Mutex for read-heavy, write-light scenarios — poor performance
var mu sync.Mutex
mu.Lock()
val := cache[key] // read operation also exclusive
mu.Unlock()

// ✅ Use RWMutex — read operations can be concurrent
var rwmu sync.RWMutex
rwmu.RLock()
val := cache[key] // multiple goroutines can read simultaneously
rwmu.RUnlock()

Q2: How to choose between sync.Map and a locked map?

GO
// Scenario 1: Key-value pairs are relatively static, read-heavy, write-light -> sync.Map
var m sync.Map
m.Store("key", "value")
val, _ := m.Load("key")

// Scenario 2: Frequent add/delete, needs iteration -> locked map
type SafeMap struct {
    mu sync.RWMutex
    m  map[string]string
}

// sync.Map is suitable for:
// 1. Keys are written once but read many times (e.g., cache)
// 2. Multiple goroutines read/write different keys (no overlap)

Q3: When are objects in Pool garbage collected?

Objects in Pool may be cleared during any GC cycle. Don't use Pool as long-term storage:

GO
// ❌ Wrong usage: using Pool as a cache
pool := &sync.Pool{New: func() interface{} { return expensiveObject() }}
obj := pool.Get()
// ... use
pool.Put(obj)
// After the next GC, obj may have been collected

// ✅ Correct usage: reuse temporary objects, reduce allocations
pool := &sync.Pool{
    New: func() interface{} {
        return make([]byte, 0, 4096) // pre-allocate buffer
    },
}
buf := pool.Get().([]byte)[:0] // get and reset
// ... use buf
pool.Put(buf) // return

Q4: How to avoid deadlocks?

GO
// ❌ Deadlock: same goroutine locks twice
var mu sync.Mutex
mu.Lock()
mu.Lock() // Blocks forever! Deadlock!

// ✅ Solution 1: Use defer to ensure release
mu.Lock()
defer mu.Unlock()
// ... releases even on panic

// ✅ Solution 2: Pay attention to lock ordering, avoid cross-locking
// Two goroutines: one holds lock A waiting for B, the other holds B waiting for A -> deadlock
// Solution: always lock in A->B order

// ✅ Solution 3: Use locks with timeouts (Go 1.18+ recommends using context)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Use channel with select for timeout control

📖 Summary

Concept Key Points
sync.Mutex Mutual exclusion lock, only one goroutine can hold at a time
sync.RWMutex Read-write lock, multiple reads, single write; ideal for read-heavy scenarios
sync.Once Guarantees function executes only once; suitable for singletons/initialization
sync.Pool Object pool, reuse temporary objects, reduce GC pressure
sync.Map Concurrent-safe map; better than locked map in specific scenarios
atomic Atomic operations; lightest concurrency-safe solution
-race Race detector; must be enabled during development
Deadlock prevention defer Unlock, consistent lock ordering, avoid nested locks

Selection Principles:

  1. Simple counter/flag → atomic
  2. General mutual exclusion → Mutex
  3. Read-heavy, write-light → RWMutex
  4. Single initialization → Once
  5. Object reuse → Pool
  6. Concurrent key-value read/write → sync.Map

📝 Exercises

Exercise 1: Implement a Thread-safe Stack

Implement a concurrent-safe stack (LIFO) supporting Push, Pop, and Size operations.

GO
package main

import (
    "errors"
    "fmt"
    "sync"
)

// ThreadSafeStack is a concurrent-safe stack
type ThreadSafeStack struct {
    // Your code:
    // - Choose appropriate data structure
    // - Choose appropriate synchronization primitive
}

// NewStack creates a new stack
func NewStack() *ThreadSafeStack {
    // Your code
    return nil
}

// Push pushes a value onto the stack
func (s *ThreadSafeStack) Push(val int) {
    // Your code
}

// Pop pops a value from the stack (returns error when empty)
func (s *ThreadSafeStack) Pop() (int, error) {
    // Your code
    return 0, errors.New("stack is empty")
}

// Size returns the number of elements in the stack
func (s *ThreadSafeStack) Size() int {
    // Your code
    return 0
}

func main() {
    stack := NewStack()
    var wg sync.WaitGroup

    // Concurrent push
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(val int) {
            defer wg.Done()
            stack.Push(val)
        }(i)
    }
    wg.Wait()

    fmt.Println("Stack size:", stack.Size()) // Should output 100

    // Concurrent pop
    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if val, err := stack.Pop(); err == nil {
                fmt.Println("Popped:", val)
            }
        }()
    }
    wg.Wait()

    fmt.Println("Remaining size:", stack.Size()) // Should output 50
}

Exercise 2: Implement a Cache with TTL

Use sync.RWMutex to implement a cache with TTL (time-to-live) support:

GO
package main

import (
    "fmt"
    "sync"
    "time"
)

// TTLCache is a cache with expiration time
type TTLCache struct {
    // Your code:
    // - Storage structure
    // - Sync lock
    // - TTL setting
}

// NewTTLCache creates a cache; ttl is the expiration time
func NewTTLCache(ttl time.Duration) *TTLCache {
    // Your code
    return nil
}

// Set sets a key-value pair
func (c *TTLCache) Set(key string, value interface{}) {
    // Your code
}

// Get retrieves a value (returns false if expired)
func (c *TTLCache) Get(key string) (interface{}, bool) {
    // Your code
    return nil, false
}

// Delete deletes a key
func (c *TTLCache) Delete(key string) {
    // Your code
}

// Size returns the number of valid key-value pairs
func (c *TTLCache) Size() int {
    // Your code
    return 0
}

func main() {
    cache := NewTTLCache(2 * time.Second)

    cache.Set("name", "Go Language")
    cache.Set("version", "1.21")

    if val, ok := cache.Get("name"); ok {
        fmt.Println("name =", val)
    }

    fmt.Println("Cache size:", cache.Size())

    time.Sleep(3 * time.Second)

    if _, ok := cache.Get("name"); !ok {
        fmt.Println("name has expired")
    }

    fmt.Println("Size after expiration:", cache.Size())
}

Exercise 3: Parallel Map-Reduce

Use the sync package to implement a simple parallel Map-Reduce:

GO
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

// ParallelMapReduce processes data in parallel and aggregates results
// Parameters:
//   - data: input data slice
//   - mapFn: mapping function, converts input to a value
//   - reduceFn: reduction function, merges two results
//
// Requirements:
//   - Split data into segments, each processed by a goroutine
//   - Use sync.WaitGroup to wait for all goroutines to complete
//   - Use sync.Mutex to protect the reduction result
//   - Use atomic to count total processed elements
func ParallelMapReduce(
    data []int,
    mapFn func(int) int,
    reduceFn func(int, int) int,
) int {
    // Your code:
    // 1. Determine segment count (suggest 4 segments)
    // 2. Launch goroutines to process each segment
    // 3. mapFn processes each element
    // 4. reduceFn merges segment results
    // 5. Return final result
    return 0
}

func main() {
    data := make([]int, 100)
    for i := range data {
        data[i] = i + 1 // 1 to 100
    }

    // Compute the sum of squares of all elements
    result := ParallelMapReduce(
        data,
        func(x int) int { return x * x }, // map: square
        func(a, b int) int { return a + b }, // reduce: sum
    )

    fmt.Println("1² + 2² + 3² + ... + 100² =", result)
    // Expected output: 338350
}

Next Lesson

Having completed the sync package, you've mastered the core tools of Go concurrency programming. Next, we'll consolidate what we've learned through comprehensive exercises.

👉 Lesson 17: Concurrency Practice Exercises

Web-Tutorial.com

Web-Tutorial Tech Team

A team of developers maintaining programming tutorials. Each tutorial is written and reviewed by developers with expertise in that field. We work to keep our content accurate and reliable — if you spot an issue, please let us know.

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏