تطبيق التزامن: مُجدول المهام

تطبيق التزامن: مُجدول المهام

تخيل مركز فرز الطرود: شريط ناقل يُوصل الطرود (المهام) باستمرار، وعدة فارزين (goroutines) يعملون في نفس الوقت — بعضهم يمسح، بعضهم يُصنف، بعضهم يُحمّل. كل مرحلة تمرر الطرود عبر خط أنابيب (channel)، والمُجدول يراقب التقدم الإجمالي، ويُعالج أي مشكلات فورًا. هكذا يعمل مُجدول المهام المتزامن — عدة عمال يتعاونون لإكمال عدد كبير من المهام، بكفاءة وموثوقية.

في هذا الدرس، سنُطبّق بشكل شامل الـ goroutines و القنوات و select و حزمة sync و context.Context التي تعلمناها لبناء مُجدول مهام متزامن كامل الوظائف من الصفر.


متطلبات المشروع

نحتاج لبناء مُجدول مهام يستوفي المتطلبات التالية:

  1. تنفيذ متزامن: دعم عدة عمال لمعالجة المهام في نفس الوقت
  2. تقديم المهام: الأنظمة الخارجية يمكنها تقديم المهام ديناميكيًا
  3. التحكم في المهلة: المهام الفردية تُلغى تلقائيًا بعد انتهاء المهلة
  4. إعادة المحاولة عند الفشل: المهام الفاشلة تُعاد محاولتها تلقائيًا، حتى N مرة
  5. جمع النتائج: جمع جميع نتائج المهام بشكل متزامن وآمن
  6. الإغلاق السلس: بعد استقبال إشارة الإلغاء، انتظر اكتمال المهام الجارية قبل الخروج

تصميم النظام

البنية العامة مقسمة إلى ثلاث طبقات:

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│ تقديم المهام │────▶│   المُجدول    │────▶│ تجمع العمال  │
│  (المنتج)    │     │   المحرك     │     │  (العمال)    │
└─────────────┘     └──────────────┘     └─────────────┘
                           │                     │
                           ▼                     ▼
                     ┌──────────────┐     ┌─────────────┐
                     │   جمع       │◀────│    تنفيذ    │
                     │  النتائج     │     │   المهام     │
                     └──────────────┘     └─────────────┘

المكونات الأساسية:

المكون المسؤولية التنفيذ
Task تمثل مهمة قابلة للتنفيذ هيكل + نوع دالة
Result تخزن نتيجة تنفيذ المهمة هيكل + قناة
Worker ينفذ مهام محددة goroutine
Scheduler يُنسق توزيع المهام وتنفيذها قناة + select
ResultCollector يجمع النتائج بشكل متزامن وآمن sync.Mutex

تنفيذ الكود الكامل

1. تعريف هياكل البيانات

GO
package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task تمثل مهمة يجب تنفيذها
type Task struct {
	ID      int                          // مُعرّف المهمة الفريد
	Payload string                       // بيانات المهمة
	Execute func(ctx context.Context) (string, error) // دالة تنفيذ المهمة
}

// Result تمثل نتيجة تنفيذ مهمة
type Result struct {
	TaskID   int    // مُعرّف المهمة المقابل
	Output   string // الإخراج عند التنفيذ الناجح
	Error    error  // الخطأ عند التنفيذ الفاشل
	Attempts int    // عدد المحاولات الفعلي
}

// ResultCollector يجمع نتائج المهام بشكل متزامن وآمن
type ResultCollector struct {
	mu      sync.Mutex
	results []Result
}

// Add إضافة نتيجة (متزامن)
func (rc *ResultCollector) Add(r Result) {
	rc.mu.Lock()
	defer rc.mu.Unlock()
	rc.results = append(rc.results, r)
}

// All إرجاع جميع النتائج المجمعة
func (rc *ResultCollector) All() []Result {
	rc.mu.Lock()
	defer rc.mu.Unlock()
	// إرجاع نسخة لتجنب التعديل الخارجي
	out := make([]Result, len(rc.results))
	copy(out, rc.results)
	return out
}

2. تنفيذ العامل

GO
// Worker يستقبل المهام من قناة المهام، ينفذها، ويرسل النتائج إلى قناة النتائج
func Worker(
	ctx context.Context,
	id int,
	tasks <-chan Task,
	results chan<- Result,
	maxRetries int,
	wg *sync.WaitGroup,
) {
	defer wg.Done()

	for task := range tasks {
		// التحقق من الإلغاء
		select {
		case <-ctx.Done():
			fmt.Printf("[العامل %d] السياق أُلغي، خروج\n", id)
			return
		default:
		}

		fmt.Printf("[العامل %d] بدء المهمة #%d\n", id, task.ID)

		var output string
		var err error
		attempts := 0

		// منطق التنفيذ مع إعادة المحاولة
		for attempts < maxRetries {
			attempts++

			// إنشاء سياق فرعي مع مهلة لكل مهمة (أقصى 2 ثانية)
			taskCtx, taskCancel := context.WithTimeout(ctx, 2*time.Second)
			output, err = task.Execute(taskCtx)
			taskCancel()

			if err == nil {
				break // نجاح، خروج من حلقة إعادة المحاولة
			}

			fmt.Printf("[العامل %d] المهمة #%d المحاولة %d فشلت: %v\n",
				id, task.ID, attempts, err)

			if attempts < maxRetries {
				// الانتظار قبل إعادة المحاولة، مع الاستماع للإلغاء
				select {
				case <-ctx.Done():
					fmt.Printf("[العامل %d] استقبل إشارة الإلغاء أثناء انتظار إعادة المحاولة\n", id)
					results <- Result{TaskID: task.ID, Error: ctx.Err(), Attempts: attempts}
					return
				case <-time.After(time.Duration(attempts) * 500 * time.Millisecond):
					// ارتداد خطي: المحاولة الأولى 500ms، الثانية 1s، الثالثة 1.5s ...
				}
			}
		}

		results <- Result{
			TaskID:   task.ID,
			Output:   output,
			Error:    err,
			Attempts: attempts,
		}

		if err != nil {
			fmt.Printf("[العامل %d] المهمة #%d فشلت نهائيًا (حاولت %d مرات)\n", id, task.ID, attempts)
		} else {
			fmt.Printf("[العامل %d] المهمة #%d اكتملت\n", id, task.ID)
		}
	}

	fmt.Printf("[العامل %d] قناة المهام أُغلقت، خروج\n", id)
}

3. تنفيذ المُجدول

GO
// Scheduler محرك جدولة المهام
type Scheduler struct {
	workerCount int            // عدد العمال
	maxRetries  int            // الحد الأقصى لإعادة المحاولة
	taskTimeout time.Duration  // مهلة لكل مهمة
}

// NewScheduler إنشاء مُجدول جديد
func NewScheduler(workerCount, maxRetries int, taskTimeout time.Duration) *Scheduler {
	return &Scheduler{
		workerCount: workerCount,
		maxRetries:  maxRetries,
		taskTimeout: taskTimeout,
	}
}

// Run بدء المُجدول، معالجة قائمة المهام المعطاة، وإرجاع جميع النتائج
func (s *Scheduler) Run(ctx context.Context, taskList []Task) []Result {
	// إنشاء القنوات
	tasks := make(chan Task, len(taskList))
	results := make(chan Result, len(taskList))

	// جامع النتائج
	collector := &ResultCollector{}

	// بدء goroutine جمع النتائج
	var collectorWg sync.WaitGroup
	collectorWg.Add(1)
	go func() {
		defer collectorWg.Done()
		for r := range results {
			collector.Add(r)
		}
	}()

	// بدء تجمع العمال
	var workerWg sync.WaitGroup
	for i := 1; i <= s.workerCount; i++ {
		workerWg.Add(1)
		go Worker(ctx, i, tasks, results, s.maxRetries, &workerWg)
	}

	// تقديم جميع المهام
	go func() {
		for _, task := range taskList {
			select {
			case tasks <- task:
				fmt.Printf("[المُجدول] قدم المهمة #%d\n", task.ID)
			case <-ctx.Done():
				fmt.Println("[المُجدول] السياق أُلغي، إيقاف تقديم المهام")
				return
			}
		}
		close(tasks) // إغلاق قناة المهام، إخطار العمال بأنه لا مزيد من المهام
	}()

	// انتظار اكتمال جميع العمال
	workerWg.Wait()
	close(results) // إغلاق قناة النتائج

	// انتظار المعالج لإكمال جميع النتائج
	collectorWg.Wait()

	return collector.All()
}

4. محاكاة المهام والدالة الرئيسية

GO
// createDemoTasks إنشاء مجموعة مهام محاكاة؛ بعضها سيفشل، وبعضها سينتهي وقته
func createDemoTasks(count int) []Task {
	tasks := make([]Task, count)
	rng := rand.New(rand.NewSource(time.Now().UnixNano()))

	for i := 0; i < count; i++ {
		id := i + 1
		behavior := rng.Intn(4) // 0-3، يحدد سلوك المهمة

		switch behavior {
		case 0:
			// اكتمال عادي
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("مهمة عادية-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					time.Sleep(time.Duration(100+rng.Intn(400)) * time.Millisecond)
					return fmt.Sprintf("المهمة #%d نجحت", id), nil
				},
			}
		case 1:
			// تفشل أحيانًا (المحاولة الأولى تفشل، الثانية تنجح)
			failCount := 0
			var mu sync.Mutex
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("مهمة غير مستقرة-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					mu.Lock()
					failCount++
					current := failCount
					mu.Unlock()

					time.Sleep(time.Duration(50+rng.Intn(200)) * time.Millisecond)

					if current <= 1 {
						return nil, fmt.Errorf("المهمة #%d فشلت محاكاة", id)
					}
					return fmt.Sprintf("المهمة #%d نجحت في المحاولة %d", id, current), nil
				},
			}
		case 2:
			// تنفيذ يستغرق وقتًا طويلًا (سينتهي وقته)
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("مهمة بطيئة-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					select {
					case <-time.After(5 * time.Second): // يتجاوز بكثير مهلة 2 ثانية
						return fmt.Sprintf("المهمة #%d اكتملت", id), nil
					case <-ctx.Done():
						return "", fmt.Errorf("المهمة #%d أُلغيت: %w", id, ctx.Err())
					}
				},
			}
		default:
			// تفشل دائمًا
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("مهمة فاشلة-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					time.Sleep(time.Duration(50+rng.Intn(150)) * time.Millisecond)
					return nil, fmt.Errorf("المهمة #%d خطأ غير قابل للإصلاح", id)
				},
			}
		}
	}

	return tasks
}

func main() {
	fmt.Println("=== مُجدول المهام المتزامن ===")
	fmt.Println()

	// إنشاء سياق قابل للإلغاء
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// الاستماع لإشارة نظام التشغيل (Ctrl+C) لتفعيل الإغلاق السلس
	go func() {
		// محاكاة: إرسال إشارة إلغاء بعد 8 ثوانٍ (استخدم signal.Notify في الإنتاج)
		time.Sleep(8 * time.Second)
		fmt.Println("\n[الرئيسي] استقبل إشارة إلغاء، بدء الإغلاق السلس...")
		cancel()
	}()

	// إنشاء المُجدول: 3 عمال، أقصى 3 محاولات، مهلة 2 ثانية لكل مهمة
	scheduler := NewScheduler(3, 3, 2*time.Second)

	// توليد 10 مهام محاكاة
	taskList := createDemoTasks(10)

	// تنفيذ الجدولة
	fmt.Printf("[الرئيسي] تقديم %d مهمة، بدء %d عمال\n\n", len(taskList), 3)
	start := time.Now()
	results := scheduler.Run(ctx, taskList)
	elapsed := time.Since(start)

	// طباعة ملخص النتائج
	fmt.Println("\n=============================")
	fmt.Println("     ملخص نتائج التنفيذ     ")
	fmt.Println("=============================")

	successCount := 0
	failCount := 0
	cancelCount := 0

	for _, r := range results {
		status := "✓"
		detail := r.Output
		if r.Error != nil {
			if ctx.Err() != nil {
				status = "⊘"
				detail = "أُلغيت"
				cancelCount++
			} else {
				status = "✗"
				detail = r.Error.Error()
				failCount++
			}
		} else {
			successCount++
		}
		fmt.Printf("  %s المهمة #%02d | المحاولات: %d | %s\n",
			status, r.TaskID, r.Attempts, detail)
	}

	fmt.Println("-----------------------------")
	fmt.Printf("  الإجمالي: %d | نجح: %d | فشل: %d | أُلغي: %d\n",
		len(results), successCount, failCount, cancelCount)
	fmt.Printf("  الوقت: %v\n", elapsed)
}

5. تأثير التشغيل

BASH
go run main.go

مثال على الإخراج (النتائج عشوائية في كل تشغيل):

TEXT
=== مُجدول المهام المتزامن ===

[الرئيسي] تقديم 10 مهام، بدء 3 عمال

[المُجدول] قدم المهمة #1
[المُجدول] قدم المهمة #2
[المُجدول] قدم المهمة #3
[المُجدول] قدم المهمة #4
[المُجدول] قدم المهمة #5
[المُجدول] قدم المهمة #6
[المُجدول] قدم المهمة #7
[المُجدول] قدم المهمة #8
[المُجدول] قدم المهمة #9
[المُجدول] قدم المهمة #10
[العامل 1] بدء المهمة #1
[العامل 2] بدء المهمة #2
[العامل 3] بدء المهمة #3
[العامل 1] المهمة #1 اكتملت
[العامل 1] بدء المهمة #4
[العامل 2] المهمة #2 المحاولة 1 فشلت: المهمة #2 فشلت محاكاة
[العامل 3] المهمة #3 اكتملت
[العامل 3] بدء المهمة #5
[العامل 2] بدء المهمة #6
[العامل 2] المهمة #6 المحاولة 1 فشلت: المهمة #6 خطأ غير قابل للإصلاح
[العامل 3] المهمة #5 اكتملت
[العامل 3] بدء المهمة #7
[العامل 2] المهمة #6 المحاولة 2 فشلت: المهمة #6 خطأ غير قابل للإصلاح
[العامل 1] المهمة #4 المحاولة 1 فشلت: المهمة #4 فشلت محاكاة
[العامل 2] المهمة #6 المحاولة 3 فشلت: المهمة #6 خطأ غير قابل لل.AutoScaleMode
[العامل 2] المهمة #6 فشلت نهائيًا (حاولت 3 مرات)
[العامل 2] بدء المهمة #8
...
[العامل 1] قناة المهام أُغلقت، خروج
[العامل 2] قناة المهام أُغلقت، خروج
[العامل 3] قناة المهام أُغلقت، خروج

=============================
     ملخص نتائج التنفيذ     
=============================
  ✓ المهمة #01 | المحاولات: 1 | المهمة #1 نجحت
  ⊘ المهمة #02 | المحاولات: 2 | أُلغيت
  ✓ المهمة #03 | المحاولات: 1 | المهمة #3 نجحت
  ⊘ المهمة #04 | المحاولات: 2 | أُلغيت
  ✓ المهمة #05 | المحاولات: 1 | المهمة #5 نجحت
  ✗ المهمة #06 | المحاولات: 3 | المهمة #6 خطأ غير قابل للإصلاح
  ✓ المهمة #07 | المحاولات: 1 | المهمة #7 نجحت
  ⊘ المهمة #08 | المحاولات: 1 | أُلغيت
  ✓ المهمة #09 | المحاولات: 1 | المهمة #9 نجحت
  ⊘ المهمة #10 | المحاولات: 1 | أُلغيت
-----------------------------
  الإجمالي: 10 | نجح: 5 | فشل: 1 | أُلغي: 4
  الوقت: 8.012s

تحليل الكود

أنماط التزامن الأساسية

1. Fan-out / Fan-in

هذا هو النمط الأساسي للمُجدول:

           ┌─ العامل 1 ─┐
المهام ─────┼─ العامل 2 ─┼───── النتائج
           └─ العامل 3 ─┘

2. إلغاء متسلسل عبر context.Context

السياق الجذري (الرئيسي)
  └─ WithCancel
       ├─ WithTimeout للعامل 1
       │    └─ taskCtx (مهلة لكل مهمة 2 ثانية)
       ├─ WithTimeout للعامل 2
       │    └─ taskCtx (مهلة لكل مهمة 2 ثانية)
       └─ WithTimeout للعامل 3
            └─ taskCtx (مهلة لكل مهمة 2 ثانية)

عندما يستدعي البرنامج الرئيسي cancel()، تتلقى جميع السياقات الفرعية إشارة الإلغاء، مما يُحقق الإلغاء المتسلسل.

3. مزامنة WaitGroup

GO
var wg sync.WaitGroup
wg.Add(1)        // كل goroutine مُطلق يزيد العداد بـ 1
go func() {
    defer wg.Done()  // خروج العامل يقلل العداد بـ 1
    // ...
}()
wg.Wait()         // يحظر حتى يصل العداد إلى الصفر

4. تعدد select

العمال يستمعون إلى عدة مصادر إشارة في نفس الوقت:

GO
select {
case <-ctx.Done():       // السياق أُلغي
    return
case <-time.After(d):    // ارتداد إعادة المحاولة
    // الاستمرار في إعادة المحاولة
}

استراتيجية ارتداد إعادة المحاولة

هذا المثال يستخدم ارتداد خطي: وقت الانتظار لكل محاولة = attempts × 500ms. في الإنتاج، يُوصى بـالارتداد الأُسّي:

GO
backoff := time.Duration(1<<uint(attempts)) * 100 * time.Millisecond
// الأولى: 200ms، الثانية: 400ms، الثالثة: 800ms

يمكنك أيضًا إضافة اضطراب عشوائي لمنع تأثير "القطيع الهائج" عندما تُعاد عدة مهام في نفس الوقت.


❓ أسئلة شائعة

1. لماذا يستخدم Worker range tasks بدلًا من التحقق المباشر من ctx.Done()؟

range tasks يخرج تلقائيًا من الحلقة عند إغلاق القناة، وهو نمط المنتج-المستهلك القياسي. ctx.Done() يُستخدم للاستجابة لإشارات الإلغاء أثناء الانتظار أو إعادة المحاولة. يعملان معًا:

إذا استخدمت ctx.Done() فقط، تحتاج لمعالجة إضافية لإغلاق القناة؛ إذا استخدمت range فقط، لا تستطيع الاستجابة للإلغاء.

2. كيف تُحدد حجم المخزن لقناة النتائج؟

هذا المثال يستخدم make(chan Result, len(taskList)) مع إجمالي عدد المهام كحجم مخزن، مما يضمن إمكانية كتابة جميع النتائج دون حظر. إذا كان عدد المهام كبيرًا جدًا (ملايين)، يمكنك:

3. كيف تنفذ ارتداد أُسّي حقيقي؟

GO
func exponentialBackoff(attempt int) time.Duration {
    base := 100 * time.Millisecond
    max := 10 * time.Second
    backoff := base * time.Duration(1<<uint(attempt))
    if backoff > max {
        backoff = max
    }
    // إضافة اضطراب عشوائي: ±25%
    jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
    return backoff - backoff/4 + jitter
}

4. كيف تحدد عدد المهام المتزامنة (التحكم في التزامن)؟

هذا المثال يحدد التزامن طبيعيًا عبر عدد العمال. نهج آخر هو استخدام سمافور:

GO
sem := make(chan struct{}, 10) // أقصى 10 متزامنة
for _, task := range taskList {
    sem <- struct{}{} // الحصول على تصريح
    go func(t Task) {
        defer func() { <-sem }() // تحرير تصريح
        t.Execute(ctx)
    }(task)
}

📖 ملخص

هذا الدرس طبّق بشكل شامل تقنيات التزامن الأساسية في Go:

التقنية التطبيق الكود الرئيسي
goroutine العمال ينفذون المهام بالتوازي go Worker(...)
channel توزيع المهام وجمع النتائج tasks <-chan Task
select تعدد: إلغاء، مهلة، ارتداد select { case <-ctx.Done(): ... }
sync.WaitGroup انتظار اكتمال جميع العمال wg.Wait()
sync.Mutex جمع نتائج متزامن وآمن rc.mu.Lock()
context.Context إلغاء متسلسل والتحكم في المهلة context.WithTimeout(ctx, 2*time.Second)

مراجعة مبادئ التصميم:


📝 تمارين

التمرين 1: إضافة طابور أولويات

عدّل المُجدول لدعم أولويات المهام (عالية/متوسطة/منخفضة). المهام عالية الأولوية يجب أن تُعالج أولاً بواسطة العمال.

تلميح: يمكنك استخدام قنوات متعددة أو تنفيذ هيكل طابور أولويات.

التمرين 2: تنفيذ تحديد المعدل

أضف تحديد المعدل إلى المُجدول، مثل أقصى 5 مهام في الثانية (خوارزمية دلو الرموز).

تلميح: استخدم time.Ticker أو المكتبة الخارجية golang.org/x/time/rate.

GO
limiter := rate.NewLimiter(5, 1) // 5 في الثانية، اندفاع 1
for task := range tasks {
    limiter.Wait(ctx) // انتظار الرمز
    task.Execute(ctx)
}

التمرين 3: إضافة استدعاء تقدم المهمة

أضف دالة استدعاء OnProgress إلى المُجدول تُستدعى عند اكتمال كل مهمة، وتُبلغ عن التقدم الحالي (مكتمل/إجمالي).

GO
type Scheduler struct {
    // ... حقول أخرى
    OnProgress func(completed, total int)
}

الدرس التالي: تطبيق زاحف الويب →

Web-Tutorial.com

فريق Web-Tutorial التقني

منصة دروس برمجية يديرها عدة مطورين. كل درس يتم كتابته ومراجعته بواسطة مطورين متخصصين في المجال. نعمل على ضمان دقة وموثوقية المحتوى — إذا لاحظت أي مشكلة، فيرجى إخبارنا.

100%