Concurrency Practice: Web Crawler
Concurrency Practice: Web Crawler
Analogy
Imagine you're a travel agency manager who needs to send multiple teams to different cities simultaneously to gather travel information:
- Rate limiting: The company has limited vehicles, so at most 3 teams can be dispatched at a time to avoid resource exhaustion
- Deduplication: Teams don't revisit places they've already been to
- Retry: If a team encounters a heavy rainstorm and can't reach their destination, they rest and try again
- Graceful shutdown: When it's time to close, teams already on the road complete their current tasks before returning, and no new tasks are assigned
This is exactly how a concurrent web crawler works. Let's implement it in Go.
Project Requirements
We'll develop a concurrent web crawler with the following capabilities:
- Concurrent crawling: Multiple goroutines crawl web pages simultaneously
- Rate limiting: Control maximum concurrency to avoid overloading the target server
- Deduplication: The same URL is not crawled twice
- Error retry: Automatic retry on crawl failure with exponential backoff
- Graceful shutdown: After receiving an interrupt signal, wait for running tasks to complete before exiting
System Design
┌─────────────┐
│ Seed URLs │ Seed URL queue
└──────┬──────┘
▼
┌─────────────┐
│ URL Queue │ URL-to-crawl channel
│ (channel) │
└──────┬──────┘
▼
┌──────────────────────────────────────┐
│ Worker Pool │
│ ┌─────────┐ ┌─────────┐ ┌────────┐ │
│ │Worker 1 │ │Worker 2 │ │Worker N│ │ Limited number of workers
│ └────┬────┘ └────┬────┘ └───┬────┘ │
└───────┼───────────┼──────────┼───────┘
▼ ▼ ▼
┌─────────────────────────────────────┐
│ Results Channel │ Crawl result aggregation
│ ┌──────────┐ ┌──────────┐ │
│ │Dedup Map │ │ Retry │ │ Deduplication + Retry
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────┘
Example 1: Complete Code
package main
import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"
)
// ============================================================
// Data Structure Definitions
// ============================================================
// CrawlTask represents a crawl task
type CrawlTask struct {
URL string // Target URL
Depth int // Current depth
}
// CrawlResult represents the result of a crawl
type CrawlResult struct {
URL string // Crawled URL
Title string // Page title (simplified extraction)
BodyLength int // Page content length
Depth int // Crawl depth
Err error // Error info (if any)
Latency time.Duration // Crawl duration
}
// CrawlerConfig is the crawler configuration
type CrawlerConfig struct {
MaxConcurrency int // Maximum concurrency
MaxDepth int // Maximum crawl depth
MaxRetries int // Maximum retry count
RequestTimeout time.Duration // Per-request timeout
RateInterval time.Duration // Rate limiting interval
}
// DefaultConfig returns the default configuration
func DefaultConfig() CrawlerConfig {
return CrawlerConfig{
MaxConcurrency: 3,
MaxDepth: 2,
MaxRetries: 3,
RequestTimeout: 10 * time.Second,
RateInterval: 500 * time.Millisecond,
}
}
// ============================================================
// Deduplicator (thread-safe visited URL set)
// ============================================================
// Deduplicator handles URL deduplication
type Deduplicator struct {
visited map[string]bool
mu sync.Mutex
}
// NewDeduplicator creates a deduplicator
func NewDeduplicator() *Deduplicator {
return &Deduplicator{
visited: make(map[string]bool),
}
}
// Mark marks a URL as visited; returns true if it's a new URL
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false // Already visited
}
d.visited[url] = true
return true // New URL
}
// Count returns the number of visited URLs
func (d *Deduplicator) Count() int {
d.mu.Lock()
defer d.mu.Unlock()
return len(d.visited)
}
// ============================================================
// Rate Limiter (semaphore-based concurrency control)
// ============================================================
// RateLimiter is a channel-based semaphore rate limiter
type RateLimiter struct {
semaphore chan struct{}
interval time.Duration
}
// NewRateLimiter creates a rate limiter
// maxConcurrency: maximum concurrency
// interval: minimum interval between two requests
func NewRateLimiter(maxConcurrency int, interval time.Duration) *RateLimiter {
return &RateLimiter{
semaphore: make(chan struct{}, maxConcurrency),
interval: interval,
}
}
// Acquire acquires an execution permit
func (rl *RateLimiter) Acquire() {
rl.semaphore <- struct{}{} // Send to buffered channel; blocks when full
}
// Release releases an execution permit
func (rl *RateLimiter) Release() {
time.Sleep(rl.interval) // Rate limiting: maintain minimum interval
<-rl.semaphore // Receive from buffered channel, freeing space
}
// ============================================================
// Web Fetcher
// ============================================================
// Fetcher handles actual HTTP requests
type Fetcher struct {
client *http.Client
}
// NewFetcher creates a fetcher
func NewFetcher(timeout time.Duration) *Fetcher {
return &Fetcher{
client: &http.Client{
Timeout: timeout,
},
}
}
// Fetch fetches the specified URL and returns the title and content length
func (f *Fetcher) Fetch(ctx context.Context, rawURL string) (title string, bodyLen int, err error) {
// Create a request with context for cancellation support
req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
if err != nil {
return "", 0, fmt.Errorf("failed to create request: %w", err)
}
// Set User-Agent to simulate a browser
req.Header.Set("User-Agent", "GoCrawler/1.0")
resp, err := f.client.Do(req)
if err != nil {
return "", 0, fmt.Errorf("request failed: %w", err)
}
defer resp.Body.Close()
// Check HTTP status code
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("HTTP status code: %d", resp.StatusCode)
}
// Read response body (limit size to prevent memory overflow)
const maxSize = 1 << 20 // 1MB
body, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
if err != nil {
return "", 0, fmt.Errorf("failed to read response: %w", err)
}
// Simple extraction of <title> tag content
title = extractTitle(string(body))
return title, len(body), nil
}
// extractTitle extracts the title from HTML
func extractTitle(html string) string {
re := regexp.MustCompile(`(?i)<title>(.*?)</title>`)
matches := re.FindStringSubmatch(html)
if len(matches) > 1 {
return strings.TrimSpace(matches[1])
}
return "(no title)"
}
// ============================================================
// Link Extractor
// ============================================================
// ExtractLinks extracts all links from HTML (simplified version)
func ExtractLinks(body string, baseURL string) []string {
re := regexp.MustCompile(`(?i)href=["']([^"']+)["']`)
matches := re.FindAllStringSubmatch(body, -1)
var links []string
base, err := url.Parse(baseURL)
if err != nil {
return links
}
for _, match := range matches {
if len(match) < 2 {
continue
}
href := match[1]
// Parse relative URL
parsed, err := url.Parse(href)
if err != nil {
continue
}
// Convert to absolute URL
absolute := base.ResolveReference(parsed)
// Only keep HTTP/HTTPS links
if absolute.Scheme == "http" || absolute.Scheme == "https" {
// Remove fragment (#anchor)
absolute.Fragment = ""
links = append(links, absolute.String())
}
}
return links
}
// ============================================================
// Crawler Engine
// ============================================================
// Crawler is the main crawler engine
type Crawler struct {
config CrawlerConfig
fetcher *Fetcher
dedup *Deduplicator
limiter *RateLimiter
taskCh chan CrawlTask // Task queue
resultCh chan CrawlResult // Result channel
wg sync.WaitGroup // Wait for all workers to complete
}
// NewCrawler creates a crawler
func NewCrawler(config CrawlerConfig) *Crawler {
return &Crawler{
config: config,
fetcher: NewFetcher(config.RequestTimeout),
dedup: NewDeduplicator(),
limiter: NewRateLimiter(config.MaxConcurrency, config.RateInterval),
taskCh: make(chan CrawlTask, 100),
resultCh: make(chan CrawlResult, 100),
}
}
// CrawlWithRetry performs a crawl with retry (exponential backoff)
func (c *Crawler) CrawlWithRetry(ctx context.Context, task CrawlTask) CrawlResult {
var lastErr error
for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
// Check if context is cancelled
select {
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
default:
}
// No wait needed for the first attempt
if attempt > 0 {
// Exponential backoff: 1s, 2s, 4s ...
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
fmt.Printf(" ↻ Retry %d/%d: %s (waiting %v)\n",
attempt, c.config.MaxRetries, task.URL, backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
}
}
start := time.Now()
title, bodyLen, err := c.fetcher.Fetch(ctx, task.URL)
latency := time.Since(start)
if err == nil {
// Success
return CrawlResult{
URL: task.URL,
Title: title,
BodyLength: bodyLen,
Depth: task.Depth,
Latency: latency,
}
}
lastErr = err
}
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: fmt.Errorf("failed after %d retries: %w", c.config.MaxRetries, lastErr),
}
}
// Worker is the worker goroutine
func (c *Crawler) Worker(ctx context.Context, id int) {
defer c.wg.Done()
fmt.Printf("[Worker %d] Started\n", id)
for task := range c.taskCh {
// Check context
select {
case <-ctx.Done():
fmt.Printf("[Worker %d] Received exit signal, stopping\n", id)
return
default:
}
// Rate limiting: acquire permit
c.limiter.Acquire()
fmt.Printf("[Worker %d] Crawling: %s (depth %d)\n", id, task.URL, task.Depth)
// Execute crawl (with retry)
result := c.CrawlWithRetry(ctx, task)
// Release permit
c.limiter.Release()
// Send result
select {
case c.resultCh <- result:
case <-ctx.Done():
return
}
}
fmt.Printf("[Worker %d] Exiting\n", id)
}
// Start starts the crawler
func (c *Crawler) Start(ctx context.Context, seedURLs []string) {
// Start worker pool
for i := 0; i < c.config.MaxConcurrency; i++ {
c.wg.Add(1)
go c.Worker(ctx, i)
}
// Submit seed URLs
go func() {
for _, rawURL := range seedURLs {
if c.dedup.Mark(rawURL) {
c.taskCh <- CrawlTask{URL: rawURL, Depth: 0}
}
}
}()
// Start result processing and link discovery goroutine
go c.processResults(ctx)
}
// processResults processes crawl results and discovers new links
func (c *Crawler) processResults(ctx context.Context) {
for result := range c.resultCh {
if result.Err != nil {
fmt.Printf("✗ Failed: %s — %v\n", result.URL, result.Err)
continue
}
fmt.Printf("✓ Success: %s\n", result.URL)
fmt.Printf(" Title: %s\n", result.Title)
fmt.Printf(" Size: %d bytes | Time: %v\n", result.BodyLength, result.Latency)
// If max depth not reached, can continue discovering links
// (Simplified here; in production, you'd re-crawl the page to get HTML for link extraction)
if result.Depth < c.config.MaxDepth {
// Demo: post new links to the task queue
// In production, you'd extract links from the result's HTML here
fmt.Printf(" Depth %d/%d, can continue discovering child links\n", result.Depth, c.config.MaxDepth)
}
}
}
// Wait waits for all tasks to complete
func (c *Crawler) Wait() {
// Close task channel, notify workers no more tasks
close(c.taskCh)
// Wait for all workers to exit
c.wg.Wait()
// Close result channel
close(c.resultCh)
}
// Stats returns crawler statistics
func (c *Crawler) Stats() (visited int) {
return c.dedup.Count()
}
// ============================================================
// Main Program
// ============================================================
func main() {
fmt.Println("========================================")
fmt.Println(" Go Concurrent Web Crawler v1.0")
fmt.Println("========================================")
fmt.Println()
// Load configuration
config := DefaultConfig()
// Can be overridden via command-line arguments or config file
// config.MaxConcurrency = 5
// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Capture system interrupt signal (Ctrl+C) for graceful shutdown
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
fmt.Printf("\n⚠ Received signal: %v, shutting down gracefully...\n", sig)
cancel() // Cancel context, notify all workers
}()
// Create crawler
crawler := NewCrawler(config)
// Seed URL list
seedURLs := []string{
"https://httpbin.org/html",
"https://httpbin.org/links/5",
"https://httpbin.org/range/100",
"https://httpbin.org/delay/1",
}
fmt.Printf("Config: concurrency=%d, depth=%d, retries=%d\n",
config.MaxConcurrency, config.MaxDepth, config.MaxRetries)
fmt.Printf("Seed URLs: %d\n", len(seedURLs))
fmt.Println("----------------------------------------")
// Start crawler
crawler.Start(ctx, seedURLs)
// Auto-shutdown after a while (in production, use other conditions)
go func() {
time.Sleep(30 * time.Second)
fmt.Println("\n⏱ Timeout, triggering shutdown...")
cancel()
}()
// Wait for all tasks to complete
crawler.Wait()
// Output statistics
fmt.Println("----------------------------------------")
fmt.Printf("Crawling complete! Visited %d URLs\n", crawler.Stats())
}
Code Analysis
1. Rate Limiting: Semaphore Channel
// Buffered channel as semaphore
semaphore: make(chan struct{}, maxConcurrency)
// Acquire permit: blocks when channel is full
rl.semaphore <- struct{}{}
// Release permit
<-rl.semaphore
A channel with buffer size N allows at most N goroutines to hold permits simultaneously; the N+1th goroutine will be blocked. This controls concurrency more precisely than time.Ticker.
2. Deduplication: Mutex-protected Map
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false
}
d.visited[url] = true
return true
}
Multiple goroutines may discover the same URL simultaneously; the Mark method uses a mutex to ensure atomicity — only one goroutine can "claim" that URL.
3. Error Retry: Exponential Backoff
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
| Retry # | Wait Time |
|---|---|
| 1st | 1 second |
| 2nd | 2 seconds |
| 3rd | 4 seconds |
Exponential backoff prevents "avalanche-style" retries during server failures, giving the server time to recover.
4. Graceful Shutdown: Context + Signal
ctx, cancel := context.WithCancel(context.Background())
// Listen for system signals
go func() {
<-sigCh
cancel() // Cancel context
}()
// Worker checks context
select {
case <-ctx.Done():
return // Exit
default:
// Continue working
}
cancel() notifies all goroutines listening to ctx.Done(), achieving coordinated shutdown.
5. Overall Collaboration Flow
main()
│
├─ Create context (cancellable)
├─ Start N Worker goroutines
├─ Submit seed URLs to taskCh
├─ Start processResults goroutine
│
├─ Wait...
│ ├─ Worker takes tasks from taskCh
│ ├─ Worker rate limits → crawl → retry
│ ├─ Worker sends results to resultCh
│ └─ processResults processes results, discovers new links
│
└─ cancel() triggers → All workers exit → Program ends
❓ FAQ
Q1: Why use a channel instead of sync.Mutex for concurrency control?
Channels not only provide mutual exclusion but also communication. A semaphore channel naturally combines "concurrency limiting" and "task passing" — when the channel is full, new goroutines automatically wait; when a goroutine releases, waiting ones are automatically woken. This is more concise than using Mutex + WaitGroup separately. Additionally, channels support select, making it easy to combine with context.Done() for cancellable blocking.
Q2: Why deduplicate at task submission time instead of before crawling?
Actually, both are needed. Deduplicating at task submission prevents duplicate tasks from being pushed into the channel, saving memory and processing time. However, if multiple goroutines discover the same new link almost simultaneously, deduplication at submission alone may still have "races" — so the Mark method must be thread-safe. In large-scale crawlers, a Bloom Filter is typically used for efficient handling of massive URL deduplication.
Q3: Why not just kill the program? Why "graceful shutdown"?
Force-killing can cause: corrupted files being written, unreleased database connections, incomplete requests reaching the target server. Graceful shutdown lets each worker complete its current task before exiting, ensuring data consistency. context.WithCancel is Go's standard pattern for implementing graceful shutdown.
Q4: Can this crawler handle JavaScript-rendered pages?
No. net/http only fetches raw HTML without executing JavaScript. If you need to crawl SPAs (Single Page Applications), you'll need to integrate a headless browser like Chromedp or Rod. The architecture in this tutorial is extensible — just replace the Fetcher implementation.
📖 Summary
This section comprehensively applied multiple Go concurrency core concepts through a complete web crawler project:
| Concept | Application | Key Code |
|---|---|---|
| goroutine | Multiple workers working in parallel | go c.Worker(ctx, i) |
| channel | Task queue, result passing, semaphore | taskCh, resultCh, semaphore |
| sync.Mutex | Protect deduplication map's concurrency safety | d.mu.Lock() |
| context | Cancellation propagation, graceful shutdown | ctx.WithCancel |
| select | Multiplexing, timeout control | select { case <-ctx.Done() ... } |
| sync.WaitGroup | Wait for all workers to complete | c.wg.Wait() |
These components work together to form a robust concurrent system. Mastering this "channel + context + WaitGroup" combination pattern is the foundation for writing production-grade Go concurrent programs.
📝 Exercises
Exercise 1: Add Deep Link Extraction
Currently, processResults only prints depth information without actually extracting child links. Modify the code to extract links from HTML after a successful crawl and post new URLs to taskCh.
Hint: You'll need to modify the Fetch method to also return the HTML content, then call ExtractLinks in processResults.
Exercise 2: Implement Result Persistence
Add a Saver struct that writes crawl results to a file in JSON format. Requirements:
- Use a separate goroutine to read from
resultChand write to the file - One record per line (JSON Lines format)
- Support concurrent-safe writing
Exercise 3: Implement Domain-level Rate Limiting
The current rate limiting is global. Implement a per-domain grouped rate limiter that:
- Allows at most 2 requests per second to the same domain (e.g.,
example.com) - Different domains don't affect each other
- Hint: maintain a
map[string]*RateLimiter
Next Lesson
After completing this practical exercise, continue with Lesson 19: String Processing to master Go's string internals, common operations, and performance optimization techniques.



