تطبيق التزامن: زاحف الويب
تطبيق التزامن: زاحف الويب
تشبيه
تخيل أنك مدير وكالة سفر وتحتاج لإرسال فرق متعددة إلى مدن مختلفة في نفس الوقت لجمع معلومات السفر:
- تحديد المعدل: الشركة لديها مركبات محدودة، لذا يمكن إرسال 3 فرق كحد أقصى في كل مرة لتجنب نفاد الموارد
- إزالة التكرار: الفرق لا تعيد زيارة الأماكن التي زارتها بالفعل
- إعادة المحاولة: إذا واجهت فريق عاصفة مطرية شديدة ولا تستطيع الوصول إلى وجهتها، يستريحون ويحاولون مرة أخرى
- الإغلاق السلس: عند وقت الإغلاق، الفرق الموجودة بالطريق تُكمل مهامها الحالية قبل العودة، ولا تُوكل مهام جديدة
هذا بالضبط كيف يعمل زاحف الويب المتزامن. لننفذه في Go.
متطلبات المشروع
سنطور زاحف ويب متزامن بالقدرات التالية:
- زحف متزامن: عدة goroutines تحف صفحات الويب في نفس الوقت
- تحديد المعدل: التحكم في الحد الأقصى للتزامن لتجنب إغراق الخادم المستهدف
- إزالة التكرار: نفس الـ URL لا يُزحف مرتين
- إعادة المحاولة عند الخطأ: إعادة محاولة تلقائية عند فشل الزحف مع ارتداد أُسّي
- الإغلاق السلس: بعد استقبال إشارة المقاطعة، انتظر اكتمال المهام الجارية قبل الخروج
تصميم النظام
┌─────────────┐
│ بذور URLs │ طابور URL البذور
└──────┬──────┘
▼
┌─────────────┐
│ طابور URLs │ قناة URLs المراد زحفها
│ (channel) │
└──────┬──────┘
▼
┌──────────────────────────────────────┐
│ تجمع العمال │
│ ┌─────────┐ ┌─────────┐ ┌────────┐ │
│ │العامل 1 │ │العامل 2 │ │العامل N│ │ عدد محدود من العمال
│ └────┬────┘ └────┬────┘ └───┬────┘ │
└───────┼───────────┼──────────┼───────┘
▼ ▼ ▼
┌─────────────────────────────────────┐
│ قناة النتائج │ تجميع نتائج الزحف
│ ┌──────────┐ ┌──────────┐ │
│ │خريطة │ │ إعادة │ │ إزالة التكرار + إعادة المحاولة
│ │إزالة التكرار│ │ المحاولة │ │
│ └──────────┘ └──────────┘ │
└─────────────────────────────────────┘
الكود الكامل
package main
import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"
)
// ============================================================
// تعريفات هياكل البيانات
// ============================================================
// CrawlTask تمثل مهمة زحف
type CrawlTask struct {
URL string // URL المستهدف
Depth int // العمق الحالي
}
// CrawlResult تمثل نتيجة الزحف
type CrawlResult struct {
URL string // URL المزحف
Title string // عنوان الصفحة (استخراج مبسط)
BodyLength int // طول محتوى الصفحة
Depth int // عمق الزحف
Err error // معلومات الخطأ (إن وجد)
Latency time.Duration // مدة الزحف
}
// CrawlerConfig تكوين الزاحف
type CrawlerConfig struct {
MaxConcurrency int // الحد الأقصى للتزامن
MaxDepth int // الحد الأقصى لعمق الزحف
MaxRetries int // الحد الأقصى لإعادة المحاولة
RequestTimeout time.Duration // مهلة لكل طلب
RateInterval time.Duration // فترة تحديد المعدل
}
// DefaultConfig إرجاع التكوين الافتراضي
func DefaultConfig() CrawlerConfig {
return CrawlerConfig{
MaxConcurrency: 3,
MaxDepth: 2,
MaxRetries: 3,
RequestTimeout: 10 * time.Second,
RateInterval: 500 * time.Millisecond,
}
}
// ============================================================
// مُزيل التكرار (مجموعة URLs مزورة آمنة)
// ============================================================
// Deduplicator يعالج إزالة تكرار URLs
type Deduplicator struct {
visited map[string]bool
mu sync.Mutex
}
// NewDeduplicator إنشاء مُزيل تكرار
func NewDeduplicator() *Deduplicator {
return &Deduplicator{
visited: make(map[string]bool),
}
}
// Mark تمييز URL كمُزار؛ يُرجع true إذا كان URL جديدًا
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false // مُزار بالفعل
}
d.visited[url] = true
return true // URL جديد
}
// Count إرجاع عدد URLs المُزارة
func (d *Deduplicator) Count() int {
d.mu.Lock()
defer d.mu.Unlock()
return len(d.visited)
}
// ============================================================
// محدد المعدل (تحكم في التزامن بالسمافور)
// ============================================================
// RateLimiter محدد معدّل سمافور قائم على القناة
type RateLimiter struct {
semaphore chan struct{}
interval time.Duration
}
// NewRateLimiter إنشاء محدد معدّل
// maxConcurrency: الحد الأقصى للتزامن
// interval: الحد الأدنى للوقت بين طلبين
func NewRateLimiter(maxConcurrency int, interval time.Duration) *RateLimiter {
return &RateLimiter{
semaphore: make(chan struct{}, maxConcurrency),
interval: interval,
}
}
// Acquire الحصول على تصريح تنفيذ
func (rl *RateLimiter) Acquire() {
rl.semaphore <- struct{}{} // إرسال إلى قناة مخزنة؛ يحظر عند الامتلاء
}
// Release تحرير تصريح تنفيذ
func (rl *RateLimiter) Release() {
time.Sleep(rl.interval) // تحديد المعدل: الحفاظ على الحد الأدنى للفترة
<-rl.semaphore // استقبال من قناة مخزنة، تحرير مساحة
}
// ============================================================
// جالب الويب
// ============================================================
// Fetcher يعالج طلبات HTTP الفعلية
type Fetcher struct {
client *http.Client
}
// NewFetcher إنشاء جالب
func NewFetcher(timeout time.Duration) *Fetcher {
return &Fetcher{
client: &http.Client{
Timeout: timeout,
},
}
}
// Fetch جلب URL المحدد وإرجاع العنوان وطول المحتوى
func (f *Fetcher) Fetch(ctx context.Context, rawURL string) (title string, bodyLen int, err error) {
// إنشاء طلب مع سياق لدعم الإلغاء
req, err := http.NewRequestWithContext(ctx, http.MethodGet, rawURL, nil)
if err != nil {
return "", 0, fmt.Errorf("فشل إنشاء الطلب: %w", err)
}
// تعيين User-Agent لمحاكاة المتصفح
req.Header.Set("User-Agent", "GoCrawler/1.0")
resp, err := f.client.Do(req)
if err != nil {
return "", 0, fmt.Errorf("فشل الطلب: %w", err)
}
defer resp.Body.Close()
// التحقق من رمز حالة HTTP
if resp.StatusCode != http.StatusOK {
return "", 0, fmt.Errorf("رمز حالة HTTP: %d", resp.StatusCode)
}
// قراءة جسم الاستجابة (تحديد الحجم لمنع فيض الذاكرة)
const maxSize = 1 << 20 // 1MB
body, err := io.ReadAll(io.LimitReader(resp.Body, maxSize))
if err != nil {
return "", 0, fmt.Errorf("فشل قراءة الاستجابة: %w", err)
}
// استخراج مبسط لمحتوى وسم <title>
title = extractTitle(string(body))
return title, len(body), nil
}
// extractTitle استخراج العنوان من HTML
func extractTitle(html string) string {
re := regexp.MustCompile(`(?i)<title>(.*?)</title>`)
matches := re.FindStringSubmatch(html)
if len(matches) > 1 {
return strings.TrimSpace(matches[1])
}
return "(بدون عنوان)"
}
// ============================================================
// مستخرج الروابط
// ============================================================
// ExtractLinks استخراج جميع الروابط من HTML (نسخة مبسطة)
func ExtractLinks(body string, baseURL string) []string {
re := regexp.MustCompile(`(?i)href=["']([^"']+)["']`)
matches := re.FindAllStringSubmatch(body, -1)
var links []string
base, err := url.Parse(baseURL)
if err != nil {
return links
}
for _, match := range matches {
if len(match) < 2 {
continue
}
href := match[1]
// تحليل URL نسبي
parsed, err := url.Parse(href)
if err != nil {
continue
}
// تحويل إلى URL مطلق
absolute := base.ResolveReference(parsed)
// الاحتفاظ فقط بروابط HTTP/HTTPS
if absolute.Scheme == "http" || absolute.Scheme == "https" {
// إزالة الجزء (#anchor)
absolute.Fragment = ""
links = append(links, absolute.String())
}
}
return links
}
// ============================================================
// محرك الزاحف
// ============================================================
// Crawler هو محرك الزاحف الرئيسي
type Crawler struct {
config CrawlerConfig
fetcher *Fetcher
dedup *Deduplicator
limiter *RateLimiter
taskCh chan CrawlTask // طابور المهام
resultCh chan CrawlResult // قناة النتائج
wg sync.WaitGroup // انتظار اكتمال جميع العمال
}
// NewCrawler إنشاء زاحف
func NewCrawler(config CrawlerConfig) *Crawler {
return &Crawler{
config: config,
fetcher: NewFetcher(config.RequestTimeout),
dedup: NewDeduplicator(),
limiter: NewRateLimiter(config.MaxConcurrency, config.RateInterval),
taskCh: make(chan CrawlTask, 100),
resultCh: make(chan CrawlResult, 100),
}
}
// CrawlWithRetry تنفيذ زحف مع إعادة المحاولة (ارتداد أُسّي)
func (c *Crawler) CrawlWithRetry(ctx context.Context, task CrawlTask) CrawlResult {
var lastErr error
for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
// التحقق مما إذا كان السياق ملغيًا
select {
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
default:
}
// لا حاجة للانتظار في المحاولة الأولى
if attempt > 0 {
// ارتداد أُسّي: 1 ثانية، 2 ثانية، 4 ثوانٍ ...
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
fmt.Printf(" ↻ إعادة المحاولة %d/%d: %s (انتظار %v)\n",
attempt, c.config.MaxRetries, task.URL, backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: ctx.Err(),
}
}
}
start := time.Now()
title, bodyLen, err := c.fetcher.Fetch(ctx, task.URL)
latency := time.Since(start)
if err == nil {
// نجاح
return CrawlResult{
URL: task.URL,
Title: title,
BodyLength: bodyLen,
Depth: task.Depth,
Latency: latency,
}
}
lastErr = err
}
return CrawlResult{
URL: task.URL,
Depth: task.Depth,
Err: fmt.Errorf("فشل بعد %d محاولات: %w", c.config.MaxRetries, lastErr),
}
}
// Worker goroutine العامل
func (c *Crawler) Worker(ctx context.Context, id int) {
defer c.wg.Done()
fmt.Printf("[العامل %d] بدأ\n", id)
for task := range c.taskCh {
// التحقق من السياق
select {
case <-ctx.Done():
fmt.Printf("[العامل %d] استقبل إشارة الخروج، توقف\n", id)
return
default:
}
// تحديد المعدل: الحصول على تصريح
c.limiter.Acquire()
fmt.Printf("[العامل %d] زحف: %s (عمق %d)\n", id, task.URL, task.Depth)
// تنفيذ الزحف (مع إعادة المحاولة)
result := c.CrawlWithRetry(ctx, task)
// تحرير التصريح
c.limiter.Release()
// إرسال النتيجة
select {
case c.resultCh <- result:
case <-ctx.Done():
return
}
}
fmt.Printf("[العامل %d] خروج\n", id)
}
// Start بدء الزاحف
func (c *Crawler) Start(ctx context.Context, seedURLs []string) {
// بدء تجمع العمال
for i := 0; i < c.config.MaxConcurrency; i++ {
c.wg.Add(1)
go c.Worker(ctx, i)
}
// تقديم بذور URLs
go func() {
for _, rawURL := range seedURLs {
if c.dedup.Mark(rawURL) {
c.taskCh <- CrawlTask{URL: rawURL, Depth: 0}
}
}
}()
// بدء goroutine معالجة النتائج واكتشاف الروابط
go c.processResults(ctx)
}
// processResults معالجة نتائج الزحف واكتشاف روابط جديدة
func (c *Crawler) processResults(ctx context.Context) {
for result := range c.resultCh {
if result.Err != nil {
fmt.Printf("✗ فشل: %s — %v\n", result.URL, result.Err)
continue
}
fmt.Printf("✓ نجح: %s\n", result.URL)
fmt.Printf(" العنوان: %s\n", result.Title)
fmt.Printf(" الحجم: %d بايت | الوقت: %v\n", result.BodyLength, result.Latency)
// إذا لم يُبلغ الحد الأقصى للعمق، يمكن الاستمرار في اكتشاف الروابط
// (مبسط هنا؛ في الإنتاج، ستعيد زحف الصفحة للحصول على HTML لاستخراج الروابط)
if result.Depth < c.config.MaxDepth {
// عرض توضيحي: نشر روابط جديدة في طابور المهام
// في الإنتاج، ستستخرج الروابط من HTML النتيجة هنا
fmt.Printf(" العمق %d/%d، يمكن الاستمرار في اكتشاف الروابط الفرعية\n", result.Depth, c.config.MaxDepth)
}
}
}
// Wait انتظار اكتمال جميع المهام
func (c *Crawler) Wait() {
// إغلاق قناة المهام، إخطار العمال بأنه لا مزيد من المهام
close(c.taskCh)
// انتظار خروج جميع العمال
c.wg.Wait()
// إغلاق قناة النتائج
close(c.resultCh)
}
// Stats إرجاع إحصائيات الزاحف
func (c *Crawler) Stats() (visited int) {
return c.dedup.Count()
}
// ============================================================
// البرنامج الرئيسي
// ============================================================
func main() {
fmt.Println("========================================")
fmt.Println(" زاحف ويب Go المتزامن v1.0")
fmt.println("========================================")
fmt.Println()
// تحميل التكوين
config := DefaultConfig()
// يمكن تجاوزه عبر وسيطات سطر الأوامر أو ملف التكوين
// config.MaxConcurrency = 5
// إنشاء سياق قابل للإلغاء
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// التقاط إشارة مقاطعة النظام (Ctrl+C) للإغلاق السلس
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
fmt.Printf("\n⚠ استقبلت إشارة: %v، جارٍ الإغلاق السلس...\n", sig)
cancel() // إلغاء السياق، إخطار جميع العمال
}()
// إنشاء الزاحف
crawler := NewCrawler(config)
// قائمة بذور URLs
seedURLs := []string{
"https://httpbin.org/html",
"https://httpbin.org/links/5",
"https://httpbin.org/range/100",
"https://httpbin.org/delay/1",
}
fmt.Printf("التكوين: التزامن=%d، العمق=%d، المحاولات=%d\n",
config.MaxConcurrency, config.MaxDepth, config.MaxRetries)
fmt.Printf("بذور URLs: %d\n", len(seedURLs))
fmt.Println("----------------------------------------")
// بدء الزاحف
crawler.Start(ctx, seedURLs)
// إغلاق تلقائي بعد فترة (في الإنتاج، استخدم شروطًا أخرى)
go func() {
time.Sleep(30 * time.Second)
fmt.Println("\n⏱ انتهت المهلة، تفعيل الإغلاق...")
cancel()
}()
// انتظار اكتمال جميع المهام
crawler.Wait()
// طباعة الإحصائيات
fmt.Println("----------------------------------------")
fmt.Printf("اكتمل الزحف! زُرت %d URLs\n", crawler.Stats())
}
تحليل الكود
1. تحديد المعدل: قناة السمافور
// قناة مخزنة كسمافور
semaphore: make(chan struct{}, maxConcurrency)
// الحصول على تصريح: يحظر عند امتلاء القناة
rl.semaphore <- struct{}{}
// تحرير تصريح
<-rl.semaphore
قناة بحجم مخزن N تسمح لـ N goroutines كحد أقصى بامتلاك تصاريح في نفس الوقت؛ الـ goroutine رقم N+1 ستحظر. هذا يتحكم في التزامن بدقة أكبر من time.Ticker.
2. إزالة التكرار: خريطة محمية بقفل
func (d *Deduplicator) Mark(url string) bool {
d.mu.Lock()
defer d.mu.Unlock()
if d.visited[url] {
return false
}
d.visited[url] = true
return true
}
عدة goroutines قد تكتشف نفس الـ URL في نفس الوقت؛ طريقة Mark تستخدم قفلًا لضمان الذرية — فقط goroutine واحد يمكنه "المطالبة" بذلك الـ URL.
3. إعادة المحاولة عند الخطأ: ارتداد أُسّي
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * time.Second
| المحاولة # | وقت الانتظار |
|---|---|
| الأولى | 1 ثانية |
| الثانية | 2 ثانية |
| الثالثة | 4 ثوانٍ |
الارتداد الأُسّي يمنع إعادة المحاولة "الانهيارية" أثناء أعطال الخادم، مما يُعطي الخادم وقتًا للتعافي.
4. الإغلاق السلس: السياق + الإشارة
ctx, cancel := context.WithCancel(context.Background())
// الاستماع لإشارات النظام
go func() {
<-sigCh
cancel() // إلغاء السياق
}()
// العامل يتحقق من السياق
select {
case <-ctx.Done():
return // خروج
default:
// الاستمرار في العمل
}
cancel() تُخطّر جميع goroutines التي تستمع إلى ctx.Done()، مما يُحقق إغلاقًا منسقًا.
5. تدفق التعاون الإجمالي
main()
│
├─ إنشاء سياق (قابل للإلغاء)
├─ بدء N Worker goroutines
├─ تقديم بذور URLs إلى taskCh
├─ بدء goroutine processResults
│
├─ انتظار...
│ ├─ العامل يأخذ مهام من taskCh
│ ├─ العامل يحدد المعدل → زحف → إعادة محاولة
│ ├─ العامل يرسل النتائج إلى resultCh
│ └─ processResults تعالج النتائج، تكتشف روابط جديدة
│
└─ cancel() يُفعّل → جميع العمال يخرجون → البرنامج ينتهي
❓ أسئلة شائعة
س1: لماذا تستخدم قناة بدلًا من sync.Mutex للتحكم في التزامن؟
القنوات لا توفر فقط الإقصاء المتبادل بل أيضًا التواصل. قناة السمافور تجمع طبيعيًا بين "تحديد التزامن" و"تمرير المهام" — عندما تمتلئ القناة، تنتظر goroutines الجديدة تلقائيًا؛ عندما تُحرر goroutine، تُيقظ المنتظرة تلقائيًا. هذا أكثر إيجازًا من استخدام Mutex + WaitGroup منفصلة. علاوة على ذلك، القنوات تدعم select، مما يُسهل الدمج مع context.Done() لحظر قابل للإلغاء.
س2: لماذا تُزيل التكرار عند تقديم المهام بدلًا من قبل الزحف؟
في الواقع، كلاهما مطلوب. إزالة التكرار عند تقديم المهام تمنع دفع مهام مكررة في القناة، مما يوفر الذاكرة ووقت المعالجة. ومع ذلك، إذا اكتشفت عدة goroutines نفس الرابط الجديد تقريبًا في نفس الوقت، فإن إزالة التكرار عند التقديم وحدها قد لا تزال بها "مسابقات" — لذا يجب أن تكون طريقة Mark آمنة للخيوط. في الزاحفات كبيرة النطاق، عادةً يُستخدم Bloom Filter للمعالجة الفعالة لإزالة تكرار URLs الضخمة.
س3: لماذا لا تقتل البرنامج ببساطة؟ لماذا "الإغلاق السلس"؟
القتل القسري قد يُسبب: ملفات تالفة تُكتب، اتصالات قاعدة بيانات غير مُحررة، طلبات غير مكتملة تصل إلى الخادم المستهدف. الإغلاق السلس يسمح لكل عامل بإكمال مهمته الحالية قبل الخروج، مما يضمن اتساق البيانات. context.WithCancel هو نمط Go القياسي لتنفيذ الإغلاق السلس.
س4: هل يمكن لهذا الزاحف التعامل مع صفحات المُصاغة بـ JavaScript؟
لا. net/http يجلب HTML الخام فقط دون تنفيذ JavaScript. إذا كنت تحتاج لزحف SPAs (تطبيقات الصفحة الواحدة)، ستحتاج لدمج متصفح بدون واجهة مثل Chromedp أو Rod. البنية في هذا الدرس قابلة للتوسيع — فقط استبدل تنفيذ Fetcher.
📖 ملخص
هذا القسم طبّق بشكل شامل عدة مفاهيم أساسية في التزامن عبر Go من خلال مشروع زاحف ويب كامل:
| المفهوم | التطبيق | الكود الرئيسي |
|---|---|---|
| goroutine | عدة عمال يعملون بالتوازي | go c.Worker(ctx, i) |
| channel | طابور المهام، تمرير النتائج، السمافور | taskCh، resultCh، semaphore |
| sync.Mutex | حماية سلامة خريطة إزالة التكرار | d.mu.Lock() |
| context | نشر الإلغاء، إغلاق سلس | ctx.WithCancel |
| select | تعدد، التحكم في المهلة | select { case <-ctx.Done() ... } |
| sync.WaitGroup | انتظار اكتمال جميع العمال | c.wg.Wait() |
هذه المكونات تعمل معًا لتكوين نظام متزامن قوي. إتقان نمط "القناة + السياق + WaitGroup" هذا هو الأساس لكتابة برامج Go متزامنة جاهزة للإنتاج.
📝 تمارين
التمرين 1: إضافة استخراج روابط عميقة
حاليًا، processResults تطبع فقط معلومات العمق دون استخراج الروابط الفرعية فعليًا. عدّل الكود لاستخراج الروابط من HTML بعد الزحف الناجح ونشر URLs جديدة في taskCh.
تلميح: ستحتاج لتعديل طريقة Fetch لإرجاع محتوى HTML أيضًا، ثم استدعاء ExtractLinks في processResults.
التمرين 2: تنفيذ استمرارية النتائج
أضف هيكل Saver يكتب نتائج الزحف إلى ملف بتنسيق JSON. المتطلبات:
- استخدم goroutine منفصلة تقرأ من
resultChوتكتب في الملف - سجل واحد لكل سطر (تنسيق JSON Lines)
- دعم الكتابة المتزامنة الآمنة
التمرين 3: تنفيذ تحديد المعدل على مستوى النطاق
التحديد الحالي للمعدل عالمي. نفّذ محدد معدّل مُجمّع حسب النطاق يسمح:
- بأقصى 2 طلب في الثانية لنفس النطاق (مثل
example.com) - النطاقات المختلفة لا تؤثر على بعضها
- تلميح: حافظ على
map[string]*RateLimiter
الدرس التالي
بعد إكمال هذا التمرين العملي، تابع مع الدرس 19: معالجة النصوص لإتقان بنية النصوص الداخلية في Go والعمليات الشائعة وتقنيات تحسين الأداء.



