Python with Redis

Python is one of the most popular client languages for Redis. This lesson covers using Redis with Python.

Installing redis-py

redis-py is the most commonly used Redis client for Python.

Installation

BASH
pip install redis

Verify Installation

PYTHON
import redis
print(redis.__version__)  # Prints the version number

Basic Connection

Simple Connection

PYTHON
import redis

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0)

# Test connection
print(r.ping())  # True

Connecting with Password

PYTHON
import redis

r = redis.Redis(
    host='localhost',
    port=6379,
    password='your_password',
    db=0
)

Connection Parameters

PYTHON
import redis

r = redis.Redis(
    host='localhost',       # Host address
    port=6379,              # Port
    password='password',    # Password
    db=0,                   # Database number
    decode_responses=True,  # Auto-decode to string
    socket_timeout=5,       # Timeout (seconds)
    socket_connect_timeout=5,
    retry_on_timeout=True,
    max_connections=10
)
💡 decode_responses: When set to True, returned bytes are automatically decoded to str.

Basic Operations

String Operations

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# SET and GET
r.set('name', 'Alice')
print(r.get('name'))  # 'Alice'

# SET with expiration
r.set('session', 'data', ex=3600)  # Expires after 1 hour

# MSET and MGET
r.mset({'key1': 'value1', 'key2': 'value2'})
print(r.mget('key1', 'key2'))  # ['value1', 'value2']

# INCR
r.set('counter', 0)
print(r.incr('counter'))  # 1
print(r.incrby('counter', 10))  # 11

# APPEND
r.append('name', ' Smith')
print(r.get('name'))  # 'Alice Smith'

# STRLEN
print(r.strlen('name'))  # 11

# DEL
r.delete('name')
print(r.get('name'))  # None

Hash Operations

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# HSET and HGET
r.hset('user:1', 'name', 'Alice')
r.hset('user:1', 'age', 25)
print(r.hget('user:1', 'name'))  # 'Alice'

# HMSET and HMGET
r.hset('user:2', mapping={'name': 'Bob', 'age': 30, 'city': 'Beijing'})
print(r.hmget('user:2', 'name', 'age'))  # ['Bob', '30']

# HGETALL
print(r.hgetall('user:2'))  # {'name': 'Bob', 'age': '30', 'city': 'Beijing'}

# HKEYS and HVALS
print(r.hkeys('user:2'))  # ['name', 'age', 'city']
print(r.hvals('user:2'))  # ['Bob', '30', 'Beijing']

# HDEL
r.hdel('user:2', 'city')

# HINCRBY
r.hincrby('user:2', 'age', 1)

List Operations

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# LPUSH and RPUSH
r.lpush('mylist', 'value1', 'value2')
r.rpush('mylist', 'value3')

# LRANGE
print(r.lrange('mylist', 0, -1))  # ['value2', 'value1', 'value3']

# LPOP and RPOP
print(r.lpop('mylist'))  # 'value2'
print(r.rpop('mylist'))  # 'value3'

# LLEN
print(r.llen('mylist'))  # 1

# LINDEX
print(r.lindex('mylist', 0))  # 'value1'

# LSET
r.lset('mylist', 0, 'new_value')

Set Operations

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# SADD
r.sadd('myset', 'a', 'b', 'c')

# SMEMBERS
print(r.smembers('myset'))  # {'a', 'b', 'c'}

# SISMEMBER
print(r.sismember('myset', 'a'))  # True

# SREM
r.srem('myset', 'a')

# SCARD
print(r.scard('myset'))  # 2

# SINTER, SUNION, SDIFF
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
print(r.sinter('set1', 'set2'))  # {'b', 'c'}
print(r.sunion('set1', 'set2'))  # {'a', 'b', 'c', 'd'}
print(r.sdiff('set1', 'set2'))  # {'a'}

Sorted Set Operations

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# ZADD
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})

# ZRANGE
print(r.zrange('leaderboard', 0, -1, withscores=True))
# [('player1', 100.0), ('player3', 150.0), ('player2', 200.0)]

# ZREVRANGE (high to low)
print(r.zrevrange('leaderboard', 0, 2, withscores=True))

# ZSCORE
print(r.zscore('leaderboard', 'player1'))  # 100.0

# ZRANK and ZREVRANK
print(r.zrank('leaderboard', 'player1'))  # 0
print(r.zrevrank('leaderboard', 'player2'))  # 0

# ZINCRBY
r.zincrby('leaderboard', 50, 'player1')

# ZREM
r.zrem('leaderboard', 'player1')

Connection Pool

Connection pools reuse connections for better performance.

Creating a Connection Pool

PYTHON
import redis

# Create connection pool
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=100,
    decode_responses=True
)

# Get a connection from the pool
r = redis.Redis(connection_pool=pool)

# Use the connection
r.set('key', 'value')
print(r.get('key'))

Connection Pool Advantages

💡 Recommended: Use connection pools in production environments.

Pipelining

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# Create pipeline
pipe = r.pipeline()

# Add commands
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')
pipe.get('key1')

# Execute pipeline
results = pipe.execute()
print(results)  # [True, True, True, 'value1']

# Pipeline with transaction
pipe = r.pipeline(transaction=True)
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
results = pipe.execute()

Pub/Sub

Publishing Messages

PYTHON
import redis

r = redis.Redis()

# Publish a message
r.publish('news', 'Hello World')

Subscribing to Messages

PYTHON
import redis

r = redis.Redis()

# Subscribe to a channel
pubsub = r.pubsub()
pubsub.subscribe('news')

# Listen for messages
for message in pubsub.listen():
    if message['type'] == 'message':
        print(f"Channel: {message['channel']}")
        print(f"Message: {message['data']}")

Pattern Subscription

PYTHON
import redis

r = redis.Redis()

pubsub = r.pubsub()
pubsub.psubscribe('news:*')

for message in pubsub.listen():
    if message['type'] == 'pmessage':
        print(f"Pattern: {message['pattern']}")
        print(f"Channel: {message['channel']}")
        print(f"Message: {message['data']}")

Practical Examples

Example 1: Cache Decorator

PYTHON
import redis
import json
import functools

r = redis.Redis(decode_responses=True)

def cache(expire=3600):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # Generate cache key
            cache_key = f"{func.__name__}:{args}:{kwargs}"
            
            # Try to get from cache
            cached = r.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # Execute function
            result = func(*args, **kwargs)
            
            # Store in cache
            r.set(cache_key, json.dumps(result), ex=expire)
            
            return result
        return wrapper
    return decorator

@cache(expire=300)
def get_user(user_id):
    # Simulate database query
    print(f"Querying database: user_id={user_id}")
    return {'id': user_id, 'name': f'User{user_id}'}

# Test
print(get_user(1))  # Queries database
print(get_user(1))  # Gets from cache
▶ Try it Yourself

Example 2: Distributed Lock

PYTHON
import redis
import time
import uuid

r = redis.Redis()

class DistributedLock:
    def __init__(self, key, expire=10):
        self.key = f"lock:{key}"
        self.expire = expire
        self.identifier = str(uuid.uuid4())
    
    def acquire(self):
        # Try to acquire the lock
        return r.set(self.key, self.identifier, nx=True, ex=self.expire)
    
    def release(self):
        # Release the lock (using Lua script for atomicity)
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return r.eval(script, 1, self.key, self.identifier)
    
    def __enter__(self):
        while not self.acquire():
            time.sleep(0.1)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# Using the lock
with DistributedLock('resource') as lock:
    print('Executing business logic')
    time.sleep(1)
▶ Try it Yourself

Example 3: Rate Limiter

PYTHON
import redis
import time

r = redis.Redis()

def rate_limit(key, limit=10, period=60):
    """
    Rate limiter
    :param key: rate limit key
    :param limit: max requests in the time window
    :param period: time window (seconds)
    :return: whether the request is allowed
    """
    current = int(time.time())
    window_start = current - period
    
    # Use sorted set for sliding window
    pipe = r.pipeline()
    
    # Remove records outside the window
    pipe.zremrangebyscore(key, 0, window_start)
    
    # Count requests in the current window
    pipe.zcard(key)
    
    # Add the current request
    pipe.zadd(key, {str(current): current})
    
    # Set expiration
    pipe.expire(key, period)
    
    results = pipe.execute()
    count = results[1]
    
    return count < limit

# Test
for i in range(15):
    if rate_limit('api:user:1', limit=10, period=60):
        print(f'Request {i}: allowed')
    else:
        print(f'Request {i}: denied')
▶ Try it Yourself

Example 4: Message Queue

PYTHON
import redis
import json

r = redis.Redis(decode_responses=True)

class MessageQueue:
    def __init__(self, name):
        self.name = f"queue:{name}"
    
    def push(self, message):
        # Enqueue
        r.rpush(self.name, json.dumps(message))
    
    def pop(self, timeout=0):
        # Dequeue (blocking)
        result = r.blpop(self.name, timeout=timeout)
        if result:
            return json.loads(result[1])
        return None
    
    def size(self):
        return r.llen(self.name)

# Producer
queue = MessageQueue('tasks')
queue.push({'task': 'send_email', 'to': 'user@example.com'})
queue.push({'task': 'generate_report', 'report_id': 123})

# Consumer
while True:
    task = queue.pop(timeout=5)
    if task:
        print(f"Processing task: {task}")
    else:
        break
▶ Try it Yourself

Error Handling

PYTHON
import redis
from redis.exceptions import RedisError, ConnectionError

try:
    r = redis.Redis(host='localhost', port=6379)
    r.set('key', 'value')
except ConnectionError:
    print('Connection failed')
except RedisError as e:
    print(f'Redis error: {e}')

❓ FAQ

Q Is redis-py thread-safe?
A Redis instances are thread-safe and can be used across threads. However, using a connection pool is recommended.
Q How do I handle disconnections?
A redis-py automatically reconnects. You can set retry_on_timeout=True.
Q What does decode_responses=True do?
A It automatically decodes bytes to str, avoiding manual decoding.
Q How do I choose between redis-py and aioredis?
A Use redis-py for synchronous code, aioredis (or redis-py 4.2+'s async support) for async code.
Q How do I set the max connections in a pool?
A Set it based on concurrency — typically 2-3 times the number of concurrent threads/processes.

📖 Summary

📝 Exercises

  1. Basic operations: Use redis-py for string, hash, list, and set operations
  2. Connection pool: Create a connection pool and test connection reuse
  3. Pipeline: Use pipelining to batch-set 1000 keys, compare performance
  4. Practical: Implement a simple cache decorator or distributed lock

Next Lesson

In the next lesson, we will learn Java with Redis, covering Java Redis operations.

100%

🙏 帮我们做得更好

我们是刚上线的编程教程站,几个人的小团队,精力有限。页面虽经检查,难免还有疏漏——链接失效、排版错乱、内容有误、语言生硬……

如果您发现了,麻烦告诉我们,我们会在收到反馈后第一时间进行修复,再次感谢您的光临 🙏