並行処理演習:Webクローラー
並行処理演習:Webクローラー
例え話
旅行代理店のマネージャーで、複数のチームを異なる都市に同時に送って旅行情報を収集する必要があると想像してください:
- レート制限:会社の車両は限られているため、同時に最大3チームしか派遣できず、リソースの枯渇を回避
- 重複排除:チームはすでに訪れた場所を再訪しない
- リトライ:チームが大雨に遭遇して目的地に到達できない場合、休憩してから再試行
- グレースフルシャットダウン:閉店時間になると、すでに路上にいるチームは現在のタスクを完了してから戻り、新しいタスクは割り当てられない
これがまさに並列Webクローラーの仕組みです。Goで実装してみましょう。
プロジェクト要件
以下の機能を持つ並列Webクローラーを開発します:
- 並列クロール:複数のgoroutineが同時にWebページをクロール
- レート制限:最大同時実行数を制御し、対象サーバーへの過負荷を回避
- 重複排除:同じURLが2回クロールされない
- エラーリトライ:クロール失敗時に指数バックオフで自動リトライ
- グレースフルシャットダウン:割り込みシグナル受信後、実行中のタスクが完了するまで待機してから終了
システム設計
┌─────────────┐
│ シードURL │ シードURLキュー
└──────┬──────┘
▼
┌─────────────┐
│ URLキュー │ クロール対象URLチャネル
│ (チャネル) │
└──────┬──────┘
▼
┌──────────────────────────────────────┐
│ ワーカープール │
│ ┌─────────┐ ┌─────────┐ ┌────────┐ │
│ │ワーカー1 │ │ワーカー2 │ │ワーカーN│ │ 限定されたワーカー数
│ └────┬────┘ └────┬────┘ └───┬────┘ │
└───────┼───────────┼──────────┼───────┘
▼ ▼ ▼
┌─────────────────────────────────────┐
│ 結果チャネル │ クロール結果集約
│ ┌──────────┐ ┌──────────┐ │
│ │重複排除 │ │ リトライ │ │ 重複排除 + リトライ
│ │マップ │ │ │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────┘
例 1: 完全なコード
package main
import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"
)
// ============================================================
// データ構造定義
// ============================================================
// CrawlTaskはクロールタスクを表します
type CrawlTask struct {
URL string // 対象URL
Depth int // 現在の深度
}
// CrawlResultはクロールの結果を表します
type CrawlResult struct {
URL string // クロールされたURL
Title string // ページタイトル(簡略化された抽出)
BodyLength int // ページコンテンツの長さ
Depth int // クロール深度
Err error // エラー情報(存在する場合)
Latency time.Duration // クロール所要時間
}
// CrawlerConfigはクローラーの設定です
type CrawlerConfig struct {
MaxConcurrency int // 最大同時実行数
MaxDepth int // 最大クロール深度
MaxRetries int // 最大リトライ回数
RequestTimeout time.Duration // リクエストごとのタイムアウト
RateInterval time.Duration // レート制限間隔
}
// DefaultConfigはデフォルトの設定を返します
func DefaultConfig() CrawlerConfig {
return CrawlerConfig{
MaxConcurrency: 3,
MaxDepth: 2,
MaxRetries: 3,
RequestTimeout: 10 * time.Second,
RateInterval: 500 * time.Millisecond,
}
}
// ============================================================
// 重複排除(スレッドセーフな訪問済みURLセット)
// ============================================================
// DeduplicatorはURLの重複排除を処理します
type Deduplicator struct {
visited map[string]bool
mu sync.Mutex
}
// NewDeduplicatorは重複排除器を作成します
func NewDeduplicator() *Deduplicator {
return &Deduplicator{
visited: make(map[string]bool),
}
}
// MarkはURLを訪問済みとしてマークします;新しいURLの場合はtrueを返します
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false // すでに訪問済み
}
d.visited[url] = true
return true // 新しいURL
}
// Countは訪問済みURLの数を返します
func (d *Deduplicator) Count() int {
d.mu.Lock()
defer d.mu.Unlock()
return len(d.visited)
}
// ============================================================
// レートリミッター(セマフォベースの並行数制御)
// ============================================================
// RateLimiterはチャネルベースのセマフォレートリミッターです
type RateLimiter struct {
semaphore chan struct{}
interval time.Duration
}
// NewRateLimiterはレートリミッターを作成します
// maxConcurrency:最大同時実行数
// interval:2つのリクエスト間の最小間隔
func NewRateLimiter(maxConcurrency int, interval time.Duration) *RateLimiter {
return &RateLimiter{
semaphore: make(chan struct{}, maxConcurrency),
interval: interval,
}
}
// Acquireは実行許可を取得します
func (rl *RateLimiter) Acquire() {
rl.semaphore <- struct{}{} // バッファ付きチャネルに送信;満杯の場合はブロック
}
// Releaseは実行許可を解放します
func (rl *RateLimiter) Release() {
time.Sleep(rl.interval) // レート制限:最小間隔を維持
<-rl.semaphore // バッファ付きチャネルから受信し、空きを作成
}
// ============================================================
// Webフェッチャー
// ============================================================
// Fetcherは実際のHTTPリクエストを処理します
type Fetcher struct {
client *http.Client
}
// NewFetcherはフェッチャーを作成します
func NewFetcher(timeout time.Duration) *Fetcher {
return &Fetcher{
client: &http.Client{
Timeout: timeout,
},
}
}
// Fetchは指定されたURLを取得し、タイトルとコンテンツの長さを返します
func (f *Fetcher) Fetch(ctx context.Context, rawURL string) (title string, bodyLen int, err error) {
// キャンセルサポートのためのコンテキスト付きリクエストを作成
req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
if err != nil {
return "", 0, fmt.Errorf("リクエスト作成失敗:%w", err)
}
// ブラウザをシミュレートするためにUser-Agentを設定
req.Header.Set("User-Agent", "GoCrawler/1.0")
resp, err := f.client.Do(req)
if err != nil {
return "", 0, fmt.Errorf("リクエスト失敗:%w", err)
}
defer resp.Body.Close()
// HTTPステータスコードをチェック
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("HTTPステータスコード:%d", resp.StatusCode)
}
// レスポンスボディを読み取る(メモリオーバーフロー防止のためサイズ制限)
const maxSize = 1 << 20 // 1MB
body, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
if err != nil {
return "", 0, fmt.Errorf("レスポンス読み取り失敗:%w", err)
}
// <title>タグのコンテンツを簡略抽出
title = extractTitle(string(body))
return title, len(body), nil
}
// extractTitleはHTMLからタイトルを抽出します
func extractTitle(html string) string {
re := regexp.MustCompile(`(?i)<title>(.*?)</title>`)
matches := re.FindStringSubmatch(html)
if len(matches) > 1 {
return strings.TrimSpace(matches[1])
}
return "(タイトルなし)"
}
// ============================================================
// リンク抽出器
// ============================================================
// ExtractLinksはHTMLからすべてのリンクを抽出します(簡略版)
func ExtractLinks(body string, baseURL string) []string {
re := regexp.MustCompile(`(?i)href=["']([^"']+)["']`)
matches := re.FindAllStringSubmatch(body, -1)
var links []string
base, err := url.Parse(baseURL)
if err != nil {
return links
}
for _, match := range matches {
if len(match) < 2 {
continue
}
href := match[1]
// 相対URLを解析
parsed, err := url.Parse(href)
if err != nil {
continue
}
// 絶対URLに変換
absolute := base.ResolveReference(parsed)
// HTTP/HTTPSリンクのみを保持
if absolute.Scheme == "http" || absolute.Scheme == "https" {
// フラグメント(#アンカー)を削除
absolute.Fragment = ""
links = append(links, absolute.String())
}
}
return links
}
// ============================================================
// クローラーエンジン
// ============================================================
// Crawlerはメインのクローラーエンジンです
type Crawler struct {
config CrawlerConfig
fetcher *Fetcher
dedup *Deduplicator
limiter *RateLimiter
taskCh chan CrawlTask // タスクキュー
resultCh chan CrawlResult // 結果チャネル
wg sync.WaitGroup // すべてのワーカーの完了を待機
}
// NewCrawlerはクローラーを作成します
func NewCrawler(config CrawlerConfig) *Crawler {
return &Crawler{
config: config,
fetcher: NewFetcher(config.RequestTimeout),
dedup: NewDeduplicator(),
limiter: NewRateLimiter(config.MaxConcurrency, config.RateInterval),
taskCh: make(chan CrawlTask, 100),
resultCh: make(chan CrawlResult, 100),
}
}
// CrawlWithRetryはリトライ付きでクロールを実行します(指数バックオフ)
func (c *Crawler) CrawlWithRetry(ctx context.Context, task CrawlTask) CrawlResult {
var lastErr error
for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
// コンテキストがキャンセルされたかチェック
select {
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
default:
}
// 最初の試行では待機不要
if attempt > 0 {
// 指数バックオフ:1秒、2秒、4秒 ...
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
fmt.Printf(" ↻ リトライ %d/%d:%s(%v 待機中)\n",
attempt, c.config.MaxRetries, task.URL, backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
}
}
start := time.Now()
title, bodyLen, err := c.fetcher.Fetch(ctx, task.URL)
latency := time.Since(start)
if err == nil {
// 成功
return CrawlResult{
URL: task.URL,
Title: title,
BodyLength: bodyLen,
Depth: task.Depth,
Latency: latency,
}
}
lastErr = err
}
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: fmt.Errorf("%d回のリトライ後に失敗:%w", c.config.MaxRetries, lastErr),
}
}
// Workerはワーカーgoroutineです
func (c *Crawler) Worker(ctx context.Context, id int) {
defer c.wg.Done()
fmt.Printf("[ワーカー %d] 開始\n", id)
for task := range c.taskCh {
// コンテキストをチェック
select {
case <-ctx.Done():
fmt.Printf("[ワーカー %d] 終了シグナルを受信、停止します\n", id)
return
default:
}
// レート制限:許可を取得
c.limiter.Acquire()
fmt.Printf("[ワーカー %d] クロール中:%s(深度 %d)\n", id, task.URL, task.Depth)
// クロールを実行(リトライ付き)
result := c.CrawlWithRetry(ctx, task)
// 許可を解放
c.limiter.Release()
// 結果を送信
select {
case c.resultCh <- result:
case <-ctx.Done():
return
}
}
fmt.Printf("[ワーカー %d] 終了\n", id)
}
// Startはクローラーを開始します
func (c *Crawler) Start(ctx context.Context, seedURLs []string) {
// ワーカープールを起動
for i := 0; i < c.config.MaxConcurrency; i++ {
c.wg.Add(1)
go c.Worker(ctx, i)
}
// シードURLを提出
go func() {
for _, rawURL := range seedURLs {
if c.dedup.Mark(rawURL) {
c.taskCh <- CrawlTask{URL: rawURL, Depth: 0}
}
}
}()
// 結果処理とリンク発見のgoroutineを起動
go c.processResults(ctx)
}
// processResultsはクロール結果を処理し、新しいリンクを発見します
func (c *Crawler) processResults(ctx context.Context) {
for result := range c.resultCh {
if result.Err != nil {
fmt.Printf("✗ 失敗:%s — %v\n", result.URL, result.Err)
continue
}
fmt.Printf("✓ 成功:%s\n", result.URL)
fmt.Printf(" タイトル:%s\n", result.Title)
fmt.Printf(" サイズ:%dバイト | 時間:%v\n", result.BodyLength, result.Latency)
// 最大深度に達していない場合、リンクの発見を続けることができます
// (ここでは簡略化;本番環境では、リンク抽出のためにページを再クロールします)
if result.Depth < c.config.MaxDepth {
// デモ:新しいリンクをタスクキューに投稿
// 本番環境では、ここで結果のHTMLからリンクを抽出します
fmt.Printf(" 深度 %d/%d、子リンクの発見を続けることができます\n", result.Depth, c.config.MaxDepth)
}
}
}
// Waitはすべてのタスクの完了を待機します
func (c *Crawler) Wait() {
// タスクチャネルを閉じ、ワーカーにこれ以上タスクがないことを通知
close(c.taskCh)
// すべてのワーカーが終了するまで待機
c.wg.Wait()
// 結果チャネルを閉じる
close(c.resultCh)
}
// Statsはクローラーの統計を返します
func (c *Crawler) Stats() (visited int) {
return c.dedup.Count()
}
// ============================================================
// メインプログラム
// ============================================================
func main() {
fmt.Println("========================================")
fmt.Println(" Go並列Webクローラー v1.0")
fmt.Println("========================================")
fmt.Println()
// 設定を読み込む
config := DefaultConfig()
// コマンドライン引数や設定ファイルで上書き可能
// config.MaxConcurrency = 5
// キャンセル可能なコンテキストを作成
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// システム割り込みシグナル(Ctrl+C)をキャプチャしてグレースフルシャットダウン
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
fmt.Printf("\n⚠ シグナルを受信:%v、グレースフルにシャットダウン中...\n", sig)
cancel() // コンテキストをキャンセルし、すべてのワーカーに通知
}()
// クローラーを作成
crawler := NewCrawler(config)
// シードURLリスト
seedURLs := []string{
"https://httpbin.org/html",
"https://httpbin.org/links/5",
"https://httpbin.org/range/100",
"https://httpbin.org/delay/1",
}
fmt.Printf("設定:同時実行数=%d、深度=%d、リトライ=%d\n",
config.MaxConcurrency, config.MaxDepth, config.MaxRetries)
fmt.Printf("シードURL:%d\n", len(seedURLs))
fmt.Println("----------------------------------------")
// クローラーを開始
crawler.Start(ctx, seedURLs)
// 一定時間後に自動シャットダウン(本番環境では他の条件を使用)
go func() {
time.Sleep(30 * time.Second)
fmt.Println("\n⏱ タイムアウト、シャットダウンをトリガー...")
cancel()
}()
// すべてのタスクの完了を待機
crawler.Wait()
// 統計を出力
fmt.Println("----------------------------------------")
fmt.Printf("クロール完了!%d個のURLを訪問しました\n", crawler.Stats())
}
コード分析
1. レート制限:セマフォチャネル
// バッファ付きチャネルをセマフォとして使用
semaphore: make(chan struct{}, maxConcurrency)
// 許可を取得:チャネルが満杯の場合はブロック
rl.semaphore <- struct{}{}
// 許可を解放
<-rl.semaphore
バッファサイズNのチャネルは、同時に最大N個のgoroutineが許可を保持することを可能にします。N+1番目のgoroutineはブロックされます。これはtime.Tickerよりも正確に並行数を制御します。
2. 重複排除:ミューテックス保護マップ
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false
}
d.visited[url] = true
return true
}
複数のgoroutineが同時に同じURLを発見する可能性があります。Markメソッドはミューテックスを使用してアトミック性を確保し、1つのgoroutineのみがそのURLを「要求」できます。
3. エラーリトライ:指数バックオフ
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
| リトライ回数 | 待機時間 |
|---|---|
| 1回目 | 1秒 |
| 2回目 | 2秒 |
| 3回目 | 4秒 |
指数バックオフはサーバー障害時の「雪崩式」リトライを防ぎ、サーバーに回復時間を与えます。
4. グレースフルシャットダウン:Context + シグナル
ctx, cancel := context.WithCancel(context.Background())
// システムシグナルをリッスン
go func() {
<-sigCh
cancel() // コンテキストをキャンセル
}()
// ワーカーがコンテキストをチェック
select {
case <-ctx.Done():
return // 終了
default:
// 作業を続行
}
cancel()はctx.Done()をリッスンしているすべてのgoroutineに通知し、協調的なシャットダウンを実現します。
5. 全体の協調フロー
main()
│
├─ コンテキストを作成(キャンセル可能)
├─ N個のワーカーgoroutineを起動
├─ シードURLをtaskChに提出
├─ processResults goroutineを起動
│
├─ 待機...
│ ├─ ワーカーがtaskChからタスクを取得
│ ├─ ワーカーがレート制限 → クロール → リトライ
│ ├─ ワーカーが結果をresultChに送信
│ └─ processResultsが結果を処理し、新しいリンクを発見
│
└─ cancel()トリガー → すべてのワーカーが終了 → プログラム終了
❓ よくある質問
Q1:並行数制御にsync.Mutexではなくチャネルを使用するのはなぜですか?
チャネルは相互排除だけでなく通信も提供します。セマフォチャネルは「並行数制限」と「タスク渡し」を自然に組み合わせます——チャネルが満杯の時、新しいgoroutineは自動的に待機し、goroutineが解放すると待機中のものが自動的に覚醒されます。これはMutex + WaitGroupを別々に使用するよりも簡潔です。さらに、チャネルはselectをサポートし、context.Done()と組み合わせたキャンセル可能なブロッキングを容易にします。
Q2:なぜタスク提出時に重複排除するのですか?クロール前ではなく?
実際には、両方が必要です。タスク提出時に重複排除すると、重複タスクがチャネルにプッシュされるのを防ぎ、メモリと処理時間を節約できます。ただし、複数のgoroutineがほぼ同時に同じ新しいリンクを発見した場合、提出時のみの重複排除では「レース」が発生する可能性があるため、Markメソッドはスレッドセーフである必要があります。大規模なクローラーでは、大量のURL重複排除を効率的に処理するためにブルームフィルターが一般的に使用されます。
Q3:なぜプログラムを強制終了するだけではだめなのですか?「グレースフルシャットダウン」が必要な理由は?
強制終了は以下を引き起こす可能性があります:書き込み中のファイルの破損、解放されていないデータベース接続、対象サーバーに到達した未完了のリクエスト。グレースフルシャットダウンでは、各ワーカーが現在のタスクを完了してから終了し、データの一貫性を保証します。context.WithCancelはGoのグレースフルシャットダウン実装の標準パターンです。
Q4:このクローラーはJavaScriptでレンダリングされたページを処理できますか?
いいえ。net/httpは生のHTMLのみを取得し、JavaScriptを実行しません。SPA(シングルページアプリケーション)をクロールする必要がある場合は、ChromedpやRodのようなヘッドレスブラウザを統合する必要があります。このチュートリアルのアーキテクチャは拡張可能です——Fetcherの実装を置き換えるだけです。
📖 まとめ
このセクションでは、完全なWebクローラープロジェクトを通じて、複数のGo並行処理コアコンセプトを総合的に活用しました:
| コンセプト | 応用 | 主要コード |
|---|---|---|
| goroutine | 複数のワーカーが並列に作業 | go c.Worker(ctx, i) |
| チャネル | タスクキュー、結果渡し、セマフォ | taskCh、resultCh、semaphore |
| sync.Mutex | 重複排除マップの並行処理安全性を保護 | d.mu.Lock() |
| context | キャンセル伝播、グレースフルシャットダウン | ctx.WithCancel |
| select | マルチプレキシング、タイムアウト制御 | select { case <-ctx.Done() ... } |
| sync.WaitGroup | すべてのワーカーの完了を待機 | c.wg.Wait() |
これらのコンポーネントが連携して、堅牢な並行処理システムを形成します。この「チャネル + context + WaitGroup」の組み合わせパターンをマスターすることが、本番環境グレードのGo並行処理プログラムを書くための基盤です。
📝 演習
演習1:深層リンク抽出の追加
現在、processResultsは深度情報を出力するだけで、実際には子リンクを抽出していません。クロール成功後にHTMLからリンクを抽出し、新しいURLをtaskChに投稿するようにコードを修正してください。
ヒント:Fetchメソッドを修正してHTMLコンテンツも返すようにし、processResultsでExtractLinksを呼び出す必要があります。
演習2:結果の永続化の実装
クロール結果をJSON形式でファイルに書き込むSaver構造体を追加してください。要件:
- 別のgoroutineを使用して
resultChから読み取り、ファイルに書き込み - 1行に1レコード(JSON Lines形式)
- 並行処理安全な書き込みをサポート
演習3:ドメインレベルのレート制限の実装
現在のレート制限はグローバルです。ドメインごとのグループ化されたレートリミッターを実装してください:
- 同じドメイン(例:
example.com)へのリクエストは1秒あたり最大2つ - 異なるドメインはお互いに影響しない
- ヒント:
map[string]*RateLimiterを維持
次のレッスン
この実践演習を完了した後、レッスン19:文字列処理に進んで、Goの文字列内部構造、一般的な操作、パフォーマンス最適化テクニックをマスターしてください。



