Selectと並行処理パターン

レッスン15:Selectと並行処理パターン

例え話

あなたがレストランのウェイターで、同時に3つのテーブルを担当していると想像してください:

愚かにも1つのテーブルだけを待つことはしません——代わりにすべてのテーブルを同時に監視し、最初に手を挙げたテーブルに対応します。これがselectの仕組みです——複数のチャネルを同時にリッスンし、準備ができているものを実行します。

コアコンセプト

コンセプト 説明
select 複数のチャネル操作を同時にリッスンする文
case 各チャネル操作がブランチに対応
default どのチャネルも準備できていない時に実行(オプション)
ブロッキング動作 defaultがない場合、selectはcaseが準備完了するまでブロック
ランダム選択 複数のcaseが同時に準備完了している場合、ランダムに1つ選択

基本構文と使い方

標準のselect文

GO
select {
case msg := <-ch1:
    // ch1からデータを受信
    fmt.Println(msg)
case ch2 <- "hello":
    // ch2へのデータ送信に成功
case <-ch3:
    // ch3が閉じられたかデータを受信
default:
    // どのチャネルも準備できていない時に実行
}

タイムアウト制御(最も一般的なパターン)

GO
select {
case msg := <-ch:
    fmt.Println("受信:", msg)
case <-time.After(3 * time.Second):
    fmt.Println("タイムアウトしました!")
}

ノンブロッキングチャネル操作

GO
select {
case msg := <-ch:
    fmt.Println("受信:", msg)
default:
    fmt.Println("データがありません、即座に返却")
}
💡 ヒント:

  • selectには少なくとも1つのcaseが必要です;完全に空にすることはできません
  • defaultがなく、すべてのcaseが準備できていない場合、select永久にブロックします
  • time.After()は指定された時間後に値を受信するチャネルを返します
  • 複数のcaseが同時に準備完了している場合、Goは飢餓を防ぐためにランダムに1つ選択します

例:基本的なselectの使い方(難易度⭐)

2つのチャネルを同時にリッスンする方法を示します:

GO
package main

import (
	"fmt"
	"time"
)

// 2つのデータソースをシミュレート
func source(name string, ch chan<- string, delay time.Duration) {
	for i := 1; i <= 3; i++ {
		time.Sleep(delay)
		ch <- fmt.Sprintf("[%s] メッセージ %d", name, i)
	}
	close(ch)
}

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go source("ソースA", ch1, 500*time.Millisecond)
	go source("ソースB", ch2, 800*time.Millisecond)

	// selectを使用して2つのチャネルを同時にリッスン
	// 6回リッスンする必要がある(各ソースから3メッセージずつ)
	for i := 0; i < 6; i++ {
		select {
		case msg, ok := <-ch1:
			if ok {
				fmt.Println("ch1から:", msg)
			}
		case msg, ok := <-ch2:
			if ok {
				fmt.Println("ch2から:", msg)
			}
		}
	}
	fmt.Println("すべてのメッセージを処理しました")
}
▶ 試してみよう
BASH
go run main.go
TEXT
ch1から:[ソースA] メッセージ 1
ch2から:[ソースB] メッセージ 1
ch1から:[ソースA] メッセージ 2
ch1から:[ソースA] メッセージ 3
ch2から:[ソースB] メッセージ 2
ch2から:[ソースB] メッセージ 3
すべてのメッセージを処理しました

例:タイムアウト制御とquitチャネル(難易度⭐⭐)

selectを使用して、タイムアウトとグレースフルシャットダウンを持つワーカーを実装します:

GO
package main

import (
	"fmt"
	"math/rand"
	"time"
)

// workerは時間のかかるタスクをシミュレートする
func worker(id int, jobs <-chan int, results chan<- string, done chan<- int) {
	for job := range jobs {
		// 予測不可能な処理時間をシミュレート
		duration := time.Duration(rand.Intn(500)+200) * time.Millisecond
		time.Sleep(duration)
		results <- fmt.Sprintf("ワーカー %d がタスク %d を完了(所要時間 %v)", id, job, duration)
	}
	done <- id
}

func main() {
	jobs := make(chan int, 10)
	results := make(chan string, 10)
	done := make(chan int, 3)

	// 3つのワーカーを起動
	for i := 1; i <= 3; i++ {
		go worker(i, jobs, results, done)
	}

	// 6つのタスクを分配
	for j := 1; j <= 6; j++ {
		jobs <- j
	}
	close(jobs)

	// タイムアウト制御で結果を収集
	timeout := time.After(2 * time.Second)
	finished := 0

	for finished < 6 {
		select {
		case result := <-results:
			fmt.Println(result)
			finished++
		case workerID := <-done:
			fmt.Printf(">>> ワーカー %d が終了しました\n", workerID)
		case <-timeout:
			fmt.Println("⏰ タイムアウト!一部のタスクが未完了です")
			return
		}
	}
	fmt.Println("すべてのタスクが完了しました")
}
▶ 試してみよう
BASH
go run main.go
TEXT
ワーカー 2 がタスク 2 を完了(所要時間 234ms)
ワーカー 1 がタスク 1 を完了(所要時間 345ms)
ワーカー 3 がタスク 3 を完了(所要時間 289ms)
ワーカー 2 がタスク 4 を完了(所要時間 412ms)
ワーカー 1 がタスク 5 を完了(所要時間 198ms)
>>> ワーカー 1 が終了しました
ワーカー 3 がタスク 6 を完了(所要時間 367ms)
すべてのタスクが完了しました

例:Fan-in / Fan-outとパイプラインパターン(難易度⭐⭐⭐)

古典的な並行処理パターンを示します:Pipeline + Fan-out + Fan-in:

GO
package main

import (
	"fmt"
	"sync"
	"time"
)

// ステージ1:データジェネレーター(パイプラインの開始)
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// ステージ2:二乗計算(パイプラインの中間ステージ)
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			time.Sleep(100 * time.Millisecond) // 計算時間をシミュレート
			out <- n * n
		}
		close(out)
	}()
	return out
}

// Fan-out:1つのチャネルを複数のワーカーに分配
func fanOut(in <-chan int, workers int) []<-chan int {
	channels := make([]<-chan int, workers)
	for i := 0; i < workers; i++ {
		channels[i] = square(in)
	}
	return channels
}

// Fan-in:複数のチャネルを1つにマージ
func fanIn(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// 各チャネルに対してデータを転送するgoroutineを起動
	wg.Add(len(channels))
	for _, ch := range channels {
		go func(c <-chan int) {
			defer wg.Done()
			for val := range c {
				out <- val
			}
		}(ch)
	}

	// すべての入力チャネルが閉じられた後に出力チャネルを閉じる
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	// ステージ1:データを生成
	nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	source := generator(nums...)

	// ステージ2:3つのワーカーにFan-outして並列計算
	workers := fanOut(source, 3)

	// ステージ3:Fan-inで結果をマージ
	merged := fanIn(workers...)

	// すべての結果を収集
	results := make(map[int]bool)
	for val := range merged {
		results[val] = true
		fmt.Printf("結果を受信:%d\n", val)
	}

	fmt.Println("\n重複を除いた結果:")
	for val := range results {
		fmt.Printf("  %d\n", val)
	}
}
▶ 試してみよう
BASH
go run main.go
TEXT
結果を受信:1
結果を受信:4
結果を受信:9
結果を受信:16
結果を受信:25
結果を受信:36
結果を受信:49
結果を受信:64
結果を受信:81
結果を受信:100

重複を除いた結果:
  1
  4
  9
  16
  25
  36
  49
  64
  81
  100

シナリオ1:HTTPリクエストレース

複数のサーバーに同時にリクエストを送信し、最初の応答を使用します:

GO
package main

import (
	"fmt"
	"time"
)

// 異なるサーバーへのリクエストをシミュレート
func fetchFromServer(name string, delay time.Duration) <-chan string {
	ch := make(chan string, 1)
	go func() {
		time.Sleep(delay)
		ch <- fmt.Sprintf("%sからの応答", name)
	}()
	return ch
}

// レースリクエスト:最も速い応答を返す
func race(urls map[string]time.Duration, timeout time.Duration) (string, error) {
	// 各サーバーに対してチャネルを作成
	ch := make(chan string, len(urls))
	for name, delay := range urls {
		go func(n string, d time.Duration) {
			ch <- fetchResult(n, d)
		}(name, delay)
	}

	// 最初の応答またはタイムアウトを待機
	select {
	case result := <-ch:
		return result, nil
	case <-time.After(timeout):
		return "", fmt.Errorf("すべてのサーバーがタイムアウトしました")
	}
}

func fetchResult(name string, delay time.Duration) string {
	time.Sleep(delay)
	return fmt.Sprintf("%sからのデータ(レイテンシ %v)", name, delay)
}

func main() {
	servers := map[string]time.Duration{
		"サーバーA-北京":   800 * time.Millisecond,
		"サーバーB-上海":   300 * time.Millisecond,
		"サーバーC-広州": 500 * time.Millisecond,
	}

	result, err := race(servers, 2*time.Second)
	if err != nil {
		fmt.Println("エラー:", err)
	} else {
		fmt.Println("レース結果:", result)
	}
}
BASH
go run main.go
TEXT
レース結果:サーバーB-上海からのデータ(レイテンシ 300ms)

シナリオ2:グレースフルシャットダウン

quitチャネルを使用して、サービスのグレースフルシャットダウンを実装します:

GO
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)

// serverは継続的に実行されるサービスをシミュレートする
func server(id int, quit <-chan struct{}) {
	fmt.Printf("[サービス %d] 開始\n", id)
	ticker := time.NewTicker(500 * time.Millisecond)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			fmt.Printf("[サービス %d] リクエスト処理中...\n", id)
		case <-quit:
			fmt.Printf("[サービス %d] 終了シグナルを受信、クリーンアップ中...", id)
			time.Sleep(200 * time.Millisecond) // クリーンアップをシミュレート
			fmt.Printf("[サービス %d] 停止\n", id)
			return
		}
	}
}

func main() {
	// システム割り込みシグナルをリッスン(Ctrl+C)
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// quitチャネルを作成
	quit := make(chan struct{})

	// 複数のサービスを起動
	for i := 1; i <= 3; i++ {
		go server(i, quit)
	}

	fmt.Println("メインプログラム実行中、Ctrl+Cでグレースフルに終了します...")

	// システムシグナルを待機
	sig := <-sigChan
	fmt.Printf("\nシグナルを受信:%v、グレースフルシャットダウンを開始...\n", sig)

	// すべてのサービスに終了を通知
	close(quit)

	// サービスがクリーンアップを完了する時間を与える
	time.Sleep(1 * time.Second)
	fmt.Println("すべてのサービスが閉じられ、プログラムを終了します")
}
BASH
go run main.go
# Ctrl+Cでグレースフルシャットダウンをトリガー
TEXT
メインプログラム実行中、Ctrl+Cでグレースフルに終了します...
[サービス 1] 開始
[サービス 2] 開始
[サービス 3] 開始
[サービス 1] リクエスト処理中...
[サービス 2] リクエスト処理中...
[サービス 3] リクエスト処理中...
[サービス 1] リクエスト処理中...
^C
シグナルを受信:interrupt、グレースフルシャットダウンを開始...
[サービス 1] 終了シグナルを受信、クリーンアップ中...[サービス 1] 停止
[サービス 2] 終了シグナルを受信、クリーンアップ中...[サービス 2] 停止
[サービス 3] 終了シグナルを受信、クリーンアップ中...[サービス 3] 停止
すべてのサービスが閉じられ、プログラムを終了します

❓ よくある質問

Q1:selectの複数のcaseが同時に準備完了した場合、どうなりますか?

Goは最初のものを順番に選ぶのではなく、ランダムに1つを選択して実行します。これにより公平性が保証され、特定のチャネルが飢餓状態になるのを防ぎます。

GO
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "A"
ch2 <- "B"

// 結果はランダム、AまたはB
select {
case v := <-ch1:
    fmt.Println(v)
case v := <-ch2:
    fmt.Println(v)
}

Q2:selectのdefaultブランチはいつ使用すべきですか?

ノンブロッキング操作が必要な場合にdefaultを使用します。defaultがない場合、selectはcaseが準備完了するまでブロックします。

GO
// ノンブロッキング受信
select {
case msg := <-ch:
    fmt.Println("受信:", msg)
default:
    fmt.Println("現在チャネルにデータがありません")
}

Q3:複数のチャネルをリッスンする無限ループを実装するには?

for-select組み合わせパターンを使用します:

GO
for {
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    case msg := <-ch2:
        fmt.Println(msg)
    case <-quit:
        fmt.Println("ループを終了")
        return
    }
}

Q4:ループ内のtime.Afterはメモリリークを引き起こしますか?

はい!time.Afterを使用するループの各反復で新しいタイマーが作成され、ガベージコレクションされません。正しいアプローチはtime.NewTimerを使用することです:

GO
// ❌ 誤り:各反復で新しいタイマーを作成
for {
    select {
    case msg := <-ch:
        fmt.Println(msg)
    case <-time.After(5 * time.Second):
        fmt.Println("タイムアウト")
    }
}

// ✅ 正しい:タイマーを再利用
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
    select {
    case msg := <-ch:
        fmt.Println(msg)
        if !timer.Stop() {
            <-timer.C
        }
        timer.Reset(5 * time.Second)
    case <-timer.C:
        fmt.Println("タイムアウト")
        return
    }
}

📖 まとめ

パターン 目的 主要コード
タイムアウト制御 操作の待機時間を制限 case <-time.After(d)
ノンブロッキング操作 待たずに即座に返却 select with default
グレースフル終了 quitチャネルをリッスン case <-quit
Fan-out 1つの入力を複数のワーカーに分配 1つのチャネルから読み取る複数のgoroutine
Fan-in 複数の入力を1つのチャネルにマージ sync.WaitGroup + 転送
パイプライン マルチステージ処理 チェーンされたチャネル

重要なポイント:

  1. selectはGoのマルチプレキシングのためのコア並行処理ツールです
  2. 複数のcaseが同時に準備完了している場合、公平性のためにランダムに1つ選択されます
  3. for-selectは複数のチャネルをリッスンする標準パターンです
  4. ループ内のtime.Afterのメモリリークに注意してください
  5. Fan-in/Fan-out/Pipelineは並列パイプラインを構築する古典的なパターンです

📝 演習

演習1:カウントダウンタイマー

selecttime.Tickerを使用して5秒のカウントダウンを実装し、毎秒残り時間を出力し、0で「発射!」と出力するプログラムを作成してください。

GO
// ヒント:
// ticker := time.NewTicker(1 * time.Second)
// select {
// case <-ticker.C:
//     // カウントダウンを更新
// }

演習2:マルチウェイマージソート

複数のソート済み整数チャネルを受信し、Fan-inパターンを使用して単一のソート済み出力チャネルにマージする関数を実装してください。

GO
// 関数シグネチャ:
func mergeSorted(channels ...<-chan int) <-chan int {
    // マージロジックを実装
}

演習3:キャンセル可能なパイプライン

3段階のパイプライン(生成 → 偶数フィルタリング → 10倍)を構築し、contextによる全体パイプラインのキャンセルをサポートしてください:

GO
// ヒント:
// ctx, cancel := context.WithCancel(context.Background())
// 各ステージのselectでctx.Done()をチェック

次のレッスン: レッスン16:syncと並行処理の安全性sync.Mutexsync.WaitGroupsync.Onceなどの同期プリミティブについて学び、共有リソースへの安全なアクセスを実現します。

Web-Tutorial.com

Web-Tutorial 技術チーム

複数の開発者によって共同維持されているプログラミングチュートリアルプラットフォーム。各チュートリアルは専門分野の開発者が執筆・レビューしています。正確で信頼性の高いコンテンツを目指しています — 問題を見つけた場合はお知らせください。

100%