PythonとRedis

PythonはRedisのクライアント言語として最も人気のあるものの1つです。このレッスンでは、PythonでのRedisの使用について解説します。

redis-pyのインストール

redis-pyはPythonで最も一般的に使用されるRedisクライアントです。

インストール

BASH
pip install redis

インストールの確認

PYTHON
import redis
print(redis.__version__)  # バージョン番号を表示

基本接続

シンプルな接続

PYTHON
import redis

# Redisに接続
r = redis.Redis(host='localhost', port=6379, db=0)

# 接続をテスト
print(r.ping())  # True

パスワード付き接続

PYTHON
import redis

r = redis.Redis(
    host='localhost',
    port=6379,
    password='your_password',
    db=0
)

接続パラメータ

PYTHON
import redis

r = redis.Redis(
    host='localhost',       # ホストアドレス
    port=6379,              # ポート
    password='password',    # パスワード
    db=0,                   # データベース番号
    decode_responses=True,  # 自動的に文字列にデコード
    socket_timeout=5,       # タイムアウト(秒)
    socket_connect_timeout=5,
    retry_on_timeout=True,
    max_connections=10
)
💡 decode_responses: Trueに設定すると、返されたバイトが自動的にstrにデコードされます。

基本操作

文字列操作

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# SETとGET
r.set('name', 'Alice')
print(r.get('name'))  # 'Alice'

# 有効期限付きSET
r.set('session', 'data', ex=3600)  # 1時間後に期限切れ

# MSETとMGET
r.mset({'key1': 'value1', 'key2': 'value2'})
print(r.mget('key1', 'key2'))  # ['value1', 'value2']

# INCR
r.set('counter', 0)
print(r.incr('counter'))  # 1
print(r.incrby('counter', 10))  # 11

# APPEND
r.append('name', ' Smith')
print(r.get('name'))  # 'Alice Smith'

# STRLEN
print(r.strlen('name'))  # 11

# DEL
r.delete('name')
print(r.get('name'))  # None

ハッシュ操作

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# HSETとHGET
r.hset('user:1', 'name', 'Alice')
r.hset('user:1', 'age', 25)
print(r.hget('user:1', 'name'))  # 'Alice'

# HMSETとHMGET
r.hset('user:2', mapping={'name': 'Bob', 'age': 30, 'city': 'Beijing'})
print(r.hmget('user:2', 'name', 'age'))  # ['Bob', '30']

# HGETALL
print(r.hgetall('user:2'))  # {'name': 'Bob', 'age': '30', 'city': 'Beijing'}

# HKEYSとHVALS
print(r.hkeys('user:2'))  # ['name', 'age', 'city']
print(r.hvals('user:2'))  # ['Bob', '30', 'Beijing']

# HDEL
r.hdel('user:2', 'city')

# HINCRBY
r.hincrby('user:2', 'age', 1)

リスト操作

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# LPUSHとRPUSH
r.lpush('mylist', 'value1', 'value2')
r.rpush('mylist', 'value3')

# LRANGE
print(r.lrange('mylist', 0, -1))  # ['value2', 'value1', 'value3']

# LPOPとRPOP
print(r.lpop('mylist'))  # 'value2'
print(r.rpop('mylist'))  # 'value3'

# LLEN
print(r.llen('mylist'))  # 1

# LINDEX
print(r.lindex('mylist', 0))  # 'value1'

# LSET
r.lset('mylist', 0, 'new_value')

セット操作

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# SADD
r.sadd('myset', 'a', 'b', 'c')

# SMEMBERS
print(r.smembers('myset'))  # {'a', 'b', 'c'}

# SISMEMBER
print(r.sismember('myset', 'a'))  # True

# SREM
r.srem('myset', 'a')

# SCARD
print(r.scard('myset'))  # 2

# SINTER、SUNION、SDIFF
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
print(r.sinter('set1', 'set2'))  # {'b', 'c'}
print(r.sunion('set1', 'set2'))  # {'a', 'b', 'c', 'd'}
print(r.sdiff('set1', 'set2'))  # {'a'}

ソート済みセット操作

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# ZADD
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})

# ZRANGE
print(r.zrange('leaderboard', 0, -1, withscores=True))
# [('player1', 100.0), ('player3', 150.0), ('player2', 200.0)]

# ZREVRANGE(高い方から低い方へ)
print(r.zrevrange('leaderboard', 0, 2, withscores=True))

# ZSCORE
print(r.zscore('leaderboard', 'player1'))  # 100.0

# ZRANKとZREVRANK
print(r.zrank('leaderboard', 'player1'))  # 0
print(r.zrevrank('leaderboard', 'player2'))  # 0

# ZINCRBY
r.zincrby('leaderboard', 50, 'player1')

# ZREM
r.zrem('leaderboard', 'player1')

コネクションプール

コネクションプールは接続を再利用してパフォーマンスを向上させます。

コネクションプールの作成

PYTHON
import redis

# コネクションプールを作成
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=100,
    decode_responses=True
)

# プールから接続を取得
r = redis.Redis(connection_pool=pool)

# 接続を使用
r.set('key', 'value')
print(r.get('key'))

コネクションプールの利点

💡 推奨: 本番環境ではコネクションプールを使用してください。

パイプライン

PYTHON
import redis

r = redis.Redis(decode_responses=True)

# パイプラインを作成
pipe = r.pipeline()

# コマンドを追加
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')
pipe.get('key1')

# パイプラインを実行
results = pipe.execute()
print(results)  # [True, True, True, 'value1']

# トランザクション付きパイプライン
pipe = r.pipeline(transaction=True)
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
results = pipe.execute()

Pub/Sub

メッセージの公開

PYTHON
import redis

r = redis.Redis()

# メッセージを公開
r.publish('news', 'Hello World')

メッセージのサブスクライブ

PYTHON
import redis

r = redis.Redis()

# チャンネルにサブスクライブ
pubsub = r.pubsub()
pubsub.subscribe('news')

# メッセージをリスン
for message in pubsub.listen():
    if message['type'] == 'message':
        print(f"Channel: {message['channel']}")
        print(f"Message: {message['data']}")

パターンサブスクリプション

PYTHON
import redis

r = redis.Redis()

pubsub = r.pubsub()
pubsub.psubscribe('news:*')

for message in pubsub.listen():
    if message['type'] == 'pmessage':
        print(f"Pattern: {message['pattern']}")
        print(f"Channel: {message['channel']}")
        print(f"Message: {message['data']}")

実践的な例

例1:キャッシュデコレーター

PYTHON
import redis
import json
import functools

r = redis.Redis(decode_responses=True)

def cache(expire=3600):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # キャッシュキーを生成
            cache_key = f"{func.__name__}:{args}:{kwargs}"
            
            # キャッシュから取得を試みる
            cached = r.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # 関数を実行
            result = func(*args, **kwargs)
            
            # キャッシュに保存
            r.set(cache_key, json.dumps(result), ex=expire)
            
            return result
        return wrapper
    return decorator

@cache(expire=300)
def get_user(user_id):
    # データベースクエリをシミュレート
    print(f"Querying database: user_id={user_id}")
    return {'id': user_id, 'name': f'User{user_id}'}

# テスト
print(get_user(1))  # データベースにクエリ
print(get_user(1))  # キャッシュから取得
▶ 試してみよう

例2:分散ロック

PYTHON
import redis
import time
import uuid

r = redis.Redis()

class DistributedLock:
    def __init__(self, key, expire=10):
        self.key = f"lock:{key}"
        self.expire = expire
        self.identifier = str(uuid.uuid4())
    
    def acquire(self):
        # ロックの取得を試みる
        return r.set(self.key, self.identifier, nx=True, ex=self.expire)
    
    def release(self):
        # ロックを解放(Luaスクリプトでアトミック性を確保)
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return r.eval(script, 1, self.key, self.identifier)
    
    def __enter__(self):
        while not self.acquire():
            time.sleep(0.1)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# ロックを使用
with DistributedLock('resource') as lock:
    print('Executing business logic')
    time.sleep(1)
▶ 試してみよう

例3:レートリミッター

PYTHON
import redis
import time

r = redis.Redis()

def rate_limit(key, limit=10, period=60):
    """
    レートリミッター
    :param key: レート制限キー
    :param limit: 時間枠内の最大リクエスト数
    :param period: 時間枠(秒)
    :return: リクエストが許可されるかどうか
    """
    current = int(time.time())
    window_start = current - period
    
    # スライディングウィンドウにSorted Setを使用
    pipe = r.pipeline()
    
    # ウィンドウ外のレコードを削除
    pipe.zremrangebyscore(key, 0, window_start)
    
    # 現在のウィンドウ内のリクエスト数をカウント
    pipe.zcard(key)
    
    # 現在のリクエストを追加
    pipe.zadd(key, {str(current): current})
    
    # 有効期限を設定
    pipe.expire(key, period)
    
    results = pipe.execute()
    count = results[1]
    
    return count < limit

# テスト
for i in range(15):
    if rate_limit('api:user:1', limit=10, period=60):
        print(f'Request {i}: allowed')
    else:
        print(f'Request {i}: denied')
▶ 試してみよう

例4:メッセージキュー

PYTHON
import redis
import json

r = redis.Redis(decode_responses=True)

class MessageQueue:
    def __init__(self, name):
        self.name = f"queue:{name}"
    
    def push(self, message):
        # エンキュー
        r.rpush(self.name, json.dumps(message))
    
    def pop(self, timeout=0):
        # デキュー(ブロッキング)
        result = r.blpop(self.name, timeout=timeout)
        if result:
            return json.loads(result[1])
        return None
    
    def size(self):
        return r.llen(self.name)

# プロデューサー
queue = MessageQueue('tasks')
queue.push({'task': 'send_email', 'to': 'user@example.com'})
queue.push({'task': 'generate_report', 'report_id': 123})

# コンシューマー
while True:
    task = queue.pop(timeout=5)
    if task:
        print(f"Processing task: {task}")
    else:
        break
▶ 試してみよう

エラーハンドリング

PYTHON
import redis
from redis.exceptions import RedisError, ConnectionError

try:
    r = redis.Redis(host='localhost', port=6379)
    r.set('key', 'value')
except ConnectionError:
    print('Connection failed')
except RedisError as e:
    print(f'Redis error: {e}')

❓ よくある質問

Q redis-pyはスレッドセーフですか?
A Redisインスタンスはスレッドセーフで、スレッド間で共有できます。ただし、コネクションプールの使用を推奨します。
Q 切断を処理するにはどうすればよいですか?
A redis-pyは自動的に再接続します。retry_on_timeout=Trueを設定できます。
Q decode_responses=Trueは何をしますか?
A バイトを自動的にstrにデコードし、手動デコードを不要にします。
Q redis-pyとaioredisの選び方は?
A 同期コードにはredis-py、非同期コードにはaioredis(またはredis-py 4.2+の非同期サポート)を使用します。
Q プールの最大接続数はどのように設定しますか?
A 同時実行数に基づいて設定します。通常は同時スレッド/プロセス数の2〜3倍です。

📖 まとめ

📝 練習問題

  1. 基本操作: redis-pyを使用して文字列、ハッシュ、リスト、セットの操作をしましょう
  2. コネクションプール: コネクションプールを作成し、接続の再利用をテストしましょう
  3. パイプライン: パイプラインを使用して1000個のキーをバッチ設定し、パフォーマンスを比較しましょう
  4. 実践: シンプルなキャッシュデコレーターまたは分散ロックを実装しましょう

次のレッスン

次のレッスンでは、JavaとRedisについて学びます。JavaでのRedis操作を解説します。

100%