Selectと並行処理パターン
レッスン15:Selectと並行処理パターン
例え話
あなたがレストランのウェイターで、同時に3つのテーブルを担当していると想像してください:
- テーブルAの客が注文している
- テーブルBの客が会計をしている
- テーブルCの客が水を求めている
愚かにも1つのテーブルだけを待つことはしません——代わりにすべてのテーブルを同時に監視し、最初に手を挙げたテーブルに対応します。これがselectの仕組みです——複数のチャネルを同時にリッスンし、準備ができているものを実行します。
コアコンセプト
| コンセプト | 説明 |
|---|---|
select |
複数のチャネル操作を同時にリッスンする文 |
case |
各チャネル操作がブランチに対応 |
default |
どのチャネルも準備できていない時に実行(オプション) |
| ブロッキング動作 | defaultがない場合、selectはcaseが準備完了するまでブロック |
| ランダム選択 | 複数のcaseが同時に準備完了している場合、ランダムに1つ選択 |
基本構文と使い方
標準のselect文
select {
case msg := <-ch1:
// ch1からデータを受信
fmt.Println(msg)
case ch2 <- "hello":
// ch2へのデータ送信に成功
case <-ch3:
// ch3が閉じられたかデータを受信
default:
// どのチャネルも準備できていない時に実行
}
タイムアウト制御(最も一般的なパターン)
select {
case msg := <-ch:
fmt.Println("受信:", msg)
case <-time.After(3 * time.Second):
fmt.Println("タイムアウトしました!")
}
ノンブロッキングチャネル操作
select {
case msg := <-ch:
fmt.Println("受信:", msg)
default:
fmt.Println("データがありません、即座に返却")
}
selectには少なくとも1つのcaseが必要です;完全に空にすることはできませんdefaultがなく、すべてのcaseが準備できていない場合、selectは永久にブロックしますtime.After()は指定された時間後に値を受信するチャネルを返します- 複数の
caseが同時に準備完了している場合、Goは飢餓を防ぐためにランダムに1つ選択します
例:基本的なselectの使い方(難易度⭐)
2つのチャネルを同時にリッスンする方法を示します:
package main
import (
"fmt"
"time"
)
// 2つのデータソースをシミュレート
func source(name string, ch chan<- string, delay time.Duration) {
for i := 1; i <= 3; i++ {
time.Sleep(delay)
ch <- fmt.Sprintf("[%s] メッセージ %d", name, i)
}
close(ch)
}
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go source("ソースA", ch1, 500*time.Millisecond)
go source("ソースB", ch2, 800*time.Millisecond)
// selectを使用して2つのチャネルを同時にリッスン
// 6回リッスンする必要がある(各ソースから3メッセージずつ)
for i := 0; i < 6; i++ {
select {
case msg, ok := <-ch1:
if ok {
fmt.Println("ch1から:", msg)
}
case msg, ok := <-ch2:
if ok {
fmt.Println("ch2から:", msg)
}
}
}
fmt.Println("すべてのメッセージを処理しました")
}
go run main.go
ch1から:[ソースA] メッセージ 1
ch2から:[ソースB] メッセージ 1
ch1から:[ソースA] メッセージ 2
ch1から:[ソースA] メッセージ 3
ch2から:[ソースB] メッセージ 2
ch2から:[ソースB] メッセージ 3
すべてのメッセージを処理しました
例:タイムアウト制御とquitチャネル(難易度⭐⭐)
selectを使用して、タイムアウトとグレースフルシャットダウンを持つワーカーを実装します:
package main
import (
"fmt"
"math/rand"
"time"
)
// workerは時間のかかるタスクをシミュレートする
func worker(id int, jobs <-chan int, results chan<- string, done chan<- int) {
for job := range jobs {
// 予測不可能な処理時間をシミュレート
duration := time.Duration(rand.Intn(500)+200) * time.Millisecond
time.Sleep(duration)
results <- fmt.Sprintf("ワーカー %d がタスク %d を完了(所要時間 %v)", id, job, duration)
}
done <- id
}
func main() {
jobs := make(chan int, 10)
results := make(chan string, 10)
done := make(chan int, 3)
// 3つのワーカーを起動
for i := 1; i <= 3; i++ {
go worker(i, jobs, results, done)
}
// 6つのタスクを分配
for j := 1; j <= 6; j++ {
jobs <- j
}
close(jobs)
// タイムアウト制御で結果を収集
timeout := time.After(2 * time.Second)
finished := 0
for finished < 6 {
select {
case result := <-results:
fmt.Println(result)
finished++
case workerID := <-done:
fmt.Printf(">>> ワーカー %d が終了しました\n", workerID)
case <-timeout:
fmt.Println("⏰ タイムアウト!一部のタスクが未完了です")
return
}
}
fmt.Println("すべてのタスクが完了しました")
}
go run main.go
ワーカー 2 がタスク 2 を完了(所要時間 234ms)
ワーカー 1 がタスク 1 を完了(所要時間 345ms)
ワーカー 3 がタスク 3 を完了(所要時間 289ms)
ワーカー 2 がタスク 4 を完了(所要時間 412ms)
ワーカー 1 がタスク 5 を完了(所要時間 198ms)
>>> ワーカー 1 が終了しました
ワーカー 3 がタスク 6 を完了(所要時間 367ms)
すべてのタスクが完了しました
例:Fan-in / Fan-outとパイプラインパターン(難易度⭐⭐⭐)
古典的な並行処理パターンを示します:Pipeline + Fan-out + Fan-in:
package main
import (
"fmt"
"sync"
"time"
)
// ステージ1:データジェネレーター(パイプラインの開始)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// ステージ2:二乗計算(パイプラインの中間ステージ)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
time.Sleep(100 * time.Millisecond) // 計算時間をシミュレート
out <- n * n
}
close(out)
}()
return out
}
// Fan-out:1つのチャネルを複数のワーカーに分配
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = square(in)
}
return channels
}
// Fan-in:複数のチャネルを1つにマージ
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 各チャネルに対してデータを転送するgoroutineを起動
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
// すべての入力チャネルが閉じられた後に出力チャネルを閉じる
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// ステージ1:データを生成
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
source := generator(nums...)
// ステージ2:3つのワーカーにFan-outして並列計算
workers := fanOut(source, 3)
// ステージ3:Fan-inで結果をマージ
merged := fanIn(workers...)
// すべての結果を収集
results := make(map[int]bool)
for val := range merged {
results[val] = true
fmt.Printf("結果を受信:%d\n", val)
}
fmt.Println("\n重複を除いた結果:")
for val := range results {
fmt.Printf(" %d\n", val)
}
}
go run main.go
結果を受信:1
結果を受信:4
結果を受信:9
結果を受信:16
結果を受信:25
結果を受信:36
結果を受信:49
結果を受信:64
結果を受信:81
結果を受信:100
重複を除いた結果:
1
4
9
16
25
36
49
64
81
100
シナリオ1:HTTPリクエストレース
複数のサーバーに同時にリクエストを送信し、最初の応答を使用します:
package main
import (
"fmt"
"time"
)
// 異なるサーバーへのリクエストをシミュレート
func fetchFromServer(name string, delay time.Duration) <-chan string {
ch := make(chan string, 1)
go func() {
time.Sleep(delay)
ch <- fmt.Sprintf("%sからの応答", name)
}()
return ch
}
// レースリクエスト:最も速い応答を返す
func race(urls map[string]time.Duration, timeout time.Duration) (string, error) {
// 各サーバーに対してチャネルを作成
ch := make(chan string, len(urls))
for name, delay := range urls {
go func(n string, d time.Duration) {
ch <- fetchResult(n, d)
}(name, delay)
}
// 最初の応答またはタイムアウトを待機
select {
case result := <-ch:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("すべてのサーバーがタイムアウトしました")
}
}
func fetchResult(name string, delay time.Duration) string {
time.Sleep(delay)
return fmt.Sprintf("%sからのデータ(レイテンシ %v)", name, delay)
}
func main() {
servers := map[string]time.Duration{
"サーバーA-北京": 800 * time.Millisecond,
"サーバーB-上海": 300 * time.Millisecond,
"サーバーC-広州": 500 * time.Millisecond,
}
result, err := race(servers, 2*time.Second)
if err != nil {
fmt.Println("エラー:", err)
} else {
fmt.Println("レース結果:", result)
}
}
go run main.go
レース結果:サーバーB-上海からのデータ(レイテンシ 300ms)
シナリオ2:グレースフルシャットダウン
quitチャネルを使用して、サービスのグレースフルシャットダウンを実装します:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// serverは継続的に実行されるサービスをシミュレートする
func server(id int, quit <-chan struct{}) {
fmt.Printf("[サービス %d] 開始\n", id)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("[サービス %d] リクエスト処理中...\n", id)
case <-quit:
fmt.Printf("[サービス %d] 終了シグナルを受信、クリーンアップ中...", id)
time.Sleep(200 * time.Millisecond) // クリーンアップをシミュレート
fmt.Printf("[サービス %d] 停止\n", id)
return
}
}
}
func main() {
// システム割り込みシグナルをリッスン(Ctrl+C)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// quitチャネルを作成
quit := make(chan struct{})
// 複数のサービスを起動
for i := 1; i <= 3; i++ {
go server(i, quit)
}
fmt.Println("メインプログラム実行中、Ctrl+Cでグレースフルに終了します...")
// システムシグナルを待機
sig := <-sigChan
fmt.Printf("\nシグナルを受信:%v、グレースフルシャットダウンを開始...\n", sig)
// すべてのサービスに終了を通知
close(quit)
// サービスがクリーンアップを完了する時間を与える
time.Sleep(1 * time.Second)
fmt.Println("すべてのサービスが閉じられ、プログラムを終了します")
}
go run main.go
# Ctrl+Cでグレースフルシャットダウンをトリガー
メインプログラム実行中、Ctrl+Cでグレースフルに終了します...
[サービス 1] 開始
[サービス 2] 開始
[サービス 3] 開始
[サービス 1] リクエスト処理中...
[サービス 2] リクエスト処理中...
[サービス 3] リクエスト処理中...
[サービス 1] リクエスト処理中...
^C
シグナルを受信:interrupt、グレースフルシャットダウンを開始...
[サービス 1] 終了シグナルを受信、クリーンアップ中...[サービス 1] 停止
[サービス 2] 終了シグナルを受信、クリーンアップ中...[サービス 2] 停止
[サービス 3] 終了シグナルを受信、クリーンアップ中...[サービス 3] 停止
すべてのサービスが閉じられ、プログラムを終了します
❓ よくある質問
Q1:selectの複数のcaseが同時に準備完了した場合、どうなりますか?
Goは最初のものを順番に選ぶのではなく、ランダムに1つを選択して実行します。これにより公平性が保証され、特定のチャネルが飢餓状態になるのを防ぎます。
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "A"
ch2 <- "B"
// 結果はランダム、AまたはB
select {
case v := <-ch1:
fmt.Println(v)
case v := <-ch2:
fmt.Println(v)
}
Q2:selectのdefaultブランチはいつ使用すべきですか?
ノンブロッキング操作が必要な場合にdefaultを使用します。defaultがない場合、selectはcaseが準備完了するまでブロックします。
// ノンブロッキング受信
select {
case msg := <-ch:
fmt.Println("受信:", msg)
default:
fmt.Println("現在チャネルにデータがありません")
}
Q3:複数のチャネルをリッスンする無限ループを実装するには?
for-select組み合わせパターンを使用します:
for {
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
case <-quit:
fmt.Println("ループを終了")
return
}
}
Q4:ループ内のtime.Afterはメモリリークを引き起こしますか?
はい!time.Afterを使用するループの各反復で新しいタイマーが作成され、ガベージコレクションされません。正しいアプローチはtime.NewTimerを使用することです:
// ❌ 誤り:各反復で新しいタイマーを作成
for {
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(5 * time.Second):
fmt.Println("タイムアウト")
}
}
// ✅ 正しい:タイマーを再利用
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case msg := <-ch:
fmt.Println(msg)
if !timer.Stop() {
<-timer.C
}
timer.Reset(5 * time.Second)
case <-timer.C:
fmt.Println("タイムアウト")
return
}
}
📖 まとめ
| パターン | 目的 | 主要コード |
|---|---|---|
| タイムアウト制御 | 操作の待機時間を制限 | case <-time.After(d) |
| ノンブロッキング操作 | 待たずに即座に返却 | select with default |
| グレースフル終了 | quitチャネルをリッスン | case <-quit |
| Fan-out | 1つの入力を複数のワーカーに分配 | 1つのチャネルから読み取る複数のgoroutine |
| Fan-in | 複数の入力を1つのチャネルにマージ | sync.WaitGroup + 転送 |
| パイプライン | マルチステージ処理 | チェーンされたチャネル |
重要なポイント:
selectはGoのマルチプレキシングのためのコア並行処理ツールです- 複数のcaseが同時に準備完了している場合、公平性のためにランダムに1つ選択されます
for-selectは複数のチャネルをリッスンする標準パターンです- ループ内の
time.Afterのメモリリークに注意してください - Fan-in/Fan-out/Pipelineは並列パイプラインを構築する古典的なパターンです
📝 演習
演習1:カウントダウンタイマー
selectとtime.Tickerを使用して5秒のカウントダウンを実装し、毎秒残り時間を出力し、0で「発射!」と出力するプログラムを作成してください。
// ヒント:
// ticker := time.NewTicker(1 * time.Second)
// select {
// case <-ticker.C:
// // カウントダウンを更新
// }
演習2:マルチウェイマージソート
複数のソート済み整数チャネルを受信し、Fan-inパターンを使用して単一のソート済み出力チャネルにマージする関数を実装してください。
// 関数シグネチャ:
func mergeSorted(channels ...<-chan int) <-chan int {
// マージロジックを実装
}
演習3:キャンセル可能なパイプライン
3段階のパイプライン(生成 → 偶数フィルタリング → 10倍)を構築し、contextによる全体パイプラインのキャンセルをサポートしてください:
// ヒント:
// ctx, cancel := context.WithCancel(context.Background())
// 各ステージのselectでctx.Done()をチェック
次のレッスン: レッスン16:syncと並行処理の安全性 — sync.Mutex、sync.WaitGroup、sync.Onceなどの同期プリミティブについて学び、共有リソースへの安全なアクセスを実現します。



