並行処理演習:Webクローラー

並行処理演習:Webクローラー

例え話

旅行代理店のマネージャーで、複数のチームを異なる都市に同時に送って旅行情報を収集する必要があると想像してください:

これがまさに並列Webクローラーの仕組みです。Goで実装してみましょう。


プロジェクト要件

以下の機能を持つ並列Webクローラーを開発します:

  1. 並列クロール:複数のgoroutineが同時にWebページをクロール
  2. レート制限:最大同時実行数を制御し、対象サーバーへの過負荷を回避
  3. 重複排除:同じURLが2回クロールされない
  4. エラーリトライ:クロール失敗時に指数バックオフで自動リトライ
  5. グレースフルシャットダウン:割り込みシグナル受信後、実行中のタスクが完了するまで待機してから終了

システム設計

┌─────────────┐
│  シードURL   │  シードURLキュー
└──────┬──────┘
       ▼
┌─────────────┐
│  URLキュー   │  クロール対象URLチャネル
│  (チャネル)  │
└──────┬──────┘
       ▼
┌──────────────────────────────────────┐
│          ワーカープール               │
│  ┌─────────┐ ┌─────────┐ ┌────────┐ │
│  │ワーカー1 │ │ワーカー2 │ │ワーカーN│ │  限定されたワーカー数
│  └────┬────┘ └────┬────┘ └───┬────┘ │
└───────┼───────────┼──────────┼───────┘
        ▼           ▼          ▼
┌─────────────────────────────────────┐
│         結果チャネル                 │  クロール結果集約
│    ┌──────────┐  ┌──────────┐       │
│    │重複排除   │  │ リトライ  │       │  重複排除 + リトライ
│    │マップ     │  │          │       │
│    └──────────┘  └──────────┘       │
└─────────────────────────────────────┘

例 1: 完全なコード

GO
package main

import (
	"context"
	"fmt"
	"io"
	"math"
	"net/http"
	"net/url"
	"os"
	"os/signal"
	"regexp"
	"strings"
	"sync"
	"syscall"
	"time"
)

// ============================================================
// データ構造定義
// ============================================================

// CrawlTaskはクロールタスクを表します
type CrawlTask struct {
	URL   string // 対象URL
	Depth int    // 現在の深度
}

// CrawlResultはクロールの結果を表します
type CrawlResult struct {
	URL        string        // クロールされたURL
	Title      string        // ページタイトル(簡略化された抽出)
	BodyLength int           // ページコンテンツの長さ
	Depth      int           // クロール深度
	Err        error         // エラー情報(存在する場合)
	Latency    time.Duration // クロール所要時間
}

// CrawlerConfigはクローラーの設定です
type CrawlerConfig struct {
	MaxConcurrency int           // 最大同時実行数
	MaxDepth       int           // 最大クロール深度
	MaxRetries     int           // 最大リトライ回数
	RequestTimeout time.Duration // リクエストごとのタイムアウト
	RateInterval   time.Duration // レート制限間隔
}

// DefaultConfigはデフォルトの設定を返します
func DefaultConfig() CrawlerConfig {
	return CrawlerConfig{
		MaxConcurrency: 3,
		MaxDepth:       2,
		MaxRetries:     3,
		RequestTimeout: 10 * time.Second,
		RateInterval:   500 * time.Millisecond,
	}
}

// ============================================================
// 重複排除(スレッドセーフな訪問済みURLセット)
// ============================================================

// DeduplicatorはURLの重複排除を処理します
type Deduplicator struct {
	visited map[string]bool
	mu      sync.Mutex
}

// NewDeduplicatorは重複排除器を作成します
func NewDeduplicator() *Deduplicator {
	return &Deduplicator{
		visited: make(map[string]bool),
	}
}

// MarkはURLを訪問済みとしてマークします;新しいURLの場合はtrueを返します
func (d *Deduplicator) Mark(url string) bool {
	d.mu.Lock()
	defer d.mu.Unlock()

	if d.visited[url] {
		return false // すでに訪問済み
	}
	d.visited[url] = true
	return true // 新しいURL
}

// Countは訪問済みURLの数を返します
func (d *Deduplicator) Count() int {
	d.mu.Lock()
	defer d.mu.Unlock()
	return len(d.visited)
}

// ============================================================
// レートリミッター(セマフォベースの並行数制御)
// ============================================================

// RateLimiterはチャネルベースのセマフォレートリミッターです
type RateLimiter struct {
	semaphore chan struct{}
	interval  time.Duration
}

// NewRateLimiterはレートリミッターを作成します
// maxConcurrency:最大同時実行数
// interval:2つのリクエスト間の最小間隔
func NewRateLimiter(maxConcurrency int, interval time.Duration) *RateLimiter {
	return &RateLimiter{
		semaphore: make(chan struct{}, maxConcurrency),
		interval:  interval,
	}
}

// Acquireは実行許可を取得します
func (rl *RateLimiter) Acquire() {
	rl.semaphore <- struct{}{} // バッファ付きチャネルに送信;満杯の場合はブロック
}

// Releaseは実行許可を解放します
func (rl *RateLimiter) Release() {
	time.Sleep(rl.interval) // レート制限:最小間隔を維持
	<-rl.semaphore          // バッファ付きチャネルから受信し、空きを作成
}

// ============================================================
// Webフェッチャー
// ============================================================

// Fetcherは実際のHTTPリクエストを処理します
type Fetcher struct {
	client *http.Client
}

// NewFetcherはフェッチャーを作成します
func NewFetcher(timeout time.Duration) *Fetcher {
	return &Fetcher{
		client: &http.Client{
			Timeout: timeout,
		},
	}
}

// Fetchは指定されたURLを取得し、タイトルとコンテンツの長さを返します
func (f *Fetcher) Fetch(ctx context.Context, rawURL string) (title string, bodyLen int, err error) {
	// キャンセルサポートのためのコンテキスト付きリクエストを作成
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
	if err != nil {
		return "", 0, fmt.Errorf("リクエスト作成失敗:%w", err)
	}

	// ブラウザをシミュレートするためにUser-Agentを設定
	req.Header.Set("User-Agent", "GoCrawler/1.0")

	resp, err := f.client.Do(req)
	if err != nil {
		return "", 0, fmt.Errorf("リクエスト失敗:%w", err)
	}
	defer resp.Body.Close()

	// HTTPステータスコードをチェック
	if resp.StatusCode != http.StatusOK {
		return "", 0, fmt.Errorf("HTTPステータスコード:%d", resp.StatusCode)
	}

	// レスポンスボディを読み取る(メモリオーバーフロー防止のためサイズ制限)
	const maxSize = 1 << 20 // 1MB
	body, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
	if err != nil {
		return "", 0, fmt.Errorf("レスポンス読み取り失敗:%w", err)
	}

	// <title>タグのコンテンツを簡略抽出
	title = extractTitle(string(body))

	return title, len(body), nil
}

// extractTitleはHTMLからタイトルを抽出します
func extractTitle(html string) string {
	re := regexp.MustCompile(`(?i)<title>(.*?)</title>`)
	matches := re.FindStringSubmatch(html)
	if len(matches) > 1 {
		return strings.TrimSpace(matches[1])
	}
	return "(タイトルなし)"
}

// ============================================================
// リンク抽出器
// ============================================================

// ExtractLinksはHTMLからすべてのリンクを抽出します(簡略版)
func ExtractLinks(body string, baseURL string) []string {
	re := regexp.MustCompile(`(?i)href=["']([^"']+)["']`)
	matches := re.FindAllStringSubmatch(body, -1)

	var links []string
	base, err := url.Parse(baseURL)
	if err != nil {
		return links
	}

	for _, match := range matches {
		if len(match) < 2 {
			continue
		}
		href := match[1]

		// 相対URLを解析
		parsed, err := url.Parse(href)
		if err != nil {
			continue
		}

		// 絶対URLに変換
		absolute := base.ResolveReference(parsed)

		// HTTP/HTTPSリンクのみを保持
		if absolute.Scheme == "http" || absolute.Scheme == "https" {
			// フラグメント(#アンカー)を削除
			absolute.Fragment = ""
			links = append(links, absolute.String())
		}
	}

	return links
}

// ============================================================
// クローラーエンジン
// ============================================================

// Crawlerはメインのクローラーエンジンです
type Crawler struct {
	config    CrawlerConfig
	fetcher   *Fetcher
	dedup     *Deduplicator
	limiter   *RateLimiter
	taskCh    chan CrawlTask   // タスクキュー
	resultCh  chan CrawlResult // 結果チャネル
	wg        sync.WaitGroup  // すべてのワーカーの完了を待機
}

// NewCrawlerはクローラーを作成します
func NewCrawler(config CrawlerConfig) *Crawler {
	return &Crawler{
		config:   config,
		fetcher:  NewFetcher(config.RequestTimeout),
		dedup:    NewDeduplicator(),
		limiter:  NewRateLimiter(config.MaxConcurrency, config.RateInterval),
		taskCh:   make(chan CrawlTask, 100),
		resultCh: make(chan CrawlResult, 100),
	}
}

// CrawlWithRetryはリトライ付きでクロールを実行します(指数バックオフ)
func (c *Crawler) CrawlWithRetry(ctx context.Context, task CrawlTask) CrawlResult {
	var lastErr error

	for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
		// コンテキストがキャンセルされたかチェック
		select {
		case <-ctx.Done():
			return CrawlResult{
				URL:   task.URL,
				Depth: task.Depth,
				Err:   ctx.Err(),
			}
		default:
		}

		// 最初の試行では待機不要
		if attempt > 0 {
			// 指数バックオフ:1秒、2秒、4秒 ...
			backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
			fmt.Printf("  ↻ リトライ %d/%d:%s(%v 待機中)\n",
				attempt, c.config.MaxRetries, task.URL, backoff)

			select {
			case <-time.After(backoff):
			case <-ctx.Done():
				return CrawlResult{
					URL:   task.URL,
					Depth: task.Depth,
					Err:   ctx.Err(),
				}
			}
		}

		start := time.Now()
		title, bodyLen, err := c.fetcher.Fetch(ctx, task.URL)
		latency := time.Since(start)

		if err == nil {
			// 成功
			return CrawlResult{
				URL:        task.URL,
				Title:      title,
				BodyLength: bodyLen,
				Depth:      task.Depth,
				Latency:    latency,
			}
		}

		lastErr = err
	}

	return CrawlResult{
		URL:   task.URL,
		Depth: task.Depth,
		Err:   fmt.Errorf("%d回のリトライ後に失敗:%w", c.config.MaxRetries, lastErr),
	}
}

// Workerはワーカーgoroutineです
func (c *Crawler) Worker(ctx context.Context, id int) {
	defer c.wg.Done()

	fmt.Printf("[ワーカー %d] 開始\n", id)

	for task := range c.taskCh {
		// コンテキストをチェック
		select {
		case <-ctx.Done():
			fmt.Printf("[ワーカー %d] 終了シグナルを受信、停止します\n", id)
			return
		default:
		}

		// レート制限:許可を取得
		c.limiter.Acquire()

		fmt.Printf("[ワーカー %d] クロール中:%s(深度 %d)\n", id, task.URL, task.Depth)

		// クロールを実行(リトライ付き)
		result := c.CrawlWithRetry(ctx, task)

		// 許可を解放
		c.limiter.Release()

		// 結果を送信
		select {
		case c.resultCh <- result:
		case <-ctx.Done():
			return
		}
	}

	fmt.Printf("[ワーカー %d] 終了\n", id)
}

// Startはクローラーを開始します
func (c *Crawler) Start(ctx context.Context, seedURLs []string) {
	// ワーカープールを起動
	for i := 0; i < c.config.MaxConcurrency; i++ {
		c.wg.Add(1)
		go c.Worker(ctx, i)
	}

	// シードURLを提出
	go func() {
		for _, rawURL := range seedURLs {
			if c.dedup.Mark(rawURL) {
				c.taskCh <- CrawlTask{URL: rawURL, Depth: 0}
			}
		}
	}()

	// 結果処理とリンク発見のgoroutineを起動
	go c.processResults(ctx)
}

// processResultsはクロール結果を処理し、新しいリンクを発見します
func (c *Crawler) processResults(ctx context.Context) {
	for result := range c.resultCh {
		if result.Err != nil {
			fmt.Printf("✗ 失敗:%s — %v\n", result.URL, result.Err)
			continue
		}

		fmt.Printf("✓ 成功:%s\n", result.URL)
		fmt.Printf("  タイトル:%s\n", result.Title)
		fmt.Printf("  サイズ:%dバイト | 時間:%v\n", result.BodyLength, result.Latency)

		// 最大深度に達していない場合、リンクの発見を続けることができます
		// (ここでは簡略化;本番環境では、リンク抽出のためにページを再クロールします)
		if result.Depth < c.config.MaxDepth {
			// デモ:新しいリンクをタスクキューに投稿
			// 本番環境では、ここで結果のHTMLからリンクを抽出します
			fmt.Printf("  深度 %d/%d、子リンクの発見を続けることができます\n", result.Depth, c.config.MaxDepth)
		}
	}
}

// Waitはすべてのタスクの完了を待機します
func (c *Crawler) Wait() {
	// タスクチャネルを閉じ、ワーカーにこれ以上タスクがないことを通知
	close(c.taskCh)
	// すべてのワーカーが終了するまで待機
	c.wg.Wait()
	// 結果チャネルを閉じる
	close(c.resultCh)
}

// Statsはクローラーの統計を返します
func (c *Crawler) Stats() (visited int) {
	return c.dedup.Count()
}

// ============================================================
// メインプログラム
// ============================================================

func main() {
	fmt.Println("========================================")
	fmt.Println("  Go並列Webクローラー v1.0")
	fmt.Println("========================================")
	fmt.Println()

	// 設定を読み込む
	config := DefaultConfig()
	// コマンドライン引数や設定ファイルで上書き可能
	// config.MaxConcurrency = 5

	// キャンセル可能なコンテキストを作成
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// システム割り込みシグナル(Ctrl+C)をキャプチャしてグレースフルシャットダウン
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		sig := <-sigCh
		fmt.Printf("\n⚠ シグナルを受信:%v、グレースフルにシャットダウン中...\n", sig)
		cancel() // コンテキストをキャンセルし、すべてのワーカーに通知
	}()

	// クローラーを作成
	crawler := NewCrawler(config)

	// シードURLリスト
	seedURLs := []string{
		"https://httpbin.org/html",
		"https://httpbin.org/links/5",
		"https://httpbin.org/range/100",
		"https://httpbin.org/delay/1",
	}

	fmt.Printf("設定:同時実行数=%d、深度=%d、リトライ=%d\n",
		config.MaxConcurrency, config.MaxDepth, config.MaxRetries)
	fmt.Printf("シードURL:%d\n", len(seedURLs))
	fmt.Println("----------------------------------------")

	// クローラーを開始
	crawler.Start(ctx, seedURLs)

	// 一定時間後に自動シャットダウン(本番環境では他の条件を使用)
	go func() {
		time.Sleep(30 * time.Second)
		fmt.Println("\n⏱ タイムアウト、シャットダウンをトリガー...")
		cancel()
	}()

	// すべてのタスクの完了を待機
	crawler.Wait()

	// 統計を出力
	fmt.Println("----------------------------------------")
	fmt.Printf("クロール完了!%d個のURLを訪問しました\n", crawler.Stats())
}
▶ 試してみよう

コード分析

1. レート制限:セマフォチャネル

GO
// バッファ付きチャネルをセマフォとして使用
semaphore: make(chan struct{}, maxConcurrency)

// 許可を取得:チャネルが満杯の場合はブロック
rl.semaphore <- struct{}{}

// 許可を解放
<-rl.semaphore

バッファサイズNのチャネルは、同時に最大N個のgoroutineが許可を保持することを可能にします。N+1番目のgoroutineはブロックされます。これはtime.Tickerよりも正確に並行数を制御します。

2. 重複排除:ミューテックス保護マップ

GO
func (d *Deduplicator) Mark(url string) bool {
    d.mu.Lock()
    defer d.mu.Unlock()
    if d.visited[url] {
        return false
    }
    d.visited[url] = true
    return true
}

複数のgoroutineが同時に同じURLを発見する可能性があります。Markメソッドはミューテックスを使用してアトミック性を確保し、1つのgoroutineのみがそのURLを「要求」できます。

3. エラーリトライ:指数バックオフ

GO
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
リトライ回数 待機時間
1回目 1秒
2回目 2秒
3回目 4秒

指数バックオフはサーバー障害時の「雪崩式」リトライを防ぎ、サーバーに回復時間を与えます。

4. グレースフルシャットダウン:Context + シグナル

GO
ctx, cancel := context.WithCancel(context.Background())

// システムシグナルをリッスン
go func() {
    <-sigCh
    cancel() // コンテキストをキャンセル
}()

// ワーカーがコンテキストをチェック
select {
case <-ctx.Done():
    return // 終了
default:
    // 作業を続行
}

cancel()ctx.Done()をリッスンしているすべてのgoroutineに通知し、協調的なシャットダウンを実現します。

5. 全体の協調フロー

main()
  │
  ├─ コンテキストを作成(キャンセル可能)
  ├─ N個のワーカーgoroutineを起動
  ├─ シードURLをtaskChに提出
  ├─ processResults goroutineを起動
  │
  ├─ 待機...
  │   ├─ ワーカーがtaskChからタスクを取得
  │   ├─ ワーカーがレート制限 → クロール → リトライ
  │   ├─ ワーカーが結果をresultChに送信
  │   └─ processResultsが結果を処理し、新しいリンクを発見
  │
  └─ cancel()トリガー → すべてのワーカーが終了 → プログラム終了

❓ よくある質問

Q1:並行数制御にsync.Mutexではなくチャネルを使用するのはなぜですか?

チャネルは相互排除だけでなく通信も提供します。セマフォチャネルは「並行数制限」と「タスク渡し」を自然に組み合わせます——チャネルが満杯の時、新しいgoroutineは自動的に待機し、goroutineが解放すると待機中のものが自動的に覚醒されます。これはMutex + WaitGroupを別々に使用するよりも簡潔です。さらに、チャネルはselectをサポートし、context.Done()と組み合わせたキャンセル可能なブロッキングを容易にします。

Q2:なぜタスク提出時に重複排除するのですか?クロール前ではなく?

実際には、両方が必要です。タスク提出時に重複排除すると、重複タスクがチャネルにプッシュされるのを防ぎ、メモリと処理時間を節約できます。ただし、複数のgoroutineがほぼ同時に同じ新しいリンクを発見した場合、提出時のみの重複排除では「レース」が発生する可能性があるため、Markメソッドはスレッドセーフである必要があります。大規模なクローラーでは、大量のURL重複排除を効率的に処理するためにブルームフィルターが一般的に使用されます。

Q3:なぜプログラムを強制終了するだけではだめなのですか?「グレースフルシャットダウン」が必要な理由は?

強制終了は以下を引き起こす可能性があります:書き込み中のファイルの破損、解放されていないデータベース接続、対象サーバーに到達した未完了のリクエスト。グレースフルシャットダウンでは、各ワーカーが現在のタスクを完了してから終了し、データの一貫性を保証します。context.WithCancelはGoのグレースフルシャットダウン実装の標準パターンです。

Q4:このクローラーはJavaScriptでレンダリングされたページを処理できますか?

いいえ。net/httpは生のHTMLのみを取得し、JavaScriptを実行しません。SPA(シングルページアプリケーション)をクロールする必要がある場合は、ChromedpRodのようなヘッドレスブラウザを統合する必要があります。このチュートリアルのアーキテクチャは拡張可能です——Fetcherの実装を置き換えるだけです。


📖 まとめ

このセクションでは、完全なWebクローラープロジェクトを通じて、複数のGo並行処理コアコンセプトを総合的に活用しました:

コンセプト 応用 主要コード
goroutine 複数のワーカーが並列に作業 go c.Worker(ctx, i)
チャネル タスクキュー、結果渡し、セマフォ taskChresultChsemaphore
sync.Mutex 重複排除マップの並行処理安全性を保護 d.mu.Lock()
context キャンセル伝播、グレースフルシャットダウン ctx.WithCancel
select マルチプレキシング、タイムアウト制御 select { case <-ctx.Done() ... }
sync.WaitGroup すべてのワーカーの完了を待機 c.wg.Wait()

これらのコンポーネントが連携して、堅牢な並行処理システムを形成します。この「チャネル + context + WaitGroup」の組み合わせパターンをマスターすることが、本番環境グレードのGo並行処理プログラムを書くための基盤です。


📝 演習

演習1:深層リンク抽出の追加

現在、processResultsは深度情報を出力するだけで、実際には子リンクを抽出していません。クロール成功後にHTMLからリンクを抽出し、新しいURLをtaskChに投稿するようにコードを修正してください。

ヒントFetchメソッドを修正してHTMLコンテンツも返すようにし、processResultsExtractLinksを呼び出す必要があります。

演習2:結果の永続化の実装

クロール結果をJSON形式でファイルに書き込むSaver構造体を追加してください。要件:

演習3:ドメインレベルのレート制限の実装

現在のレート制限はグローバルです。ドメインごとのグループ化されたレートリミッターを実装してください:


次のレッスン

この実践演習を完了した後、レッスン19:文字列処理に進んで、Goの文字列内部構造、一般的な操作、パフォーマンス最適化テクニックをマスターしてください。

Web-Tutorial.com

Web-Tutorial 技術チーム

複数の開発者によって共同維持されているプログラミングチュートリアルプラットフォーム。各チュートリアルは専門分野の開発者が執筆・レビューしています。正確で信頼性の高いコンテンツを目指しています — 問題を見つけた場合はお知らせください。

100%