Concurrency Practice: Web Crawler

Concurrency Practice: Web Crawler

Analogy

Imagine you're a travel agency manager who needs to send multiple teams to different cities simultaneously to gather travel information:

This is exactly how a concurrent web crawler works. Let's implement it in Go.


Project Requirements

We'll develop a concurrent web crawler with the following capabilities:

  1. Concurrent crawling: Multiple goroutines crawl web pages simultaneously
  2. Rate limiting: Control maximum concurrency to avoid overloading the target server
  3. Deduplication: The same URL is not crawled twice
  4. Error retry: Automatic retry on crawl failure with exponential backoff
  5. Graceful shutdown: After receiving an interrupt signal, wait for running tasks to complete before exiting

System Design

┌─────────────┐
│  Seed URLs   │  Seed URL queue
└──────┬──────┘
       ▼
┌─────────────┐
│  URL Queue   │  URL-to-crawl channel
│  (channel)   │
└──────┬──────┘
       ▼
┌──────────────────────────────────────┐
│          Worker Pool                 │
│  ┌─────────┐ ┌─────────┐ ┌────────┐ │
│  │Worker 1 │ │Worker 2 │ │Worker N│ │  Limited number of workers
│  └────┬────┘ └────┬────┘ └───┬────┘ │
└───────┼───────────┼──────────┼───────┘
        ▼           ▼          ▼
┌─────────────────────────────────────┐
│         Results Channel             │  Crawl result aggregation
│    ┌──────────┐  ┌──────────┐       │
│    │Dedup Map │  │ Retry    │       │  Deduplication + Retry
│    └──────────┘  └──────────┘       │
└─────────────────────────────────────┘

Example 1: Complete Code

GO
package main

import (
	"context"
	"fmt"
	"io"
	"math"
	"net/http"
	"net/url"
	"os"
	"os/signal"
	"regexp"
	"strings"
	"sync"
	"syscall"
	"time"
)

// ============================================================
// Data Structure Definitions
// ============================================================

// CrawlTask represents a crawl task
type CrawlTask struct {
	URL   string // Target URL
	Depth int    // Current depth
}

// CrawlResult represents the result of a crawl
type CrawlResult struct {
	URL        string        // Crawled URL
	Title      string        // Page title (simplified extraction)
	BodyLength int           // Page content length
	Depth      int           // Crawl depth
	Err        error         // Error info (if any)
	Latency    time.Duration // Crawl duration
}

// CrawlerConfig is the crawler configuration
type CrawlerConfig struct {
	MaxConcurrency int           // Maximum concurrency
	MaxDepth       int           // Maximum crawl depth
	MaxRetries     int           // Maximum retry count
	RequestTimeout time.Duration // Per-request timeout
	RateInterval   time.Duration // Rate limiting interval
}

// DefaultConfig returns the default configuration
func DefaultConfig() CrawlerConfig {
	return CrawlerConfig{
		MaxConcurrency: 3,
		MaxDepth:       2,
		MaxRetries:     3,
		RequestTimeout: 10 * time.Second,
		RateInterval:   500 * time.Millisecond,
	}
}

// ============================================================
// Deduplicator (thread-safe visited URL set)
// ============================================================

// Deduplicator handles URL deduplication
type Deduplicator struct {
	visited map[string]bool
	mu      sync.Mutex
}

// NewDeduplicator creates a deduplicator
func NewDeduplicator() *Deduplicator {
	return &Deduplicator{
		visited: make(map[string]bool),
	}
}

// Mark marks a URL as visited; returns true if it's a new URL
func (d *Deduplicator) Mark(url string) bool {
	d.mu.Lock()
	defer d.mu.Unlock()

	if d.visited[url] {
		return false // Already visited
	}
	d.visited[url] = true
	return true // New URL
}

// Count returns the number of visited URLs
func (d *Deduplicator) Count() int {
	d.mu.Lock()
	defer d.mu.Unlock()
	return len(d.visited)
}

// ============================================================
// Rate Limiter (semaphore-based concurrency control)
// ============================================================

// RateLimiter is a channel-based semaphore rate limiter
type RateLimiter struct {
	semaphore chan struct{}
	interval  time.Duration
}

// NewRateLimiter creates a rate limiter
// maxConcurrency: maximum concurrency
// interval: minimum interval between two requests
func NewRateLimiter(maxConcurrency int, interval time.Duration) *RateLimiter {
	return &RateLimiter{
		semaphore: make(chan struct{}, maxConcurrency),
		interval:  interval,
	}
}

// Acquire acquires an execution permit
func (rl *RateLimiter) Acquire() {
	rl.semaphore <- struct{}{} // Send to buffered channel; blocks when full
}

// Release releases an execution permit
func (rl *RateLimiter) Release() {
	time.Sleep(rl.interval) // Rate limiting: maintain minimum interval
	<-rl.semaphore          // Receive from buffered channel, freeing space
}

// ============================================================
// Web Fetcher
// ============================================================

// Fetcher handles actual HTTP requests
type Fetcher struct {
	client *http.Client
}

// NewFetcher creates a fetcher
func NewFetcher(timeout time.Duration) *Fetcher {
	return &Fetcher{
		client: &http.Client{
			Timeout: timeout,
		},
	}
}

// Fetch fetches the specified URL and returns the title and content length
func (f *Fetcher) Fetch(ctx context.Context, rawURL string) (title string, bodyLen int, err error) {
	// Create a request with context for cancellation support
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
	if err != nil {
		return "", 0, fmt.Errorf("failed to create request: %w", err)
	}

	// Set User-Agent to simulate a browser
	req.Header.Set("User-Agent", "GoCrawler/1.0")

	resp, err := f.client.Do(req)
	if err != nil {
		return "", 0, fmt.Errorf("request failed: %w", err)
	}
	defer resp.Body.Close()

	// Check HTTP status code
	if resp.StatusCode != http.StatusOK {
		return "", 0, fmt.Errorf("HTTP status code: %d", resp.StatusCode)
	}

	// Read response body (limit size to prevent memory overflow)
	const maxSize = 1 << 20 // 1MB
	body, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
	if err != nil {
		return "", 0, fmt.Errorf("failed to read response: %w", err)
	}

	// Simple extraction of <title> tag content
	title = extractTitle(string(body))

	return title, len(body), nil
}

// extractTitle extracts the title from HTML
func extractTitle(html string) string {
	re := regexp.MustCompile(`(?i)<title>(.*?)</title>`)
	matches := re.FindStringSubmatch(html)
	if len(matches) > 1 {
		return strings.TrimSpace(matches[1])
	}
	return "(no title)"
}

// ============================================================
// Link Extractor
// ============================================================

// ExtractLinks extracts all links from HTML (simplified version)
func ExtractLinks(body string, baseURL string) []string {
	re := regexp.MustCompile(`(?i)href=["']([^"']+)["']`)
	matches := re.FindAllStringSubmatch(body, -1)

	var links []string
	base, err := url.Parse(baseURL)
	if err != nil {
		return links
	}

	for _, match := range matches {
		if len(match) < 2 {
			continue
		}
		href := match[1]

		// Parse relative URL
		parsed, err := url.Parse(href)
		if err != nil {
			continue
		}

		// Convert to absolute URL
		absolute := base.ResolveReference(parsed)

		// Only keep HTTP/HTTPS links
		if absolute.Scheme == "http" || absolute.Scheme == "https" {
			// Remove fragment (#anchor)
			absolute.Fragment = ""
			links = append(links, absolute.String())
		}
	}

	return links
}

// ============================================================
// Crawler Engine
// ============================================================

// Crawler is the main crawler engine
type Crawler struct {
	config    CrawlerConfig
	fetcher   *Fetcher
	dedup     *Deduplicator
	limiter   *RateLimiter
	taskCh    chan CrawlTask   // Task queue
	resultCh  chan CrawlResult // Result channel
	wg        sync.WaitGroup  // Wait for all workers to complete
}

// NewCrawler creates a crawler
func NewCrawler(config CrawlerConfig) *Crawler {
	return &Crawler{
		config:   config,
		fetcher:  NewFetcher(config.RequestTimeout),
		dedup:    NewDeduplicator(),
		limiter:  NewRateLimiter(config.MaxConcurrency, config.RateInterval),
		taskCh:   make(chan CrawlTask, 100),
		resultCh: make(chan CrawlResult, 100),
	}
}

// CrawlWithRetry performs a crawl with retry (exponential backoff)
func (c *Crawler) CrawlWithRetry(ctx context.Context, task CrawlTask) CrawlResult {
	var lastErr error

	for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
		// Check if context is cancelled
		select {
		case <-ctx.Done():
			return CrawlResult{
				URL:   task.URL,
				Depth: task.Depth,
				Err:   ctx.Err(),
			}
		default:
		}

		// No wait needed for the first attempt
		if attempt > 0 {
			// Exponential backoff: 1s, 2s, 4s ...
			backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
			fmt.Printf("  ↻ Retry %d/%d: %s (waiting %v)\n",
				attempt, c.config.MaxRetries, task.URL, backoff)

			select {
			case <-time.After(backoff):
			case <-ctx.Done():
				return CrawlResult{
					URL:   task.URL,
					Depth: task.Depth,
					Err:   ctx.Err(),
				}
			}
		}

		start := time.Now()
		title, bodyLen, err := c.fetcher.Fetch(ctx, task.URL)
		latency := time.Since(start)

		if err == nil {
			// Success
			return CrawlResult{
				URL:        task.URL,
				Title:      title,
				BodyLength: bodyLen,
				Depth:      task.Depth,
				Latency:    latency,
			}
		}

		lastErr = err
	}

	return CrawlResult{
		URL:   task.URL,
		Depth: task.Depth,
		Err:   fmt.Errorf("failed after %d retries: %w", c.config.MaxRetries, lastErr),
	}
}

// Worker is the worker goroutine
func (c *Crawler) Worker(ctx context.Context, id int) {
	defer c.wg.Done()

	fmt.Printf("[Worker %d] Started\n", id)

	for task := range c.taskCh {
		// Check context
		select {
		case <-ctx.Done():
			fmt.Printf("[Worker %d] Received exit signal, stopping\n", id)
			return
		default:
		}

		// Rate limiting: acquire permit
		c.limiter.Acquire()

		fmt.Printf("[Worker %d] Crawling: %s (depth %d)\n", id, task.URL, task.Depth)

		// Execute crawl (with retry)
		result := c.CrawlWithRetry(ctx, task)

		// Release permit
		c.limiter.Release()

		// Send result
		select {
		case c.resultCh <- result:
		case <-ctx.Done():
			return
		}
	}

	fmt.Printf("[Worker %d] Exiting\n", id)
}

// Start starts the crawler
func (c *Crawler) Start(ctx context.Context, seedURLs []string) {
	// Start worker pool
	for i := 0; i < c.config.MaxConcurrency; i++ {
		c.wg.Add(1)
		go c.Worker(ctx, i)
	}

	// Submit seed URLs
	go func() {
		for _, rawURL := range seedURLs {
			if c.dedup.Mark(rawURL) {
				c.taskCh <- CrawlTask{URL: rawURL, Depth: 0}
			}
		}
	}()

	// Start result processing and link discovery goroutine
	go c.processResults(ctx)
}

// processResults processes crawl results and discovers new links
func (c *Crawler) processResults(ctx context.Context) {
	for result := range c.resultCh {
		if result.Err != nil {
			fmt.Printf("✗ Failed: %s — %v\n", result.URL, result.Err)
			continue
		}

		fmt.Printf("✓ Success: %s\n", result.URL)
		fmt.Printf("  Title: %s\n", result.Title)
		fmt.Printf("  Size: %d bytes | Time: %v\n", result.BodyLength, result.Latency)

		// If max depth not reached, can continue discovering links
		// (Simplified here; in production, you'd re-crawl the page to get HTML for link extraction)
		if result.Depth < c.config.MaxDepth {
			// Demo: post new links to the task queue
			// In production, you'd extract links from the result's HTML here
			fmt.Printf("  Depth %d/%d, can continue discovering child links\n", result.Depth, c.config.MaxDepth)
		}
	}
}

// Wait waits for all tasks to complete
func (c *Crawler) Wait() {
	// Close task channel, notify workers no more tasks
	close(c.taskCh)
	// Wait for all workers to exit
	c.wg.Wait()
	// Close result channel
	close(c.resultCh)
}

// Stats returns crawler statistics
func (c *Crawler) Stats() (visited int) {
	return c.dedup.Count()
}

// ============================================================
// Main Program
// ============================================================

func main() {
	fmt.Println("========================================")
	fmt.Println("  Go Concurrent Web Crawler v1.0")
	fmt.Println("========================================")
	fmt.Println()

	// Load configuration
	config := DefaultConfig()
	// Can be overridden via command-line arguments or config file
	// config.MaxConcurrency = 5

	// Create a cancellable context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Capture system interrupt signal (Ctrl+C) for graceful shutdown
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		sig := <-sigCh
		fmt.Printf("\n⚠ Received signal: %v, shutting down gracefully...\n", sig)
		cancel() // Cancel context, notify all workers
	}()

	// Create crawler
	crawler := NewCrawler(config)

	// Seed URL list
	seedURLs := []string{
		"https://httpbin.org/html",
		"https://httpbin.org/links/5",
		"https://httpbin.org/range/100",
		"https://httpbin.org/delay/1",
	}

	fmt.Printf("Config: concurrency=%d, depth=%d, retries=%d\n",
		config.MaxConcurrency, config.MaxDepth, config.MaxRetries)
	fmt.Printf("Seed URLs: %d\n", len(seedURLs))
	fmt.Println("----------------------------------------")

	// Start crawler
	crawler.Start(ctx, seedURLs)

	// Auto-shutdown after a while (in production, use other conditions)
	go func() {
		time.Sleep(30 * time.Second)
		fmt.Println("\n⏱ Timeout, triggering shutdown...")
		cancel()
	}()

	// Wait for all tasks to complete
	crawler.Wait()

	// Output statistics
	fmt.Println("----------------------------------------")
	fmt.Printf("Crawling complete! Visited %d URLs\n", crawler.Stats())
}
▶ Try it Yourself

Code Analysis

1. Rate Limiting: Semaphore Channel

GO
// Buffered channel as semaphore
semaphore: make(chan struct{}, maxConcurrency)

// Acquire permit: blocks when channel is full
rl.semaphore <- struct{}{}

// Release permit
<-rl.semaphore

A channel with buffer size N allows at most N goroutines to hold permits simultaneously; the N+1th goroutine will be blocked. This controls concurrency more precisely than time.Ticker.

2. Deduplication: Mutex-protected Map

GO
func (d *Deduplicator) Mark(url string) bool {
    d.mu.Lock()
    defer d.mu.Unlock()
    if d.visited[url] {
        return false
    }
    d.visited[url] = true
    return true
}

Multiple goroutines may discover the same URL simultaneously; the Mark method uses a mutex to ensure atomicity — only one goroutine can "claim" that URL.

3. Error Retry: Exponential Backoff

GO
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
Retry # Wait Time
1st 1 second
2nd 2 seconds
3rd 4 seconds

Exponential backoff prevents "avalanche-style" retries during server failures, giving the server time to recover.

4. Graceful Shutdown: Context + Signal

GO
ctx, cancel := context.WithCancel(context.Background())

// Listen for system signals
go func() {
    <-sigCh
    cancel() // Cancel context
}()

// Worker checks context
select {
case <-ctx.Done():
    return // Exit
default:
    // Continue working
}

cancel() notifies all goroutines listening to ctx.Done(), achieving coordinated shutdown.

5. Overall Collaboration Flow

main()
  │
  ├─ Create context (cancellable)
  ├─ Start N Worker goroutines
  ├─ Submit seed URLs to taskCh
  ├─ Start processResults goroutine
  │
  ├─ Wait...
  │   ├─ Worker takes tasks from taskCh
  │   ├─ Worker rate limits → crawl → retry
  │   ├─ Worker sends results to resultCh
  │   └─ processResults processes results, discovers new links
  │
  └─ cancel() triggers → All workers exit → Program ends

❓ FAQ

Q1: Why use a channel instead of sync.Mutex for concurrency control?

Channels not only provide mutual exclusion but also communication. A semaphore channel naturally combines "concurrency limiting" and "task passing" — when the channel is full, new goroutines automatically wait; when a goroutine releases, waiting ones are automatically woken. This is more concise than using Mutex + WaitGroup separately. Additionally, channels support select, making it easy to combine with context.Done() for cancellable blocking.

Q2: Why deduplicate at task submission time instead of before crawling?

Actually, both are needed. Deduplicating at task submission prevents duplicate tasks from being pushed into the channel, saving memory and processing time. However, if multiple goroutines discover the same new link almost simultaneously, deduplication at submission alone may still have "races" — so the Mark method must be thread-safe. In large-scale crawlers, a Bloom Filter is typically used for efficient handling of massive URL deduplication.

Q3: Why not just kill the program? Why "graceful shutdown"?

Force-killing can cause: corrupted files being written, unreleased database connections, incomplete requests reaching the target server. Graceful shutdown lets each worker complete its current task before exiting, ensuring data consistency. context.WithCancel is Go's standard pattern for implementing graceful shutdown.

Q4: Can this crawler handle JavaScript-rendered pages?

No. net/http only fetches raw HTML without executing JavaScript. If you need to crawl SPAs (Single Page Applications), you'll need to integrate a headless browser like Chromedp or Rod. The architecture in this tutorial is extensible — just replace the Fetcher implementation.


📖 Summary

This section comprehensively applied multiple Go concurrency core concepts through a complete web crawler project:

Concept Application Key Code
goroutine Multiple workers working in parallel go c.Worker(ctx, i)
channel Task queue, result passing, semaphore taskCh, resultCh, semaphore
sync.Mutex Protect deduplication map's concurrency safety d.mu.Lock()
context Cancellation propagation, graceful shutdown ctx.WithCancel
select Multiplexing, timeout control select { case <-ctx.Done() ... }
sync.WaitGroup Wait for all workers to complete c.wg.Wait()

These components work together to form a robust concurrent system. Mastering this "channel + context + WaitGroup" combination pattern is the foundation for writing production-grade Go concurrent programs.


📝 Exercises

Currently, processResults only prints depth information without actually extracting child links. Modify the code to extract links from HTML after a successful crawl and post new URLs to taskCh.

Hint: You'll need to modify the Fetch method to also return the HTML content, then call ExtractLinks in processResults.

Exercise 2: Implement Result Persistence

Add a Saver struct that writes crawl results to a file in JSON format. Requirements:

Exercise 3: Implement Domain-level Rate Limiting

The current rate limiting is global. Implement a per-domain grouped rate limiter that:


Next Lesson

After completing this practical exercise, continue with Lesson 19: String Processing to master Go's string internals, common operations, and performance optimization techniques.

Web-Tutorial.com

Web-Tutorial Tech Team

A team of developers maintaining programming tutorials. Each tutorial is written and reviewed by developers with expertise in that field. We work to keep our content accurate and reliable — if you spot an issue, please let us know.

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏