并发实战:网页爬虫
并发实战:网页爬虫
生活类比
想象你是一个旅行社的经理,需要同时派出多支小队去不同城市收集旅游信息:
- 限流:公司车辆有限,最多同时派出 3 支小队,避免资源耗尽
- 去重:已经去过的地方不再重复派队
- 重试:如果某支小队遇到暴雨无法到达目的地,休息一会儿再试一次
- 优雅退出:到了下班时间,正在路上的小队完成当前任务后返回,不再接新任务
这正是一个并发网页爬虫的工作方式。让我们用 Go 来实现它。
项目需求
我们要开发一个并发网页爬虫,具备以下能力:
- 并发抓取:多个 goroutine 同时抓取网页
- 限流:控制最大并发数,避免目标服务器过载
- 去重:同一 URL 不重复抓取
- 错误重试:抓取失败时自动重试,带指数退避
- 优雅退出:收到中断信号后,等待正在执行的任务完成再退出
系统设计
┌─────────────┐
│ Seed URLs │ 种子URL队列
└──────┬──────┘
▼
┌─────────────┐
│ URL Queue │ 待抓取URL通道
│ (channel) │
└──────┬──────┘
▼
┌──────────────────────────────────────┐
│ Worker Pool │
│ ┌─────────┐ ┌─────────┐ ┌────────┐ │
│ │Worker 1 │ │Worker 2 │ │Worker N│ │ 有限个工作者
│ └────┬────┘ └────┬────┘ └───┬────┘ │
└───────┼───────────┼──────────┼───────┘
▼ ▼ ▼
┌─────────────────────────────────────┐
│ Results Channel │ 抓取结果汇总
│ ┌──────────┐ ┌──────────┐ │
│ │Dedup Map │ │ Retry │ │ 去重 + 重试
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────┘
完整代码
package main
import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"
)
// ============================================================
// 数据结构定义
// ============================================================
// CrawlTask 表示一个抓取任务
type CrawlTask struct {
URL string // 目标地址
Depth int // 当前深度
}
// CrawlResult 表示一次抓取的结果
type CrawlResult struct {
URL string // 抓取地址
Title string // 页面标题(简化提取)
BodyLength int // 页面内容长度
Depth int // 抓取深度
Err error // 错误信息(如有)
Latency time.Duration // 抓取耗时
}
// CrawlerConfig 爬虫配置
type CrawlerConfig struct {
MaxConcurrency int // 最大并发数
MaxDepth int // 最大抓取深度
MaxRetries int // 最大重试次数
RequestTimeout time.Duration // 单次请求超时
RateInterval time.Duration // 限流间隔
}
// DefaultConfig 返回默认配置
func DefaultConfig() CrawlerConfig {
return CrawlerConfig{
MaxConcurrency: 3,
MaxDepth: 2,
MaxRetries: 3,
RequestTimeout: 10 * time.Second,
RateInterval: 500 * time.Millisecond,
}
}
// ============================================================
// 去重器(线程安全的已访问URL集合)
// ============================================================
// Deduplicator 负责URL去重
type Deduplicator struct {
visited map[string]bool
mu sync.Mutex
}
// NewDeduplicator 创建去重器
func NewDeduplicator() *Deduplicator {
return &Deduplicator{
visited: make(map[string]bool),
}
}
// Mark 标记URL为已访问,返回true表示是新URL
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false // 已经访问过
}
d.visited[url] = true
return true // 新URL
}
// Count 返回已访问URL数量
func (d *Deduplicator) Count() int {
d.mu.Lock()
defer d.mu.Unlock()
return len(d.visited)
}
// ============================================================
// 限流器(基于信号量的并发控制)
// ============================================================
// RateLimiter 基于channel的信号量限流器
type RateLimiter struct {
semaphore chan struct{}
interval time.Duration
}
// NewRateLimiter 创建限流器
// maxConcurrency: 最大并发数
// interval: 两次请求之间的最小间隔
func NewRateLimiter(maxConcurrency int, interval time.Duration) *RateLimiter {
return &RateLimiter{
semaphore: make(chan struct{}, maxConcurrency),
interval: interval,
}
}
// Acquire 获取一个执行许可
func (rl *RateLimiter) Acquire() {
rl.semaphore <- struct{}{} // 发送到缓冲channel,满时阻塞
}
// Release 释放执行许可
func (rl *RateLimiter) Release() {
time.Sleep(rl.interval) // 限流:保持最小间隔
<-rl.semaphore // 从缓冲channel接收,腾出空间
}
// ============================================================
// 网页抓取器
// ============================================================
// Fetcher 负责实际的HTTP请求
type Fetcher struct {
client *http.Client
}
// NewFetcher 创建抓取器
func NewFetcher(timeout time.Duration) *Fetcher {
return &Fetcher{
client: &http.Client{
Timeout: timeout,
},
}
}
// Fetch 抓取指定URL,返回标题和内容长度
func (f *Fetcher) Fetch(ctx context.Context, rawURL string) (title string, bodyLen int, err error) {
// 创建带context的请求,支持取消
req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
if err != nil {
return "", 0, fmt.Errorf("创建请求失败: %w", err)
}
// 设置User-Agent,模拟浏览器
req.Header.Set("User-Agent", "GoCrawler/1.0")
resp, err := f.client.Do(req)
if err != nil {
return "", 0, fmt.Errorf("请求失败: %w", err)
}
defer resp.Body.Close()
// 检查HTTP状态码
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("HTTP状态码: %d", resp.StatusCode)
}
// 读取响应体(限制大小,防止内存溢出)
const maxSize = 1 << 20 // 1MB
body, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
if err != nil {
return "", 0, fmt.Errorf("读取响应失败: %w", err)
}
// 简单提取<title>标签内容
title = extractTitle(string(body))
return title, len(body), nil
}
// extractTitle 从HTML中提取标题
func extractTitle(html string) string {
re := regexp.MustCompile(`(?i)<title>(.*?)</title>`)
matches := re.FindStringSubmatch(html)
if len(matches) > 1 {
return strings.TrimSpace(matches[1])
}
return "(无标题)"
}
// ============================================================
// 链接提取器
// ============================================================
// ExtractLinks 从HTML中提取所有链接(简化版)
func ExtractLinks(body string, baseURL string) []string {
re := regexp.MustCompile(`(?i)href=["']([^"']+)["']`)
matches := re.FindAllStringSubmatch(body, -1)
var links []string
base, err := url.Parse(baseURL)
if err != nil {
return links
}
for _, match := range matches {
if len(match) < 2 {
continue
}
href := match[1]
// 解析相对URL
parsed, err := url.Parse(href)
if err != nil {
continue
}
// 转换为绝对URL
absolute := base.ResolveReference(parsed)
// 只保留HTTP/HTTPS链接
if absolute.Scheme == "http" || absolute.Scheme == "https" {
// 去掉fragment(#锚点)
absolute.Fragment = ""
links = append(links, absolute.String())
}
}
return links
}
// ============================================================
// 爬虫引擎
// ============================================================
// Crawler 爬虫主引擎
type Crawler struct {
config CrawlerConfig
fetcher *Fetcher
dedup *Deduplicator
limiter *RateLimiter
taskCh chan CrawlTask // 任务队列
resultCh chan CrawlResult // 结果通道
wg sync.WaitGroup // 等待所有worker完成
}
// NewCrawler 创建爬虫
func NewCrawler(config CrawlerConfig) *Crawler {
return &Crawler{
config: config,
fetcher: NewFetcher(config.RequestTimeout),
dedup: NewDeduplicator(),
limiter: NewRateLimiter(config.MaxConcurrency, config.RateInterval),
taskCh: make(chan CrawlTask, 100),
resultCh: make(chan CrawlResult, 100),
}
}
// CrawlWithRetry 带重试的抓取(指数退避)
func (c *Crawler) CrawlWithRetry(ctx context.Context, task CrawlTask) CrawlResult {
var lastErr error
for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
// 检查context是否已取消
select {
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
default:
}
// 第一次不需要等待
if attempt > 0 {
// 指数退避:1s, 2s, 4s ...
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
fmt.Printf(" ↻ 重试 %d/%d: %s (等待 %v)\n",
attempt, c.config.MaxRetries, task.URL, backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
}
}
start := time.Now()
title, bodyLen, err := c.fetcher.Fetch(ctx, task.URL)
latency := time.Since(start)
if err == nil {
// 成功
return CrawlResult{
URL: task.URL,
Title: title,
BodyLength: bodyLen,
Depth: task.Depth,
Latency: latency,
}
}
lastErr = err
}
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: fmt.Errorf("重试%d次后失败: %w", c.config.MaxRetries, lastErr),
}
}
// Worker 工作协程
func (c *Crawler) Worker(ctx context.Context, id int) {
defer c.wg.Done()
fmt.Printf("[Worker %d] 启动\n", id)
for task := range c.taskCh {
// 检查context
select {
case <-ctx.Done():
fmt.Printf("[Worker %d] 收到退出信号,停止\n", id)
return
default:
}
// 限流:获取许可
c.limiter.Acquire()
fmt.Printf("[Worker %d] 抓取: %s (深度 %d)\n", id, task.URL, task.Depth)
// 执行抓取(带重试)
result := c.CrawlWithRetry(ctx, task)
// 释放许可
c.limiter.Release()
// 发送结果
select {
case c.resultCh <- result:
case <-ctx.Done():
return
}
}
fmt.Printf("[Worker %d] 退出\n", id)
}
// Start 启动爬虫
func (c *Crawler) Start(ctx context.Context, seedURLs []string) {
// 启动Worker池
for i := 0; i < c.config.MaxConcurrency; i++ {
c.wg.Add(1)
go c.Worker(ctx, i)
}
// 投递种子URL
go func() {
for _, rawURL := range seedURLs {
if c.dedup.Mark(rawURL) {
c.taskCh <- CrawlTask{URL: rawURL, Depth: 0}
}
}
}()
// 启动结果处理和链接发现协程
go c.processResults(ctx)
}
// processResults 处理抓取结果,发现新链接
func (c *Crawler) processResults(ctx context.Context) {
for result := range c.resultCh {
if result.Err != nil {
fmt.Printf("✗ 失败: %s — %v\n", result.URL, result.Err)
continue
}
fmt.Printf("✓ 成功: %s\n", result.URL)
fmt.Printf(" 标题: %s\n", result.Title)
fmt.Printf(" 大小: %d 字节 | 耗时: %v\n", result.BodyLength, result.Latency)
// 如果未达到最大深度,可以继续发现链接
// (此处简化处理,实际项目中需要重新抓取页面获取HTML来提取链接)
if result.Depth < c.config.MaxDepth {
// 演示:将新链接投递到任务队列
// 实际项目中,这里会从result的HTML中提取链接
fmt.Printf(" 深度 %d/%d,可继续发现子链接\n", result.Depth, c.config.MaxDepth)
}
}
}
// Wait 等待所有任务完成
func (c *Crawler) Wait() {
// 关闭任务通道,通知worker不再接收新任务
close(c.taskCh)
// 等待所有worker退出
c.wg.Wait()
// 关闭结果通道
close(c.resultCh)
}
// Stats 返回爬虫统计信息
func (c *Crawler) Stats() (visited int) {
return c.dedup.Count()
}
// ============================================================
// 主程序
// ============================================================
func main() {
fmt.Println("========================================")
fmt.Println(" Go 并发网页爬虫 v1.0")
fmt.Println("========================================")
fmt.Println()
// 加载配置
config := DefaultConfig()
// 可以通过命令行参数或配置文件覆盖
// config.MaxConcurrency = 5
// 创建带取消功能的context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 捕获系统中断信号(Ctrl+C),实现优雅退出
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
fmt.Printf("\n⚠ 收到信号: %v,正在优雅退出...\n", sig)
cancel() // 取消context,通知所有worker
}()
// 创建爬虫
crawler := NewCrawler(config)
// 种子URL列表
seedURLs := []string{
"https://httpbin.org/html",
"https://httpbin.org/links/5",
"https://httpbin.org/range/100",
"https://httpbin.org/delay/1",
}
fmt.Printf("配置: 并发=%d, 深度=%d, 重试=%d\n",
config.MaxConcurrency, config.MaxDepth, config.MaxRetries)
fmt.Printf("种子URL: %d 个\n", len(seedURLs))
fmt.Println("----------------------------------------")
// 启动爬虫
crawler.Start(ctx, seedURLs)
// 等待一段时间后自动关闭(实际项目中可能用其他条件判断)
go func() {
time.Sleep(30 * time.Second)
fmt.Println("\n⏱ 超时,触发关闭...")
cancel()
}()
// 等待所有任务完成
crawler.Wait()
// 输出统计信息
fmt.Println("----------------------------------------")
fmt.Printf("抓取完成!共访问 %d 个URL\n", crawler.Stats())
}
代码解析
1. 限流机制:信号量 Channel
// 缓冲channel作为信号量
semaphore: make(chan struct{}, maxConcurrency)
// 获取许可:channel满时自动阻塞
rl.semaphore <- struct{}{}
// 释放许可
<-rl.semaphore
缓冲大小为 N 的 channel 最多允许 N 个 goroutine 同时持有许可,第 N+1 个 goroutine 会被阻塞。这比 time.Ticker 更精确地控制并发数。
2. 去重机制:互斥锁保护的 Map
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false
}
d.visited[url] = true
return true
}
多个 goroutine 可能同时发现同一个 URL,Mark 方法用互斥锁保证原子性——只有一个 goroutine 能"抢到"这个 URL。
3. 错误重试:指数退避
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
| 重试次数 | 等待时间 |
|---|---|
| 第1次 | 1秒 |
| 第2次 | 2秒 |
| 第3次 | 4秒 |
指数退避避免在服务器故障时"雪崩式"重试,给服务器恢复的时间。
4. 优雅退出:Context + Signal
ctx, cancel := context.WithCancel(context.Background())
// 监听系统信号
go func() {
<-sigCh
cancel() // 取消context
}()
// Worker中检查context
select {
case <-ctx.Done():
return // 退出
default:
// 继续工作
}
cancel() 会通知所有正在监听 ctx.Done() 的 goroutine,实现协调退出。
5. 整体协作流程
main()
│
├─ 创建 context(可取消)
├─ 启动 N 个 Worker goroutine
├─ 投递种子URL到 taskCh
├─ 启动 processResults goroutine
│
├─ 等待...
│ ├─ Worker 从 taskCh 取任务
│ ├─ Worker 限流 → 抓取 → 重试
│ ├─ Worker 结果发到 resultCh
│ └─ processResults 处理结果、发现新链接
│
└─ cancel() 触发 → 所有Worker退出 → 程序结束
❓ 常见问题
Q1: 为什么用 channel 而不是 sync.Mutex 来做并发控制?
channel 不仅能做互斥,还能做通信。信号量 channel 天然地将"并发数限制"和"任务传递"结合在一起——当 channel 满时,新的 goroutine 自动等待;当有 goroutine 释放时,等待的自动唤醒。这比单独用 Mutex + WaitGroup 更简洁。此外,channel 支持 select,可以方便地与 context.Done() 组合,实现带取消的阻塞。
Q2: 去重为什么不在抓取前做,而是在投递任务时做?
实际上两者都需要。在投递任务时去重可以避免往 channel 中塞入重复的任务,节省内存和处理时间。但如果多个 goroutine 几乎同时发现同一个新链接,仅靠投递时去重可能仍有"竞态"——所以 Mark 方法必须是线程安全的。在大型爬虫中,通常使用 布隆过滤器(Bloom Filter)来高效处理海量 URL 去重。
Q3: 为什么不直接关闭程序,而要"优雅退出"?
强制关闭会导致:正在写入的文件损坏、数据库连接未释放、目标服务器收到未完成的请求。优雅退出让每个 worker 完成当前任务后退出,确保数据一致性。context.WithCancel 是 Go 中实现优雅退出的标准模式。
Q4: 这个爬虫能处理 JavaScript 渲染的页面吗?
不能。net/http 只获取原始 HTML,不执行 JavaScript。如果需要抓取 SPA(单页应用),需要集成 headless 浏览器如 Chromedp 或 Rod。本教程的架构可以扩展——只需替换 Fetcher 的实现即可。
📖 小节
本节通过一个完整的网页爬虫项目,综合运用了多个 Go 并发核心概念:
| 概念 | 应用场景 | 关键代码 |
|---|---|---|
| goroutine | 多个Worker并行工作 | go c.Worker(ctx, i) |
| channel | 任务队列、结果传递、信号量 | taskCh, resultCh, semaphore |
| sync.Mutex | 保护去重Map的并发安全 | d.mu.Lock() |
| context | 取消传播、优雅退出 | ctx.WithCancel |
| select | 多路复用、超时控制 | select { case <-ctx.Done() ... } |
| sync.WaitGroup | 等待所有Worker完成 | c.wg.Wait() |
这些组件协同工作,形成一个健壮的并发系统。掌握这种"channel + context + WaitGroup"的组合模式,是编写生产级 Go 并发程序的基础。
📝 作业
作业 1:添加深度链接提取
当前代码中 processResults 只打印了深度信息,没有实际提取子链接。请修改代码,在抓取成功后从 HTML 中提取链接,并将新的 URL 投递到 taskCh。
提示:需要将 Fetch 方法改为同时返回 HTML 内容,然后在 processResults 中调用 ExtractLinks。
作业 2:实现结果持久化
添加一个 Saver 结构体,将抓取结果以 JSON 格式写入文件。要求:
- 使用单独的 goroutine 从
resultCh读取并写入文件 - 文件每行一条记录(JSON Lines 格式)
- 支持并发安全写入
作业 3:实现域名级别限流
当前的限流是全局的。请实现一个按域名分组的限流器,使得:
- 同一域名(如
example.com)每秒最多请求 2 次 - 不同域名之间互不影响
- 提示:维护一个
map[string]*RateLimiter
下一课
完成本节的实战练习后,请继续学习 第19课:字符串处理,掌握 Go 中字符串的底层结构、常用操作和性能优化技巧。



