تطبيق التزامن: مُجدول المهام
تطبيق التزامن: مُجدول المهام
تخيل مركز فرز الطرود: شريط ناقل يُوصل الطرود (المهام) باستمرار، وعدة فارزين (goroutines) يعملون في نفس الوقت — بعضهم يمسح، بعضهم يُصنف، بعضهم يُحمّل. كل مرحلة تمرر الطرود عبر خط أنابيب (channel)، والمُجدول يراقب التقدم الإجمالي، ويُعالج أي مشكلات فورًا. هكذا يعمل مُجدول المهام المتزامن — عدة عمال يتعاونون لإكمال عدد كبير من المهام، بكفاءة وموثوقية.
في هذا الدرس، سنُطبّق بشكل شامل الـ goroutines و القنوات و select و حزمة sync و context.Context التي تعلمناها لبناء مُجدول مهام متزامن كامل الوظائف من الصفر.
متطلبات المشروع
نحتاج لبناء مُجدول مهام يستوفي المتطلبات التالية:
- تنفيذ متزامن: دعم عدة عمال لمعالجة المهام في نفس الوقت
- تقديم المهام: الأنظمة الخارجية يمكنها تقديم المهام ديناميكيًا
- التحكم في المهلة: المهام الفردية تُلغى تلقائيًا بعد انتهاء المهلة
- إعادة المحاولة عند الفشل: المهام الفاشلة تُعاد محاولتها تلقائيًا، حتى N مرة
- جمع النتائج: جمع جميع نتائج المهام بشكل متزامن وآمن
- الإغلاق السلس: بعد استقبال إشارة الإلغاء، انتظر اكتمال المهام الجارية قبل الخروج
تصميم النظام
البنية العامة مقسمة إلى ثلاث طبقات:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ تقديم المهام │────▶│ المُجدول │────▶│ تجمع العمال │
│ (المنتج) │ │ المحرك │ │ (العمال) │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
▼ ▼
┌──────────────┐ ┌─────────────┐
│ جمع │◀────│ تنفيذ │
│ النتائج │ │ المهام │
└──────────────┘ └─────────────┘
المكونات الأساسية:
| المكون | المسؤولية | التنفيذ |
|---|---|---|
Task |
تمثل مهمة قابلة للتنفيذ | هيكل + نوع دالة |
Result |
تخزن نتيجة تنفيذ المهمة | هيكل + قناة |
Worker |
ينفذ مهام محددة | goroutine |
Scheduler |
يُنسق توزيع المهام وتنفيذها | قناة + select |
ResultCollector |
يجمع النتائج بشكل متزامن وآمن | sync.Mutex |
تنفيذ الكود الكامل
1. تعريف هياكل البيانات
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Task تمثل مهمة يجب تنفيذها
type Task struct {
ID int // مُعرّف المهمة الفريد
Payload string // بيانات المهمة
Execute func(ctx context.Context) (string, error) // دالة تنفيذ المهمة
}
// Result تمثل نتيجة تنفيذ مهمة
type Result struct {
TaskID int // مُعرّف المهمة المقابل
Output string // الإخراج عند التنفيذ الناجح
Error error // الخطأ عند التنفيذ الفاشل
Attempts int // عدد المحاولات الفعلي
}
// ResultCollector يجمع نتائج المهام بشكل متزامن وآمن
type ResultCollector struct {
mu sync.Mutex
results []Result
}
// Add إضافة نتيجة (متزامن)
func (rc *ResultCollector) Add(r Result) {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.results = append(rc.results, r)
}
// All إرجاع جميع النتائج المجمعة
func (rc *ResultCollector) All() []Result {
rc.mu.Lock()
defer rc.mu.Unlock()
// إرجاع نسخة لتجنب التعديل الخارجي
out := make([]Result, len(rc.results))
copy(out, rc.results)
return out
}
2. تنفيذ العامل
// Worker يستقبل المهام من قناة المهام، ينفذها، ويرسل النتائج إلى قناة النتائج
func Worker(
ctx context.Context,
id int,
tasks <-chan Task,
results chan<- Result,
maxRetries int,
wg *sync.WaitGroup,
) {
defer wg.Done()
for task := range tasks {
// التحقق من الإلغاء
select {
case <-ctx.Done():
fmt.Printf("[العامل %d] السياق أُلغي، خروج\n", id)
return
default:
}
fmt.Printf("[العامل %d] بدء المهمة #%d\n", id, task.ID)
var output string
var err error
attempts := 0
// منطق التنفيذ مع إعادة المحاولة
for attempts < maxRetries {
attempts++
// إنشاء سياق فرعي مع مهلة لكل مهمة (أقصى 2 ثانية)
taskCtx, taskCancel := context.WithTimeout(ctx, 2*time.Second)
output, err = task.Execute(taskCtx)
taskCancel()
if err == nil {
break // نجاح، خروج من حلقة إعادة المحاولة
}
fmt.Printf("[العامل %d] المهمة #%d المحاولة %d فشلت: %v\n",
id, task.ID, attempts, err)
if attempts < maxRetries {
// الانتظار قبل إعادة المحاولة، مع الاستماع للإلغاء
select {
case <-ctx.Done():
fmt.Printf("[العامل %d] استقبل إشارة الإلغاء أثناء انتظار إعادة المحاولة\n", id)
results <- Result{TaskID: task.ID, Error: ctx.Err(), Attempts: attempts}
return
case <-time.After(time.Duration(attempts) * 500 * time.Millisecond):
// ارتداد خطي: المحاولة الأولى 500ms، الثانية 1s، الثالثة 1.5s ...
}
}
}
results <- Result{
TaskID: task.ID,
Output: output,
Error: err,
Attempts: attempts,
}
if err != nil {
fmt.Printf("[العامل %d] المهمة #%d فشلت نهائيًا (حاولت %d مرات)\n", id, task.ID, attempts)
} else {
fmt.Printf("[العامل %d] المهمة #%d اكتملت\n", id, task.ID)
}
}
fmt.Printf("[العامل %d] قناة المهام أُغلقت، خروج\n", id)
}
3. تنفيذ المُجدول
// Scheduler محرك جدولة المهام
type Scheduler struct {
workerCount int // عدد العمال
maxRetries int // الحد الأقصى لإعادة المحاولة
taskTimeout time.Duration // مهلة لكل مهمة
}
// NewScheduler إنشاء مُجدول جديد
func NewScheduler(workerCount, maxRetries int, taskTimeout time.Duration) *Scheduler {
return &Scheduler{
workerCount: workerCount,
maxRetries: maxRetries,
taskTimeout: taskTimeout,
}
}
// Run بدء المُجدول، معالجة قائمة المهام المعطاة، وإرجاع جميع النتائج
func (s *Scheduler) Run(ctx context.Context, taskList []Task) []Result {
// إنشاء القنوات
tasks := make(chan Task, len(taskList))
results := make(chan Result, len(taskList))
// جامع النتائج
collector := &ResultCollector{}
// بدء goroutine جمع النتائج
var collectorWg sync.WaitGroup
collectorWg.Add(1)
go func() {
defer collectorWg.Done()
for r := range results {
collector.Add(r)
}
}()
// بدء تجمع العمال
var workerWg sync.WaitGroup
for i := 1; i <= s.workerCount; i++ {
workerWg.Add(1)
go Worker(ctx, i, tasks, results, s.maxRetries, &workerWg)
}
// تقديم جميع المهام
go func() {
for _, task := range taskList {
select {
case tasks <- task:
fmt.Printf("[المُجدول] قدم المهمة #%d\n", task.ID)
case <-ctx.Done():
fmt.Println("[المُجدول] السياق أُلغي، إيقاف تقديم المهام")
return
}
}
close(tasks) // إغلاق قناة المهام، إخطار العمال بأنه لا مزيد من المهام
}()
// انتظار اكتمال جميع العمال
workerWg.Wait()
close(results) // إغلاق قناة النتائج
// انتظار المعالج لإكمال جميع النتائج
collectorWg.Wait()
return collector.All()
}
4. محاكاة المهام والدالة الرئيسية
// createDemoTasks إنشاء مجموعة مهام محاكاة؛ بعضها سيفشل، وبعضها سينتهي وقته
func createDemoTasks(count int) []Task {
tasks := make([]Task, count)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < count; i++ {
id := i + 1
behavior := rng.Intn(4) // 0-3، يحدد سلوك المهمة
switch behavior {
case 0:
// اكتمال عادي
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("مهمة عادية-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(100+rng.Intn(400)) * time.Millisecond)
return fmt.Sprintf("المهمة #%d نجحت", id), nil
},
}
case 1:
// تفشل أحيانًا (المحاولة الأولى تفشل، الثانية تنجح)
failCount := 0
var mu sync.Mutex
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("مهمة غير مستقرة-%d", id),
Execute: func(ctx context.Context) (string, error) {
mu.Lock()
failCount++
current := failCount
mu.Unlock()
time.Sleep(time.Duration(50+rng.Intn(200)) * time.Millisecond)
if current <= 1 {
return nil, fmt.Errorf("المهمة #%d فشلت محاكاة", id)
}
return fmt.Sprintf("المهمة #%d نجحت في المحاولة %d", id, current), nil
},
}
case 2:
// تنفيذ يستغرق وقتًا طويلًا (سينتهي وقته)
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("مهمة بطيئة-%d", id),
Execute: func(ctx context.Context) (string, error) {
select {
case <-time.After(5 * time.Second): // يتجاوز بكثير مهلة 2 ثانية
return fmt.Sprintf("المهمة #%d اكتملت", id), nil
case <-ctx.Done():
return "", fmt.Errorf("المهمة #%d أُلغيت: %w", id, ctx.Err())
}
},
}
default:
// تفشل دائمًا
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("مهمة فاشلة-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(50+rng.Intn(150)) * time.Millisecond)
return nil, fmt.Errorf("المهمة #%d خطأ غير قابل للإصلاح", id)
},
}
}
}
return tasks
}
func main() {
fmt.Println("=== مُجدول المهام المتزامن ===")
fmt.Println()
// إنشاء سياق قابل للإلغاء
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// الاستماع لإشارة نظام التشغيل (Ctrl+C) لتفعيل الإغلاق السلس
go func() {
// محاكاة: إرسال إشارة إلغاء بعد 8 ثوانٍ (استخدم signal.Notify في الإنتاج)
time.Sleep(8 * time.Second)
fmt.Println("\n[الرئيسي] استقبل إشارة إلغاء، بدء الإغلاق السلس...")
cancel()
}()
// إنشاء المُجدول: 3 عمال، أقصى 3 محاولات، مهلة 2 ثانية لكل مهمة
scheduler := NewScheduler(3, 3, 2*time.Second)
// توليد 10 مهام محاكاة
taskList := createDemoTasks(10)
// تنفيذ الجدولة
fmt.Printf("[الرئيسي] تقديم %d مهمة، بدء %d عمال\n\n", len(taskList), 3)
start := time.Now()
results := scheduler.Run(ctx, taskList)
elapsed := time.Since(start)
// طباعة ملخص النتائج
fmt.Println("\n=============================")
fmt.Println(" ملخص نتائج التنفيذ ")
fmt.Println("=============================")
successCount := 0
failCount := 0
cancelCount := 0
for _, r := range results {
status := "✓"
detail := r.Output
if r.Error != nil {
if ctx.Err() != nil {
status = "⊘"
detail = "أُلغيت"
cancelCount++
} else {
status = "✗"
detail = r.Error.Error()
failCount++
}
} else {
successCount++
}
fmt.Printf(" %s المهمة #%02d | المحاولات: %d | %s\n",
status, r.TaskID, r.Attempts, detail)
}
fmt.Println("-----------------------------")
fmt.Printf(" الإجمالي: %d | نجح: %d | فشل: %d | أُلغي: %d\n",
len(results), successCount, failCount, cancelCount)
fmt.Printf(" الوقت: %v\n", elapsed)
}
5. تأثير التشغيل
go run main.go
مثال على الإخراج (النتائج عشوائية في كل تشغيل):
=== مُجدول المهام المتزامن ===
[الرئيسي] تقديم 10 مهام، بدء 3 عمال
[المُجدول] قدم المهمة #1
[المُجدول] قدم المهمة #2
[المُجدول] قدم المهمة #3
[المُجدول] قدم المهمة #4
[المُجدول] قدم المهمة #5
[المُجدول] قدم المهمة #6
[المُجدول] قدم المهمة #7
[المُجدول] قدم المهمة #8
[المُجدول] قدم المهمة #9
[المُجدول] قدم المهمة #10
[العامل 1] بدء المهمة #1
[العامل 2] بدء المهمة #2
[العامل 3] بدء المهمة #3
[العامل 1] المهمة #1 اكتملت
[العامل 1] بدء المهمة #4
[العامل 2] المهمة #2 المحاولة 1 فشلت: المهمة #2 فشلت محاكاة
[العامل 3] المهمة #3 اكتملت
[العامل 3] بدء المهمة #5
[العامل 2] بدء المهمة #6
[العامل 2] المهمة #6 المحاولة 1 فشلت: المهمة #6 خطأ غير قابل للإصلاح
[العامل 3] المهمة #5 اكتملت
[العامل 3] بدء المهمة #7
[العامل 2] المهمة #6 المحاولة 2 فشلت: المهمة #6 خطأ غير قابل للإصلاح
[العامل 1] المهمة #4 المحاولة 1 فشلت: المهمة #4 فشلت محاكاة
[العامل 2] المهمة #6 المحاولة 3 فشلت: المهمة #6 خطأ غير قابل لل.AutoScaleMode
[العامل 2] المهمة #6 فشلت نهائيًا (حاولت 3 مرات)
[العامل 2] بدء المهمة #8
...
[العامل 1] قناة المهام أُغلقت، خروج
[العامل 2] قناة المهام أُغلقت، خروج
[العامل 3] قناة المهام أُغلقت، خروج
=============================
ملخص نتائج التنفيذ
=============================
✓ المهمة #01 | المحاولات: 1 | المهمة #1 نجحت
⊘ المهمة #02 | المحاولات: 2 | أُلغيت
✓ المهمة #03 | المحاولات: 1 | المهمة #3 نجحت
⊘ المهمة #04 | المحاولات: 2 | أُلغيت
✓ المهمة #05 | المحاولات: 1 | المهمة #5 نجحت
✗ المهمة #06 | المحاولات: 3 | المهمة #6 خطأ غير قابل للإصلاح
✓ المهمة #07 | المحاولات: 1 | المهمة #7 نجحت
⊘ المهمة #08 | المحاولات: 1 | أُلغيت
✓ المهمة #09 | المحاولات: 1 | المهمة #9 نجحت
⊘ المهمة #10 | المحاولات: 1 | أُلغيت
-----------------------------
الإجمالي: 10 | نجح: 5 | فشل: 1 | أُلغي: 4
الوقت: 8.012s
تحليل الكود
أنماط التزامن الأساسية
1. Fan-out / Fan-in
هذا هو النمط الأساسي للمُجدول:
┌─ العامل 1 ─┐
المهام ─────┼─ العامل 2 ─┼───── النتائج
└─ العامل 3 ─┘
- Fan-out: عدة عمال يقرأون من نفس قناة
tasks، توزيع المهام تلقائيًا - Fan-in: جميع العمال يكتبون النتائج في نفس قناة
results، تجميع النتائج
2. إلغاء متسلسل عبر context.Context
السياق الجذري (الرئيسي)
└─ WithCancel
├─ WithTimeout للعامل 1
│ └─ taskCtx (مهلة لكل مهمة 2 ثانية)
├─ WithTimeout للعامل 2
│ └─ taskCtx (مهلة لكل مهمة 2 ثانية)
└─ WithTimeout للعامل 3
└─ taskCtx (مهلة لكل مهمة 2 ثانية)
عندما يستدعي البرنامج الرئيسي cancel()، تتلقى جميع السياقات الفرعية إشارة الإلغاء، مما يُحقق الإلغاء المتسلسل.
3. مزامنة WaitGroup
var wg sync.WaitGroup
wg.Add(1) // كل goroutine مُطلق يزيد العداد بـ 1
go func() {
defer wg.Done() // خروج العامل يقلل العداد بـ 1
// ...
}()
wg.Wait() // يحظر حتى يصل العداد إلى الصفر
4. تعدد select
العمال يستمعون إلى عدة مصادر إشارة في نفس الوقت:
select {
case <-ctx.Done(): // السياق أُلغي
return
case <-time.After(d): // ارتداد إعادة المحاولة
// الاستمرار في إعادة المحاولة
}
استراتيجية ارتداد إعادة المحاولة
هذا المثال يستخدم ارتداد خطي: وقت الانتظار لكل محاولة = attempts × 500ms. في الإنتاج، يُوصى بـالارتداد الأُسّي:
backoff := time.Duration(1<<uint(attempts)) * 100 * time.Millisecond
// الأولى: 200ms، الثانية: 400ms، الثالثة: 800ms
يمكنك أيضًا إضافة اضطراب عشوائي لمنع تأثير "القطيع الهائج" عندما تُعاد عدة مهام في نفس الوقت.
❓ أسئلة شائعة
1. لماذا يستخدم Worker range tasks بدلًا من التحقق المباشر من ctx.Done()؟
range tasks يخرج تلقائيًا من الحلقة عند إغلاق القناة، وهو نمط المنتج-المستهلك القياسي. ctx.Done() يُستخدم للاستجابة لإشارات الإلغاء أثناء الانتظار أو إعادة المحاولة. يعملان معًا:
- القناة أُغلقت → خروج عادي (جميع المهام قُدمت)
- السياق أُلغي → خروج مبكر (طلب خارجي للإنهاء)
إذا استخدمت ctx.Done() فقط، تحتاج لمعالجة إضافية لإغلاق القناة؛ إذا استخدمت range فقط، لا تستطيع الاستجابة للإلغاء.
2. كيف تُحدد حجم المخزن لقناة النتائج؟
هذا المثال يستخدم make(chan Result, len(taskList)) مع إجمالي عدد المهام كحجم مخزن، مما يضمن إمكانية كتابة جميع النتائج دون حظر. إذا كان عدد المهام كبيرًا جدًا (ملايين)، يمكنك:
- استخدام قناة غير مخزنة + goroutine جمع منفصلة
- معالجة دفعات، إعادة تعيين القناة بعد كل دفعة
- استخدام
sync.Mapأو slice مقفل بدلًا من القناة
3. كيف تنفذ ارتداد أُسّي حقيقي؟
func exponentialBackoff(attempt int) time.Duration {
base := 100 * time.Millisecond
max := 10 * time.Second
backoff := base * time.Duration(1<<uint(attempt))
if backoff > max {
backoff = max
}
// إضافة اضطراب عشوائي: ±25%
jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
return backoff - backoff/4 + jitter
}
4. كيف تحدد عدد المهام المتزامنة (التحكم في التزامن)؟
هذا المثال يحدد التزامن طبيعيًا عبر عدد العمال. نهج آخر هو استخدام سمافور:
sem := make(chan struct{}, 10) // أقصى 10 متزامنة
for _, task := range taskList {
sem <- struct{}{} // الحصول على تصريح
go func(t Task) {
defer func() { <-sem }() // تحرير تصريح
t.Execute(ctx)
}(task)
}
📖 ملخص
هذا الدرس طبّق بشكل شامل تقنيات التزامن الأساسية في Go:
| التقنية | التطبيق | الكود الرئيسي |
|---|---|---|
| goroutine | العمال ينفذون المهام بالتوازي | go Worker(...) |
| channel | توزيع المهام وجمع النتائج | tasks <-chan Task |
| select | تعدد: إلغاء، مهلة، ارتداد | select { case <-ctx.Done(): ... } |
| sync.WaitGroup | انتظار اكتمال جميع العمال | wg.Wait() |
| sync.Mutex | جمع نتائج متزامن وآمن | rc.mu.Lock() |
| context.Context | إلغاء متسلسل والتحكم في المهلة | context.WithTimeout(ctx, 2*time.Second) |
مراجعة مبادئ التصميم:
- استخدم القنوات لنقل البيانات، لا تتواصل بمشاركة الذاكرة
- استخدم السياق لنشر إشارات الإلغاء عبر سلسلة الاستدعاءات
- WaitGroup يُستخدم لانتظار مجموعة goroutines للانتهاء
- Mutex يُستخدم لحماية الحالة المشتركة (جامع النتائج في هذا المثال)
- إغلاق القناة يعمل كإشارة بث، يُخطّر جميع المستهلكين
📝 تمارين
التمرين 1: إضافة طابور أولويات
عدّل المُجدول لدعم أولويات المهام (عالية/متوسطة/منخفضة). المهام عالية الأولوية يجب أن تُعالج أولاً بواسطة العمال.
تلميح: يمكنك استخدام قنوات متعددة أو تنفيذ هيكل طابور أولويات.
التمرين 2: تنفيذ تحديد المعدل
أضف تحديد المعدل إلى المُجدول، مثل أقصى 5 مهام في الثانية (خوارزمية دلو الرموز).
تلميح: استخدم time.Ticker أو المكتبة الخارجية golang.org/x/time/rate.
limiter := rate.NewLimiter(5, 1) // 5 في الثانية، اندفاع 1
for task := range tasks {
limiter.Wait(ctx) // انتظار الرمز
task.Execute(ctx)
}
التمرين 3: إضافة استدعاء تقدم المهمة
أضف دالة استدعاء OnProgress إلى المُجدول تُستدعى عند اكتمال كل مهمة، وتُبلغ عن التقدم الحالي (مكتمل/إجمالي).
type Scheduler struct {
// ... حقول أخرى
OnProgress func(completed, total int)
}



