404 Not Found

404 Not Found


nginx

Select与并发模式

第15课:Select与并发模式

生活类比

想象你是一个餐厅服务员,同时负责三张桌子:

你不会傻等某一张桌子,而是同时关注所有桌子,哪桌先举手就先去服务哪桌。这就是 select 的工作方式——同时监听多个通道,哪个先准备好就执行哪个。

核心概念

概念 说明
select 同时监听多个通道操作的语句
case 每个通道操作对应一个分支
default 所有通道都未就绪时执行(可选)
阻塞行为 没有 default 时,select 会阻塞直到某个 case 就绪
随机选择 多个 case 同时就绪时,随机选一个执行

基本语法与用法

标准 select 语句

GO
select {
case msg := <-ch1:
    // 从 ch1 接收到数据
    fmt.Println(msg)
case ch2 <- "hello":
    // 成功向 ch2 发送数据
case <-ch3:
    // ch3 已关闭或收到数据
default:
    // 所有通道都未就绪时执行
}

超时控制(最常用模式)

GO
select {
case msg := <-ch:
    fmt.Println("收到:", msg)
case <-time.After(3 * time.Second):
    fmt.Println("超时了!")
}

非阻塞通道操作

GO
select {
case msg := <-ch:
    fmt.Println("收到:", msg)
default:
    fmt.Println("没有数据,立即返回")
}
💡 Tips:

  • select 中至少要有一个 case,不能全是空的
  • 没有 default 且所有 case 都未就绪时,select永久阻塞
  • time.After() 返回一个通道,在指定时间后接收到值
  • 多个 case 同时就绪时,Go 会随机选择一个执行,这是为了避免饥饿

示例:基本 Select 用法(难度⭐)

演示如何同时监听两个通道:

GO
package main

import (
	"fmt"
	"time"
)

// 模拟两个数据源
func source(name string, ch chan<- string, delay time.Duration) {
	for i := 1; i <= 3; i++ {
		time.Sleep(delay)
		ch <- fmt.Sprintf("[%s] 消息 %d", name, i)
	}
	close(ch)
}

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go source("源A", ch1, 500*time.Millisecond)
	go source("源B", ch2, 800*time.Millisecond)

	// 用 select 同时监听两个通道
	// 需要监听 6 次(两个源各 3 条消息)
	for i := 0; i < 6; i++ {
		select {
		case msg, ok := <-ch1:
			if ok {
				fmt.Println("从 ch1:", msg)
			}
		case msg, ok := <-ch2:
			if ok {
				fmt.Println("从 ch2:", msg)
			}
		}
	}
	fmt.Println("所有消息处理完毕")
}
▶ 试一试
BASH
go run main.go
TEXT
从 ch1: [源A] 消息 1
从 ch2: [源B] 消息 1
从 ch1: [源A] 消息 2
从 ch1: [源A] 消息 3
从 ch2: [源B] 消息 2
从 ch2: [源B] 消息 3
所有消息处理完毕

示例:超时控制与 quit 退出(难度⭐⭐)

使用 select 实现带超时和优雅退出的 worker:

GO
package main

import (
	"fmt"
	"math/rand"
	"time"
)

// worker 模拟一个耗时任务
func worker(id int, jobs <-chan int, results chan<- string, done chan<- int) {
	for job := range jobs {
		// 模拟不确定的处理时间
		duration := time.Duration(rand.Intn(500)+200) * time.Millisecond
		time.Sleep(duration)
		results <- fmt.Sprintf("Worker %d 完成任务 %d (耗时 %v)", id, job, duration)
	}
	done <- id
}

func main() {
	jobs := make(chan int, 10)
	results := make(chan string, 10)
	done := make(chan int, 3)

	// 启动 3 个 worker
	for i := 1; i <= 3; i++ {
		go worker(i, jobs, results, done)
	}

	// 分发 6 个任务
	for j := 1; j <= 6; j++ {
		jobs <- j
	}
	close(jobs)

	// 收集结果,带超时控制
	timeout := time.After(2 * time.Second)
	finished := 0

	for finished < 6 {
		select {
		case result := <-results:
			fmt.Println(result)
			finished++
		case workerID := <-done:
			fmt.Printf(">>> Worker %d 已退出\n", workerID)
		case <-timeout:
			fmt.Println("⏰ 超时!部分任务未完成")
			return
		}
	}
	fmt.Println("所有任务已完成")
}
▶ 试一试
BASH
go run main.go
TEXT
Worker 2 完成任务 2 (耗时 234ms)
Worker 1 完成任务 1 (耗时 345ms)
Worker 3 完成任务 3 (耗时 289ms)
Worker 2 完成任务 4 (耗时 412ms)
Worker 1 完成任务 5 (耗时 198ms)
>>> Worker 1 已退出
Worker 3 完成任务 6 (耗时 367ms)
所有任务已完成

示例:Fan-in / Fan-out 与 Pipeline 模式(难度⭐⭐⭐)

展示经典的并发模式:Pipeline 管道 + Fan-out 扇出 + Fan-in 扇入:

GO
package main

import (
	"fmt"
	"sync"
	"time"
)

// 阶段1:数据生成器(Pipeline 起点)
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// 阶段2:平方计算(Pipeline 中间阶段)
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			time.Sleep(100 * time.Millisecond) // 模拟计算耗时
			out <- n * n
		}
		close(out)
	}()
	return out
}

// Fan-out:将一个通道分发给多个 worker 处理
func fanOut(in <-chan int, workers int) []<-chan int {
	channels := make([]<-chan int, workers)
	for i := 0; i < workers; i++ {
		channels[i] = square(in)
	}
	return channels
}

// Fan-in:将多个通道合并为一个
func fanIn(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// 每个通道启动一个 goroutine 转发数据
	wg.Add(len(channels))
	for _, ch := range channels {
		go func(c <-chan int) {
			defer wg.Done()
			for val := range c {
				out <- val
			}
		}(ch)
	}

	// 所有输入通道关闭后,关闭输出通道
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	// 阶段1:生成数据
	nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	source := generator(nums...)

	// 阶段2:Fan-out 分发给 3 个 worker 并行计算
	workers := fanOut(source, 3)

	// 阶段3:Fan-in 合并结果
	merged := fanIn(workers...)

	// 收集所有结果
	results := make(map[int]bool)
	for val := range merged {
		results[val] = true
		fmt.Printf("收到结果: %d\n", val)
	}

	fmt.Println("\n去重后的结果:")
	for val := range results {
		fmt.Printf("  %d\n", val)
	}
}
▶ 试一试
BASH
go run main.go
TEXT
收到结果: 1
收到结果: 4
收到结果: 9
收到结果: 16
收到结果: 25
收到结果: 36
收到结果: 49
收到结果: 64
收到结果: 81
收到结果: 100

去重后的结果:
  1
  4
  9
  16
  25
  36
  49
  64
  81
  100

场景一:HTTP 请求竞速(Race)

同时向多个服务器发送请求,使用第一个返回的结果:

GO
package main

import (
	"fmt"
	"time"
)

// 模拟向不同服务器发送请求
func fetchFromServer(name string, delay time.Duration) <-chan string {
	ch := make(chan string, 1)
	go func() {
		time.Sleep(delay)
		ch <- fmt.Sprintf("来自 %s 的响应", name)
	}()
	return ch
}

// 竞速请求:返回最快响应的结果
func race(urls map[string]time.Duration, timeout time.Duration) (string, error) {
	// 为每个服务器创建一个通道
	ch := make(chan string, len(urls))
	for name, delay := range urls {
		go func(n string, d time.Duration) {
			ch <- fetchResult(n, d)
		}(name, delay)
	}

	// 等待第一个响应或超时
	select {
	case result := <-ch:
		return result, nil
	case <-time.After(timeout):
		return "", fmt.Errorf("所有服务器均超时")
	}
}

func fetchResult(name string, delay time.Duration) string {
	time.Sleep(delay)
	return fmt.Sprintf("来自 %s 的数据 (延迟 %v)", name, delay)
}

func main() {
	servers := map[string]time.Duration{
		"服务器A-北京": 800 * time.Millisecond,
		"服务器B-上海": 300 * time.Millisecond,
		"服务器C-广州": 500 * time.Millisecond,
	}

	result, err := race(servers, 2*time.Second)
	if err != nil {
		fmt.Println("错误:", err)
	} else {
		fmt.Println("竞速结果:", result)
	}
}
BASH
go run main.go
TEXT
竞速结果: 来自 服务器B-上海 的数据 (延迟 300ms)

场景二:优雅退出(Graceful Shutdown)

使用 quit 通道实现服务的优雅停止:

GO
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)

// server 模拟一个持续运行的服务
func server(id int, quit <-chan struct{}) {
	fmt.Printf("[服务 %d] 已启动\n", id)
	ticker := time.NewTicker(500 * time.Millisecond)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			fmt.Printf("[服务 %d] 处理请求中...\n", id)
		case <-quit:
			fmt.Printf("[服务 %d] 收到退出信号,正在清理...", id)
			time.Sleep(200 * time.Millisecond) // 模拟清理工作
			fmt.Printf("[服务 %d] 已停止\n", id)
			return
		}
	}
}

func main() {
	// 监听系统中断信号 (Ctrl+C)
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// 创建 quit 通道
	quit := make(chan struct{})

	// 启动多个服务
	for i := 1; i <= 3; i++ {
		go server(i, quit)
	}

	fmt.Println("主程序运行中,按 Ctrl+C 优雅退出...")

	// 等待系统信号
	sig := <-sigChan
	fmt.Printf("\n收到信号: %v,开始优雅退出...\n", sig)

	// 通知所有服务退出
	close(quit)

	// 给服务时间完成清理
	time.Sleep(1 * time.Second)
	fmt.Println("所有服务已关闭,程序退出")
}
BASH
go run main.go
# 按 Ctrl+C 触发优雅退出
TEXT
主程序运行中,按 Ctrl+C 优雅退出...
[服务 1] 已启动
[服务 2] 已启动
[服务 3] 已启动
[服务 1] 处理请求中...
[服务 2] 处理请求中...
[服务 3] 处理请求中...
[服务 1] 处理请求中...
^C
收到信号: interrupt,开始优雅退出...
[服务 1] 收到退出信号,正在清理...[服务 1] 已停止
[服务 2] 收到退出信号,正在清理...[服务 2] 已停止
[服务 3] 收到退出信号,正在清理...[服务 3] 已停止
所有服务已关闭,程序退出

❓ 常见问题

Q1:select 中多个 case 同时就绪会怎样?

Go 会随机选择一个执行,而不是按顺序选第一个。这保证了公平性,避免某个通道被饿死。

GO
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "A"
ch2 <- "B"

// 结果是随机的,可能是 A 也可能是 B
select {
case v := <-ch1:
    fmt.Println(v)
case v := <-ch2:
    fmt.Println(v)
}

Q2:select 的 default 分支什么时候用?

当你需要非阻塞操作时使用 default。如果没有 defaultselect 会阻塞直到某个 case 就绪。

GO
// 非阻塞接收
select {
case msg := <-ch:
    fmt.Println("收到:", msg)
default:
    fmt.Println("通道暂时没有数据")
}

Q3:如何实现无限循环监听多个通道?

for-select 组合模式:

GO
for {
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    case msg := <-ch2:
        fmt.Println(msg)
    case <-quit:
        fmt.Println("退出循环")
        return
    }
}

Q4:time.After 在循环中使用会内存泄漏吗?

会!每次循环 time.After 都会创建一个新定时器,旧的不会被回收。正确做法是用 time.NewTimer

GO
// ❌ 错误:每次循环创建新定时器
for {
    select {
    case msg := <-ch:
        fmt.Println(msg)
    case <-time.After(5 * time.Second):
        fmt.Println("超时")
    }
}

// ✅ 正确:复用定时器
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
    select {
    case msg := <-ch:
        fmt.Println(msg)
        if !timer.Stop() {
            <-timer.C
        }
        timer.Reset(5 * time.Second)
    case <-timer.C:
        fmt.Println("超时")
        return
    }
}

📖 小节

模式 用途 关键代码
超时控制 限制操作等待时间 case <-time.After(d)
非阻塞操作 立即返回不等待 default 的 select
优雅退出 监听 quit 通道 case <-quit
Fan-out 一个输入分发给多个 worker 多个 goroutine 读同一通道
Fan-in 多个输入合并为一个通道 sync.WaitGroup + 转发
Pipeline 多阶段流水线处理 通道串联

关键要点:

  1. select 是 Go 并发的核心工具,实现多路复用
  2. 多个 case 同时就绪时随机选择,保证公平
  3. for-select 组合是监听多个通道的标准模式
  4. 注意 time.After 在循环中的内存泄漏问题
  5. Fan-in/Fan-out/Pipeline 是构建并发管道的经典模式

📝 作业

练习1:倒计时器

创建一个程序,使用 selecttime.Ticker 实现 5 秒倒计时,每秒打印一次剩余时间,到 0 时打印 "发射!"。

GO
// 提示:
// ticker := time.NewTicker(1 * time.Second)
// select {
// case <-ticker.C:
//     // 更新倒计时
// }

练习2:多路合并排序

实现一个函数,接收多个已排序的整数通道,使用 Fan-in 模式将它们合并为一个有序的输出通道。

GO
// 函数签名:
func mergeSorted(channels ...<-chan int) <-chan int {
    // 实现合并逻辑
}

练习3:带取消的 Pipeline

构建一个三阶段 Pipeline(生成 → 过滤偶数 → 乘以10),支持通过 context 取消整个管道:

GO
// 提示:
// ctx, cancel := context.WithCancel(context.Background())
// 在每个阶段的 select 中检查 ctx.Done()

下一课: 第16课:Sync与并发安全 — 学习 sync.Mutexsync.WaitGroupsync.Once 等同步原语,保护共享资源的安全访问。

Web-Tutorial.com

Web-Tutorial 技术团队

由多位开发者共同维护的编程教程平台。每篇教程由对应领域的开发者编写和审核,确保内容准确可靠。如发现任何问题,欢迎向我们反馈。

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏