Select وأنماط التزامن
الدرس 15: Select وأنماط التزامن
تشبيه
تخيل أنك نادل في مطعم مسؤول عن ثلاث طاولات في نفس الوقت:
- طاولة أ: الزبون يطلب
- طاولة ب: الزبون يدفع الفاتورة
- طاولة ج: الزبون يطلب ماءً
لن تنتظر بطريقة ساذجة طاولة واحدة فقط — بدلاً من ذلك، تراقب جميع الطاولات في نفس الوقت وتخدم أيّ منها يرفع يده أولاً. هكذا يعمل select — يستمع إلى قنوات متعددة في نفس الوقت وينفذ أيّ منها جاهزًا أولاً.
المفاهيم الأساسية
| المفهوم | الوصف |
|---|---|
select |
جملة تستمع إلى عمليات قنوات متعددة في نفس الوقت |
case |
كل عملية قناة تقابل فرعًا |
default |
يُنفذ عندما لا تكون أي قناة جاهزة (اختياري) |
| السلوك المحظور | بدون default، يحظر select حتى يكون case جاهزًا |
| الاختيار العشوائي | عندما تكون عدة cases جاهزة في نفس الوقت، يُختار واحد عشوائيًا |
البنية الأساسية والاستخدام
جملة select القياسية
select {
case msg := <-ch1:
// تم استقبال بيانات من ch1
fmt.Println(msg)
case ch2 <- "مرحبًا":
// تم إرسال بيانات بنجاح إلى ch2
case <-ch3:
// ch3 مغلقة أو تم استقبال بيانات
default:
// يُنفذ عندما لا تكون أي قناة جاهزة
}
التحكم في المهلة (النمط الأكثر شيوعًا)
select {
case msg := <-ch:
fmt.Println("تم الاستقبال:", msg)
case <-time.After(3 * time.Second):
fmt.Println("انتهت المهلة!")
}
عملية قناة غير محظورة
select {
case msg := <-ch:
fmt.Println("تم الاستقبال:", msg)
default:
fmt.Println("لا توجد بيانات متاحة، إرجاع فوري"
}
- يجب أن يحتوي
selectعلىcaseواحد على الأقل؛ لا يمكن أن يكون فارغًا تمامًا - بدون
defaultومع عدم جاهزية جميعcase، سيحظرselectبشكل دائم time.After()يُعيد قناة تستقبل قيمة بعد المدة المحددة- عندما تكون عدة
casesجاهزة في نفس الوقت، يختار Go واحدًا عشوائيًا لمنع الحرمان
مثال: استخدام Select الأساسي (الصعوبة ⭐)
يُوضّح كيفية الاستماع إلى قناتين في نفس الوقت:
package main
import (
"fmt"
"time"
)
// محاكاة مصدري بيانات
func source(name string, ch chan<- string, delay time.Duration) {
for i := 1; i <= 3; i++ {
time.Sleep(delay)
ch <- fmt.Sprintf("[%s] رسالة %d", name, i)
}
close(ch)
}
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go source("المصدرأ", ch1, 500*time.Millisecond)
go source("المصدرب", ch2, 800*time.Millisecond)
// استخدام select للاستماع إلى القناتين في نفس الوقت
// نحتاج للاستماع 6 مرات (3 رسائل من كل مصدر)
for i := 0; i < 6; i++ {
select {
case msg, ok := <-ch1:
if ok {
fmt.Println("من ch1:", msg)
}
case msg, ok := <-ch2:
if ok {
fmt.Println("من ch2:", msg)
}
}
}
fmt.Println("تمت معالجة جميع الرسائل")
}
go run main.go
من ch1: [المصدرأ] رسالة 1
من ch2: [المصدرب] رسالة 1
من ch1: [المصدرأ] رسالة 2
من ch1: [المصدرأ] رسالة 3
من ch2: [المصدرب] رسالة 2
من ch2: [المصدرب] رسالة 3
تمت معالجة جميع الرسائل
مثال: التحكم في المهلة وقناة الخروج (الصعوبة ⭐⭐)
استخدم select لتنفيذ عامل مع مهلة وإغلاق سلس:
package main
import (
"fmt"
"math/rand"
"time"
)
// worker محاكاة مهمة تستغرق وقتًا
func worker(id int, jobs <-chan int, results chan<- string, done chan<- int) {
for job := range jobs {
// محاكاة وقت معالجة غير متوقع
duration := time.Duration(rand.Intn(500)+200) * time.Millisecond
time.Sleep(duration)
results <- fmt.Sprintf("العامل %d أكمل المهمة %d (استغرق %v)", id, job, duration)
}
done <- id
}
func main() {
jobs := make(chan int, 10)
results := make(chan string, 10)
done := make(chan int, 3)
// بدء 3 عمال
for i := 1; i <= 3; i++ {
go worker(i, jobs, results, done)
}
// توزيع 6 مهام
for j := 1; j <= 6; j++ {
jobs <- j
}
close(jobs)
// جمع النتائج مع التحكم في المهلة
timeout := time.After(2 * time.Second)
finished := 0
for finished < 6 {
select {
case result := <-results:
fmt.Println(result)
finished++
case workerID := <-done:
fmt.Printf(">>> العامل %d خرج\n", workerID)
case <-timeout:
fmt.Println("⏰ انتهت المهلة! بعض المهام غير مكتملة")
return
}
}
fmt.Println("اكتملت جميع المهام")
}
go run main.go
العامل 2 أكمل المهمة 2 (استغرق 234ms)
العامل 1 أكمل المهمة 1 (استغرق 345ms)
العامل 3 أكمل المهمة 3 (استغرق 289ms)
العامل 2 أكمل المهمة 4 (استغرق 412ms)
العامل 1 أكمل المهمة 5 (استغرق 198ms)
>>> العامل 1 خرج
العامل 3 أكمل المهمة 6 (استغرق 367ms)
اكتملت جميع المهام
مثال: Fan-in / Fan-out ونمط خط الأنابيب (الصعوبة ⭐⭐⭐)
يُوضّح أنماط التزامن الكلاسيكية: خط أنابيب + Fan-out + Fan-in:
package main
import (
"fmt"
"sync"
"time"
)
// المرحلة 1: مولّد البيانات (بداية خط الأنابيب)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// المرحلة 2: حساب المربعات (مرحلة وسطى في خط الأنابيب)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
time.Sleep(100 * time.Millisecond) // محاكاة وقت الحساب
out <- n * n
}
close(out)
}()
return out
}
// Fan-out: توزيع قناة واحدة على عدة عمال
func fanOut(in <-chan int, workers int) []<-chan int {
channels := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
channels[i] = square(in)
}
return channels
}
// Fan-in: دمج عدة قنوات في واحدة
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// بدء goroutine لكل قناة لتحويل البيانات
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(ch)
}
// إغلاق قناة المخرجات بعد إغلاق جميع قنوات المدخلات
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// المرحلة 1: توليد البيانات
nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
source := generator(nums...)
// المرحلة 2: Fan-out إلى 3 عمال للحساب المتوازي
workers := fanOut(source, 3)
// المرحلة 3: Fan-in لدمج النتائج
merged := fanIn(workers...)
// جمع جميع النتائج
results := make(map[int]bool)
for val := range merged {
results[val] = true
fmt.Printf("تم استقبال النتيجة: %d\n", val)
}
fmt.Println("\nالنتائج بعد إزالة التكرار:")
for val := range results {
fmt.Printf(" %d\n", val)
}
}
go run main.go
تم استقبال النتيجة: 1
تم استقبال النتيجة: 4
تم استقبال النتيجة: 9
تم استقبال النتيجة: 16
تم استقبال النتيجة: 25
تم استقبال النتيجة: 36
تم استقبال النتيجة: 49
تم استقبال النتيجة: 64
تم استقبال النتيجة: 81
تم استقبال النتيجة: 100
النتائج بعد إزالة التكرار:
1
4
9
16
25
36
49
64
81
100
السيناريو 1: سباق طلبات HTTP
أرسل طلبات إلى عدة خوادم في نفس الوقت، واستخدم أول استجابة:
package main
import (
"fmt"
"time"
)
// محاكاة إرسال طلبات إلى خوادم مختلفة
func fetchFromServer(name string, delay time.Duration) <-chan string {
ch := make(chan string, 1)
go func() {
time.Sleep(delay)
ch <- fmt.Sprintf("استجابة من %s", name)
}()
return ch
}
// سباق الطلبات: إرجاع أسرع استجابة
func race(urls map[string]time.Duration, timeout time.Duration) (string, error) {
// إنشاء قناة لكل خادم
ch := make(chan string, len(urls))
for name, delay := range urls {
go func(n string, d time.Duration) {
ch <- fetchResult(n, d)
}(name, delay)
}
// انتظار أول استجابة أو انتهاء المهلة
select {
case result := <-ch:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("انتهت مهلة جميع الخوادم")
}
}
func fetchResult(name string, delay time.Duration) string {
time.Sleep(delay)
return fmt.Sprintf("بيانات من %s (زمن الاستجابة %v)", name, delay)
}
func main() {
servers := map[string]time.Duration{
"الخادمأ-بكين": 800 * time.Millisecond,
"الخادمب-شنغهاي": 300 * time.Millisecond,
"الخادمج-قوانغتشو": 500 * time.Millisecond,
}
result, err := race(servers, 2*time.Second)
if err != nil {
fmt.Println("خطأ:", err)
} else {
fmt.Println("نتيجة السباق:", result)
}
}
go run main.go
نتيجة السباق: بيانات من الخادمب-شنغهاي (زمن الاستجابة 300ms)
السيناريو 2: الإغلاق السلس
استخدم قناة quit لتنفيذ إغلاق سلس للخدمة:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
// server محاكاة خدمة تعمل باستمرار
func server(id int, quit <-chan struct{}) {
fmt.Printf("[الخدمة %d] بدأت\n", id)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fmt.Printf("[الخدمة %d] معالجة الطلب...\n", id)
case <-quit:
fmt.Printf("[الخدمة %d] استقبلت إشارة الخروج، جارٍ التنظيف...", id)
time.Sleep(200 * time.Millisecond) // محاكاة التنظيف
fmt.Printf("[الخدمة %d] توقفت\n", id)
return
}
}
}
func main() {
// الاستماع لإشارات النظام (Ctrl+C)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// إنشاء قناة الخروج
quit := make(chan struct{})
// بدء خدمات متعددة
for i := 1; i <= 3; i++ {
go server(i, quit)
}
fmt.Println("البرنامج الرئيسي يعمل، اضغط Ctrl+C للخروج السلس...")
// انتظار إشارة النظام
sig := <-sigChan
fmt.Printf("\nاستقبلت إشارة: %v، بدء الخروج السلس...\n", sig)
// إخطار جميع الخدمات بالخروج
close(quit)
// إعطاء الخدمات وقتًا لإكمال التنظيف
time.Sleep(1 * time.Second)
fmt.Println("أُغلقت جميع الخدمات، البرنامج ينتهي")
}
go run main.go
# اضغط Ctrl+C لتفعيل الخروج السلس
البرنامج الرئيسي يعمل، اضغط Ctrl+C للخروج السلس...
[الخدمة 1] بدأت
[الخدمة 2] بدأت
[الخدمة 3] بدأت
[الخدمة 1] معالجة الطلب...
[الخدمة 2] معالجة الطلب...
[الخدمة 3] معالجة الطلب...
[الخدمة 1] معالجة الطلب...
^C
استقبلت إشارة: interrupt، بدء الخروج السلس...
[الخدمة 1] استقبلت إشارة الخروج، جارٍ التنظيف...[الخدمة 1] توقفت
[الخدمة 2] استقبلت إشارة الخروج، جارٍ التنظيف...[الخدمة 2] توقفت
[الخدمة 3] استقبلت إشارة الخروج، جارٍ التنظيف...[الخدمة 3] توقفت
أُغلقت جميع الخدمات، البرنامج ينتهي
❓ أسئلة شائعة
س1: ماذا يحدث عندما تكون عدة cases في select جاهزة في نفس الوقت؟
يختار Go واحدًا عشوائيًا للتنفيذ، بدلًا من اختيار الأول بالترتيب. هذا يضمن العدالة ويمنع حرمان أي قناة.
ch1 := make(chan string, 1)
ch2 := make(chan string, 1)
ch1 <- "أ"
ch2 <- "ب"
// النتيجة عشوائية، قد تكون أ أو ب
select {
case v := <-ch1:
fmt.Println(v)
case v := <-ch2:
fmt.Println(v)
}
س2: متى يجب استخدام فرع default في select؟
استخدم default عندما تحتاج إلى عمليات غير محظورة. بدون default، يحظر select حتى يكون case جاهزًا.
// استقبال غير محظور
select {
case msg := <-ch:
fmt.Println("تم الاستقبال:", msg)
default:
fmt.Println("لا توجد بيانات في القناة حاليًا")
}
س3: كيف تنفذ حلقة لا نهائية تستمع إلى قنوات متعددة؟
استخدم نمط for-select المشترك:
for {
select {
case msg := <-ch1:
fmt.Println(msg)
case msg := <-ch2:
fmt.Println(msg)
case <-quit:
fmt.Println("الخروج من الحلقة")
return
}
}
س4: هل time.After في حلقة يسبب تسرب ذاكرة؟
نعم! كل تكرار في الحلقة مع time.After يُنشئ مؤقتًا جديدًا لن يُجمع كقمامة. النهج الصحيح هو استخدام time.NewTimer:
// ❌ خاطئ: يُنشئ مؤقتًا جديدًا في كل تكرار
for {
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(5 * time.Second):
fmt.Println("انتهت المهلة")
}
}
// ✅ صحيح: إعادة استخدام المؤقت
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case msg := <-ch:
fmt.Println(msg)
if !timer.Stop() {
<-timer.C
}
timer.Reset(5 * time.Second)
case <-timer.C:
fmt.Println("انتهت المهلة")
return
}
}
📖 ملخص
| النمط | الغرض | الكود الرئيسي |
|---|---|---|
| التحكم في المهلة | تحديد وقت انتظار العملية | case <-time.After(d) |
| عملية غير محظورة | إرجاع فوري دون انتظار | select مع default |
| خروج سلس | الاستماع لقناة الخروج | case <-quit |
| Fan-out | مدخل واحد يُوزع على عدة عمال | عدة goroutines تقرأ من قناة واحدة |
| Fan-in | عدة مدخلات تُدمج في قناة واحدة | sync.WaitGroup + تحويل |
| خط أنابيب | معالجة متعددة المراحل | قنوات مترابطة |
النقاط الأساسية:
selectهو أداة التزامن الأساسية في Go للتعدد- عندما تكون عدة cases جاهزة في نفس الوقت، يُختار واحد عشوائيًا للعدالة
for-selectهو النمط القياسي للاستماع إلى قنوات متعددة- انتبه لتسرب ذاكرة
time.Afterفي الحلقات - Fan-in/Fan-out/Pipeline هي أنماط كلاسيكية لبناء خطوط أنابيب متزامنة
📝 تمارين
التمرين 1: مؤقت تنازلي
أنشئ برنامجًا يستخدم select و time.Ticker لتنفيذ تنازلي 5 ثوانٍ، يطبع الوقت المتبقي كل ثانية، ويطبع "إطلاق!" عند 0.
// تلميحات:
// ticker := time.NewTicker(1 * time.Second)
// select {
// case <-ticker.C:
// // تحديث التنازلي
// }
التمرين 2: دمج ترتيب متعدد المسارات
نفّذ دالة تستقبل عدة قنوات أعداد صحيحة مرتبة وتستخدم نمط Fan-in لدمجها في قناة إخراج مرتبة واحدة.
// توقيع الدالة:
func mergeSorted(channels ...<-chan int) <-chan int {
// نفّذ منطق الدمج
}
التمرين 3: خط أنابيب مع إلغاء
ابنِ خط أنابيب ثلاثي المراحل (توليد → تصفية الأعداد الزوجية → ضرب بـ 10) يدعم إلغاء خط الأنابيب بالكامل عبر context:
// تلميحات:
// ctx, cancel := context.WithCancel(context.Background())
// تحقق من ctx.Done() في select لكل مرحلة
الدرس التالي: الدرس 16: Sync وسلامة التزامن — تعرّف على sync.Mutex، sync.WaitGroup، sync_ONCE، وغيرها من أدوات المزامنة للوصول الآمن إلى الموارد المشتركة.



