並行処理演習:タスクスケジューラ
並行処理演習:タスクスケジューラ
荷物仕分けセンターを想像してください:コンベアベルトが継続的に荷物(タスク)を届け、複数の仕分け担当者(goroutine)が同時に作業します——スキャンする人、分類する人、積み込む人がいます。各ステージはパイプライン(チャネル)を通じて荷物を渡し、スケジューラは全体の進捗を監視し、問題があれば迅速に対応します。これが並列タスクスケジューラの仕組みです——複数のワーカーが協力して大量のタスクを、効率的かつ確実に完了します。
このレッスンでは、goroutine、チャネル、select、syncパッケージ、context.Contextを総合的に活用して、完全に機能する並列タスクスケジューラをゼロから構築します。
プロジェクト要件
以下の要件を満たすタスクスケジューラを構築する必要があります:
- 並列実行:複数のワーカーが同時にタスクを処理することをサポート
- タスク提出:外部システムが動的にタスクを提出できる
- タイムアウト制御:個々のタスクがタイムアウト後に自動的にキャンセルされる
- 失敗リトライ:失敗したタスクが自動的にリトライされ、最大N回まで
- 結果収集:すべてのタスク結果を並列かつ安全に収集
- グレースフルシャットダウン:キャンセルシグナル受信後、実行中のタスクが完了するまで待機してから終了
システム設計
全体のアーキテクチャは3層に分かれています:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ タスク提出 │────▶│ スケジューラ │────▶│ ワーカープール│
│ (プロデューサ)│ │ エンジン │ │ (ワーカー) │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
▼ ▼
┌──────────────┐ ┌─────────────┐
│ 結果 │◀────│ タスク │
│ コレクター │ │ 実行 │
└──────────────┘ └─────────────┘
コアコンポーネント:
| コンポーネント | 責任 | 実装 |
|---|---|---|
Task |
実行可能なタスクを表す | 構造体 + 関数型 |
Result |
タスクの実行結果を格納 | 構造体 + チャネル |
Worker |
特定のタスクを実行 | goroutine |
Scheduler |
タスクの配布と実行を調整 | チャネル + select |
ResultCollector |
結果を並列かつ安全に収集 | sync.Mutex |
例 1: 完全なコード
1. データ構造の定義
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Taskは実行されるタスクを表します
type Task struct {
ID int // ユニークなタスク識別子
Payload string // タスクデータ
Execute func(ctx context.Context) (string, error) // タスク実行関数
}
// Resultはタスクの実行結果を表します
type Result struct {
TaskID int // 対応するタスクID
Output string // 実行成功時の出力
Error error // 実行失敗時のエラー
Attempts int // 実際の試行回数
}
// ResultCollectorはタスク結果を並列かつ安全に収集します
type ResultCollector struct {
mu sync.Mutex
results []Result
}
// Addは結果を追加します(並行処理安全)
func (rc *ResultCollector) Add(r Result) {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.results = append(rc.results, r)
}
// Allは収集されたすべての結果を返します
func (rc *ResultCollector) All() []Result {
rc.mu.Lock()
defer rc.mu.Unlock()
// 外部からの変更を避けるためコピーを返す
out := make([]Result, len(rc.results))
copy(out, rc.results)
return out
}
2. ワーカーの実装
// Workerはタスクチャネルからタスクを受信し、実行し、結果を結果チャネルに送信する
func Worker(
ctx context.Context,
id int,
tasks <-chan Task,
results chan<- Result,
maxRetries int,
wg *sync.WaitGroup,
) {
defer wg.Done()
for task := range tasks {
// キャンセルされたかチェック
select {
case <-ctx.Done():
fmt.Printf("[ワーカー %d] コンテキストがキャンセルされました、終了します\n", id)
return
default:
}
fmt.Printf("[ワーカー %d] タスク #%d を開始\n", id, task.ID)
var output string
var err error
attempts := 0
// リトライ付き実行ロジック
for attempts < maxRetries {
attempts++
// 各タスクにタイムアウト付きのサブコンテキストを作成(最大2秒)
taskCtx, taskCancel := context.WithTimeout(ctx, 2*time.Second)
output, err = task.Execute(taskCtx)
taskCancel()
if err == nil {
break // 成功、リトループを終了
}
fmt.Printf("[ワーカー %d] タスク #%d 試行%d 回目失敗:%v\n",
id, task.ID, attempts, err)
if attempts < maxRetries {
// リトライ前に待機しつつ、キャンセルをリッスン
select {
case <-ctx.Done():
fmt.Printf("[ワーカー %d] リトライ待機中にキャンセルシグナルを受信\n", id)
results <- Result{TaskID: task.ID, Error: ctx.Err(), Attempts: attempts}
return
case <-time.After(time.Duration(attempts) * 500 * time.Millisecond):
// 線形バックオフ:1回目500ms、2回目1s、3回目1.5s ...
}
}
}
results <- Result{
TaskID: task.ID,
Output: output,
Error: err,
Attempts: attempts,
}
if err != nil {
fmt.Printf("[ワーカー %d] タスク #%d が最終的に失敗(%d回試行)\n", id, task.ID, attempts)
} else {
fmt.Printf("[ワーカー %d] タスク #%d が完了\n", id, task.ID)
}
}
fmt.Printf("[ワーカー %d] タスクチャネルが閉じられました、終了します\n", id)
}
3. スケジューラの実装
// Schedulerはタスクスケジューリングエンジンです
type Scheduler struct {
workerCount int // ワーカー数
maxRetries int // 最大リトライ回数
taskTimeout time.Duration // タスクごとのタイムアウト
}
// NewSchedulerは新しいスケジューラを作成します
func NewScheduler(workerCount, maxRetries int, taskTimeout time.Duration) *Scheduler {
return &Scheduler{
workerCount: workerCount,
maxRetries: maxRetries,
taskTimeout: taskTimeout,
}
}
// Runはスケジューラを開始し、与えられたタスクリストを処理し、すべての結果を返します
func (s *Scheduler) Run(ctx context.Context, taskList []Task) []Result {
// チャネルを作成
tasks := make(chan Task, len(taskList))
results := make(chan Result, len(taskList))
// 結果コレクター
collector := &ResultCollector{}
// 結果収集goroutineを起動
var collectorWg sync.WaitGroup
collectorWg.Add(1)
go func() {
defer collectorWg.Done()
for r := range results {
collector.Add(r)
}
}()
// ワーカープールを起動
var workerWg sync.WaitGroup
for i := 1; i <= s.workerCount; i++ {
workerWg.Add(1)
go Worker(ctx, i, tasks, results, s.maxRetries, &workerWg)
}
// すべてのタスクを提出
go func() {
for _, task := range taskList {
select {
case tasks <- task:
fmt.Printf("[スケジューラ] タスク #%d を提出しました\n", task.ID)
case <-ctx.Done():
fmt.Println("[スケジューラ] コンテキストがキャンセルされました、タスク提出を停止")
return
}
}
close(tasks) // タスクチャネルを閉じ、ワーカーにこれ以上タスクがないことを通知
}()
// すべてのワーカーの完了を待機
workerWg.Wait()
close(results) // 結果チャネルを閉じる
// コレクターがすべての結果を処理するまで待機
collectorWg.Wait()
return collector.All()
}
4. タスクとメイン関数のシミュレーション
// createDemoTasksはシミュレートされたタスクのセットを作成します;一部は失敗し、一部はタイムアウトします
func createDemoTasks(count int) []Task {
tasks := make([]Task, count)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < count; i++ {
id := i + 1
behavior := rng.Intn(4) // 0-3、タスクの動作を決定
switch behavior {
case 0:
// 正常完了
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("通常タスク-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(100+rng.Intn(400)) * time.Millisecond)
return fmt.Sprintf("タスク #%d が成功しました", id), nil
},
}
case 1:
// 時々失敗(1回目失敗、2回目成功)
failCount := 0
var mu sync.Mutex
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("不安定タスク-%d", id),
Execute: func(ctx context.Context) (string, error) {
mu.Lock()
failCount++
current := failCount
mu.Unlock()
time.Sleep(time.Duration(50+rng.Intn(200)) * time.Millisecond)
if current <= 1 {
return nil, fmt.Errorf("タスク #%d のシミュレート失敗", id)
}
return fmt.Sprintf("タスク #%d が%d回目の試行で成功", id, current), nil
},
}
case 2:
// 実行に時間がかかりすぎる(タイムアウトをトリガー)
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("遅いタスク-%d", id),
Execute: func(ctx context.Context) (string, error) {
select {
case <-time.After(5 * time.Second): // 2秒のタイムアウトを大幅に超過
return fmt.Sprintf("タスク #%d が完了", id), nil
case <-ctx.Done():
return "", fmt.Errorf("タスク #%d がキャンセル:%w", id, ctx.Err())
}
},
}
default:
// 常に失敗
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("失敗タスク-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(50+rng.Intn(150)) * time.Millisecond)
return nil, fmt.Errorf("タスク #%d の回復不能なエラー", id)
},
}
}
}
return tasks
}
func main() {
fmt.Println("=== 並列タスクスケジューラ ===")
fmt.Println()
// キャンセル可能なコンテキストを作成
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// OS割り込みシグナルをリッスン(Ctrl+C)してグレースフルシャットダウンをトリガー
go func() {
// シミュレーション:8秒後にキャンセルシグナルを送信(本番環境ではsignal.Notifyを使用)
time.Sleep(8 * time.Second)
fmt.Println("\n[メイン] キャンセルシグナルを受信、グレースフルシャットダウンを開始...")
cancel()
}()
// スケジューラを作成:3ワーカー、最大3回リトライ、タスクごとに2秒のタイムアウト
scheduler := NewScheduler(3, 3, 2*time.Second)
// 10個のシミュレートされたタスクを生成
taskList := createDemoTasks(10)
// スケジューリングを実行
fmt.Printf("[メイン] %d個のタスクを提出、%d個のワーカーを起動\n\n", len(taskList), 3)
start := time.Now()
results := scheduler.Run(ctx, taskList)
elapsed := time.Since(start)
// 結果サマリーを出力
fmt.Println("\n=============================")
fmt.Println(" 実行結果サマリー ")
fmt.Println("=============================")
successCount := 0
failCount := 0
cancelCount := 0
for _, r := range results {
status := "✓"
detail := r.Output
if r.Error != nil {
if ctx.Err() != nil {
status = "⊘"
detail = "キャンセル済み"
cancelCount++
} else {
status = "✗"
detail = r.Error.Error()
failCount++
}
} else {
successCount++
}
fmt.Printf(" %s タスク #%02d | 試行回数:%d | %s\n",
status, r.TaskID, r.Attempts, detail)
}
fmt.Println("-----------------------------")
fmt.Printf(" 合計:%d | 成功:%d | 失敗:%d | キャンセル:%d\n",
len(results), successCount, failCount, cancelCount)
fmt.Printf(" 時間:%v\n", elapsed)
}
5. 実行結果
go run main.go
出力例(実行ごとに結果はランダムです):
=== 並列タスクスケジューラ ===
[メイン] 10個のタスクを提出、3個のワーカーを起動
[スケジューラ] タスク #1 を提出しました
[スケジューラ] タスク #2 を提出しました
[スケジューラ] タスク #3 を提出しました
[スケジューラ] タスク #4 を提出しました
[スケジューラ] タスク #5 を提出しました
[スケジューラ] タスク #6 を提出しました
[スケジューラ] タスク #7 を提出しました
[スケジューラ] タスク #8 を提出しました
[スケジューラ] タスク #9 を提出しました
[スケジューラ] タスク #10 を提出しました
[ワーカー 1] タスク #1 を開始
[ワーカー 2] タスク #2 を開始
[ワーカー 3] タスク #3 を開始
[ワーカー 1] タスク #1 が完了
[ワーカー 1] タスク #4 を開始
[ワーカー 2] タスク #2 試行1回目失敗:タスク #2 のシミュレート失敗
[ワーカー 3] タスク #3 が完了
[ワーカー 3] タスク #5 を開始
[ワーカー 2] タスク #6 を開始
[ワーカー 2] タスク #6 試行1回目失敗:タスク #6 の回復不能なエラー
[ワーカー 3] タスク #5 が完了
[ワーカー 3] タスク #7 を開始
[ワーカー 2] タスク #6 試行2回目失敗:タスク #6 の回復不能なエラー
[ワーカー 1] タスク #4 試行1回目失敗:タスク #4 のシミュレート失敗
[ワーカー 2] タスク #6 試行3回目失敗:タスク #6 の回復不能なエラー
[ワーカー 2] タスク #6 が最終的に失敗(3回試行)
[ワーカー 2] タスク #8 を開始
...
[ワーカー 1] タスクチャネルが閉じられました、終了します
[ワーカー 2] タスクチャネルが閉じられました、終了します
[ワーカー 3] タスクチャネルが閉じられました、終了します
============================
実行結果サマリー
============================
✓ タスク #01 | 試行回数:1 | タスク #1 が成功しました
⊘ タスク #02 | 試行回数:2 | キャンセル済み
✓ タスク #03 | 試行回数:1 | タスク #3 が成功しました
⊘ タスク #04 | 試行回数:2 | キャンセル済み
✓ タスク #05 | 試行回数:1 | タスク #5 が成功しました
✗ タスク #06 | 試行回数:3 | タスク #6 の回復不能なエラー
✓ タスク #07 | 試行回数:1 | タスク #7 が成功しました
⊘ タスク #08 | 試行回数:1 | キャンセル済み
✓ タスク #09 | 試行回数:1 | タスク #9 が成功しました
⊘ タスク #10 | 試行回数:1 | キャンセル済み
-----------------------------
合計:10 | 成功:5 | 失敗:1 | キャンセル:4
時間:8.012s
コード分析
コア並行処理パターン
1. Fan-out / Fan-in
これはスケジューラのコアパターンです:
┌─ ワーカー 1 ─┐
タスク ─────┼─ ワーカー 2 ─┼───── 結果
└─ ワーカー 3 ─┘
- Fan-out:複数のワーカーが同じ
tasksチャネルから読み取り、タスクを自動的に分配 - Fan-in:すべてのワーカーが同じ
resultsチャネルに結果を書き込み、結果を集約
2. context.Contextのカスケードキャンセル
ルートコンテキスト(main)
└─ WithCancel
├─ ワーカー1のWithTimeout
│ └─ taskCtx(タスクごとのタイムアウト2秒)
├─ ワーカー2のWithTimeout
│ └─ taskCtx(タスクごとのタイムアウト2秒)
└─ ワーカー3のWithTimeout
└─ taskCtx(タスクごとのタイムアウト2秒)
メインプログラムがcancel()を呼び出すと、すべての子コンテキストがキャンセルシグナルを受信し、カスケードキャンセルが実現されます。
3. WaitGroupの同期
var wg sync.WaitGroup
wg.Add(1) // 起動された各ワーカーがカウントを1増加
go func() {
defer wg.Done() // ワーカー終了時にカウントを1減少
// ...
}()
wg.Wait() // カウントがゼロになるまでブロック
4. selectのマルチプレキシング
ワーカーは複数のシグナルソースを同時にリッスンします:
select {
case <-ctx.Done(): // コンテキストキャンセル
return
case <-time.After(d): // リトライバックオフ
// リトライを続行
}
リトライバックオフ戦略
この例では線形バックオフを使用しています:リトライごとの待機時間 = attempts × 500ms。本番環境では指数バックオフを推奨します:
backoff := time.Duration(1<<uint(attempts)) * 100 * time.Millisecond
// 1回目:200ms、2回目:400ms、3回目:800ms
複数のタスクが同時にリトライする際の「サンダーヘッド」効果を防ぐために、ランダムなジッターを追加することもできます。
❓ よくある質問
1. なぜワーカーはctx.Done()を直接チェックするのではなくrange tasksを使用するのですか?
range tasksはチャネルが閉じられると自動的にループを終了します。これは標準的なプロデューサー・コンシューマーパターンです。ctx.Done()は待機中やリトライ中にキャンセルシグナルに応答するために使用されます。両方が連携します:
- チャネル閉じ → 正常終了(すべてのタスクが提出された)
- コンテキストキャンセル → 早期終了(外部からの終了要求)
ctx.Done()のみを使用する場合、チャネル閉じの追加処理が必要です。rangeのみを使用する場合、キャンセルに応答できません。
2. 結果チャネルのバッファサイズはどうやって決めるのですか?
この例ではmake(chan Result, len(taskList))で総タスク数をバッファサイズとして使用し、すべての結果がブロックなしで書き込めることを保証しています。タスク数が非常に多い(数百万)場合:
- バッファなしチャネル + 別の収集goroutineを使用
- バッチ処理し、各バッチ後にチャネルをリセット
- チャネルの代わりに
sync.Mapまたはロック付きスライスを使用
3. 真の指数バックオフを実装するには?
func exponentialBackoff(attempt int) time.Duration {
base := 100 * time.Millisecond
max := 10 * time.Second
backoff := base * time.Duration(1<<uint(attempt))
if backoff > max {
backoff = max
}
// ランダムなジッターを追加:±25%
jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
return backoff - backoff/4 + jitter
}
4. 同時に実行されるタスクの数を制限するには(並行数制御)?
この例ではワーカーの数で自然に並行数を制限しています。別のアプローチはセマフォを使用することです:
sem := make(chan struct{}, 10) // 最大同時実行数10
for _, task := range taskList {
sem <- struct{}{} // 許可を取得
go func(t Task) {
defer func() { <-sem }() // 許可を解放
t.Execute(ctx)
}(task)
}
📖 まとめ
このレッスンではGoのコア並行処理技術を総合的に活用しました:
| 技術 | 応用 | 主要コード |
|---|---|---|
| goroutine | ワーカーがタスクを並列に実行 | go Worker(...) |
| チャネル | タスクの配布と結果の収集 | tasks <-chan Task |
| select | マルチプレキシング:キャンセル、タイムアウト、バックオフ | select { case <-ctx.Done(): ... } |
| sync.WaitGroup | すべてのワーカーの完了を待機 | wg.Wait() |
| sync.Mutex | 並行処理安全な結果収集 | rc.mu.Lock() |
| context.Context | カスケードキャンセルとタイムアウト制御 | context.WithTimeout(ctx, 2*time.Second) |
設計原則の復習:
- チャネルを使用してデータを渡し、メモリを共有して通信しない
- contextを使用して呼び出しチェーン全体にキャンセルシグナルを伝播
- WaitGroupはグループのgoroutineの完了待機に使用
- Mutexは共有状態の保護に使用(この例では結果コレクター)
- チャネルを閉じることはブロードキャストシグナルとして機能し、すべてのコンシューマーに通知
📝 演習
演習1:優先度キューの追加
スケジューラを修正してタスクの優先度(高/中/低)をサポートしてください。高優先度のタスクがワーカーによって最初に処理されるようにします。
ヒント:複数のチャネルを使用するか、優先度キュー構造を実装してください。
演習2:レート制限の実装
スケジューラにレート制限を追加してください。例えば、1秒あたり最大5タスク(トークンバケットアルゴリズム)。
ヒント:time.Tickerまたはサードパーティライブラリgolang.org/x/time/rateを使用してください。
limiter := rate.NewLimiter(5, 1) // 1秒あたり5、バースト1
for task := range tasks {
limiter.Wait(ctx) // トークンを待機
task.Execute(ctx)
}
演習3:タスク進捗コールバックの追加
スケジューラにOnProgressコールバック関数を追加し、各タスク完了時に呼び出されて現在の進捗(完了数/合計数)を報告するようにしてください。
type Scheduler struct {
// ... 他のフィールド
OnProgress func(completed, total int)
}



