Select与并发模式
第15课:Select与并发模式
生活类比
想象你是一个餐厅服务员,同时负责三张桌子:
- 桌A的顾客在点菜
- 桌B的顾客在买单
- 桌C的顾客在要水
你不会傻等某一张桌子,而是同时关注所有桌子,哪桌先举手就先去服务哪桌。这就是 select 的工作方式——同时监听多个通道,哪个先准备好就执行哪个。
核心概念
| 概念 | 说明 |
|---|---|
select |
同时监听多个通道操作的语句 |
case |
每个通道操作对应一个分支 |
default |
所有通道都未就绪时执行(可选) |
| 阻塞行为 | 没有 default 时,select 会阻塞直到某个 case 就绪 |
| 随机选择 | 多个 case 同时就绪时,随机选一个执行 |
基本语法与用法
标准 select 语句
GO
select {
case msg := <-ch1:
// 从 ch1 接收到数据
fmt.Println(msg)
case ch2 <- "hello":
// 成功向 ch2 发送数据
case <-ch3:
// ch3 已关闭或收到数据
default:
// 所有通道都未就绪时执行
}
超时控制(最常用模式)
GO
select {
case msg := <-ch:
fmt.Println("收到:", msg)
case <-time.After(3 * time.Second):
fmt.Println("超时了!")
}
非阻塞通道操作
GO
select {
case msg := <-ch:
fmt.Println("收到:", msg)
default:
fmt.Println("没有数据,立即返回")
}
💡 Tips:
select中至少要有一个case,不能全是空的- 没有
default且所有case都未就绪时,select会永久阻塞 time.After()返回一个通道,在指定时间后接收到值- 多个
case同时就绪时,Go 会随机选择一个执行,这是为了避免饥饿
示例:基本 Select 用法(难度⭐)
演示如何同时监听两个通道:
GO
package main
import (
"fmt"
"time"
)
// 模拟两个数据源
func source(name string, ch chan<- string, delay time.Duration) {
for i := 1; i <= 3; i++ {
time.Sleep(delay)
ch <- fmt.Sprintf("[%s] 消息 %d", name, i)
}
close(ch)
}
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go source("源A", ch1, 500*time.Millisecond)
go source("源B", ch2, 800*time.Millisecond)
// 用 select 同时监听两个通道
// 需要监听 6 次(两个源各 3 条消息)
for i := 0; i < 6; i++ {
select {
case msg, ok := <-ch1:
if ok {
fmt.Println("从 ch1:", msg)
}
case msg, ok := <-ch2:
if ok {
fmt.Println("从 ch2:", msg)
}
}
}
fmt.Println("所有消息处理完毕")
}
BASH
go run main.go
TEXT
从 ch1: [源A] 消息 1
从 ch2: [源B] 消息 1
从 ch1: [源A] 消息 2
从 ch1: [源A] 消息 3
从 ch2: [源B] 消息 2
从 ch2: [源B] 消息 3
所有消息处理完毕
示例:超时控制与 quit 退出(难度⭐⭐)
使用 select 实现带超时和优雅退出的 worker:
GO
package main
import (
"fmt"
"math/rand"
"time"
)
// worker 模拟一个耗时任务
func worker(id int, jobs <-chan int, results chan<- string, done chan<- int) {
for job := range jobs {
// 模拟不确定的处理时间
duration := time.Duration(rand.Intn(500)+200) * time.Millisecond
time.Sleep(duration)
results <- fmt.Sprintf("Worker %d 完成任务 %d (耗时 %v)", id, job, duration)
}
done <- id
}
func main() {
jobs := make(chan int, 10)
results := make(chan string, 10)
done := make(chan int, 3)
// 启动 3 个 worker
for i := 1; i <= 3; i++ {
go worker(i, jobs, results, done)
}
// 分发 6 个任务
for j := 1; j <= 6; j++ {
jobs <- j
}
close(jobs)
// 收集结果,带超时控制
timeout := time.After(2 * time.Second)
finished := 0
for finished < 6 {
select {
case result := <-results:
fmt.Println(result)
finished++
case workerID := <-done:
fmt.Printf(">>> Worker %d 已退出\n", workerID)
case <-timeout:
fmt.Println("⏰ 超时!部分任务未完成")
return
}
}
fmt.Println("所有任务已完成")
}
BASH
go run main.go
TEXT
Worker 2 完成任务 2 (耗时 234ms)
Worker 1 完成任务 1 (耗时 345ms)
Worker 3 完成任务 3 (耗时 289ms)
Worker 2 完成任务 4 (耗时 412ms)
Worker 1 完成任务 5 (耗时 198ms)
>>> Worker 1 已退出
Worker 3 完成任务 6 (耗时 367ms)
所有任务已完成
示例:Fan-in / Fan-out 与 Pipeline 模式(难度⭐⭐⭐)
展示经典的并发模式:Pipeline 管道 + Fan-out 扇出 + Fan-in 扇入:
GO
package main
import (
"fmt"
"sync"
"time"
)
// 阶段1:数据生成器(Pipeline 起点)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2:平方计算(Pipeline 中间阶段)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
time.Sleep(100 * time.Millisecond) // 模拟计算耗时
out <- n * n
}
close(out)
}()
return out
}
// Fan-out:将一个通道分发给多个 worker 处理
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = square(in)
}
return channels
}
// Fan-in:将多个通道合并为一个
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 每个通道启动一个 goroutine 转发数据
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
// 所有输入通道关闭后,关闭输出通道
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 阶段1:生成数据
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
source := generator(nums...)
// 阶段2:Fan-out 分发给 3 个 worker 并行计算
workers := fanOut(source, 3)
// 阶段3:Fan-in 合并结果
merged := fanIn(workers...)
// 收集所有结果
results := make(map[int]bool)
for val := range merged {
results[val] = true
fmt.Printf("收到结果: %d\n", val)
}
fmt.Println("\n去重后的结果:")
for val := range results {
fmt.Printf(" %d\n", val)
}
}
BASH
go run main.go
TEXT
收到结果: 1
收到结果: 4
收到结果: 9
收到结果: 16
收到结果: 25
收到结果: 36
收到结果: 49
收到结果: 64
收到结果: 81
收到结果: 100
去重后的结果:
1
4
9
16
25
36
49
64
81
100
场景一:HTTP 请求竞速(Race)
同时向多个服务器发送请求,使用第一个返回的结果:
GO
package main
import (
"fmt"
"time"
)
// 模拟向不同服务器发送请求
func fetchFromServer(name string, delay time.Duration) <-chan string {
ch := make(chan string, 1)
go func() {
time.Sleep(delay)
ch <- fmt.Sprintf("来自 %s 的响应", name)
}()
return ch
}
// 竞速请求:返回最快响应的结果
func race(urls map[string]time.Duration, timeout time.Duration) (string, error) {
// 为每个服务器创建一个通道
ch := make(chan string, len(urls))
for name, delay := range urls {
go func(n string, d time.Duration) {
ch <- fetchResult(n, d)
}(name, delay)
}
// 等待第一个响应或超时
select {
case result := <-ch:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("所有服务器均超时")
}
}
func fetchResult(name string, delay time.Duration) string {
time.Sleep(delay)
return fmt.Sprintf("来自 %s 的数据 (延迟 %v)", name, delay)
}
func main() {
servers := map[string]time.Duration{
"服务器A-北京": 800 * time.Millisecond,
"服务器B-上海": 300 * time.Millisecond,
"服务器C-广州": 500 * time.Millisecond,
}
result, err := race(servers, 2*time.Second)
if err != nil {
fmt.Println("错误:", err)
} else {
fmt.Println("竞速结果:", result)
}
}
BASH
go run main.go
TEXT
竞速结果: 来自 服务器B-上海 的数据 (延迟 300ms)
场景二:优雅退出(Graceful Shutdown)
使用 quit 通道实现服务的优雅停止:
GO
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// server 模拟一个持续运行的服务
func server(id int, quit <-chan struct{}) {
fmt.Printf("[服务 %d] 已启动\n", id)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("[服务 %d] 处理请求中...\n", id)
case <-quit:
fmt.Printf("[服务 %d] 收到退出信号,正在清理...", id)
time.Sleep(200 * time.Millisecond) // 模拟清理工作
fmt.Printf("[服务 %d] 已停止\n", id)
return
}
}
}
func main() {
// 监听系统中断信号 (Ctrl+C)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// 创建 quit 通道
quit := make(chan struct{})
// 启动多个服务
for i := 1; i <= 3; i++ {
go server(i, quit)
}
fmt.Println("主程序运行中,按 Ctrl+C 优雅退出...")
// 等待系统信号
sig := <-sigChan
fmt.Printf("\n收到信号: %v,开始优雅退出...\n", sig)
// 通知所有服务退出
close(quit)
// 给服务时间完成清理
time.Sleep(1 * time.Second)
fmt.Println("所有服务已关闭,程序退出")
}
BASH
go run main.go
# 按 Ctrl+C 触发优雅退出
TEXT
主程序运行中,按 Ctrl+C 优雅退出...
[服务 1] 已启动
[服务 2] 已启动
[服务 3] 已启动
[服务 1] 处理请求中...
[服务 2] 处理请求中...
[服务 3] 处理请求中...
[服务 1] 处理请求中...
^C
收到信号: interrupt,开始优雅退出...
[服务 1] 收到退出信号,正在清理...[服务 1] 已停止
[服务 2] 收到退出信号,正在清理...[服务 2] 已停止
[服务 3] 收到退出信号,正在清理...[服务 3] 已停止
所有服务已关闭,程序退出
❓ 常见问题
Q1:select 中多个 case 同时就绪会怎样?
Go 会随机选择一个执行,而不是按顺序选第一个。这保证了公平性,避免某个通道被饿死。
GO
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "A"
ch2 <- "B"
// 结果是随机的,可能是 A 也可能是 B
select {
case v := <-ch1:
fmt.Println(v)
case v := <-ch2:
fmt.Println(v)
}
Q2:select 的 default 分支什么时候用?
当你需要非阻塞操作时使用 default。如果没有 default,select 会阻塞直到某个 case 就绪。
GO
// 非阻塞接收
select {
case msg := <-ch:
fmt.Println("收到:", msg)
default:
fmt.Println("通道暂时没有数据")
}
Q3:如何实现无限循环监听多个通道?
用 for-select 组合模式:
GO
for {
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
case <-quit:
fmt.Println("退出循环")
return
}
}
Q4:time.After 在循环中使用会内存泄漏吗?
会!每次循环 time.After 都会创建一个新定时器,旧的不会被回收。正确做法是用 time.NewTimer:
GO
// ❌ 错误:每次循环创建新定时器
for {
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(5 * time.Second):
fmt.Println("超时")
}
}
// ✅ 正确:复用定时器
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case msg := <-ch:
fmt.Println(msg)
if !timer.Stop() {
<-timer.C
}
timer.Reset(5 * time.Second)
case <-timer.C:
fmt.Println("超时")
return
}
}
📖 小节
| 模式 | 用途 | 关键代码 |
|---|---|---|
| 超时控制 | 限制操作等待时间 | case <-time.After(d) |
| 非阻塞操作 | 立即返回不等待 | 带 default 的 select |
| 优雅退出 | 监听 quit 通道 | case <-quit |
| Fan-out | 一个输入分发给多个 worker | 多个 goroutine 读同一通道 |
| Fan-in | 多个输入合并为一个通道 | sync.WaitGroup + 转发 |
| Pipeline | 多阶段流水线处理 | 通道串联 |
关键要点:
select是 Go 并发的核心工具,实现多路复用- 多个 case 同时就绪时随机选择,保证公平
for-select组合是监听多个通道的标准模式- 注意
time.After在循环中的内存泄漏问题 - Fan-in/Fan-out/Pipeline 是构建并发管道的经典模式
📝 作业
练习1:倒计时器
创建一个程序,使用 select 和 time.Ticker 实现 5 秒倒计时,每秒打印一次剩余时间,到 0 时打印 "发射!"。
GO
// 提示:
// ticker := time.NewTicker(1 * time.Second)
// select {
// case <-ticker.C:
// // 更新倒计时
// }
练习2:多路合并排序
实现一个函数,接收多个已排序的整数通道,使用 Fan-in 模式将它们合并为一个有序的输出通道。
GO
// 函数签名:
func mergeSorted(channels ...<-chan int) <-chan int {
// 实现合并逻辑
}
练习3:带取消的 Pipeline
构建一个三阶段 Pipeline(生成 → 过滤偶数 → 乘以10),支持通过 context 取消整个管道:
GO
// 提示:
// ctx, cancel := context.WithCancel(context.Background())
// 在每个阶段的 select 中检查 ctx.Done()
下一课: 第16课:Sync与并发安全 — 学习 sync.Mutex、sync.WaitGroup、sync.Once 等同步原语,保护共享资源的安全访问。



