Python com Redis

Python é uma das linguagens cliente mais populares para Redis. Esta lição aborda o uso do Redis com Python.

Instalando redis-py

redis-py é o cliente Redis mais usado para Python.

Instalação

BASH
pip install redis

Verificar Instalação

PYTHON
import redis
print(redis.__version__)  # Mostra o número da versão

Conexão Básica

Conexão Simples

PYTHON
import redis

# Conectar ao Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# Testar conexão
print(r.ping())  # True

Conectando com Senha

PYTHON
import redis

r = redis.Redis(
    host='localhost',
    port=6379,
    password='sua_senha',
    db=0
)

Parâmetros de Conexão

PYTHON
import redis

r = redis.Redis(
    host='localhost',       # Endereço do host
    port=6379,              # Porta
    password='senha',       # Senha
    db=0,                   # Número do banco de dados
    decode_responses=True,  # Decodificar automaticamente para string
    socket_timeout=5,       # Timeout (segundos)
    socket_connect_timeout=5,
    retry_on_timeout=True,
    max_connections=10
)
💡 decode_responses: Quando definido como True, bytes retornados são automaticamente decodificados para str.

Operações Básicas

Operações de String

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# SET e GET
r.set('nome', 'Alice')
print(r.get('nome'))  # 'Alice'

# SET com expiração
r.set('session', 'dados', ex=3600)  # Expira após 1 hora

# MSET e MGET
r.mset({'key1': 'valor1', 'key2': 'valor2'})
print(r.mget('key1', 'key2'))  # ['valor1', 'valor2']

# INCR
r.set('counter', 0)
print(r.incr('counter'))  # 1
print(r.incrby('counter', 10))  # 11

# APPEND
r.append('nome', ' Silva')
print(r.get('nome'))  # 'Alice Silva'

# STRLEN
print(r.strlen('nome'))  # 11

# DEL
r.delete('nome')
print(r.get('nome'))  # None

Operações de Hash

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# HSET e HGET
r.hset('user:1', 'name', 'Alice')
r.hset('user:1', 'age', 25)
print(r.hget('user:1', 'name'))  # 'Alice'

# HMSET e HMGET
r.hset('user:2', mapping={'name': 'Bob', 'age': 30, 'city': 'Beijing'})
print(r.hmget('user:2', 'name', 'age'))  # ['Bob', '30']

# HGETALL
print(r.hgetall('user:2'))  # {'name': 'Bob', 'age': '30', 'city': 'Beijing'}

# HKEYS e HVALS
print(r.hkeys('user:2'))  # ['name', 'age', 'city']
print(r.hvals('user:2'))  # ['Bob', '30', 'Beijing']

# HDEL
r.hdel('user:2', 'city')

# HINCRBY
r.hincrby('user:2', 'age', 1)

Operações de Lista

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# LPUSH e RPUSH
r.lpush('mylist', 'valor1', 'valor2')
r.rpush('mylist', 'valor3')

# LRANGE
print(r.lrange('mylist', 0, -1))  # ['valor2', 'valor1', 'valor3']

# LPOP e RPOP
print(r.lpop('mylist'))  # 'valor2'
print(r.rpop('mylist'))  # 'valor3'

# LLEN
print(r.llen('mylist'))  # 1

# LINDEX
print(r.lindex('mylist', 0))  # 'valor1'

# LSET
r.lset('mylist', 0, 'novo_valor')

Operações de Conjunto

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# SADD
r.sadd('myset', 'a', 'b', 'c')

# SMEMBERS
print(r.smembers('myset'))  # {'a', 'b', 'c'}

# SISMEMBER
print(r.sismember('myset', 'a'))  # True

# SREM
r.srem('myset', 'a')

# SCARD
print(r.scard('myset'))  # 2

# SINTER, SUNION, SDIFF
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
print(r.sinter('set1', 'set2'))  # {'b', 'c'}
print(r.sunion('set1', 'set2'))  # {'a', 'b', 'c', 'd'}
print(r.sdiff('set1', 'set2'))  # {'a'}

Operações de Conjunto Ordenado

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# ZADD
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})

# ZRANGE
print(r.zrange('leaderboard', 0, -1, withscores=True))
# [('player1', 100.0), ('player3', 150.0), ('player2', 200.0)]

# ZREVRANGE (maior para menor)
print(r.zrevrange('leaderboard', 0, 2, withscores=True))

# ZSCORE
print(r.zscore('leaderboard', 'player1'))  # 100.0

# ZRANK e ZREVRANK
print(r.zrank('leaderboard', 'player1'))  # 0
print(r.zrevrank('leaderboard', 'player2'))  # 0

# ZINCRBY
r.zincrby('leaderboard', 50, 'player1')

# ZREM
r.zrem('leaderboard', 'player1')

Pool de Conexão

Pools de conexão reutilizam conexões para melhor desempenho.

Criando um Pool de Conexão

PYTHON
import redis

# Criar pool de conexão
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=100,
    decode_responses=True
)

# Obter uma conexão do pool
r = redis.Redis(connection_pool=pool)

# Usar a conexão
r.set('key', 'valor')
print(r.get('key'))

Vantagens do Pool de Conexão

💡 Recomendado: Use pools de conexão em ambientes de produção.

Pipelining

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# Criar pipeline
pipe = r.pipeline()

# Adicionar comandos
pipe.set('key1', 'valor1')
pipe.set('key2', 'valor2')
pipe.set('key3', 'valor3')
pipe.get('key1')

# Executar pipeline
resultados = pipe.execute()
print(resultados)  # [True, True, True, 'valor1']

# Pipeline com transação
pipe = r.pipeline(transaction=True)
pipe.set('key1', 'valor1')
pipe.set('key2', 'valor2')
resultados = pipe.execute()

Pub/Sub

Publicando Mensagens

PYTHON
import redis

r = redis.Redis()

# Publicar uma mensagem
r.publish('news', 'Olá Mundo')

Assinando Mensagens

PYTHON
import redis

r = redis.Redis()

# Assinar um canal
pubsub = r.pubsub()
pubsub.subscribe('news')

# Ouvir mensagens
for mensagem in pubsub.listen():
    if mensagem['type'] == 'message':
        print(f"Canal: {mensagem['channel']}")
        print(f"Mensagem: {mensagem['data']}")

Assinatura por Padrão

PYTHON
import redis

r = redis.Redis()

pubsub = r.pubsub()
pubsub.psubscribe('news:*')

for mensagem in pubsub.listen():
    if mensagem['type'] == 'pmessage':
        print(f"Padrão: {mensagem['pattern']}")
        print(f"Canal: {mensagem['channel']}")
        print(f"Mensagem: {mensagem['data']}")

Exemplos Práticos

Exemplo 1: Decorador de Cache

PYTHON
import redis
import json
import functools

r = redis.Redis(decode_responses=True)

def cache(expire=3600):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Gerar chave de cache
            cache_key = f"{func.__name__}:{args}:{kwargs}"
            
            # Tentar obter do cache
            cached = r.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # Executar função
            result = func(*args, **kwargs)
            
            # Armazenar em cache
            r.set(cache_key, json.dumps(result), ex=expire)
            
            return result
        return wrapper
    return decorator

@cache(expire=300)
def get_user(user_id):
    # Simular consulta ao banco de dados
    print(f"Consultando banco de dados: user_id={user_id}")
    return {'id': user_id, 'name': f'User{user_id}'}

# Teste
print(get_user(1))  # Consulta banco de dados
print(get_user(1))  # Obtém do cache
▶ Experimente

Exemplo 2: Lock Distribuído

PYTHON
import redis
import time
import uuid

r = redis.Redis()

class DistributedLock:
    def __init__(self, key, expire=10):
        self.key = f"lock:{key}"
        self.expire = expire
        self.identifier = str(uuid.uuid4())
    
    def acquire(self):
        # Tentar adquirir o lock
        return r.set(self.key, self.identifier, nx=True, ex=self.expire)
    
    def release(self):
        # Liberar o lock (usando script Lua para atomicidade)
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return r.eval(script, 1, self.key, self.identifier)
    
    def __enter__(self):
        while not self.acquire():
            time.sleep(0.1)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# Usando o lock
with DistributedLock('resource') as lock:
    print('Executando lógica de negócio')
    time.sleep(1)
▶ Experimente

Exemplo 3: Limitador de Taxa

PYTHON
import redis
import time

r = redis.Redis()

def rate_limit(key, limit=10, period=60):
    """
    Limitador de taxa
    :param key: chave do limite de taxa
    :param limit: máximo de requisições na janela de tempo
    :param period: janela de tempo (segundos)
    :return: se a requisição é permitida
    """
    current = int(time.time())
    window_start = current - period
    
    # Usar sorted set para janela deslizante
    pipe = r.pipeline()
    
    # Remover registros fora da janela
    pipe.zremrangebyscore(key, 0, window_start)
    
    # Contar requisições na janela atual
    pipe.zcard(key)
    
    # Adicionar a requisição atual
    pipe.zadd(key, {str(current): current})
    
    # Definir expiração
    pipe.expire(key, period)
    
    results = pipe.execute()
    count = results[1]
    
    return count < limit

# Teste
for i in range(15):
    if rate_limit('api:user:1', limit=10, period=60):
        print(f'Requisição {i}: permitida')
    else:
        print(f'Requisição {i}: negada')
▶ Experimente

Exemplo 4: Fila de Mensagens

PYTHON
import redis
import json

r = redis.Redis(decode_responses=True)

class MessageQueue:
    def __init__(self, name):
        self.name = f"queue:{name}"
    
    def push(self, message):
        # Enfileirar
        r.rpush(self.name, json.dumps(message))
    
    def pop(self, timeout=0):
        # Desenfileirar (bloqueante)
        result = r.blpop(self.name, timeout=timeout)
        if result:
            return json.loads(result[1])
        return None
    
    def size(self):
        return r.llen(self.name)

# Produtor
queue = MessageQueue('tasks')
queue.push({'task': 'send_email', 'to': 'user@example.com'})
queue.push({'task': 'generate_report', 'report_id': 123})

# Consumidor
while True:
    task = queue.pop(timeout=5)
    if task:
        print(f"Processando tarefa: {task}")
    else:
        break
▶ Experimente

Tratamento de Erros

PYTHON
import redis
from redis.exceptions import RedisError, ConnectionError

try:
    r = redis.Redis(host='localhost', port=6379)
    r.set('key', 'valor')
except ConnectionError:
    print('Conexão falhou')
except RedisError as e:
    print(f'Erro Redis: {e}')

❓ Perguntas Frequentes

P redis-py é thread-safe?
R Instâncias Redis são thread-safe e podem ser usadas entre threads. No entanto, usar um pool de conexão é recomendado.
P Como lidar com desconexões?
R redis-py reconecta automaticamente. Você pode definir retry_on_timeout=True.
P O que decode_responses=True faz?
R Decodifica automaticamente bytes para str, evitando decodificação manual.
P Como escolher entre redis-py e aioredis?
R Use redis-py para código síncrono, aioredis (ou suporte assíncrono do redis-py 4.2+) para código assíncrono.
P Como definir o máximo de conexões em um pool?
R Defina com base na concorrência — tipicamente 2-3 vezes o número de threads/processos concorrentes.

📖 Resumo

📝 Atividades

  1. Operações básicas: Use redis-py para operações de string, hash, lista e conjunto
  2. Pool de conexão: Crie um pool de conexão e teste a reutilização de conexão
  3. Pipeline: Use pipelining para definir 1000 chaves em lote, compare o desempenho
  4. Prático: Implemente um decorador de cache simples ou lock distribuído

Próxima Lição

Na próxima lição, nós aprenderemos sobre Java com Redis, abordando operações Redis com Java.

100%