Prática de Concorrência: Agendador de Tarefas

Prática de Concorrência: Agendador de Tarefas

Imagine um centro de classificação de pacotes: uma esteira transportadora entrega pacotes (tarefas) continuamente, e múltiplos classificadores (goroutines) trabalham simultaneamente — alguns escaneiam, alguns classificam, alguns carregam. Cada estágio passa pacotes através de um pipeline (channel), e o agendador monitora o progresso geral, lidando com quaisquer problemas prontamente. É assim que um agendador de tarefas concorrente funciona — múltiplos workers colaboram para completar um grande número de tarefas, de forma eficiente e confiável.

Nesta lição, aplicaremos abrangentemente as goroutines, channels, select, pacote sync e context.Context que aprendemos para construir do zero um agendador de tarefas concorrente totalmente funcional.


Requisitos do Projeto

Precisamos construir um agendador de tarefas que atenda aos seguintes requisitos:

  1. Execução concorrente: Suportar múltiplos workers processando tarefas simultaneamente
  2. Submissão de tarefas: Sistemas externos podem submeter tarefas dinamicamente
  3. Controle de timeout: Tarefas individuais são automaticamente canceladas após um timeout
  4. Tentativa em caso de falha: Tarefas com falha são automaticamente tentadas novamente, até N vezes
  5. Coleta de resultados: Coletar concorrente e seguramente todos os resultados das tarefas
  6. Desligamento gracioso: Após receber sinal de cancelamento, esperar tarefas em execução completarem antes de sair

Design do Sistema

A arquitetura geral é dividida em três camadas:

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│ Submissão   │────▶│  Agendador   │────▶│ Pool de     │
│ de Tarefas  │     │   Engine     │     │  Workers    │
│(Produtor)   │     │              │     │             │
└─────────────┘     └──────────────┘     └─────────────┘
                           │                     │
                           ▼                     ▼
                     ┌──────────────┐     ┌─────────────┐
                     │  Coletor de  │◀────│  Execução   │
                     │  Resultados  │     │  de Tarefas │
                     └──────────────┘     └─────────────┘

Componentes Principais:

Componente Responsabilidade Implementação
Task Representa uma tarefa executável Struct + tipo função
Result Armazena resultado da execução da tarefa Struct + channel
Worker Executa tarefas específicas goroutine
Scheduler Coordena distribuição e execução de tarefas channel + select
ResultCollector Coleta resultados concorrente e seguramente sync.Mutex

Exemplo 1: Código Completo

1. Definindo Estruturas de Dados

GO
package main

import (
	"context"
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task representa uma tarefa a ser executada
type Task struct {
	ID      int                          // Identificador único da tarefa
	Payload string                       // Dados da tarefa
	Execute func(ctx context.Context) (string, error) // Função de execução da tarefa
}

// Result representa o resultado da execução de uma tarefa
type Result struct {
	TaskID   int    // ID da tarefa correspondente
	Output   string // Saída em caso de sucesso
	Error    error  // Erro em caso de falha
	Attempts int    // Número real de tentativas
}

// ResultCollector coleta resultados de tarefas de forma concorrente e segura
type ResultCollector struct {
	mu      sync.Mutex
	results []Result
}

// Add adiciona um resultado (concorrente seguro)
func (rc *ResultCollector) Add(r Result) {
	rc.mu.Lock()
	defer rc.mu.Unlock()
	rc.results = append(rc.results, r)
}

// All retorna todos os resultados coletados
func (rc *ResultCollector) All() []Result {
	rc.mu.Lock()
	defer rc.mu.Unlock()
	// Retorna uma cópia para evitar modificação externa
	out := make([]Result, len(rc.results))
	copy(out, rc.results)
	return out
}

2. Implementando o Worker

GO
// Worker recebe tarefas do channel de tarefas, executa e envia resultados para o channel de resultados
func Worker(
	ctx context.Context,
	id int,
	tasks <-chan Task,
	results chan<- Result,
	maxRetries int,
	wg *sync.WaitGroup,
) {
	defer wg.Done()

	for task := range tasks {
		// Verifica se foi cancelado
		select {
		case <-ctx.Done():
			fmt.Printf("[Worker %d] Contexto cancelado, saindo\n", id)
			return
		default:
		}

		fmt.Printf("[Worker %d] Iniciando tarefa #%d\n", id, task.ID)

		var output string
		var err error
		attempts := 0

		// Lógica de execução com tentativa
		for attempts < maxRetries {
			attempts++

			// Cria um sub-contexto com timeout para cada tarefa (máximo 2 segundos)
			taskCtx, taskCancel := context.WithTimeout(ctx, 2*time.Second)
			output, err = task.Execute(taskCtx)
			taskCancel()

			if err == nil {
				break // Sucesso, sai do loop de tentativa
			}

			fmt.Printf("[Worker %d] Tarefa #%d tentativa %d falhou: %v\n",
				id, task.ID, attempts, err)

			if attempts < maxRetries {
				// Espera antes de tentar novamente, enquanto escuta cancelamento
				select {
				case <-ctx.Done():
					fmt.Printf("[Worker %d] Sinal de cancelamento recebido durante espera de tentativa\n", id)
					results <- Result{TaskID: task.ID, Error: ctx.Err(), Attempts: attempts}
					return
				case <-time.After(time.Duration(attempts) * 500 * time.Millisecond):
					// Backoff linear: 1ª espera 500ms, 2ª espera 1s, 3ª espera 1.5s ...
				}
			}
		}

		results <- Result{
			TaskID:   task.ID,
			Output:   output,
			Error:    err,
			Attempts: attempts,
		}

		if err != nil {
			fmt.Printf("[Worker %d] Tarefa #%d falhou definitivamente (tentou %d vezes)\n", id, task.ID, attempts)
		} else {
			fmt.Printf("[Worker %d] Tarefa #%d concluída\n", id, task.ID)
		}
	}

	fmt.Printf("[Worker %d] Channel de tarefas fechado, saindo\n", id)
}

3. Implementando o Agendador

GO
// Scheduler é o engine de agendamento de tarefas
type Scheduler struct {
	workerCount int            // Número de workers
	maxRetries  int            // Contagem máxima de tentativas
	taskTimeout time.Duration  // Timeout por tarefa
}

// NewScheduler cria um novo agendador
func NewScheduler(workerCount, maxRetries int, taskTimeout time.Duration) *Scheduler {
	return &Scheduler{
		workerCount: workerCount,
		maxRetries:  maxRetries,
		taskTimeout: taskTimeout,
	}
}

// Run inicia o agendador, processa a lista de tarefas fornecida e retorna todos os resultados
func (s *Scheduler) Run(ctx context.Context, taskList []Task) []Result {
	// Cria channels
	tasks := make(chan Task, len(taskList))
	results := make(chan Result, len(taskList))

	// Coletor de resultados
	collector := &ResultCollector{}

	// Inicia goroutine de coleta de resultados
	var collectorWg sync.WaitGroup
	collectorWg.Add(1)
	go func() {
		defer collectorWg.Done()
		for r := range results {
			collector.Add(r)
		}
	}()

	// Inicia pool de workers
	var workerWg sync.WaitGroup
	for i := 1; i <= s.workerCount; i++ {
		workerWg.Add(1)
		go Worker(ctx, i, tasks, results, s.maxRetries, &workerWg)
	}

	// Submete todas as tarefas
	go func() {
		for _, task := range taskList {
			select {
			case tasks <- task:
				fmt.Printf("[Agendador] Tarefa #%d submetida\n", task.ID)
			case <-ctx.Done():
				fmt.Println("[Agendador] Contexto cancelado, parando submissão de tarefas")
				return
			}
		}
		close(tasks) // Fecha channel de tarefas, notifica workers que não há mais tarefas
	}()

	// Espera todos os workers completarem
	workerWg.Wait()
	close(results) // Fecha channel de resultados

	// Espera coletor processar todos os resultados
	collectorWg.Wait()

	return collector.All()
}

4. Simulando Tarefas e Função Principal

GO
// createDemoTasks cria um conjunto de tarefas simuladas; algumas vão falhar, algumas vão exceder o tempo
func createDemoTasks(count int) []Task {
	tasks := make([]Task, count)
	rng := rand.New(rand.NewSource(time.Now().UnixNano()))

	for i := 0; i < count; i++ {
		id := i + 1
		behavior := rng.Intn(4) // 0-3, determina comportamento da tarefa

		switch behavior {
		case 0:
			// Conclusão normal
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Tarefa normal-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					time.Sleep(time.Duration(100+rng.Intn(400)) * time.Millisecond)
					return fmt.Sprintf("Tarefa #%d bem-sucedida", id), nil
				},
			}
		case 1:
			// Falha ocasionalmente (1ª tentativa falha, 2ª sucede)
			failCount := 0
			var mu sync.Mutex
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Tarefa instável-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					mu.Lock()
					failCount++
					current := failCount
					mu.Unlock()

					time.Sleep(time.Duration(50+rng.Intn(200)) * time.Millisecond)

					if current <= 1 {
						return nil, fmt.Errorf("Tarefa #%d simulou falha", id)
					}
					return fmt.Sprintf("Tarefa #%d bem-sucedida na tentativa %d", id, current), nil
				},
			}
		case 2:
			// Execução demora demais (vai acionar timeout)
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Tarefa lenta-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					select {
					case <-time.After(5 * time.Second): // Excede muito o timeout de 2 segundos
						return fmt.Sprintf("Tarefa #%d concluída", id), nil
					case <-ctx.Done():
						return "", fmt.Errorf("Tarefa #%d cancelada: %w", id, ctx.Err())
					}
				},
			}
		default:
			// Sempre falha
			tasks[i] = Task{
				ID:      id,
				Payload: fmt.Sprintf("Tarefa com falha-%d", id),
				Execute: func(ctx context.Context) (string, error) {
					time.Sleep(time.Duration(50+rng.Intn(150)) * time.Millisecond)
					return nil, fmt.Errorf("Tarefa #%d erro irrecuperável", id)
				},
			}
		}
	}

	return tasks
}

func main() {
	fmt.Println("=== Agendador de Tarefas Concorrente ===")
	fmt.Println()

	// Cria um contexto cancelável
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Escuta sinal de interrupção do SO (Ctrl+C) para acionar desligamento gracioso
	go func() {
		// Simula: envia sinal de cancelamento após 8 segundos (use signal.Notify em produção)
		time.Sleep(8 * time.Second)
		fmt.Println("\n[Principal] Sinal de cancelamento recebido, iniciando desligamento gracioso...")
		cancel()
	}()

	// Cria agendador: 3 workers, máximo 3 tentativas, timeout de 2 segundos por tarefa
	scheduler := NewScheduler(3, 3, 2*time.Second)

	// Gera 10 tarefas simuladas
	taskList := createDemoTasks(10)

	// Executa agendamento
	fmt.Printf("[Principal] Submetendo %d tarefas, iniciando %d workers\n\n", len(taskList), 3)
	start := time.Now()
	results := scheduler.Run(ctx, taskList)
	elapsed := time.Since(start)

	// Exibe resumo dos resultados
	fmt.Println("\n=============================")
	fmt.Println("   Resumo dos Resultados     ")
	fmt.Println("=============================")

	successCount := 0
	failCount := 0
	cancelCount := 0

	for _, r := range results {
		status := "✓"
		detail := r.Output
		if r.Error != nil {
			if ctx.Err() != nil {
				status = "⊘"
				detail = "Cancelada"
				cancelCount++
			} else {
				status = "✗"
				detail = r.Error.Error()
				failCount++
			}
		} else {
			successCount++
		}
		fmt.Printf("  %s Tarefa #%02d | Tentativas: %d | %s\n",
			status, r.TaskID, r.Attempts, detail)
	}

	fmt.Println("-----------------------------")
	fmt.Printf("  Total: %d | Sucesso: %d | Falha: %d | Canceladas: %d\n",
		len(results), successCount, failCount, cancelCount)
	fmt.Printf("  Tempo: %v\n", elapsed)
}

5. Efeito da Execução

BASH
go run main.go

Exemplo de saída (resultados são aleatórios a cada execução):

TEXT
=== Agendador de Tarefas Concorrente ===

[Principal] Submetendo 10 tarefas, iniciando 3 workers

[Agendador] Tarefa #1 submetida
[Agendador] Tarefa #2 submetida
[Agendador] Tarefa #3 submetida
[Agendador] Tarefa #4 submetida
[Agendador] Tarefa #5 submetida
[Agendador] Tarefa #6 submetida
[Agendador] Tarefa #7 submetida
[Agendador] Tarefa #8 submetida
[Agendador] Tarefa #9 submetida
[Agendador] Tarefa #10 submetida
[Worker 1] Iniciando tarefa #1
[Worker 2] Iniciando tarefa #2
[Worker 3] Iniciando tarefa #3
[Worker 1] Tarefa #1 concluída
[Worker 1] Iniciando tarefa #4
[Worker 2] Tarefa #2 tentativa 1 falhou: Tarefa #2 simulou falha
[Worker 3] Tarefa #3 concluída
[Worker 3] Iniciando tarefa #5
[Worker 2] Iniciando tarefa #6
[Worker 2] Tarefa #6 tentativa 1 falhou: Tarefa #6 erro irrecuperável
[Worker 3] Tarefa #5 concluída
[Worker 3] Iniciando tarefa #7
[Worker 2] Tarefa #6 tentativa 2 falhou: Tarefa #6 erro irrecuperável
[Worker 1] Tarefa #4 tentativa 1 falhou: Tarefa #4 simulou falha
[Worker 2] Tarefa #6 tentativa 3 falhou: Tarefa #6 erro irrecuperável
[Worker 2] Tarefa #6 falhou definitivamente (tentou 3 vezes)
[Worker 2] Iniciando tarefa #8
...
[Worker 1] Channel de tarefas fechado, saindo
[Worker 2] Channel de tarefas fechado, saindo
[Worker 3] Channel de tarefas fechado, saindo

=============================
   Resumo dos Resultados     
=============================
  ✓ Tarefa #01 | Tentativas: 1 | Tarefa #1 bem-sucedida
  ⊘ Tarefa #02 | Tentativas: 2 | Cancelada
  ✓ Tarefa #03 | Tentativas: 1 | Tarefa #3 bem-sucedida
  ⊘ Tarefa #04 | Tentativas: 2 | Cancelada
  ✓ Tarefa #05 | Tentativas: 1 | Tarefa #5 bem-sucedida
  ✗ Tarefa #06 | Tentativas: 3 | Tarefa #6 erro irrecuperável
  ✓ Tarefa #07 | Tentativas: 1 | Tarefa #7 bem-sucedida
  ⊘ Tarefa #08 | Tentativas: 1 | Cancelada
  ✓ Tarefa #09 | Tentativas: 1 | Tarefa #9 bem-sucedida
  ⊘ Tarefa #10 | Tentativas: 1 | Cancelada
-----------------------------
  Total: 10 | Sucesso: 5 | Falha: 1 | Canceladas: 4
  Tempo: 8.012s

Análise do Código

Padrões Principais de Concorrência

1. Fan-out / Fan-in

Este é o padrão central do agendador:

           ┌─ Worker 1 ─┐
Tarefas ───┼─ Worker 2 ─┼───── Resultados
           └─ Worker 3 ─┘

2. Cancelamento Cascata com context.Context

Contexto Raiz (principal)
  └─ WithCancel
       ├─ WithTimeout do Worker 1
       │    └─ taskCtx (timeout de 2s por tarefa)
       ├─ WithTimeout do Worker 2
       │    └─ taskCtx (timeout de 2s por tarefa)
       └─ WithTimeout do Worker 3
            └─ taskCtx (timeout de 2s por tarefa)

Quando o programa principal chama cancel(), todos os contextos filhos recebem o sinal de cancelamento, alcançando cancelamento cascata.

3. Sincronização com WaitGroup

GO
var wg sync.WaitGroup
wg.Add(1)        // Cada worker lançado incrementa a contagem em 1
go func() {
    defer wg.Done()  // Saída do worker decrementa a contagem em 1
    // ...
}()
wg.Wait()         // Bloqueia até a contagem chegar a zero

4. Multiplexação com select

Workers escutam múltiplas fontes de sinal simultaneamente:

GO
select {
case <-ctx.Done():       // Contexto cancelado
    return
case <-time.After(d):    // Backoff de tentativa
    // Continua tentando
}

Estratégia de Backoff de Tentativa

Este exemplo usa backoff linear: tempo de espera por tentativa = attempts × 500ms. Em produção, recomenda-se backoff exponencial:

GO
backoff := time.Duration(1<<uint(attempts)) * 100 * time.Millisecond
// 1ª: 200ms, 2ª: 400ms, 3ª: 800ms

Você também pode adicionar jitter aleatório para prevenir efeitos de "avalanche" quando múltiplas tarefas tentam novamente simultaneamente.


❓ Perguntas Frequentes

1. Por que o Worker usa range tasks em vez de verificar diretamente ctx.Done()?

range tasks sai automaticamente do loop quando o channel é fechado, que é o padrão produtor-consumidor. ctx.Done() é usado para responder a sinais de cancelamento durante espera ou tentativa. Eles trabalham juntos:

Se você usar apenas ctx.Done(), precisa de tratamento adicional para fechamento de channel; se usar apenas range, não pode responder a cancelamento.

2. Como determinar o tamanho do buffer para o channel de resultados?

Este exemplo usa make(chan Result, len(taskList)) com o total de tarefas como tamanho do buffer, garantindo que todos os resultados possam ser escritos sem bloquear. Se a contagem de tarefas for muito grande (milhões), você pode:

3. Como implementar verdadeiro backoff exponencial?

GO
func exponentialBackoff(attempt int) time.Duration {
    base := 100 * time.Millisecond
    max := 10 * time.Second
    backoff := base * time.Duration(1<<uint(attempt))
    if backoff > max {
        backoff = max
    }
    // Adiciona jitter aleatório: ±25%
    jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
    return backoff - backoff/4 + jitter
}

4. Como limitar o número de tarefas executando simultaneamente (controle de concorrência)?

Este exemplo limita naturalmente a concorrência através do número de workers. Outra abordagem é usar um semáforo:

GO
sem := make(chan struct{}, 10) // máximo 10 concorrentes
for _, task := range taskList {
    sem <- struct{}{} // adquire permissão
    go func(t Task) {
        defer func() { <-sem }() // libera permissão
        t.Execute(ctx)
    }(task)
}

📖 Resumo

Esta lição aplicou abrangentemente as tecnologias centrais de concorrência do Go:

Tecnologia Aplicação Código-chave
goroutine Workers executam tarefas em paralelo go Worker(...)
channel Distribuição de tarefas e coleta de resultados tasks <-chan Task
select Multiplexação: cancelamento, timeout, backoff select { case <-ctx.Done(): ... }
sync.WaitGroup Espera todos os workers completarem wg.Wait()
sync.Mutex Coleta de resultados concorrente segura rc.mu.Lock()
context.Context Cancelamento cascata e controle de timeout context.WithTimeout(ctx, 2*time.Second)

Revisão dos Princípios de Design:


📝 Exercícios

Exercício 1: Adicionar Fila de Prioridade

Modifique o agendador para suportar prioridades de tarefas (alta/média/baixa). Tarefas de alta prioridade devem ser processadas pelos workers primeiro.

Dica: Você pode usar múltiplos channels ou implementar uma estrutura de fila de prioridade.

Exercício 2: Implementar Limitação de Taxa

Adicione limitação de taxa ao agendador, por exemplo, máximo de 5 tarefas por segundo (algoritmo de balde de tokens).

Dica: Use time.Ticker ou a biblioteca de terceiros golang.org/x/time/rate.

GO
limiter := rate.NewLimiter(5, 1) // 5 por segundo, burst de 1
for task := range tasks {
    limiter.Wait(ctx) // espera por token
    task.Execute(ctx)
}

Exercício 3: Adicionar Callback de Progresso de Tarefa

Adicione uma função de callback OnProgress ao agendador que é chamada quando cada tarefa é concluída, reportando o progresso atual (concluídas/total).

GO
type Scheduler struct {
    // ... outros campos
    OnProgress func(completed, total int)
}

Próxima Lição: Prática de Web Crawler →

Web-Tutorial.com

Equipe Técnica Web-Tutorial

Uma plataforma de tutoriais mantida por diversos desenvolvedores. Cada tutorial é escrito e revisado por profissionais da área correspondente. Trabalhamos para manter nosso conteúdo preciso e confiável — se encontrar algum problema, avise-nos.

100%