Concurrency Practice: Task Scheduler
Concurrency Practice: Task Scheduler
Imagine a package sorting center: a conveyor belt continuously delivers packages (tasks), and multiple sorters (goroutines) work simultaneously — some scan, some classify, some load. Each stage passes packages through a pipeline (channel), and the scheduler monitors overall progress, handling any issues promptly. This is how a concurrent task scheduler works — multiple workers collaborate to complete a large number of tasks, both efficiently and reliably.
In this lesson, we'll comprehensively apply the goroutines, channels, select, sync package, and context.Context we've learned to build a fully functional concurrent task scheduler from scratch.
Project Requirements
We need to build a task scheduler that meets the following requirements:
- Concurrent execution: Support multiple workers processing tasks simultaneously
- Task submission: External systems can dynamically submit tasks
- Timeout control: Individual tasks are automatically cancelled after a timeout
- Failure retry: Failed tasks are automatically retried, up to N times
- Result collection: Concurrently and safely collect all task results
- Graceful shutdown: After receiving a cancellation signal, wait for running tasks to complete before exiting
System Design
The overall architecture is divided into three layers:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Task Submit │────▶│ Scheduler │────▶│ Worker Pool │
│ (Producer) │ │ Engine │ │ (Workers) │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
▼ ▼
┌──────────────┐ ┌─────────────┐
│ Result │◀────│ Task │
│ Collector │ │ Execution │
└──────────────┘ └─────────────┘
Core Components:
| Component | Responsibility | Implementation |
|---|---|---|
Task |
Represents an executable task | Struct + function type |
Result |
Stores task execution result | Struct + channel |
Worker |
Executes specific tasks | goroutine |
Scheduler |
Coordinates task distribution and execution | channel + select |
ResultCollector |
Concurrently and safely collects results | sync.Mutex |
Example 1: Complete Code
1. Define Data Structures
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Task represents a task to be executed
type Task struct {
ID int // Unique task identifier
Payload string // Task data
Execute func(ctx context.Context) (string, error) // Task execution function
}
// Result represents a task's execution result
type Result struct {
TaskID int // Corresponding task ID
Output string // Output on successful execution
Error error // Error on failed execution
Attempts int // Actual number of attempts
}
// ResultCollector safely collects task results concurrently
type ResultCollector struct {
mu sync.Mutex
results []Result
}
// Add adds a result (concurrent-safe)
func (rc *ResultCollector) Add(r Result) {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.results = append(rc.results, r)
}
// All returns all collected results
func (rc *ResultCollector) All() []Result {
rc.mu.Lock()
defer rc.mu.Unlock()
// Return a copy to avoid external modification
out := make([]Result, len(rc.results))
copy(out, rc.results)
return out
}
2. Implement Worker
// Worker receives tasks from the task channel, executes them, and sends results to the result channel
func Worker(
ctx context.Context,
id int,
tasks <-chan Task,
results chan<- Result,
maxRetries int,
wg *sync.WaitGroup,
) {
defer wg.Done()
for task := range tasks {
// Check if cancelled
select {
case <-ctx.Done():
fmt.Printf("[Worker %d] Context cancelled, exiting\n", id)
return
default:
}
fmt.Printf("[Worker %d] Starting task #%d\n", id, task.ID)
var output string
var err error
attempts := 0
// Execution logic with retry
for attempts < maxRetries {
attempts++
// Create a sub-context with timeout for each task (max 2 seconds)
taskCtx, taskCancel := context.WithTimeout(ctx, 2*time.Second)
output, err = task.Execute(taskCtx)
taskCancel()
if err == nil {
break // Success, exit retry loop
}
fmt.Printf("[Worker %d] Task #%d attempt %d failed: %v\n",
id, task.ID, attempts, err)
if attempts < maxRetries {
// Wait before retrying, while listening for cancellation
select {
case <-ctx.Done():
fmt.Printf("[Worker %d] Received cancellation signal during retry wait\n", id)
results <- Result{TaskID: task.ID, Error: ctx.Err(), Attempts: attempts}
return
case <-time.After(time.Duration(attempts) * 500 * time.Millisecond):
// Linear backoff: 1st wait 500ms, 2nd wait 1s, 3rd wait 1.5s ...
}
}
}
results <- Result{
TaskID: task.ID,
Output: output,
Error: err,
Attempts: attempts,
}
if err != nil {
fmt.Printf("[Worker %d] Task #%d ultimately failed (attempted %d times)\n", id, task.ID, attempts)
} else {
fmt.Printf("[Worker %d] Task #%d completed\n", id, task.ID)
}
}
fmt.Printf("[Worker %d] Task channel closed, exiting\n", id)
}
3. Implement Scheduler
// Scheduler is the task scheduling engine
type Scheduler struct {
workerCount int // Number of workers
maxRetries int // Maximum retry count
taskTimeout time.Duration // Per-task timeout
}
// NewScheduler creates a new scheduler
func NewScheduler(workerCount, maxRetries int, taskTimeout time.Duration) *Scheduler {
return &Scheduler{
workerCount: workerCount,
maxRetries: maxRetries,
taskTimeout: taskTimeout,
}
}
// Run starts the scheduler, processes the given task list, and returns all results
func (s *Scheduler) Run(ctx context.Context, taskList []Task) []Result {
// Create channels
tasks := make(chan Task, len(taskList))
results := make(chan Result, len(taskList))
// Result collector
collector := &ResultCollector{}
// Start result collection goroutine
var collectorWg sync.WaitGroup
collectorWg.Add(1)
go func() {
defer collectorWg.Done()
for r := range results {
collector.Add(r)
}
}()
// Start worker pool
var workerWg sync.WaitGroup
for i := 1; i <= s.workerCount; i++ {
workerWg.Add(1)
go Worker(ctx, i, tasks, results, s.maxRetries, &workerWg)
}
// Submit all tasks
go func() {
for _, task := range taskList {
select {
case tasks <- task:
fmt.Printf("[Scheduler] Submitted task #%d\n", task.ID)
case <-ctx.Done():
fmt.Println("[Scheduler] Context cancelled, stopping task submission")
return
}
}
close(tasks) // Close task channel, notify workers no more tasks
}()
// Wait for all workers to complete
workerWg.Wait()
close(results) // Close result channel
// Wait for collector to process all results
collectorWg.Wait()
return collector.All()
}
4. Simulate Tasks and Main Function
// createDemoTasks creates a set of simulated tasks; some will fail, some will timeout
func createDemoTasks(count int) []Task {
tasks := make([]Task, count)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < count; i++ {
id := i + 1
behavior := rng.Intn(4) // 0-3, determines task behavior
switch behavior {
case 0:
// Normal completion
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Normal task-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(100+rng.Intn(400)) * time.Millisecond)
return fmt.Sprintf("Task #%d succeeded", id), nil
},
}
case 1:
// Occasionally fails (1st attempt fails, 2nd succeeds)
failCount := 0
var mu sync.Mutex
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Unstable task-%d", id),
Execute: func(ctx context.Context) (string, error) {
mu.Lock()
failCount++
current := failCount
mu.Unlock()
time.Sleep(time.Duration(50+rng.Intn(200)) * time.Millisecond)
if current <= 1 {
return nil, fmt.Errorf("Task #%d simulated failure", id)
}
return fmt.Sprintf("Task #%d succeeded on attempt %d", id, current), nil
},
}
case 2:
// Execution takes too long (will trigger timeout)
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Slow task-%d", id),
Execute: func(ctx context.Context) (string, error) {
select {
case <-time.After(5 * time.Second): // Far exceeds 2-second timeout
return fmt.Sprintf("Task #%d completed", id), nil
case <-ctx.Done():
return "", fmt.Errorf("Task #%d cancelled: %w", id, ctx.Err())
}
},
}
default:
// Always fails
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Failing task-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(50+rng.Intn(150)) * time.Millisecond)
return nil, fmt.Errorf("Task #%d unrecoverable error", id)
},
}
}
}
return tasks
}
func main() {
fmt.Println("=== Concurrent Task Scheduler ===")
fmt.Println()
// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Listen for OS interrupt signal (Ctrl+C) to trigger graceful shutdown
go func() {
// Simulate: send cancellation signal after 8 seconds (use signal.Notify in production)
time.Sleep(8 * time.Second)
fmt.Println("\n[Main] Received cancellation signal, starting graceful shutdown...")
cancel()
}()
// Create scheduler: 3 workers, max 3 retries, 2-second per-task timeout
scheduler := NewScheduler(3, 3, 2*time.Second)
// Generate 10 simulated tasks
taskList := createDemoTasks(10)
// Execute scheduling
fmt.Printf("[Main] Submitting %d tasks, starting %d workers\n\n", len(taskList), 3)
start := time.Now()
results := scheduler.Run(ctx, taskList)
elapsed := time.Since(start)
// Output result summary
fmt.Println("\n=============================")
fmt.Println(" Execution Result Summary ")
fmt.Println("=============================")
successCount := 0
failCount := 0
cancelCount := 0
for _, r := range results {
status := "✓"
detail := r.Output
if r.Error != nil {
if ctx.Err() != nil {
status = "⊘"
detail = "Cancelled"
cancelCount++
} else {
status = "✗"
detail = r.Error.Error()
failCount++
}
} else {
successCount++
}
fmt.Printf(" %s Task #%02d | Attempts: %d | %s\n",
status, r.TaskID, r.Attempts, detail)
}
fmt.Println("-----------------------------")
fmt.Printf(" Total: %d | Success: %d | Failed: %d | Cancelled: %d\n",
len(results), successCount, failCount, cancelCount)
fmt.Printf(" Time: %v\n", elapsed)
}
5. Running Effect
go run main.go
Example output (results are random each run):
=== Concurrent Task Scheduler ===
[Main] Submitting 10 tasks, starting 3 workers
[Scheduler] Submitted task #1
[Scheduler] Submitted task #2
[Scheduler] Submitted task #3
[Scheduler] Submitted task #4
[Scheduler] Submitted task #5
[Scheduler] Submitted task #6
[Scheduler] Submitted task #7
[Scheduler] Submitted task #8
[Scheduler] Submitted task #9
[Scheduler] Submitted task #10
[Worker 1] Starting task #1
[Worker 2] Starting task #2
[Worker 3] Starting task #3
[Worker 1] Task #1 completed
[Worker 1] Starting task #4
[Worker 2] Task #2 attempt 1 failed: Task #2 simulated failure
[Worker 3] Task #3 completed
[Worker 3] Starting task #5
[Worker 2] Starting task #6
[Worker 2] Task #6 attempt 1 failed: Task #6 unrecoverable error
[Worker 3] Task #5 completed
[Worker 3] Starting task #7
[Worker 2] Task #6 attempt 2 failed: Task #6 unrecoverable error
[Worker 1] Task #4 attempt 1 failed: Task #4 simulated failure
[Worker 2] Task #6 attempt 3 failed: Task #6 unrecoverable error
[Worker 2] Task #6 ultimately failed (attempted 3 times)
[Worker 2] Starting task #8
...
[Worker 1] Task channel closed, exiting
[Worker 2] Task channel closed, exiting
[Worker 3] Task channel closed, exiting
=============================
Execution Result Summary
=============================
✓ Task #01 | Attempts: 1 | Task #1 succeeded
⊘ Task #02 | Attempts: 2 | Cancelled
✓ Task #03 | Attempts: 1 | Task #3 succeeded
⊘ Task #04 | Attempts: 2 | Cancelled
✓ Task #05 | Attempts: 1 | Task #5 succeeded
✗ Task #06 | Attempts: 3 | Task #6 unrecoverable error
✓ Task #07 | Attempts: 1 | Task #7 succeeded
⊘ Task #08 | Attempts: 1 | Cancelled
✓ Task #09 | Attempts: 1 | Task #9 succeeded
⊘ Task #10 | Attempts: 1 | Cancelled
-----------------------------
Total: 10 | Success: 5 | Failed: 1 | Cancelled: 4
Time: 8.012s
Code Analysis
Core Concurrency Patterns
1. Fan-out / Fan-in
This is the core pattern of the scheduler:
┌─ Worker 1 ─┐
Tasks ─────┼─ Worker 2 ─┼───── Results
└─ Worker 3 ─┘
- Fan-out: Multiple workers read from the same
taskschannel, automatically distributing tasks - Fan-in: All workers write results to the same
resultschannel, aggregating results
2. context.Context Cascading Cancellation
Root Context (main)
└─ WithCancel
├─ Worker 1's WithTimeout
│ └─ taskCtx (per-task timeout 2s)
├─ Worker 2's WithTimeout
│ └─ taskCtx (per-task timeout 2s)
└─ Worker 3's WithTimeout
└─ taskCtx (per-task timeout 2s)
When the main program calls cancel(), all child contexts receive the cancellation signal, achieving cascading cancellation.
3. WaitGroup Synchronization
var wg sync.WaitGroup
wg.Add(1) // Each worker launched increments count by 1
go func() {
defer wg.Done() // Worker exit decrements count by 1
// ...
}()
wg.Wait() // Blocks until count reaches zero
4. select Multiplexing
Workers listen to multiple signal sources simultaneously:
select {
case <-ctx.Done(): // Context cancelled
return
case <-time.After(d): // Retry backoff
// Continue retrying
}
Retry Backoff Strategy
This example uses linear backoff: wait time per retry = attempts × 500ms. In production, exponential backoff is recommended:
backoff := time.Duration(1<<uint(attempts)) * 100 * time.Millisecond
// 1st: 200ms, 2nd: 400ms, 3rd: 800ms
You can also add random jitter to prevent "thundering herd" effects when multiple tasks retry simultaneously.
❓ FAQ
1. Why does Worker use range tasks instead of directly checking ctx.Done()?
range tasks automatically exits the loop when the channel closes, which is the standard producer-consumer pattern. ctx.Done() is used to respond to cancellation signals during waiting or retrying. They work together:
- Channel closed → Normal exit (all tasks have been submitted)
- Context cancelled → Early exit (external request to terminate)
If you only use ctx.Done(), you need additional handling for channel closure; if you only use range, you can't respond to cancellation.
2. How to determine the buffer size for the results channel?
This example uses make(chan Result, len(taskList)) with the total task count as the buffer size, ensuring all results can be written without blocking. If the task count is very large (millions), you can:
- Use an unbuffered channel + a separate collection goroutine
- Process in batches, resetting the channel after each batch
- Use
sync.Mapor a locked slice instead of a channel
3. How to implement true exponential backoff?
func exponentialBackoff(attempt int) time.Duration {
base := 100 * time.Millisecond
max := 10 * time.Second
backoff := base * time.Duration(1<<uint(attempt))
if backoff > max {
backoff = max
}
// Add random jitter: ±25%
jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
return backoff - backoff/4 + jitter
}
4. How to limit the number of simultaneously executing tasks (concurrency control)?
This example naturally limits concurrency through the number of workers. Another approach is to use a semaphore:
sem := make(chan struct{}, 10) // max 10 concurrent
for _, task := range taskList {
sem <- struct{}{} // acquire permit
go func(t Task) {
defer func() { <-sem }() // release permit
t.Execute(ctx)
}(task)
}
📖 Summary
This lesson comprehensively applied Go's core concurrency technologies:
| Technology | Application | Key Code |
|---|---|---|
| goroutine | Workers execute tasks in parallel | go Worker(...) |
| channel | Task distribution and result collection | tasks <-chan Task |
| select | Multiplexing: cancellation, timeout, backoff | select { case <-ctx.Done(): ... } |
| sync.WaitGroup | Wait for all workers to complete | wg.Wait() |
| sync.Mutex | Concurrent-safe result collection | rc.mu.Lock() |
| context.Context | Cascading cancellation and timeout control | context.WithTimeout(ctx, 2*time.Second) |
Design Principles Review:
- Use channels to pass data, don't communicate by sharing memory
- Use context to propagate cancellation signals throughout the call chain
- WaitGroup is used to wait for a group of goroutines to complete
- Mutex is used to protect shared state (the result collector in this example)
- Closing a channel acts as a broadcast signal, notifying all consumers
📝 Exercises
Exercise 1: Add Priority Queue
Modify the scheduler to support task priorities (high/medium/low). High-priority tasks should be processed by workers first.
Hint: You can use multiple channels or implement a priority queue structure.
Exercise 2: Implement Rate Limiting
Add rate limiting to the scheduler, e.g., a maximum of 5 tasks per second (token bucket algorithm).
Hint: Use time.Ticker or the third-party library golang.org/x/time/rate.
limiter := rate.NewLimiter(5, 1) // 5 per second, burst of 1
for task := range tasks {
limiter.Wait(ctx) // wait for token
task.Execute(ctx)
}
Exercise 3: Add Task Progress Callback
Add an OnProgress callback function to the scheduler that is called when each task completes, reporting current progress (completed/total).
type Scheduler struct {
// ... other fields
OnProgress func(completed, total int)
}



