404 Not Found

404 Not Found


nginx

并发实战:网页爬虫

并发实战:网页爬虫

生活类比

想象你是一个旅行社的经理,需要同时派出多支小队去不同城市收集旅游信息:

这正是一个并发网页爬虫的工作方式。让我们用 Go 来实现它。


项目需求

我们要开发一个并发网页爬虫,具备以下能力:

  1. 并发抓取:多个 goroutine 同时抓取网页
  2. 限流:控制最大并发数,避免目标服务器过载
  3. 去重:同一 URL 不重复抓取
  4. 错误重试:抓取失败时自动重试,带指数退避
  5. 优雅退出:收到中断信号后,等待正在执行的任务完成再退出

系统设计

┌─────────────┐
│  Seed URLs   │  种子URL队列
└──────┬──────┘
       ▼
┌─────────────┐
│  URL Queue   │  待抓取URL通道
│  (channel)   │
└──────┬──────┘
       ▼
┌──────────────────────────────────────┐
│          Worker Pool                 │
│  ┌─────────┐ ┌─────────┐ ┌────────┐ │
│  │Worker 1 │ │Worker 2 │ │Worker N│ │  有限个工作者
│  └────┬────┘ └────┬────┘ └───┬────┘ │
└───────┼───────────┼──────────┼───────┘
        ▼           ▼          ▼
┌─────────────────────────────────────┐
│         Results Channel             │  抓取结果汇总
│    ┌──────────┐  ┌──────────┐       │
│    │Dedup Map │  │ Retry    │       │  去重 + 重试
│    └──────────┘  └──────────┘       │
└─────────────────────────────────────┘

完整代码

GO
package main

import (
	"context"
	"fmt"
	"io"
	"math"
	"net/http"
	"net/url"
	"os"
	"os/signal"
	"regexp"
	"strings"
	"sync"
	"syscall"
	"time"
)

// ============================================================
// 数据结构定义
// ============================================================

// CrawlTask 表示一个抓取任务
type CrawlTask struct {
	URL   string // 目标地址
	Depth int    // 当前深度
}

// CrawlResult 表示一次抓取的结果
type CrawlResult struct {
	URL        string        // 抓取地址
	Title      string        // 页面标题(简化提取)
	BodyLength int           // 页面内容长度
	Depth      int           // 抓取深度
	Err        error         // 错误信息(如有)
	Latency    time.Duration // 抓取耗时
}

// CrawlerConfig 爬虫配置
type CrawlerConfig struct {
	MaxConcurrency int           // 最大并发数
	MaxDepth       int           // 最大抓取深度
	MaxRetries     int           // 最大重试次数
	RequestTimeout time.Duration // 单次请求超时
	RateInterval   time.Duration // 限流间隔
}

// DefaultConfig 返回默认配置
func DefaultConfig() CrawlerConfig {
	return CrawlerConfig{
		MaxConcurrency: 3,
		MaxDepth:       2,
		MaxRetries:     3,
		RequestTimeout: 10 * time.Second,
		RateInterval:   500 * time.Millisecond,
	}
}

// ============================================================
// 去重器(线程安全的已访问URL集合)
// ============================================================

// Deduplicator 负责URL去重
type Deduplicator struct {
	visited map[string]bool
	mu      sync.Mutex
}

// NewDeduplicator 创建去重器
func NewDeduplicator() *Deduplicator {
	return &Deduplicator{
		visited: make(map[string]bool),
	}
}

// Mark 标记URL为已访问,返回true表示是新URL
func (d *Deduplicator) Mark(url string) bool {
	d.mu.Lock()
	defer d.mu.Unlock()

	if d.visited[url] {
		return false // 已经访问过
	}
	d.visited[url] = true
	return true // 新URL
}

// Count 返回已访问URL数量
func (d *Deduplicator) Count() int {
	d.mu.Lock()
	defer d.mu.Unlock()
	return len(d.visited)
}

// ============================================================
// 限流器(基于信号量的并发控制)
// ============================================================

// RateLimiter 基于channel的信号量限流器
type RateLimiter struct {
	semaphore chan struct{}
	interval  time.Duration
}

// NewRateLimiter 创建限流器
// maxConcurrency: 最大并发数
// interval: 两次请求之间的最小间隔
func NewRateLimiter(maxConcurrency int, interval time.Duration) *RateLimiter {
	return &RateLimiter{
		semaphore: make(chan struct{}, maxConcurrency),
		interval:  interval,
	}
}

// Acquire 获取一个执行许可
func (rl *RateLimiter) Acquire() {
	rl.semaphore <- struct{}{} // 发送到缓冲channel,满时阻塞
}

// Release 释放执行许可
func (rl *RateLimiter) Release() {
	time.Sleep(rl.interval) // 限流:保持最小间隔
	<-rl.semaphore          // 从缓冲channel接收,腾出空间
}

// ============================================================
// 网页抓取器
// ============================================================

// Fetcher 负责实际的HTTP请求
type Fetcher struct {
	client *http.Client
}

// NewFetcher 创建抓取器
func NewFetcher(timeout time.Duration) *Fetcher {
	return &Fetcher{
		client: &http.Client{
			Timeout: timeout,
		},
	}
}

// Fetch 抓取指定URL,返回标题和内容长度
func (f *Fetcher) Fetch(ctx context.Context, rawURL string) (title string, bodyLen int, err error) {
	// 创建带context的请求,支持取消
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
	if err != nil {
		return "", 0, fmt.Errorf("创建请求失败: %w", err)
	}

	// 设置User-Agent,模拟浏览器
	req.Header.Set("User-Agent", "GoCrawler/1.0")

	resp, err := f.client.Do(req)
	if err != nil {
		return "", 0, fmt.Errorf("请求失败: %w", err)
	}
	defer resp.Body.Close()

	// 检查HTTP状态码
	if resp.StatusCode != http.StatusOK {
		return "", 0, fmt.Errorf("HTTP状态码: %d", resp.StatusCode)
	}

	// 读取响应体(限制大小,防止内存溢出)
	const maxSize = 1 << 20 // 1MB
	body, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
	if err != nil {
		return "", 0, fmt.Errorf("读取响应失败: %w", err)
	}

	// 简单提取<title>标签内容
	title = extractTitle(string(body))

	return title, len(body), nil
}

// extractTitle 从HTML中提取标题
func extractTitle(html string) string {
	re := regexp.MustCompile(`(?i)<title>(.*?)</title>`)
	matches := re.FindStringSubmatch(html)
	if len(matches) > 1 {
		return strings.TrimSpace(matches[1])
	}
	return "(无标题)"
}

// ============================================================
// 链接提取器
// ============================================================

// ExtractLinks 从HTML中提取所有链接(简化版)
func ExtractLinks(body string, baseURL string) []string {
	re := regexp.MustCompile(`(?i)href=["']([^"']+)["']`)
	matches := re.FindAllStringSubmatch(body, -1)

	var links []string
	base, err := url.Parse(baseURL)
	if err != nil {
		return links
	}

	for _, match := range matches {
		if len(match) < 2 {
			continue
		}
		href := match[1]

		// 解析相对URL
		parsed, err := url.Parse(href)
		if err != nil {
			continue
		}

		// 转换为绝对URL
		absolute := base.ResolveReference(parsed)

		// 只保留HTTP/HTTPS链接
		if absolute.Scheme == "http" || absolute.Scheme == "https" {
			// 去掉fragment(#锚点)
			absolute.Fragment = ""
			links = append(links, absolute.String())
		}
	}

	return links
}

// ============================================================
// 爬虫引擎
// ============================================================

// Crawler 爬虫主引擎
type Crawler struct {
	config    CrawlerConfig
	fetcher   *Fetcher
	dedup     *Deduplicator
	limiter   *RateLimiter
	taskCh    chan CrawlTask   // 任务队列
	resultCh  chan CrawlResult // 结果通道
	wg        sync.WaitGroup  // 等待所有worker完成
}

// NewCrawler 创建爬虫
func NewCrawler(config CrawlerConfig) *Crawler {
	return &Crawler{
		config:   config,
		fetcher:  NewFetcher(config.RequestTimeout),
		dedup:    NewDeduplicator(),
		limiter:  NewRateLimiter(config.MaxConcurrency, config.RateInterval),
		taskCh:   make(chan CrawlTask, 100),
		resultCh: make(chan CrawlResult, 100),
	}
}

// CrawlWithRetry 带重试的抓取(指数退避)
func (c *Crawler) CrawlWithRetry(ctx context.Context, task CrawlTask) CrawlResult {
	var lastErr error

	for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
		// 检查context是否已取消
		select {
		case <-ctx.Done():
			return CrawlResult{
				URL:   task.URL,
				Depth: task.Depth,
				Err:   ctx.Err(),
			}
		default:
		}

		// 第一次不需要等待
		if attempt > 0 {
			// 指数退避:1s, 2s, 4s ...
			backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
			fmt.Printf("  ↻ 重试 %d/%d: %s (等待 %v)\n",
				attempt, c.config.MaxRetries, task.URL, backoff)

			select {
			case <-time.After(backoff):
			case <-ctx.Done():
				return CrawlResult{
					URL:   task.URL,
					Depth: task.Depth,
					Err:   ctx.Err(),
				}
			}
		}

		start := time.Now()
		title, bodyLen, err := c.fetcher.Fetch(ctx, task.URL)
		latency := time.Since(start)

		if err == nil {
			// 成功
			return CrawlResult{
				URL:        task.URL,
				Title:      title,
				BodyLength: bodyLen,
				Depth:      task.Depth,
				Latency:    latency,
			}
		}

		lastErr = err
	}

	return CrawlResult{
		URL:   task.URL,
		Depth: task.Depth,
		Err:   fmt.Errorf("重试%d次后失败: %w", c.config.MaxRetries, lastErr),
	}
}

// Worker 工作协程
func (c *Crawler) Worker(ctx context.Context, id int) {
	defer c.wg.Done()

	fmt.Printf("[Worker %d] 启动\n", id)

	for task := range c.taskCh {
		// 检查context
		select {
		case <-ctx.Done():
			fmt.Printf("[Worker %d] 收到退出信号,停止\n", id)
			return
		default:
		}

		// 限流:获取许可
		c.limiter.Acquire()

		fmt.Printf("[Worker %d] 抓取: %s (深度 %d)\n", id, task.URL, task.Depth)

		// 执行抓取(带重试)
		result := c.CrawlWithRetry(ctx, task)

		// 释放许可
		c.limiter.Release()

		// 发送结果
		select {
		case c.resultCh <- result:
		case <-ctx.Done():
			return
		}
	}

	fmt.Printf("[Worker %d] 退出\n", id)
}

// Start 启动爬虫
func (c *Crawler) Start(ctx context.Context, seedURLs []string) {
	// 启动Worker池
	for i := 0; i < c.config.MaxConcurrency; i++ {
		c.wg.Add(1)
		go c.Worker(ctx, i)
	}

	// 投递种子URL
	go func() {
		for _, rawURL := range seedURLs {
			if c.dedup.Mark(rawURL) {
				c.taskCh <- CrawlTask{URL: rawURL, Depth: 0}
			}
		}
	}()

	// 启动结果处理和链接发现协程
	go c.processResults(ctx)
}

// processResults 处理抓取结果,发现新链接
func (c *Crawler) processResults(ctx context.Context) {
	for result := range c.resultCh {
		if result.Err != nil {
			fmt.Printf("✗ 失败: %s — %v\n", result.URL, result.Err)
			continue
		}

		fmt.Printf("✓ 成功: %s\n", result.URL)
		fmt.Printf("  标题: %s\n", result.Title)
		fmt.Printf("  大小: %d 字节 | 耗时: %v\n", result.BodyLength, result.Latency)

		// 如果未达到最大深度,可以继续发现链接
		// (此处简化处理,实际项目中需要重新抓取页面获取HTML来提取链接)
		if result.Depth < c.config.MaxDepth {
			// 演示:将新链接投递到任务队列
			// 实际项目中,这里会从result的HTML中提取链接
			fmt.Printf("  深度 %d/%d,可继续发现子链接\n", result.Depth, c.config.MaxDepth)
		}
	}
}

// Wait 等待所有任务完成
func (c *Crawler) Wait() {
	// 关闭任务通道,通知worker不再接收新任务
	close(c.taskCh)
	// 等待所有worker退出
	c.wg.Wait()
	// 关闭结果通道
	close(c.resultCh)
}

// Stats 返回爬虫统计信息
func (c *Crawler) Stats() (visited int) {
	return c.dedup.Count()
}

// ============================================================
// 主程序
// ============================================================

func main() {
	fmt.Println("========================================")
	fmt.Println("  Go 并发网页爬虫 v1.0")
	fmt.Println("========================================")
	fmt.Println()

	// 加载配置
	config := DefaultConfig()
	// 可以通过命令行参数或配置文件覆盖
	// config.MaxConcurrency = 5

	// 创建带取消功能的context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 捕获系统中断信号(Ctrl+C),实现优雅退出
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		sig := <-sigCh
		fmt.Printf("\n⚠ 收到信号: %v,正在优雅退出...\n", sig)
		cancel() // 取消context,通知所有worker
	}()

	// 创建爬虫
	crawler := NewCrawler(config)

	// 种子URL列表
	seedURLs := []string{
		"https://httpbin.org/html",
		"https://httpbin.org/links/5",
		"https://httpbin.org/range/100",
		"https://httpbin.org/delay/1",
	}

	fmt.Printf("配置: 并发=%d, 深度=%d, 重试=%d\n",
		config.MaxConcurrency, config.MaxDepth, config.MaxRetries)
	fmt.Printf("种子URL: %d 个\n", len(seedURLs))
	fmt.Println("----------------------------------------")

	// 启动爬虫
	crawler.Start(ctx, seedURLs)

	// 等待一段时间后自动关闭(实际项目中可能用其他条件判断)
	go func() {
		time.Sleep(30 * time.Second)
		fmt.Println("\n⏱ 超时,触发关闭...")
		cancel()
	}()

	// 等待所有任务完成
	crawler.Wait()

	// 输出统计信息
	fmt.Println("----------------------------------------")
	fmt.Printf("抓取完成!共访问 %d 个URL\n", crawler.Stats())
}

代码解析

1. 限流机制:信号量 Channel

GO
// 缓冲channel作为信号量
semaphore: make(chan struct{}, maxConcurrency)

// 获取许可:channel满时自动阻塞
rl.semaphore <- struct{}{}

// 释放许可
<-rl.semaphore

缓冲大小为 N 的 channel 最多允许 N 个 goroutine 同时持有许可,第 N+1 个 goroutine 会被阻塞。这比 time.Ticker 更精确地控制并发数。

2. 去重机制:互斥锁保护的 Map

GO
func (d *Deduplicator) Mark(url string) bool {
    d.mu.Lock()
    defer d.mu.Unlock()
    if d.visited[url] {
        return false
    }
    d.visited[url] = true
    return true
}

多个 goroutine 可能同时发现同一个 URL,Mark 方法用互斥锁保证原子性——只有一个 goroutine 能"抢到"这个 URL。

3. 错误重试:指数退避

GO
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
重试次数 等待时间
第1次 1秒
第2次 2秒
第3次 4秒

指数退避避免在服务器故障时"雪崩式"重试,给服务器恢复的时间。

4. 优雅退出:Context + Signal

GO
ctx, cancel := context.WithCancel(context.Background())

// 监听系统信号
go func() {
    <-sigCh
    cancel() // 取消context
}()

// Worker中检查context
select {
case <-ctx.Done():
    return // 退出
default:
    // 继续工作
}

cancel() 会通知所有正在监听 ctx.Done() 的 goroutine,实现协调退出。

5. 整体协作流程

main()
  │
  ├─ 创建 context(可取消)
  ├─ 启动 N 个 Worker goroutine
  ├─ 投递种子URL到 taskCh
  ├─ 启动 processResults goroutine
  │
  ├─ 等待...
  │   ├─ Worker 从 taskCh 取任务
  │   ├─ Worker 限流 → 抓取 → 重试
  │   ├─ Worker 结果发到 resultCh
  │   └─ processResults 处理结果、发现新链接
  │
  └─ cancel() 触发 → 所有Worker退出 → 程序结束

❓ 常见问题

Q1: 为什么用 channel 而不是 sync.Mutex 来做并发控制?

channel 不仅能做互斥,还能做通信。信号量 channel 天然地将"并发数限制"和"任务传递"结合在一起——当 channel 满时,新的 goroutine 自动等待;当有 goroutine 释放时,等待的自动唤醒。这比单独用 Mutex + WaitGroup 更简洁。此外,channel 支持 select,可以方便地与 context.Done() 组合,实现带取消的阻塞。

Q2: 去重为什么不在抓取前做,而是在投递任务时做?

实际上两者都需要。在投递任务时去重可以避免往 channel 中塞入重复的任务,节省内存和处理时间。但如果多个 goroutine 几乎同时发现同一个新链接,仅靠投递时去重可能仍有"竞态"——所以 Mark 方法必须是线程安全的。在大型爬虫中,通常使用 布隆过滤器(Bloom Filter)来高效处理海量 URL 去重。

Q3: 为什么不直接关闭程序,而要"优雅退出"?

强制关闭会导致:正在写入的文件损坏、数据库连接未释放、目标服务器收到未完成的请求。优雅退出让每个 worker 完成当前任务后退出,确保数据一致性。context.WithCancel 是 Go 中实现优雅退出的标准模式。

Q4: 这个爬虫能处理 JavaScript 渲染的页面吗?

不能。net/http 只获取原始 HTML,不执行 JavaScript。如果需要抓取 SPA(单页应用),需要集成 headless 浏览器如 ChromedpRod。本教程的架构可以扩展——只需替换 Fetcher 的实现即可。


📖 小节

本节通过一个完整的网页爬虫项目,综合运用了多个 Go 并发核心概念:

概念 应用场景 关键代码
goroutine 多个Worker并行工作 go c.Worker(ctx, i)
channel 任务队列、结果传递、信号量 taskCh, resultCh, semaphore
sync.Mutex 保护去重Map的并发安全 d.mu.Lock()
context 取消传播、优雅退出 ctx.WithCancel
select 多路复用、超时控制 select { case <-ctx.Done() ... }
sync.WaitGroup 等待所有Worker完成 c.wg.Wait()

这些组件协同工作,形成一个健壮的并发系统。掌握这种"channel + context + WaitGroup"的组合模式,是编写生产级 Go 并发程序的基础。


📝 作业

作业 1:添加深度链接提取

当前代码中 processResults 只打印了深度信息,没有实际提取子链接。请修改代码,在抓取成功后从 HTML 中提取链接,并将新的 URL 投递到 taskCh

提示:需要将 Fetch 方法改为同时返回 HTML 内容,然后在 processResults 中调用 ExtractLinks

作业 2:实现结果持久化

添加一个 Saver 结构体,将抓取结果以 JSON 格式写入文件。要求:

作业 3:实现域名级别限流

当前的限流是全局的。请实现一个按域名分组的限流器,使得:


下一课

完成本节的实战练习后,请继续学习 第19课:字符串处理,掌握 Go 中字符串的底层结构、常用操作和性能优化技巧。

Web-Tutorial.com

Web-Tutorial 技术团队

由多位开发者共同维护的编程教程平台。每篇教程由对应领域的开发者编写和审核,确保内容准确可靠。如发现任何问题,欢迎向我们反馈。

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏