sync Package
Lesson 16: sync Package — The Guardian of Concurrency Safety
Analogy
Imagine a public restroom:
- Mutex (mutual exclusion lock): The door has a lock; when one person enters, they lock the door and others must wait. Only one person can use it at a time.
- RWMutex (read-write lock): A library reading room — many people can read simultaneously (read lock), but if someone wants to modify materials, they must wait for all readers to leave and have exclusive access (write lock).
- Once (single execution): When joining a company, an employee badge is issued only once. No matter how many times you ask, it won't be repeated after the first time.
- Pool (object pool): Meeting room chairs — put them back in storage after use, bring them out again for the next meeting, rather than buying new ones each time.
- sync.Map (concurrent map): A secure mailbox with multiple keys; multiple mail carriers can deliver to different compartments simultaneously without interfering with each other.
Core Concepts
1. Why Do We Need the sync Package?
Go's goroutines are lightweight and powerful, but when multiple goroutines access shared data simultaneously, race conditions occur:
// Dangerous! Multiple goroutines modifying count simultaneously
var count int
for i := 0; i < 1000; i++ {
go func() {
count++ // Data race!
}()
}
The sync package provides synchronization primitives to ensure concurrency safety.
2. Core Types Overview
| Type | Purpose | Characteristics |
|---|---|---|
sync.Mutex |
Mutual exclusion lock | Only one goroutine can hold it at a time |
sync.RWMutex |
Read-write lock | Multiple readers, single writer; reads don't conflict |
sync.WaitGroup |
Wait group | Wait for a group of goroutines to complete (covered in Lesson 15) |
sync.Once |
Single execution | Ensures a function executes only once |
sync.Pool |
Object pool | Reuse objects, reduce memory allocations |
sync.Map |
Concurrent-safe map | Lock-free concurrent key-value storage |
sync.Cond |
Condition variable | Condition notification between goroutines |
atomic package |
Atomic operations | Lowest-level, most efficient concurrency-safe operations |
Basic Syntax and Usage
💡 Mutex
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
count := 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock() // Lock
count++ // Safely modify shared variable
mu.Unlock() // Unlock
}()
}
wg.Wait()
fmt.Println("count =", count) // Output: count = 1000
}
defer mu.Unlock() ensures the lock is released even if the function panics, preventing deadlocks.
💡 RWMutex (Read-Write Lock)
package main
import (
"fmt"
"sync"
)
func main() {
var rwmu sync.RWMutex
data := make(map[string]string)
var wg sync.WaitGroup
// Write operation: uses write lock
wg.Add(1)
go func() {
defer wg.Done()
rwmu.Lock() // Write lock: exclusive
data["key"] = "value"
rwmu.Unlock()
}()
// Read operation: uses read lock (can be concurrent)
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
rwmu.RLock() // Read lock: shared
_ = data["key"]
rwmu.RUnlock()
}()
}
wg.Wait()
fmt.Println("data:", data)
}
RWMutex performs better; if read and write frequencies are similar, Mutex is simpler.
💡 Once (Single Execution)
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
var wg sync.WaitGroup
setup := func() {
fmt.Println("Initialization (runs only once)")
}
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
once.Do(setup) // Only the first goroutine will execute setup
fmt.Printf("goroutine %d completed\n", id)
}(i)
}
wg.Wait()
}
Once guarantees that even if multiple goroutines call it simultaneously, the provided function will only execute once. Commonly used for singleton initialization.
💡 Pool (Object Pool)
package main
import (
"fmt"
"sync"
)
func main() {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Creating new object")
return make([]byte, 1024)
},
}
// First Get: calls New to create
buf1 := pool.Get().([]byte)
fmt.Println("Got object, length:", len(buf1))
// Return after use
pool.Put(buf1)
// Second Get: reuses the previously returned object
buf2 := pool.Get().([]byte)
fmt.Println("Reused object, length:", len(buf2))
}
Pool may be garbage collected during any GC cycle. Don't assume a Put object can always be Get-ed back.
💡 Atomic Operations
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var count int64 = 0
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&count, 1) // Atomic increment
}()
}
wg.Wait()
fmt.Println("count =", atomic.LoadInt64(&count)) // Atomic read
}
atomic operations are lighter than Mutex, suitable for simple counters, flags, etc.
💡 Race Detection
go run -race main.go # Detect races at runtime
go test -race ./... # Detect races during testing
-race during development; it helps you discover hidden data race issues.
Practical Examples
Example: Thread-safe Counter (Difficulty ⭐)
A safe counter encapsulated with a mutex:
package main
import (
"fmt"
"sync"
)
// SafeCounter is a thread-safe counter
type SafeCounter struct {
mu sync.Mutex
count int
}
// Inc increments the counter
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
// Value returns the current value
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func main() {
counter := &SafeCounter{}
var wg sync.WaitGroup
// Start 100 goroutines, each incrementing 100 times
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
counter.Inc()
}
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Value()) // Output: Final count: 10000
}
Example: Concurrent-safe Cache with sync.Map (Difficulty ⭐⭐)
package main
import (
"fmt"
"sync"
)
// Cache is a concurrent-safe cache
type Cache struct {
store sync.Map
}
// Set stores a value in the cache
func (c *Cache) Set(key string, value interface{}) {
c.store.Store(key, value)
}
// Get retrieves a value from the cache
func (c *Cache) Get(key string) (interface{}, bool) {
c.store.Load(key)
}
// Delete removes a key from the cache
func (c *Cache) Delete(key string) {
c.store.Delete(key)
}
// GetOrSet retrieves or sets a value atomically (avoids redundant computation)
func (c *Cache) GetOrSet(key string, factory func() interface{}) interface{} {
if val, ok := c.store.Load(key); ok {
return val
}
// Use LoadOrStore to ensure atomicity
val, _ := c.store.LoadOrStore(key, factory())
return val
}
func main() {
cache := &Cache{}
var wg sync.WaitGroup
// Concurrent writes
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", id)
cache.Set(key, id*10)
fmt.Printf("Write: %s = %d\n", key, id*10)
}(i)
}
wg.Wait()
// Concurrent reads
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", id)
if val, ok := cache.Get(key); ok {
fmt.Printf("Read: %s = %v\n", key, val)
}
}(i)
}
wg.Wait()
// Use GetOrSet to avoid redundant computation
result := cache.GetOrSet("computed", func() interface{} {
fmt.Println("Performing complex computation...")
return 42
})
fmt.Println("computed =", result)
// Get again, won't recompute
result2 := cache.GetOrSet("computed", func() interface{} {
fmt.Println("This line won't execute")
return 99
})
fmt.Println("computed =", result2)
}
Example: Worker Pool with Timeout (Comprehensive sync Usage) (Difficulty ⭐⭐⭐)
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// Task represents a task
type Task struct {
ID int
Data string
}
// Result represents a result
type Result struct {
TaskID int
Output string
WorkerID int
Duration time.Duration
}
// WorkerPool is a worker pool
type WorkerPool struct {
workerCount int
taskCh chan Task
resultCh chan Result
wg sync.WaitGroup
processed int64 // atomic counter
errors int64
once sync.Once // ensures single initialization
pool sync.Pool // reuse Result objects
}
// NewWorkerPool creates a worker pool
func NewWorkerPool(workerCount, taskBufferSize int) *WorkerPool {
wp := &WorkerPool{
workerCount: workerCount,
taskCh: make(chan Task, taskBufferSize),
resultCh: make(chan Result, taskBufferSize),
}
// Initialize object pool
wp.pool = sync.Pool{
New: func() interface{} {
return &Result{}
},
}
return wp
}
// Start starts the worker pool (executes only once)
func (wp *WorkerPool) Start(ctx context.Context) {
wp.once.Do(func() {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go wp.worker(ctx, i)
}
fmt.Printf("Worker pool started with %d workers\n", wp.workerCount)
})
}
// worker is the worker goroutine
func (wp *WorkerPool) worker(ctx context.Context, id int) {
defer wp.wg.Done()
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: received exit signal\n", id)
return
case task, ok := <-wp.taskCh:
if !ok {
fmt.Printf("Worker %d: task channel closed\n", id)
return
}
// Get Result from pool
result := wp.pool.Get().(*Result)
result.TaskID = task.ID
result.WorkerID = id
// Simulate task processing
start := time.Now()
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
result.Output = fmt.Sprintf("Processed: %s", task.Data)
result.Duration = time.Since(start)
atomic.AddInt64(&wp.processed, 1)
wp.resultCh <- *result
// Return Result object to pool (note: sends a value copy)
wp.pool.Put(result)
}
}
}
// Submit submits a task
func (wp *WorkerPool) Submit(task Task) {
wp.taskCh <- task
}
// Close closes the worker pool
func (wp *WorkerPool) Close() {
close(wp.taskCh)
wp.wg.Wait()
close(wp.resultCh)
}
// Stats returns statistics
func (wp *WorkerPool) Stats() (processed, errors int64) {
return atomic.LoadInt64(&wp.processed), atomic.LoadInt64(&wp.errors)
}
func main() {
rand.Seed(time.Now().UnixNano())
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create worker pool
pool := NewWorkerPool(3, 20)
pool.Start(ctx)
// Submit tasks
var submitWg sync.WaitGroup
submitWg.Add(1)
go func() {
defer submitWg.Done()
for i := 1; i <= 10; i++ {
pool.Submit(Task{
ID: i,
Data: fmt.Sprintf("task-data-%d", i),
})
fmt.Printf("Submitted task #%d\n", i)
}
}()
// Collect results
var collectWg sync.WaitGroup
collectWg.Add(1)
go func() {
defer collectWg.Done()
for result := range pool.resultCh {
fmt.Printf("Task#%d processed by Worker%d, took %v -> %s\n",
result.TaskID, result.WorkerID, result.Duration, result.Output)
}
}()
// Wait for task submission to complete
submitWg.Wait()
// Close worker pool
pool.Close()
// Wait for result collection to complete
collectWg.Wait()
// Output statistics
processed, _ := pool.Stats()
fmt.Printf("\nStats: processed %d tasks total\n", processed)
}
Practical Use Cases
Case 1: Rate Limiting for Concurrent HTTP Requests
In crawlers or API calls, you need to limit concurrency to avoid being banned:
package main
import (
"fmt"
"sync"
"time"
)
// RateLimiter is a simple concurrency rate limiter
type RateLimiter struct {
sem chan struct{}
mu sync.Mutex
active int
maxConc int
}
// NewRateLimiter creates a rate limiter; maxConc is the max concurrency
func NewRateLimiter(maxConc int) *RateLimiter {
return &RateLimiter{
sem: make(chan struct{}, maxConc),
maxConc: maxConc,
}
}
// Acquire acquires a permit
func (r *RateLimiter) Acquire() {
r.sem <- struct{}{}
r.mu.Lock()
r.active++
r.mu.Unlock()
}
// Release releases a permit
func (r *RateLimiter) Release() {
<-r.sem
r.mu.Lock()
r.active--
r.mu.Unlock()
}
// ActiveCount returns the current active count
func (r *RateLimiter) ActiveCount() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.active
}
func main() {
limiter := NewRateLimiter(3) // max 3 concurrent
var wg sync.WaitGroup
urls := []string{
"https://api.example.com/page1",
"https://api.example.com/page2",
"https://api.example.com/page3",
"https://api.example.com/page4",
"https://api.example.com/page5",
"https://api.example.com/page6",
}
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
limiter.Acquire() // acquire permit (blocks when more than 3)
defer limiter.Release() // release permit
fmt.Printf("[Concurrent:%d] Start request: %s\n", limiter.ActiveCount(), u)
time.Sleep(time.Duration(100+len(u)*10) * time.Millisecond) // simulate request
fmt.Printf("[Concurrent:%d] Complete request: %s\n", limiter.ActiveCount(), u)
}(url)
}
wg.Wait()
fmt.Println("All requests completed")
}
Case 2: Lazy Configuration Loading (Singleton Pattern)
Use sync.Once to ensure configuration is loaded only once:
package main
import (
"fmt"
"sync"
)
// Config is the application configuration
type Config struct {
DatabaseURL string
APIKey string
MaxRetries int
}
var (
config *Config
once sync.Once
)
// GetConfig gets the configuration (lazy loading, initializes only once)
func GetConfig() *Config {
once.Do(func() {
fmt.Println("Loading configuration (runs only once)...")
// Simulate loading config from file or environment variables
config = &Config{
DatabaseURL: "postgres://localhost:5432/mydb",
APIKey: "sk-xxxxxxxxxxxx",
MaxRetries: 3,
}
})
return config
}
func main() {
var wg sync.WaitGroup
// Multiple goroutines getting config simultaneously
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
cfg := GetConfig()
fmt.Printf("Goroutine %d: DB=%s, Retries=%d\n",
id, cfg.DatabaseURL, cfg.MaxRetries)
}(i)
}
wg.Wait()
fmt.Println("Configuration loading complete")
}
❓ FAQ
Q1: How to choose between Mutex and RWMutex?
| Scenario | Recommendation |
|---|---|
| Read-heavy, write-light (e.g., cache) | RWMutex — multiple reads can be concurrent |
| Balanced read/write or write-heavy | Mutex — simpler, lower overhead |
| Simple counter | atomic — lightest weight |
// ❌ Using Mutex for read-heavy, write-light scenarios — poor performance
var mu sync.Mutex
mu.Lock()
val := cache[key] // read operation also exclusive
mu.Unlock()
// ✅ Use RWMutex — read operations can be concurrent
var rwmu sync.RWMutex
rwmu.RLock()
val := cache[key] // multiple goroutines can read simultaneously
rwmu.RUnlock()
Q2: How to choose between sync.Map and a locked map?
// Scenario 1: Key-value pairs are relatively static, read-heavy, write-light -> sync.Map
var m sync.Map
m.Store("key", "value")
val, _ := m.Load("key")
// Scenario 2: Frequent add/delete, needs iteration -> locked map
type SafeMap struct {
mu sync.RWMutex
m map[string]string
}
// sync.Map is suitable for:
// 1. Keys are written once but read many times (e.g., cache)
// 2. Multiple goroutines read/write different keys (no overlap)
Q3: When are objects in Pool garbage collected?
Objects in Pool may be cleared during any GC cycle. Don't use Pool as long-term storage:
// ❌ Wrong usage: using Pool as a cache
pool := &sync.Pool{New: func() interface{} { return expensiveObject() }}
obj := pool.Get()
// ... use
pool.Put(obj)
// After the next GC, obj may have been collected
// ✅ Correct usage: reuse temporary objects, reduce allocations
pool := &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 4096) // pre-allocate buffer
},
}
buf := pool.Get().([]byte)[:0] // get and reset
// ... use buf
pool.Put(buf) // return
Q4: How to avoid deadlocks?
// ❌ Deadlock: same goroutine locks twice
var mu sync.Mutex
mu.Lock()
mu.Lock() // Blocks forever! Deadlock!
// ✅ Solution 1: Use defer to ensure release
mu.Lock()
defer mu.Unlock()
// ... releases even on panic
// ✅ Solution 2: Pay attention to lock ordering, avoid cross-locking
// Two goroutines: one holds lock A waiting for B, the other holds B waiting for A -> deadlock
// Solution: always lock in A->B order
// ✅ Solution 3: Use locks with timeouts (Go 1.18+ recommends using context)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Use channel with select for timeout control
📖 Summary
| Concept | Key Points |
|---|---|
sync.Mutex |
Mutual exclusion lock, only one goroutine can hold at a time |
sync.RWMutex |
Read-write lock, multiple reads, single write; ideal for read-heavy scenarios |
sync.Once |
Guarantees function executes only once; suitable for singletons/initialization |
sync.Pool |
Object pool, reuse temporary objects, reduce GC pressure |
sync.Map |
Concurrent-safe map; better than locked map in specific scenarios |
atomic |
Atomic operations; lightest concurrency-safe solution |
-race |
Race detector; must be enabled during development |
| Deadlock prevention | defer Unlock, consistent lock ordering, avoid nested locks |
Selection Principles:
- Simple counter/flag →
atomic - General mutual exclusion →
Mutex - Read-heavy, write-light →
RWMutex - Single initialization →
Once - Object reuse →
Pool - Concurrent key-value read/write →
sync.Map
📝 Exercises
Exercise 1: Implement a Thread-safe Stack
Implement a concurrent-safe stack (LIFO) supporting Push, Pop, and Size operations.
package main
import (
"errors"
"fmt"
"sync"
)
// ThreadSafeStack is a concurrent-safe stack
type ThreadSafeStack struct {
// Your code:
// - Choose appropriate data structure
// - Choose appropriate synchronization primitive
}
// NewStack creates a new stack
func NewStack() *ThreadSafeStack {
// Your code
return nil
}
// Push pushes a value onto the stack
func (s *ThreadSafeStack) Push(val int) {
// Your code
}
// Pop pops a value from the stack (returns error when empty)
func (s *ThreadSafeStack) Pop() (int, error) {
// Your code
return 0, errors.New("stack is empty")
}
// Size returns the number of elements in the stack
func (s *ThreadSafeStack) Size() int {
// Your code
return 0
}
func main() {
stack := NewStack()
var wg sync.WaitGroup
// Concurrent push
for i := 0; i < 100; i++ {
wg.Add(1)
go func(val int) {
defer wg.Done()
stack.Push(val)
}(i)
}
wg.Wait()
fmt.Println("Stack size:", stack.Size()) // Should output 100
// Concurrent pop
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if val, err := stack.Pop(); err == nil {
fmt.Println("Popped:", val)
}
}()
}
wg.Wait()
fmt.Println("Remaining size:", stack.Size()) // Should output 50
}
Exercise 2: Implement a Cache with TTL
Use sync.RWMutex to implement a cache with TTL (time-to-live) support:
package main
import (
"fmt"
"sync"
"time"
)
// TTLCache is a cache with expiration time
type TTLCache struct {
// Your code:
// - Storage structure
// - Sync lock
// - TTL setting
}
// NewTTLCache creates a cache; ttl is the expiration time
func NewTTLCache(ttl time.Duration) *TTLCache {
// Your code
return nil
}
// Set sets a key-value pair
func (c *TTLCache) Set(key string, value interface{}) {
// Your code
}
// Get retrieves a value (returns false if expired)
func (c *TTLCache) Get(key string) (interface{}, bool) {
// Your code
return nil, false
}
// Delete deletes a key
func (c *TTLCache) Delete(key string) {
// Your code
}
// Size returns the number of valid key-value pairs
func (c *TTLCache) Size() int {
// Your code
return 0
}
func main() {
cache := NewTTLCache(2 * time.Second)
cache.Set("name", "Go Language")
cache.Set("version", "1.21")
if val, ok := cache.Get("name"); ok {
fmt.Println("name =", val)
}
fmt.Println("Cache size:", cache.Size())
time.Sleep(3 * time.Second)
if _, ok := cache.Get("name"); !ok {
fmt.Println("name has expired")
}
fmt.Println("Size after expiration:", cache.Size())
}
Exercise 3: Parallel Map-Reduce
Use the sync package to implement a simple parallel Map-Reduce:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// ParallelMapReduce processes data in parallel and aggregates results
// Parameters:
// - data: input data slice
// - mapFn: mapping function, converts input to a value
// - reduceFn: reduction function, merges two results
//
// Requirements:
// - Split data into segments, each processed by a goroutine
// - Use sync.WaitGroup to wait for all goroutines to complete
// - Use sync.Mutex to protect the reduction result
// - Use atomic to count total processed elements
func ParallelMapReduce(
data []int,
mapFn func(int) int,
reduceFn func(int, int) int,
) int {
// Your code:
// 1. Determine segment count (suggest 4 segments)
// 2. Launch goroutines to process each segment
// 3. mapFn processes each element
// 4. reduceFn merges segment results
// 5. Return final result
return 0
}
func main() {
data := make([]int, 100)
for i := range data {
data[i] = i + 1 // 1 to 100
}
// Compute the sum of squares of all elements
result := ParallelMapReduce(
data,
func(x int) int { return x * x }, // map: square
func(a, b int) int { return a + b }, // reduce: sum
)
fmt.Println("1² + 2² + 3² + ... + 100² =", result)
// Expected output: 338350
}
Next Lesson
Having completed the sync package, you've mastered the core tools of Go concurrency programming. Next, we'll consolidate what we've learned through comprehensive exercises.



