Concurrency Practice: Task Scheduler

Concurrency Practice: Task Scheduler

Imagine a package sorting center: a conveyor belt continuously delivers packages (tasks), and multiple sorters (goroutines) work simultaneously — some scan, some classify, some load. Each stage passes packages through a pipeline (channel), and the scheduler monitors overall progress, handling any issues promptly. This is how a concurrent task scheduler works — multiple workers collaborate to complete a large number of tasks, both efficiently and reliably.

In this lesson, we'll comprehensively apply the goroutines, channels, select, sync package, and context.Context we've learned to build a fully functional concurrent task scheduler from scratch.


Project Requirements

We need to build a task scheduler that meets the following requirements:

  1. Concurrent execution: Support multiple workers processing tasks simultaneously
  2. Task submission: External systems can dynamically submit tasks
  3. Timeout control: Individual tasks are automatically cancelled after a timeout
  4. Failure retry: Failed tasks are automatically retried, up to N times
  5. Result collection: Concurrently and safely collect all task results
  6. Graceful shutdown: After receiving a cancellation signal, wait for running tasks to complete before exiting

System Design

The overall architecture is divided into three layers:

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│ Task Submit  │────▶│  Scheduler   │────▶│  Worker Pool │
│  (Producer)  │     │   Engine     │     │  (Workers)   │
└─────────────┘     └──────────────┘     └─────────────┘
                           │                     │
                           ▼                     ▼
                     ┌──────────────┐     ┌─────────────┐
                     │   Result     │◀────│    Task      │
                     │  Collector   │     │  Execution   │
                     └──────────────┘     └─────────────┘

Core Components:

Component Responsibility Implementation
Task Represents an executable task Struct + function type
Result Stores task execution result Struct + channel
Worker Executes specific tasks goroutine
Scheduler Coordinates task distribution and execution channel + select
ResultCollector Concurrently and safely collects results sync.Mutex

Example 1: Complete Code

1. Define Data Structures

GO
package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task represents a task to be executed
type Task struct {
	ID      int                          // Unique task identifier
	Payload string                       // Task data
	Execute func(ctx context.Context) (string, error) // Task execution function
}

// Result represents a task's execution result
type Result struct {
	TaskID   int    // Corresponding task ID
	Output   string // Output on successful execution
	Error    error  // Error on failed execution
	Attempts int    // Actual number of attempts
}

// ResultCollector safely collects task results concurrently
type ResultCollector struct {
	mu      sync.Mutex
	results []Result
}

// Add adds a result (concurrent-safe)
func (rc *ResultCollector) Add(r Result) {
	rc.mu.Lock()
	defer rc.mu.Unlock()
	rc.results = append(rc.results, r)
}

// All returns all collected results
func (rc *ResultCollector) All() []Result {
	rc.mu.Lock()
	defer rc.mu.Unlock()
	// Return a copy to avoid external modification
	out := make([]Result, len(rc.results))
	copy(out, rc.results)
	return out
}

2. Implement Worker

GO
// Worker receives tasks from the task channel, executes them, and sends results to the result channel
func Worker(
	ctx context.Context,
	id int,
	tasks <-chan Task,
	results chan<- Result,
	maxRetries int,
	wg *sync.WaitGroup,
) {
	defer wg.Done()

	for task := range tasks {
		// Check if cancelled
		select {
		case <-ctx.Done():
			fmt.Printf("[Worker %d] Context cancelled, exiting\n", id)
			return
		default:
		}

		fmt.Printf("[Worker %d] Starting task #%d\n", id, task.ID)

		var output string
		var err error
		attempts := 0

		// Execution logic with retry
		for attempts < maxRetries {
			attempts++

			// Create a sub-context with timeout for each task (max 2 seconds)
			taskCtx, taskCancel := context.WithTimeout(ctx, 2*time.Second)
			output, err = task.Execute(taskCtx)
			taskCancel()

			if err == nil {
				break // Success, exit retry loop
			}

			fmt.Printf("[Worker %d] Task #%d attempt %d failed: %v\n",
				id, task.ID, attempts, err)

			if attempts < maxRetries {
				// Wait before retrying, while listening for cancellation
				select {
				case <-ctx.Done():
					fmt.Printf("[Worker %d] Received cancellation signal during retry wait\n", id)
					results <- Result{TaskID: task.ID, Error: ctx.Err(), Attempts: attempts}
					return
				case <-time.After(time.Duration(attempts) * 500 * time.Millisecond):
					// Linear backoff: 1st wait 500ms, 2nd wait 1s, 3rd wait 1.5s ...
				}
			}
		}

		results <- Result{
			TaskID:   task.ID,
			Output:   output,
			Error:    err,
			Attempts: attempts,
		}

		if err != nil {
			fmt.Printf("[Worker %d] Task #%d ultimately failed (attempted %d times)\n", id, task.ID, attempts)
		} else {
			fmt.Printf("[Worker %d] Task #%d completed\n", id, task.ID)
		}
	}

	fmt.Printf("[Worker %d] Task channel closed, exiting\n", id)
}

3. Implement Scheduler

GO
// Scheduler is the task scheduling engine
type Scheduler struct {
	workerCount int            // Number of workers
	maxRetries  int            // Maximum retry count
	taskTimeout time.Duration  // Per-task timeout
}

// NewScheduler creates a new scheduler
func NewScheduler(workerCount, maxRetries int, taskTimeout time.Duration) *Scheduler {
	return &Scheduler{
		workerCount: workerCount,
		maxRetries:  maxRetries,
		taskTimeout: taskTimeout,
	}
}

// Run starts the scheduler, processes the given task list, and returns all results
func (s *Scheduler) Run(ctx context.Context, taskList []Task) []Result {
	// Create channels
	tasks := make(chan Task, len(taskList))
	results := make(chan Result, len(taskList))

	// Result collector
	collector := &ResultCollector{}

	// Start result collection goroutine
	var collectorWg sync.WaitGroup
	collectorWg.Add(1)
	go func() {
		defer collectorWg.Done()
		for r := range results {
			collector.Add(r)
		}
	}()

	// Start worker pool
	var workerWg sync.WaitGroup
	for i := 1; i <= s.workerCount; i++ {
		workerWg.Add(1)
		go Worker(ctx, i, tasks, results, s.maxRetries, &workerWg)
	}

	// Submit all tasks
	go func() {
		for _, task := range taskList {
			select {
			case tasks <- task:
				fmt.Printf("[Scheduler] Submitted task #%d\n", task.ID)
			case <-ctx.Done():
				fmt.Println("[Scheduler] Context cancelled, stopping task submission")
				return
			}
		}
		close(tasks) // Close task channel, notify workers no more tasks
	}()

	// Wait for all workers to complete
	workerWg.Wait()
	close(results) // Close result channel

	// Wait for collector to process all results
	collectorWg.Wait()

	return collector.All()
}

4. Simulate Tasks and Main Function

GO
// createDemoTasks creates a set of simulated tasks; some will fail, some will timeout
func createDemoTasks(count int) []Task {
	tasks := make([]Task, count)
	rng := rand.New(rand.NewSource(time.Now().UnixNano()))

	for i := 0; i < count; i++ {
		id := i + 1
		behavior := rng.Intn(4) // 0-3, determines task behavior

		switch behavior {
		case 0:
			// Normal completion
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Normal task-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					time.Sleep(time.Duration(100+rng.Intn(400)) * time.Millisecond)
					return fmt.Sprintf("Task #%d succeeded", id), nil
				},
			}
		case 1:
			// Occasionally fails (1st attempt fails, 2nd succeeds)
			failCount := 0
			var mu sync.Mutex
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Unstable task-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					mu.Lock()
					failCount++
					current := failCount
					mu.Unlock()

					time.Sleep(time.Duration(50+rng.Intn(200)) * time.Millisecond)

					if current <= 1 {
						return nil, fmt.Errorf("Task #%d simulated failure", id)
					}
					return fmt.Sprintf("Task #%d succeeded on attempt %d", id, current), nil
				},
			}
		case 2:
			// Execution takes too long (will trigger timeout)
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Slow task-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					select {
					case <-time.After(5 * time.Second): // Far exceeds 2-second timeout
						return fmt.Sprintf("Task #%d completed", id), nil
					case <-ctx.Done():
						return "", fmt.Errorf("Task #%d cancelled: %w", id, ctx.Err())
					}
				},
			}
		default:
			// Always fails
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Failing task-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					time.Sleep(time.Duration(50+rng.Intn(150)) * time.Millisecond)
					return nil, fmt.Errorf("Task #%d unrecoverable error", id)
				},
			}
		}
	}

	return tasks
}

func main() {
	fmt.Println("=== Concurrent Task Scheduler ===")
	fmt.Println()

	// Create a cancellable context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Listen for OS interrupt signal (Ctrl+C) to trigger graceful shutdown
	go func() {
		// Simulate: send cancellation signal after 8 seconds (use signal.Notify in production)
		time.Sleep(8 * time.Second)
		fmt.Println("\n[Main] Received cancellation signal, starting graceful shutdown...")
		cancel()
	}()

	// Create scheduler: 3 workers, max 3 retries, 2-second per-task timeout
	scheduler := NewScheduler(3, 3, 2*time.Second)

	// Generate 10 simulated tasks
	taskList := createDemoTasks(10)

	// Execute scheduling
	fmt.Printf("[Main] Submitting %d tasks, starting %d workers\n\n", len(taskList), 3)
	start := time.Now()
	results := scheduler.Run(ctx, taskList)
	elapsed := time.Since(start)

	// Output result summary
	fmt.Println("\n=============================")
	fmt.Println("     Execution Result Summary    ")
	fmt.Println("=============================")

	successCount := 0
	failCount := 0
	cancelCount := 0

	for _, r := range results {
		status := "✓"
		detail := r.Output
		if r.Error != nil {
			if ctx.Err() != nil {
				status = "⊘"
				detail = "Cancelled"
				cancelCount++
			} else {
				status = "✗"
				detail = r.Error.Error()
				failCount++
			}
		} else {
			successCount++
		}
		fmt.Printf("  %s Task #%02d | Attempts: %d | %s\n",
			status, r.TaskID, r.Attempts, detail)
	}

	fmt.Println("-----------------------------")
	fmt.Printf("  Total: %d | Success: %d | Failed: %d | Cancelled: %d\n",
		len(results), successCount, failCount, cancelCount)
	fmt.Printf("  Time: %v\n", elapsed)
}

5. Running Effect

BASH
go run main.go

Example output (results are random each run):

TEXT
=== Concurrent Task Scheduler ===

[Main] Submitting 10 tasks, starting 3 workers

[Scheduler] Submitted task #1
[Scheduler] Submitted task #2
[Scheduler] Submitted task #3
[Scheduler] Submitted task #4
[Scheduler] Submitted task #5
[Scheduler] Submitted task #6
[Scheduler] Submitted task #7
[Scheduler] Submitted task #8
[Scheduler] Submitted task #9
[Scheduler] Submitted task #10
[Worker 1] Starting task #1
[Worker 2] Starting task #2
[Worker 3] Starting task #3
[Worker 1] Task #1 completed
[Worker 1] Starting task #4
[Worker 2] Task #2 attempt 1 failed: Task #2 simulated failure
[Worker 3] Task #3 completed
[Worker 3] Starting task #5
[Worker 2] Starting task #6
[Worker 2] Task #6 attempt 1 failed: Task #6 unrecoverable error
[Worker 3] Task #5 completed
[Worker 3] Starting task #7
[Worker 2] Task #6 attempt 2 failed: Task #6 unrecoverable error
[Worker 1] Task #4 attempt 1 failed: Task #4 simulated failure
[Worker 2] Task #6 attempt 3 failed: Task #6 unrecoverable error
[Worker 2] Task #6 ultimately failed (attempted 3 times)
[Worker 2] Starting task #8
...
[Worker 1] Task channel closed, exiting
[Worker 2] Task channel closed, exiting
[Worker 3] Task channel closed, exiting

=============================
     Execution Result Summary    
=============================
  ✓ Task #01 | Attempts: 1 | Task #1 succeeded
  ⊘ Task #02 | Attempts: 2 | Cancelled
  ✓ Task #03 | Attempts: 1 | Task #3 succeeded
  ⊘ Task #04 | Attempts: 2 | Cancelled
  ✓ Task #05 | Attempts: 1 | Task #5 succeeded
  ✗ Task #06 | Attempts: 3 | Task #6 unrecoverable error
  ✓ Task #07 | Attempts: 1 | Task #7 succeeded
  ⊘ Task #08 | Attempts: 1 | Cancelled
  ✓ Task #09 | Attempts: 1 | Task #9 succeeded
  ⊘ Task #10 | Attempts: 1 | Cancelled
-----------------------------
  Total: 10 | Success: 5 | Failed: 1 | Cancelled: 4
  Time: 8.012s

Code Analysis

Core Concurrency Patterns

1. Fan-out / Fan-in

This is the core pattern of the scheduler:

TEXT
           ┌─ Worker 1 ─┐
Tasks ─────┼─ Worker 2 ─┼───── Results
           └─ Worker 3 ─┘

2. context.Context Cascading Cancellation

Root Context (main)
  └─ WithCancel
       ├─ Worker 1's WithTimeout
       │    └─ taskCtx (per-task timeout 2s)
       ├─ Worker 2's WithTimeout
       │    └─ taskCtx (per-task timeout 2s)
       └─ Worker 3's WithTimeout
            └─ taskCtx (per-task timeout 2s)

When the main program calls cancel(), all child contexts receive the cancellation signal, achieving cascading cancellation.

3. WaitGroup Synchronization

GO
var wg sync.WaitGroup
wg.Add(1)        // Each worker launched increments count by 1
go func() {
    defer wg.Done()  // Worker exit decrements count by 1
    // ...
}()
wg.Wait()         // Blocks until count reaches zero

4. select Multiplexing

Workers listen to multiple signal sources simultaneously:

GO
select {
case <-ctx.Done():       // Context cancelled
    return
case <-time.After(d):    // Retry backoff
    // Continue retrying
}

Retry Backoff Strategy

This example uses linear backoff: wait time per retry = attempts × 500ms. In production, exponential backoff is recommended:

GO
backoff := time.Duration(1<<uint(attempts)) * 100 * time.Millisecond
// 1st: 200ms, 2nd: 400ms, 3rd: 800ms

You can also add random jitter to prevent "thundering herd" effects when multiple tasks retry simultaneously.


❓ FAQ

1. Why does Worker use range tasks instead of directly checking ctx.Done()?

range tasks automatically exits the loop when the channel closes, which is the standard producer-consumer pattern. ctx.Done() is used to respond to cancellation signals during waiting or retrying. They work together:

If you only use ctx.Done(), you need additional handling for channel closure; if you only use range, you can't respond to cancellation.

2. How to determine the buffer size for the results channel?

This example uses make(chan Result, len(taskList)) with the total task count as the buffer size, ensuring all results can be written without blocking. If the task count is very large (millions), you can:

3. How to implement true exponential backoff?

GO
func exponentialBackoff(attempt int) time.Duration {
    base := 100 * time.Millisecond
    max := 10 * time.Second
    backoff := base * time.Duration(1<<uint(attempt))
    if backoff > max {
        backoff = max
    }
    // Add random jitter: ±25%
    jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
    return backoff - backoff/4 + jitter
}

4. How to limit the number of simultaneously executing tasks (concurrency control)?

This example naturally limits concurrency through the number of workers. Another approach is to use a semaphore:

GO
sem := make(chan struct{}, 10) // max 10 concurrent
for _, task := range taskList {
    sem <- struct{}{} // acquire permit
    go func(t Task) {
        defer func() { <-sem }() // release permit
        t.Execute(ctx)
    }(task)
}

📖 Summary

This lesson comprehensively applied Go's core concurrency technologies:

Technology Application Key Code
goroutine Workers execute tasks in parallel go Worker(...)
channel Task distribution and result collection tasks <-chan Task
select Multiplexing: cancellation, timeout, backoff select { case <-ctx.Done(): ... }
sync.WaitGroup Wait for all workers to complete wg.Wait()
sync.Mutex Concurrent-safe result collection rc.mu.Lock()
context.Context Cascading cancellation and timeout control context.WithTimeout(ctx, 2*time.Second)

Design Principles Review:


📝 Exercises

Exercise 1: Add Priority Queue

Modify the scheduler to support task priorities (high/medium/low). High-priority tasks should be processed by workers first.

Hint: You can use multiple channels or implement a priority queue structure.

Exercise 2: Implement Rate Limiting

Add rate limiting to the scheduler, e.g., a maximum of 5 tasks per second (token bucket algorithm).

Hint: Use time.Ticker or the third-party library golang.org/x/time/rate.

GO
limiter := rate.NewLimiter(5, 1) // 5 per second, burst of 1
for task := range tasks {
    limiter.Wait(ctx) // wait for token
    task.Execute(ctx)
}

Exercise 3: Add Task Progress Callback

Add an OnProgress callback function to the scheduler that is called when each task completes, reporting current progress (completed/total).

GO
type Scheduler struct {
    // ... other fields
    OnProgress func(completed, total int)
}

Next Lesson: Web Crawler Practice →

Web-Tutorial.com

Web-Tutorial Tech Team

A team of developers maintaining programming tutorials. Each tutorial is written and reviewed by developers with expertise in that field. We work to keep our content accurate and reliable — if you spot an issue, please let us know.

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏