Prática de Concorrência: Agendador de Tarefas
Prática de Concorrência: Agendador de Tarefas
Imagine um centro de classificação de pacotes: uma esteira transportadora entrega pacotes (tarefas) continuamente, e múltiplos classificadores (goroutines) trabalham simultaneamente — alguns escaneiam, alguns classificam, alguns carregam. Cada estágio passa pacotes através de um pipeline (channel), e o agendador monitora o progresso geral, lidando com quaisquer problemas prontamente. É assim que um agendador de tarefas concorrente funciona — múltiplos workers colaboram para completar um grande número de tarefas, de forma eficiente e confiável.
Nesta lição, aplicaremos abrangentemente as goroutines, channels, select, pacote sync e context.Context que aprendemos para construir do zero um agendador de tarefas concorrente totalmente funcional.
Requisitos do Projeto
Precisamos construir um agendador de tarefas que atenda aos seguintes requisitos:
- Execução concorrente: Suportar múltiplos workers processando tarefas simultaneamente
- Submissão de tarefas: Sistemas externos podem submeter tarefas dinamicamente
- Controle de timeout: Tarefas individuais são automaticamente canceladas após um timeout
- Tentativa em caso de falha: Tarefas com falha são automaticamente tentadas novamente, até N vezes
- Coleta de resultados: Coletar concorrente e seguramente todos os resultados das tarefas
- Desligamento gracioso: Após receber sinal de cancelamento, esperar tarefas em execução completarem antes de sair
Design do Sistema
A arquitetura geral é dividida em três camadas:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Submissão │────▶│ Agendador │────▶│ Pool de │
│ de Tarefas │ │ Engine │ │ Workers │
│(Produtor) │ │ │ │ │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
▼ ▼
┌──────────────┐ ┌─────────────┐
│ Coletor de │◀────│ Execução │
│ Resultados │ │ de Tarefas │
└──────────────┘ └─────────────┘
Componentes Principais:
| Componente | Responsabilidade | Implementação |
|---|---|---|
Task |
Representa uma tarefa executável | Struct + tipo função |
Result |
Armazena resultado da execução da tarefa | Struct + channel |
Worker |
Executa tarefas específicas | goroutine |
Scheduler |
Coordena distribuição e execução de tarefas | channel + select |
ResultCollector |
Coleta resultados concorrente e seguramente | sync.Mutex |
Exemplo 1: Código Completo
1. Definindo Estruturas de Dados
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
// Task representa uma tarefa a ser executada
type Task struct {
ID int // Identificador único da tarefa
Payload string // Dados da tarefa
Execute func(ctx context.Context) (string, error) // Função de execução da tarefa
}
// Result representa o resultado da execução de uma tarefa
type Result struct {
TaskID int // ID da tarefa correspondente
Output string // Saída em caso de sucesso
Error error // Erro em caso de falha
Attempts int // Número real de tentativas
}
// ResultCollector coleta resultados de tarefas de forma concorrente e segura
type ResultCollector struct {
mu sync.Mutex
results []Result
}
// Add adiciona um resultado (concorrente seguro)
func (rc *ResultCollector) Add(r Result) {
rc.mu.Lock()
defer rc.mu.Unlock()
rc.results = append(rc.results, r)
}
// All retorna todos os resultados coletados
func (rc *ResultCollector) All() []Result {
rc.mu.Lock()
defer rc.mu.Unlock()
// Retorna uma cópia para evitar modificação externa
out := make([]Result, len(rc.results))
copy(out, rc.results)
return out
}
2. Implementando o Worker
// Worker recebe tarefas do channel de tarefas, executa e envia resultados para o channel de resultados
func Worker(
ctx context.Context,
id int,
tasks <-chan Task,
results chan<- Result,
maxRetries int,
wg *sync.WaitGroup,
) {
defer wg.Done()
for task := range tasks {
// Verifica se foi cancelado
select {
case <-ctx.Done():
fmt.Printf("[Worker %d] Contexto cancelado, saindo\n", id)
return
default:
}
fmt.Printf("[Worker %d] Iniciando tarefa #%d\n", id, task.ID)
var output string
var err error
attempts := 0
// Lógica de execução com tentativa
for attempts < maxRetries {
attempts++
// Cria um sub-contexto com timeout para cada tarefa (máximo 2 segundos)
taskCtx, taskCancel := context.WithTimeout(ctx, 2*time.Second)
output, err = task.Execute(taskCtx)
taskCancel()
if err == nil {
break // Sucesso, sai do loop de tentativa
}
fmt.Printf("[Worker %d] Tarefa #%d tentativa %d falhou: %v\n",
id, task.ID, attempts, err)
if attempts < maxRetries {
// Espera antes de tentar novamente, enquanto escuta cancelamento
select {
case <-ctx.Done():
fmt.Printf("[Worker %d] Sinal de cancelamento recebido durante espera de tentativa\n", id)
results <- Result{TaskID: task.ID, Error: ctx.Err(), Attempts: attempts}
return
case <-time.After(time.Duration(attempts) * 500 * time.Millisecond):
// Backoff linear: 1ª espera 500ms, 2ª espera 1s, 3ª espera 1.5s ...
}
}
}
results <- Result{
TaskID: task.ID,
Output: output,
Error: err,
Attempts: attempts,
}
if err != nil {
fmt.Printf("[Worker %d] Tarefa #%d falhou definitivamente (tentou %d vezes)\n", id, task.ID, attempts)
} else {
fmt.Printf("[Worker %d] Tarefa #%d concluída\n", id, task.ID)
}
}
fmt.Printf("[Worker %d] Channel de tarefas fechado, saindo\n", id)
}
3. Implementando o Agendador
// Scheduler é o engine de agendamento de tarefas
type Scheduler struct {
workerCount int // Número de workers
maxRetries int // Contagem máxima de tentativas
taskTimeout time.Duration // Timeout por tarefa
}
// NewScheduler cria um novo agendador
func NewScheduler(workerCount, maxRetries int, taskTimeout time.Duration) *Scheduler {
return &Scheduler{
workerCount: workerCount,
maxRetries: maxRetries,
taskTimeout: taskTimeout,
}
}
// Run inicia o agendador, processa a lista de tarefas fornecida e retorna todos os resultados
func (s *Scheduler) Run(ctx context.Context, taskList []Task) []Result {
// Cria channels
tasks := make(chan Task, len(taskList))
results := make(chan Result, len(taskList))
// Coletor de resultados
collector := &ResultCollector{}
// Inicia goroutine de coleta de resultados
var collectorWg sync.WaitGroup
collectorWg.Add(1)
go func() {
defer collectorWg.Done()
for r := range results {
collector.Add(r)
}
}()
// Inicia pool de workers
var workerWg sync.WaitGroup
for i := 1; i <= s.workerCount; i++ {
workerWg.Add(1)
go Worker(ctx, i, tasks, results, s.maxRetries, &workerWg)
}
// Submete todas as tarefas
go func() {
for _, task := range taskList {
select {
case tasks <- task:
fmt.Printf("[Agendador] Tarefa #%d submetida\n", task.ID)
case <-ctx.Done():
fmt.Println("[Agendador] Contexto cancelado, parando submissão de tarefas")
return
}
}
close(tasks) // Fecha channel de tarefas, notifica workers que não há mais tarefas
}()
// Espera todos os workers completarem
workerWg.Wait()
close(results) // Fecha channel de resultados
// Espera coletor processar todos os resultados
collectorWg.Wait()
return collector.All()
}
4. Simulando Tarefas e Função Principal
// createDemoTasks cria um conjunto de tarefas simuladas; algumas vão falhar, algumas vão exceder o tempo
func createDemoTasks(count int) []Task {
tasks := make([]Task, count)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < count; i++ {
id := i + 1
behavior := rng.Intn(4) // 0-3, determina comportamento da tarefa
switch behavior {
case 0:
// Conclusão normal
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Tarefa normal-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(100+rng.Intn(400)) * time.Millisecond)
return fmt.Sprintf("Tarefa #%d bem-sucedida", id), nil
},
}
case 1:
// Falha ocasionalmente (1ª tentativa falha, 2ª sucede)
failCount := 0
var mu sync.Mutex
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Tarefa instável-%d", id),
Execute: func(ctx context.Context) (string, error) {
mu.Lock()
failCount++
current := failCount
mu.Unlock()
time.Sleep(time.Duration(50+rng.Intn(200)) * time.Millisecond)
if current <= 1 {
return nil, fmt.Errorf("Tarefa #%d simulou falha", id)
}
return fmt.Sprintf("Tarefa #%d bem-sucedida na tentativa %d", id, current), nil
},
}
case 2:
// Execução demora demais (vai acionar timeout)
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Tarefa lenta-%d", id),
Execute: func(ctx context.Context) (string, error) {
select {
case <-time.After(5 * time.Second): // Excede muito o timeout de 2 segundos
return fmt.Sprintf("Tarefa #%d concluída", id), nil
case <-ctx.Done():
return "", fmt.Errorf("Tarefa #%d cancelada: %w", id, ctx.Err())
}
},
}
default:
// Sempre falha
tasks[i] = Task{
ID: id,
Payload: fmt.Sprintf("Tarefa com falha-%d", id),
Execute: func(ctx context.Context) (string, error) {
time.Sleep(time.Duration(50+rng.Intn(150)) * time.Millisecond)
return nil, fmt.Errorf("Tarefa #%d erro irrecuperável", id)
},
}
}
}
return tasks
}
func main() {
fmt.Println("=== Agendador de Tarefas Concorrente ===")
fmt.Println()
// Cria um contexto cancelável
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Escuta sinal de interrupção do SO (Ctrl+C) para acionar desligamento gracioso
go func() {
// Simula: envia sinal de cancelamento após 8 segundos (use signal.Notify em produção)
time.Sleep(8 * time.Second)
fmt.Println("\n[Principal] Sinal de cancelamento recebido, iniciando desligamento gracioso...")
cancel()
}()
// Cria agendador: 3 workers, máximo 3 tentativas, timeout de 2 segundos por tarefa
scheduler := NewScheduler(3, 3, 2*time.Second)
// Gera 10 tarefas simuladas
taskList := createDemoTasks(10)
// Executa agendamento
fmt.Printf("[Principal] Submetendo %d tarefas, iniciando %d workers\n\n", len(taskList), 3)
start := time.Now()
results := scheduler.Run(ctx, taskList)
elapsed := time.Since(start)
// Exibe resumo dos resultados
fmt.Println("\n=============================")
fmt.Println(" Resumo dos Resultados ")
fmt.Println("=============================")
successCount := 0
failCount := 0
cancelCount := 0
for _, r := range results {
status := "✓"
detail := r.Output
if r.Error != nil {
if ctx.Err() != nil {
status = "⊘"
detail = "Cancelada"
cancelCount++
} else {
status = "✗"
detail = r.Error.Error()
failCount++
}
} else {
successCount++
}
fmt.Printf(" %s Tarefa #%02d | Tentativas: %d | %s\n",
status, r.TaskID, r.Attempts, detail)
}
fmt.Println("-----------------------------")
fmt.Printf(" Total: %d | Sucesso: %d | Falha: %d | Canceladas: %d\n",
len(results), successCount, failCount, cancelCount)
fmt.Printf(" Tempo: %v\n", elapsed)
}
5. Efeito da Execução
go run main.go
Exemplo de saída (resultados são aleatórios a cada execução):
=== Agendador de Tarefas Concorrente ===
[Principal] Submetendo 10 tarefas, iniciando 3 workers
[Agendador] Tarefa #1 submetida
[Agendador] Tarefa #2 submetida
[Agendador] Tarefa #3 submetida
[Agendador] Tarefa #4 submetida
[Agendador] Tarefa #5 submetida
[Agendador] Tarefa #6 submetida
[Agendador] Tarefa #7 submetida
[Agendador] Tarefa #8 submetida
[Agendador] Tarefa #9 submetida
[Agendador] Tarefa #10 submetida
[Worker 1] Iniciando tarefa #1
[Worker 2] Iniciando tarefa #2
[Worker 3] Iniciando tarefa #3
[Worker 1] Tarefa #1 concluída
[Worker 1] Iniciando tarefa #4
[Worker 2] Tarefa #2 tentativa 1 falhou: Tarefa #2 simulou falha
[Worker 3] Tarefa #3 concluída
[Worker 3] Iniciando tarefa #5
[Worker 2] Iniciando tarefa #6
[Worker 2] Tarefa #6 tentativa 1 falhou: Tarefa #6 erro irrecuperável
[Worker 3] Tarefa #5 concluída
[Worker 3] Iniciando tarefa #7
[Worker 2] Tarefa #6 tentativa 2 falhou: Tarefa #6 erro irrecuperável
[Worker 1] Tarefa #4 tentativa 1 falhou: Tarefa #4 simulou falha
[Worker 2] Tarefa #6 tentativa 3 falhou: Tarefa #6 erro irrecuperável
[Worker 2] Tarefa #6 falhou definitivamente (tentou 3 vezes)
[Worker 2] Iniciando tarefa #8
...
[Worker 1] Channel de tarefas fechado, saindo
[Worker 2] Channel de tarefas fechado, saindo
[Worker 3] Channel de tarefas fechado, saindo
=============================
Resumo dos Resultados
=============================
✓ Tarefa #01 | Tentativas: 1 | Tarefa #1 bem-sucedida
⊘ Tarefa #02 | Tentativas: 2 | Cancelada
✓ Tarefa #03 | Tentativas: 1 | Tarefa #3 bem-sucedida
⊘ Tarefa #04 | Tentativas: 2 | Cancelada
✓ Tarefa #05 | Tentativas: 1 | Tarefa #5 bem-sucedida
✗ Tarefa #06 | Tentativas: 3 | Tarefa #6 erro irrecuperável
✓ Tarefa #07 | Tentativas: 1 | Tarefa #7 bem-sucedida
⊘ Tarefa #08 | Tentativas: 1 | Cancelada
✓ Tarefa #09 | Tentativas: 1 | Tarefa #9 bem-sucedida
⊘ Tarefa #10 | Tentativas: 1 | Cancelada
-----------------------------
Total: 10 | Sucesso: 5 | Falha: 1 | Canceladas: 4
Tempo: 8.012s
Análise do Código
Padrões Principais de Concorrência
1. Fan-out / Fan-in
Este é o padrão central do agendador:
┌─ Worker 1 ─┐
Tarefas ───┼─ Worker 2 ─┼───── Resultados
└─ Worker 3 ─┘
- Fan-out: Múltiplos workers leem do mesmo channel de
tarefas, distribuindo automaticamente as tarefas - Fan-in: Todos os workers escrevem resultados no mesmo channel de
resultados, agregando resultados
2. Cancelamento Cascata com context.Context
Contexto Raiz (principal)
└─ WithCancel
├─ WithTimeout do Worker 1
│ └─ taskCtx (timeout de 2s por tarefa)
├─ WithTimeout do Worker 2
│ └─ taskCtx (timeout de 2s por tarefa)
└─ WithTimeout do Worker 3
└─ taskCtx (timeout de 2s por tarefa)
Quando o programa principal chama cancel(), todos os contextos filhos recebem o sinal de cancelamento, alcançando cancelamento cascata.
3. Sincronização com WaitGroup
var wg sync.WaitGroup
wg.Add(1) // Cada worker lançado incrementa a contagem em 1
go func() {
defer wg.Done() // Saída do worker decrementa a contagem em 1
// ...
}()
wg.Wait() // Bloqueia até a contagem chegar a zero
4. Multiplexação com select
Workers escutam múltiplas fontes de sinal simultaneamente:
select {
case <-ctx.Done(): // Contexto cancelado
return
case <-time.After(d): // Backoff de tentativa
// Continua tentando
}
Estratégia de Backoff de Tentativa
Este exemplo usa backoff linear: tempo de espera por tentativa = attempts × 500ms. Em produção, recomenda-se backoff exponencial:
backoff := time.Duration(1<<uint(attempts)) * 100 * time.Millisecond
// 1ª: 200ms, 2ª: 400ms, 3ª: 800ms
Você também pode adicionar jitter aleatório para prevenir efeitos de "avalanche" quando múltiplas tarefas tentam novamente simultaneamente.
❓ Perguntas Frequentes
1. Por que o Worker usa range tasks em vez de verificar diretamente ctx.Done()?
range tasks sai automaticamente do loop quando o channel é fechado, que é o padrão produtor-consumidor. ctx.Done() é usado para responder a sinais de cancelamento durante espera ou tentativa. Eles trabalham juntos:
- Channel fechado → Saída normal (todas as tarefas foram submetidas)
- Contexto cancelado → Saída antecipada (solicitação externa para terminar)
Se você usar apenas ctx.Done(), precisa de tratamento adicional para fechamento de channel; se usar apenas range, não pode responder a cancelamento.
2. Como determinar o tamanho do buffer para o channel de resultados?
Este exemplo usa make(chan Result, len(taskList)) com o total de tarefas como tamanho do buffer, garantindo que todos os resultados possam ser escritos sem bloquear. Se a contagem de tarefas for muito grande (milhões), você pode:
- Usar um channel sem buffer + uma goroutine de coleta separada
- Processar em lotes, resetando o channel após cada lote
- Usar
sync.Mapou um slice com lock em vez de um channel
3. Como implementar verdadeiro backoff exponencial?
func exponentialBackoff(attempt int) time.Duration {
base := 100 * time.Millisecond
max := 10 * time.Second
backoff := base * time.Duration(1<<uint(attempt))
if backoff > max {
backoff = max
}
// Adiciona jitter aleatório: ±25%
jitter := time.Duration(rand.Int63n(int64(backoff) / 2))
return backoff - backoff/4 + jitter
}
4. Como limitar o número de tarefas executando simultaneamente (controle de concorrência)?
Este exemplo limita naturalmente a concorrência através do número de workers. Outra abordagem é usar um semáforo:
sem := make(chan struct{}, 10) // máximo 10 concorrentes
for _, task := range taskList {
sem <- struct{}{} // adquire permissão
go func(t Task) {
defer func() { <-sem }() // libera permissão
t.Execute(ctx)
}(task)
}
📖 Resumo
Esta lição aplicou abrangentemente as tecnologias centrais de concorrência do Go:
| Tecnologia | Aplicação | Código-chave |
|---|---|---|
| goroutine | Workers executam tarefas em paralelo | go Worker(...) |
| channel | Distribuição de tarefas e coleta de resultados | tasks <-chan Task |
| select | Multiplexação: cancelamento, timeout, backoff | select { case <-ctx.Done(): ... } |
| sync.WaitGroup | Espera todos os workers completarem | wg.Wait() |
| sync.Mutex | Coleta de resultados concorrente segura | rc.mu.Lock() |
| context.Context | Cancelamento cascata e controle de timeout | context.WithTimeout(ctx, 2*time.Second) |
Revisão dos Princípios de Design:
- Use channels para passar dados, não comunique compartilhando memória
- Use context para propagar sinais de cancelamento por toda a cadeia de chamadas
- WaitGroup é usado para esperar um grupo de goroutines completar
- Mutex é usado para proteger estado compartilhado (o coletor de resultados neste exemplo)
- Fechar um channel funciona como um sinal de broadcast, notificando todos os consumidores
📝 Exercícios
Exercício 1: Adicionar Fila de Prioridade
Modifique o agendador para suportar prioridades de tarefas (alta/média/baixa). Tarefas de alta prioridade devem ser processadas pelos workers primeiro.
Dica: Você pode usar múltiplos channels ou implementar uma estrutura de fila de prioridade.
Exercício 2: Implementar Limitação de Taxa
Adicione limitação de taxa ao agendador, por exemplo, máximo de 5 tarefas por segundo (algoritmo de balde de tokens).
Dica: Use time.Ticker ou a biblioteca de terceiros golang.org/x/time/rate.
limiter := rate.NewLimiter(5, 1) // 5 por segundo, burst de 1
for task := range tasks {
limiter.Wait(ctx) // espera por token
task.Execute(ctx)
}
Exercício 3: Adicionar Callback de Progresso de Tarefa
Adicione uma função de callback OnProgress ao agendador que é chamada quando cada tarefa é concluída, reportando o progresso atual (concluídas/total).
type Scheduler struct {
// ... outros campos
OnProgress func(completed, total int)
}



