Select and Concurrency Patterns
Lesson 15: Select and Concurrency Patterns
Analogy
Imagine you're a restaurant waiter simultaneously responsible for three tables:
- Table A's customer is ordering
- Table B's customer is paying the bill
- Table C's customer is asking for water
You wouldn't foolishly wait for just one table — instead, you watch all tables at once and serve whichever raises their hand first. That's how select works — it listens to multiple channels simultaneously and executes whichever is ready first.
Core Concepts
| Concept | Description |
|---|---|
select |
A statement that listens to multiple channel operations simultaneously |
case |
Each channel operation corresponds to a branch |
default |
Executes when no channels are ready (optional) |
| Blocking behavior | Without default, select blocks until a case is ready |
| Random selection | When multiple cases are ready simultaneously, one is chosen randomly |
Basic Syntax and Usage
Standard select Statement
select {
case msg := <-ch1:
// Received data from ch1
fmt.Println(msg)
case ch2 <- "hello":
// Successfully sent data to ch2
case <-ch3:
// ch3 is closed or data received
default:
// Executes when no channels are ready
}
Timeout Control (Most Common Pattern)
select {
case msg := <-ch:
fmt.Println("Received:", msg)
case <-time.After(3 * time.Second):
fmt.Println("Timed out!")
}
Non-blocking Channel Operation
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No data available, returning immediately")
}
selectmust have at least onecase; it cannot be entirely empty- Without
defaultand with allcases not ready,selectwill block permanently time.After()returns a channel that receives a value after the specified duration- When multiple
cases are ready simultaneously, Go chooses one randomly to prevent starvation
Example: Basic Select Usage (Difficulty ⭐)
Demonstrates how to listen to two channels simultaneously:
package main
import (
"fmt"
"time"
)
// Simulate two data sources
func source(name string, ch chan<- string, delay time.Duration) {
for i := 1; i <= 3; i++ {
time.Sleep(delay)
ch <- fmt.Sprintf("[%s] Message %d", name, i)
}
close(ch)
}
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go source("SourceA", ch1, 500*time.Millisecond)
go source("SourceB", ch2, 800*time.Millisecond)
// Use select to listen to both channels simultaneously
// Need to listen 6 times (3 messages from each source)
for i := 0; i < 6; i++ {
select {
case msg, ok := <-ch1:
if ok {
fmt.Println("From ch1:", msg)
}
case msg, ok := <-ch2:
if ok {
fmt.Println("From ch2:", msg)
}
}
}
fmt.Println("All messages processed")
}
go run main.go
From ch1: [SourceA] Message 1
From ch2: [SourceB] Message 1
From ch1: [SourceA] Message 2
From ch1: [SourceA] Message 3
From ch2: [SourceB] Message 2
From ch2: [SourceB] Message 3
All messages processed
Example: Timeout Control and Quit Channel (Difficulty ⭐⭐)
Use select to implement a worker with timeout and graceful shutdown:
package main
import (
"fmt"
"math/rand"
"time"
)
// worker simulates a time-consuming task
func worker(id int, jobs <-chan int, results chan<- string, done chan<- int) {
for job := range jobs {
// Simulate unpredictable processing time
duration := time.Duration(rand.Intn(500)+200) * time.Millisecond
time.Sleep(duration)
results <- fmt.Sprintf("Worker %d completed task %d (took %v)", id, job, duration)
}
done <- id
}
func main() {
jobs := make(chan int, 10)
results := make(chan string, 10)
done := make(chan int, 3)
// Start 3 workers
for i := 1; i <= 3; i++ {
go worker(i, jobs, results, done)
}
// Distribute 6 tasks
for j := 1; j <= 6; j++ {
jobs <- j
}
close(jobs)
// Collect results with timeout control
timeout := time.After(2 * time.Second)
finished := 0
for finished < 6 {
select {
case result := <-results:
fmt.Println(result)
finished++
case workerID := <-done:
fmt.Printf(">>> Worker %d has exited\n", workerID)
case <-timeout:
fmt.Println("⏰ Timeout! Some tasks incomplete")
return
}
}
fmt.Println("All tasks completed")
}
go run main.go
Worker 2 completed task 2 (took 234ms)
Worker 1 completed task 1 (took 345ms)
Worker 3 completed task 3 (took 289ms)
Worker 2 completed task 4 (took 412ms)
Worker 1 completed task 5 (took 198ms)
>>> Worker 1 has exited
Worker 3 completed task 6 (took 367ms)
All tasks completed
Example: Fan-in / Fan-out and Pipeline Pattern (Difficulty ⭐⭐⭐)
Demonstrates classic concurrency patterns: Pipeline + Fan-out + Fan-in:
package main
import (
"fmt"
"sync"
"time"
)
// Stage 1: Data generator (Pipeline start)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2: Square computation (Pipeline middle stage)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
time.Sleep(100 * time.Millisecond) // simulate computation time
out <- n * n
}
close(out)
}()
return out
}
// Fan-out: distribute one channel to multiple workers
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = square(in)
}
return channels
}
// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start a goroutine for each channel to forward data
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
// Close the output channel after all input channels are closed
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Stage 1: Generate data
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
source := generator(nums...)
// Stage 2: Fan-out to 3 workers for parallel computation
workers := fanOut(source, 3)
// Stage 3: Fan-in to merge results
merged := fanIn(workers...)
// Collect all results
results := make(map[int]bool)
for val := range merged {
results[val] = true
fmt.Printf("Received result: %d\n", val)
}
fmt.Println("\nDeduplicated results:")
for val := range results {
fmt.Printf(" %d\n", val)
}
}
go run main.go
Received result: 1
Received result: 4
Received result: 9
Received result: 16
Received result: 25
Received result: 36
Received result: 49
Received result: 64
Received result: 81
Received result: 100
Deduplicated results:
1
4
9
16
25
36
49
64
81
100
Scenario 1: HTTP Request Race
Send requests to multiple servers simultaneously, using the first response:
package main
import (
"fmt"
"time"
)
// Simulate sending requests to different servers
func fetchFromServer(name string, delay time.Duration) <-chan string {
ch := make(chan string, 1)
go func() {
time.Sleep(delay)
ch <- fmt.Sprintf("Response from %s", name)
}()
return ch
}
// Race requests: return the fastest response
func race(urls map[string]time.Duration, timeout time.Duration) (string, error) {
// Create a channel for each server
ch := make(chan string, len(urls))
for name, delay := range urls {
go func(n string, d time.Duration) {
ch <- fetchResult(n, d)
}(name, delay)
}
// Wait for the first response or timeout
select {
case result := <-ch:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("all servers timed out")
}
}
func fetchResult(name string, delay time.Duration) string {
time.Sleep(delay)
return fmt.Sprintf("Data from %s (latency %v)", name, delay)
}
func main() {
servers := map[string]time.Duration{
"ServerA-Beijing": 800 * time.Millisecond,
"ServerB-Shanghai": 300 * time.Millisecond,
"ServerC-Guangzhou": 500 * time.Millisecond,
}
result, err := race(servers, 2*time.Second)
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Race result:", result)
}
}
go run main.go
Race result: Data from ServerB-Shanghai (latency 300ms)
Scenario 2: Graceful Shutdown
Use a quit channel to implement graceful service shutdown:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// server simulates a continuously running service
func server(id int, quit <-chan struct{}) {
fmt.Printf("[Service %d] Started\n", id)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("[Service %d] Processing request...\n", id)
case <-quit:
fmt.Printf("[Service %d] Received exit signal, cleaning up...", id)
time.Sleep(200 * time.Millisecond) // simulate cleanup
fmt.Printf("[Service %d] Stopped\n", id)
return
}
}
}
func main() {
// Listen for system interrupt signals (Ctrl+C)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Create quit channel
quit := make(chan struct{})
// Start multiple services
for i := 1; i <= 3; i++ {
go server(i, quit)
}
fmt.Println("Main program running, press Ctrl+C for graceful exit...")
// Wait for system signal
sig := <-sigChan
fmt.Printf("\nReceived signal: %v, starting graceful exit...\n", sig)
// Notify all services to exit
close(quit)
// Give services time to complete cleanup
time.Sleep(1 * time.Second)
fmt.Println("All services closed, program exiting")
}
go run main.go
# Press Ctrl+C to trigger graceful exit
Main program running, press Ctrl+C for graceful exit...
[Service 1] Started
[Service 2] Started
[Service 3] Started
[Service 1] Processing request...
[Service 2] Processing request...
[Service 3] Processing request...
[Service 1] Processing request...
^C
Received signal: interrupt, starting graceful exit...
[Service 1] Received exit signal, cleaning up...[Service 1] Stopped
[Service 2] Received exit signal, cleaning up...[Service 2] Stopped
[Service 3] Received exit signal, cleaning up...[Service 3] Stopped
All services closed, program exiting
❓ FAQ
Q1: What happens when multiple cases in select are ready simultaneously?
Go randomly selects one to execute, rather than picking the first one in order. This ensures fairness and prevents any channel from being starved.
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "A"
ch2 <- "B"
// Result is random, could be A or B
select {
case v := <-ch1:
fmt.Println(v)
case v := <-ch2:
fmt.Println(v)
}
Q2: When should the default branch in select be used?
Use default when you need non-blocking operations. Without default, select blocks until a case is ready.
// Non-blocking receive
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No data in channel right now")
}
Q3: How do you implement an infinite loop listening to multiple channels?
Use the for-select combination pattern:
for {
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
case <-quit:
fmt.Println("Exiting loop")
return
}
}
Q4: Does time.After in a loop cause memory leaks?
Yes! Each loop iteration with time.After creates a new timer that won't be garbage collected. The correct approach is to use time.NewTimer:
// ❌ Wrong: creates a new timer each iteration
for {
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(5 * time.Second):
fmt.Println("Timed out")
}
}
// ✅ Correct: reuse the timer
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case msg := <-ch:
fmt.Println(msg)
if !timer.Stop() {
<-timer.C
}
timer.Reset(5 * time.Second)
case <-timer.C:
fmt.Println("Timed out")
return
}
}
📖 Summary
| Pattern | Purpose | Key Code |
|---|---|---|
| Timeout control | Limit operation wait time | case <-time.After(d) |
| Non-blocking operation | Return immediately without waiting | select with default |
| Graceful exit | Listen to quit channel | case <-quit |
| Fan-out | One input distributed to multiple workers | Multiple goroutines reading from one channel |
| Fan-in | Multiple inputs merged into one channel | sync.WaitGroup + forwarding |
| Pipeline | Multi-stage processing | Chained channels |
Key Takeaways:
selectis Go's core concurrency tool for multiplexing- When multiple cases are ready simultaneously, one is chosen randomly for fairness
for-selectis the standard pattern for listening to multiple channels- Watch out for
time.Aftermemory leaks in loops - Fan-in/Fan-out/Pipeline are classic patterns for building concurrent pipelines
📝 Exercises
Exercise 1: Countdown Timer
Create a program using select and time.Ticker to implement a 5-second countdown, printing the remaining time each second, and printing "Launch!" at 0.
// Hints:
// ticker := time.NewTicker(1 * time.Second)
// select {
// case <-ticker.C:
// // update countdown
// }
Exercise 2: Multi-way Merge Sort
Implement a function that receives multiple sorted integer channels and uses the Fan-in pattern to merge them into a single sorted output channel.
// Function signature:
func mergeSorted(channels ...<-chan int) <-chan int {
// Implement merge logic
}
Exercise 3: Pipeline with Cancellation
Build a three-stage Pipeline (generate → filter even numbers → multiply by 10) that supports cancelling the entire pipeline via context:
// Hints:
// ctx, cancel := context.WithCancel(context.Background())
// Check ctx.Done() in each stage's select
Next Lesson: Lesson 16: Sync and Concurrency Safety — Learn about sync.Mutex, sync.WaitGroup, sync.Once, and other synchronization primitives for safe access to shared resources.



