Select and Concurrency Patterns

Lesson 15: Select and Concurrency Patterns

Analogy

Imagine you're a restaurant waiter simultaneously responsible for three tables:

You wouldn't foolishly wait for just one table — instead, you watch all tables at once and serve whichever raises their hand first. That's how select works — it listens to multiple channels simultaneously and executes whichever is ready first.

Core Concepts

Concept Description
select A statement that listens to multiple channel operations simultaneously
case Each channel operation corresponds to a branch
default Executes when no channels are ready (optional)
Blocking behavior Without default, select blocks until a case is ready
Random selection When multiple cases are ready simultaneously, one is chosen randomly

Basic Syntax and Usage

Standard select Statement

GO
select {
case msg := <-ch1:
    // Received data from ch1
    fmt.Println(msg)
case ch2 <- "hello":
    // Successfully sent data to ch2
case <-ch3:
    // ch3 is closed or data received
default:
    // Executes when no channels are ready
}

Timeout Control (Most Common Pattern)

GO
select {
case msg := <-ch:
    fmt.Println("Received:", msg)
case <-time.After(3 * time.Second):
    fmt.Println("Timed out!")
}

Non-blocking Channel Operation

GO
select {
case msg := <-ch:
    fmt.Println("Received:", msg)
default:
    fmt.Println("No data available, returning immediately")
}
💡 Tip:

  • select must have at least one case; it cannot be entirely empty
  • Without default and with all cases not ready, select will block permanently
  • time.After() returns a channel that receives a value after the specified duration
  • When multiple cases are ready simultaneously, Go chooses one randomly to prevent starvation

Example: Basic Select Usage (Difficulty ⭐)

Demonstrates how to listen to two channels simultaneously:

GO
package main

import (
	"fmt"
	"time"
)

// Simulate two data sources
func source(name string, ch chan<- string, delay time.Duration) {
	for i := 1; i <= 3; i++ {
		time.Sleep(delay)
		ch <- fmt.Sprintf("[%s] Message %d", name, i)
	}
	close(ch)
}

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go source("SourceA", ch1, 500*time.Millisecond)
	go source("SourceB", ch2, 800*time.Millisecond)

	// Use select to listen to both channels simultaneously
	// Need to listen 6 times (3 messages from each source)
	for i := 0; i < 6; i++ {
		select {
		case msg, ok := <-ch1:
			if ok {
				fmt.Println("From ch1:", msg)
			}
		case msg, ok := <-ch2:
			if ok {
				fmt.Println("From ch2:", msg)
			}
		}
	}
	fmt.Println("All messages processed")
}
▶ Try it Yourself
BASH
go run main.go
TEXT
From ch1: [SourceA] Message 1
From ch2: [SourceB] Message 1
From ch1: [SourceA] Message 2
From ch1: [SourceA] Message 3
From ch2: [SourceB] Message 2
From ch2: [SourceB] Message 3
All messages processed

Example: Timeout Control and Quit Channel (Difficulty ⭐⭐)

Use select to implement a worker with timeout and graceful shutdown:

GO
package main

import (
	"fmt"
	"math/rand"
	"time"
)

// worker simulates a time-consuming task
func worker(id int, jobs <-chan int, results chan<- string, done chan<- int) {
	for job := range jobs {
		// Simulate unpredictable processing time
		duration := time.Duration(rand.Intn(500)+200) * time.Millisecond
		time.Sleep(duration)
		results <- fmt.Sprintf("Worker %d completed task %d (took %v)", id, job, duration)
	}
	done <- id
}

func main() {
	jobs := make(chan int, 10)
	results := make(chan string, 10)
	done := make(chan int, 3)

	// Start 3 workers
	for i := 1; i <= 3; i++ {
		go worker(i, jobs, results, done)
	}

	// Distribute 6 tasks
	for j := 1; j <= 6; j++ {
		jobs <- j
	}
	close(jobs)

	// Collect results with timeout control
	timeout := time.After(2 * time.Second)
	finished := 0

	for finished < 6 {
		select {
		case result := <-results:
			fmt.Println(result)
			finished++
		case workerID := <-done:
			fmt.Printf(">>> Worker %d has exited\n", workerID)
		case <-timeout:
			fmt.Println("⏰ Timeout! Some tasks incomplete")
			return
		}
	}
	fmt.Println("All tasks completed")
}
▶ Try it Yourself
BASH
go run main.go
TEXT
Worker 2 completed task 2 (took 234ms)
Worker 1 completed task 1 (took 345ms)
Worker 3 completed task 3 (took 289ms)
Worker 2 completed task 4 (took 412ms)
Worker 1 completed task 5 (took 198ms)
>>> Worker 1 has exited
Worker 3 completed task 6 (took 367ms)
All tasks completed

Example: Fan-in / Fan-out and Pipeline Pattern (Difficulty ⭐⭐⭐)

Demonstrates classic concurrency patterns: Pipeline + Fan-out + Fan-in:

GO
package main

import (
	"fmt"
	"sync"
	"time"
)

// Stage 1: Data generator (Pipeline start)
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// Stage 2: Square computation (Pipeline middle stage)
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			time.Sleep(100 * time.Millisecond) // simulate computation time
			out <- n * n
		}
		close(out)
	}()
	return out
}

// Fan-out: distribute one channel to multiple workers
func fanOut(in <-chan int, workers int) []<-chan int {
	channels := make([]<-chan int, workers)
	for i := 0; i < workers; i++ {
		channels[i] = square(in)
	}
	return channels
}

// Fan-in: merge multiple channels into one
func fanIn(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// Start a goroutine for each channel to forward data
	wg.Add(len(channels))
	for _, ch := range channels {
		go func(c <-chan int) {
			defer wg.Done()
			for val := range c {
				out <- val
			}
		}(ch)
	}

	// Close the output channel after all input channels are closed
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	// Stage 1: Generate data
	nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	source := generator(nums...)

	// Stage 2: Fan-out to 3 workers for parallel computation
	workers := fanOut(source, 3)

	// Stage 3: Fan-in to merge results
	merged := fanIn(workers...)

	// Collect all results
	results := make(map[int]bool)
	for val := range merged {
		results[val] = true
		fmt.Printf("Received result: %d\n", val)
	}

	fmt.Println("\nDeduplicated results:")
	for val := range results {
		fmt.Printf("  %d\n", val)
	}
}
▶ Try it Yourself
BASH
go run main.go
TEXT
Received result: 1
Received result: 4
Received result: 9
Received result: 16
Received result: 25
Received result: 36
Received result: 49
Received result: 64
Received result: 81
Received result: 100

Deduplicated results:
  1
  4
  9
  16
  25
  36
  49
  64
  81
  100

Scenario 1: HTTP Request Race

Send requests to multiple servers simultaneously, using the first response:

GO
package main

import (
	"fmt"
	"time"
)

// Simulate sending requests to different servers
func fetchFromServer(name string, delay time.Duration) <-chan string {
	ch := make(chan string, 1)
	go func() {
		time.Sleep(delay)
		ch <- fmt.Sprintf("Response from %s", name)
	}()
	return ch
}

// Race requests: return the fastest response
func race(urls map[string]time.Duration, timeout time.Duration) (string, error) {
	// Create a channel for each server
	ch := make(chan string, len(urls))
	for name, delay := range urls {
		go func(n string, d time.Duration) {
			ch <- fetchResult(n, d)
		}(name, delay)
	}

	// Wait for the first response or timeout
	select {
	case result := <-ch:
		return result, nil
	case <-time.After(timeout):
		return "", fmt.Errorf("all servers timed out")
	}
}

func fetchResult(name string, delay time.Duration) string {
	time.Sleep(delay)
	return fmt.Sprintf("Data from %s (latency %v)", name, delay)
}

func main() {
	servers := map[string]time.Duration{
		"ServerA-Beijing":  800 * time.Millisecond,
		"ServerB-Shanghai": 300 * time.Millisecond,
		"ServerC-Guangzhou": 500 * time.Millisecond,
	}

	result, err := race(servers, 2*time.Second)
	if err != nil {
		fmt.Println("Error:", err)
	} else {
		fmt.Println("Race result:", result)
	}
}
BASH
go run main.go
TEXT
Race result: Data from ServerB-Shanghai (latency 300ms)

Scenario 2: Graceful Shutdown

Use a quit channel to implement graceful service shutdown:

GO
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)

// server simulates a continuously running service
func server(id int, quit <-chan struct{}) {
	fmt.Printf("[Service %d] Started\n", id)
	ticker := time.NewTicker(500 * time.Millisecond)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			fmt.Printf("[Service %d] Processing request...\n", id)
		case <-quit:
			fmt.Printf("[Service %d] Received exit signal, cleaning up...", id)
			time.Sleep(200 * time.Millisecond) // simulate cleanup
			fmt.Printf("[Service %d] Stopped\n", id)
			return
		}
	}
}

func main() {
	// Listen for system interrupt signals (Ctrl+C)
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// Create quit channel
	quit := make(chan struct{})

	// Start multiple services
	for i := 1; i <= 3; i++ {
		go server(i, quit)
	}

	fmt.Println("Main program running, press Ctrl+C for graceful exit...")

	// Wait for system signal
	sig := <-sigChan
	fmt.Printf("\nReceived signal: %v, starting graceful exit...\n", sig)

	// Notify all services to exit
	close(quit)

	// Give services time to complete cleanup
	time.Sleep(1 * time.Second)
	fmt.Println("All services closed, program exiting")
}
BASH
go run main.go
# Press Ctrl+C to trigger graceful exit
TEXT
Main program running, press Ctrl+C for graceful exit...
[Service 1] Started
[Service 2] Started
[Service 3] Started
[Service 1] Processing request...
[Service 2] Processing request...
[Service 3] Processing request...
[Service 1] Processing request...
^C
Received signal: interrupt, starting graceful exit...
[Service 1] Received exit signal, cleaning up...[Service 1] Stopped
[Service 2] Received exit signal, cleaning up...[Service 2] Stopped
[Service 3] Received exit signal, cleaning up...[Service 3] Stopped
All services closed, program exiting

❓ FAQ

Q1: What happens when multiple cases in select are ready simultaneously?

Go randomly selects one to execute, rather than picking the first one in order. This ensures fairness and prevents any channel from being starved.

GO
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "A"
ch2 <- "B"

// Result is random, could be A or B
select {
case v := <-ch1:
    fmt.Println(v)
case v := <-ch2:
    fmt.Println(v)
}

Q2: When should the default branch in select be used?

Use default when you need non-blocking operations. Without default, select blocks until a case is ready.

GO
// Non-blocking receive
select {
case msg := <-ch:
    fmt.Println("Received:", msg)
default:
    fmt.Println("No data in channel right now")
}

Q3: How do you implement an infinite loop listening to multiple channels?

Use the for-select combination pattern:

GO
for {
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    case msg := <-ch2:
        fmt.Println(msg)
    case <-quit:
        fmt.Println("Exiting loop")
        return
    }
}

Q4: Does time.After in a loop cause memory leaks?

Yes! Each loop iteration with time.After creates a new timer that won't be garbage collected. The correct approach is to use time.NewTimer:

GO
// ❌ Wrong: creates a new timer each iteration
for {
    select {
    case msg := <-ch:
        fmt.Println(msg)
    case <-time.After(5 * time.Second):
        fmt.Println("Timed out")
    }
}

// ✅ Correct: reuse the timer
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
    select {
    case msg := <-ch:
        fmt.Println(msg)
        if !timer.Stop() {
            <-timer.C
        }
        timer.Reset(5 * time.Second)
    case <-timer.C:
        fmt.Println("Timed out")
        return
    }
}

📖 Summary

Pattern Purpose Key Code
Timeout control Limit operation wait time case <-time.After(d)
Non-blocking operation Return immediately without waiting select with default
Graceful exit Listen to quit channel case <-quit
Fan-out One input distributed to multiple workers Multiple goroutines reading from one channel
Fan-in Multiple inputs merged into one channel sync.WaitGroup + forwarding
Pipeline Multi-stage processing Chained channels

Key Takeaways:

  1. select is Go's core concurrency tool for multiplexing
  2. When multiple cases are ready simultaneously, one is chosen randomly for fairness
  3. for-select is the standard pattern for listening to multiple channels
  4. Watch out for time.After memory leaks in loops
  5. Fan-in/Fan-out/Pipeline are classic patterns for building concurrent pipelines

📝 Exercises

Exercise 1: Countdown Timer

Create a program using select and time.Ticker to implement a 5-second countdown, printing the remaining time each second, and printing "Launch!" at 0.

GO
// Hints:
// ticker := time.NewTicker(1 * time.Second)
// select {
// case <-ticker.C:
//     // update countdown
// }

Exercise 2: Multi-way Merge Sort

Implement a function that receives multiple sorted integer channels and uses the Fan-in pattern to merge them into a single sorted output channel.

GO
// Function signature:
func mergeSorted(channels ...<-chan int) <-chan int {
    // Implement merge logic
}

Exercise 3: Pipeline with Cancellation

Build a three-stage Pipeline (generate → filter even numbers → multiply by 10) that supports cancelling the entire pipeline via context:

GO
// Hints:
// ctx, cancel := context.WithCancel(context.Background())
// Check ctx.Done() in each stage's select

Next Lesson: Lesson 16: Sync and Concurrency Safety — Learn about sync.Mutex, sync.WaitGroup, sync.Once, and other synchronization primitives for safe access to shared resources.

Web-Tutorial.com

Web-Tutorial Tech Team

A team of developers maintaining programming tutorials. Each tutorial is written and reviewed by developers with expertise in that field. We work to keep our content accurate and reliable — if you spot an issue, please let us know.

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏