PythonとRedis
PythonはRedisのクライアント言語として最も人気のあるものの1つです。このレッスンでは、PythonでのRedisの使用について解説します。
redis-pyのインストール
redis-pyはPythonで最も一般的に使用されるRedisクライアントです。
インストール
BASH
pip install redis
インストールの確認
PYTHON
import redis
print(redis.__version__) # バージョン番号を表示
基本接続
シンプルな接続
PYTHON
import redis
# Redisに接続
r = redis.Redis(host='localhost', port=6379, db=0)
# 接続をテスト
print(r.ping()) # True
パスワード付き接続
PYTHON
import redis
r = redis.Redis(
host='localhost',
port=6379,
password='your_password',
db=0
)
接続パラメータ
PYTHON
import redis
r = redis.Redis(
host='localhost', # ホストアドレス
port=6379, # ポート
password='password', # パスワード
db=0, # データベース番号
decode_responses=True, # 自動的に文字列にデコード
socket_timeout=5, # タイムアウト(秒)
socket_connect_timeout=5,
retry_on_timeout=True,
max_connections=10
)
💡 decode_responses: Trueに設定すると、返されたバイトが自動的にstrにデコードされます。
基本操作
文字列操作
PYTHON
import redis
r = redis.Redis(decode_responses=True)
# SETとGET
r.set('name', 'Alice')
print(r.get('name')) # 'Alice'
# 有効期限付きSET
r.set('session', 'data', ex=3600) # 1時間後に期限切れ
# MSETとMGET
r.mset({'key1': 'value1', 'key2': 'value2'})
print(r.mget('key1', 'key2')) # ['value1', 'value2']
# INCR
r.set('counter', 0)
print(r.incr('counter')) # 1
print(r.incrby('counter', 10)) # 11
# APPEND
r.append('name', ' Smith')
print(r.get('name')) # 'Alice Smith'
# STRLEN
print(r.strlen('name')) # 11
# DEL
r.delete('name')
print(r.get('name')) # None
ハッシュ操作
PYTHON
import redis
r = redis.Redis(decode_responses=True)
# HSETとHGET
r.hset('user:1', 'name', 'Alice')
r.hset('user:1', 'age', 25)
print(r.hget('user:1', 'name')) # 'Alice'
# HMSETとHMGET
r.hset('user:2', mapping={'name': 'Bob', 'age': 30, 'city': 'Beijing'})
print(r.hmget('user:2', 'name', 'age')) # ['Bob', '30']
# HGETALL
print(r.hgetall('user:2')) # {'name': 'Bob', 'age': '30', 'city': 'Beijing'}
# HKEYSとHVALS
print(r.hkeys('user:2')) # ['name', 'age', 'city']
print(r.hvals('user:2')) # ['Bob', '30', 'Beijing']
# HDEL
r.hdel('user:2', 'city')
# HINCRBY
r.hincrby('user:2', 'age', 1)
リスト操作
PYTHON
import redis
r = redis.Redis(decode_responses=True)
# LPUSHとRPUSH
r.lpush('mylist', 'value1', 'value2')
r.rpush('mylist', 'value3')
# LRANGE
print(r.lrange('mylist', 0, -1)) # ['value2', 'value1', 'value3']
# LPOPとRPOP
print(r.lpop('mylist')) # 'value2'
print(r.rpop('mylist')) # 'value3'
# LLEN
print(r.llen('mylist')) # 1
# LINDEX
print(r.lindex('mylist', 0)) # 'value1'
# LSET
r.lset('mylist', 0, 'new_value')
セット操作
PYTHON
import redis
r = redis.Redis(decode_responses=True)
# SADD
r.sadd('myset', 'a', 'b', 'c')
# SMEMBERS
print(r.smembers('myset')) # {'a', 'b', 'c'}
# SISMEMBER
print(r.sismember('myset', 'a')) # True
# SREM
r.srem('myset', 'a')
# SCARD
print(r.scard('myset')) # 2
# SINTER、SUNION、SDIFF
r.sadd('set1', 'a', 'b', 'c')
r.sadd('set2', 'b', 'c', 'd')
print(r.sinter('set1', 'set2')) # {'b', 'c'}
print(r.sunion('set1', 'set2')) # {'a', 'b', 'c', 'd'}
print(r.sdiff('set1', 'set2')) # {'a'}
ソート済みセット操作
PYTHON
import redis
r = redis.Redis(decode_responses=True)
# ZADD
r.zadd('leaderboard', {'player1': 100, 'player2': 200, 'player3': 150})
# ZRANGE
print(r.zrange('leaderboard', 0, -1, withscores=True))
# [('player1', 100.0), ('player3', 150.0), ('player2', 200.0)]
# ZREVRANGE(高い方から低い方へ)
print(r.zrevrange('leaderboard', 0, 2, withscores=True))
# ZSCORE
print(r.zscore('leaderboard', 'player1')) # 100.0
# ZRANKとZREVRANK
print(r.zrank('leaderboard', 'player1')) # 0
print(r.zrevrank('leaderboard', 'player2')) # 0
# ZINCRBY
r.zincrby('leaderboard', 50, 'player1')
# ZREM
r.zrem('leaderboard', 'player1')
コネクションプール
コネクションプールは接続を再利用してパフォーマンスを向上させます。
コネクションプールの作成
PYTHON
import redis
# コネクションプールを作成
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=100,
decode_responses=True
)
# プールから接続を取得
r = redis.Redis(connection_pool=pool)
# 接続を使用
r.set('key', 'value')
print(r.get('key'))
コネクションプールの利点
- 接続を再利用し、接続オーバーヘッドを削減
- 最大接続数を制御
- パフォーマンス向上
💡 推奨: 本番環境ではコネクションプールを使用してください。
パイプライン
PYTHON
import redis
r = redis.Redis(decode_responses=True)
# パイプラインを作成
pipe = r.pipeline()
# コマンドを追加
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')
pipe.get('key1')
# パイプラインを実行
results = pipe.execute()
print(results) # [True, True, True, 'value1']
# トランザクション付きパイプライン
pipe = r.pipeline(transaction=True)
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
results = pipe.execute()
Pub/Sub
メッセージの公開
PYTHON
import redis
r = redis.Redis()
# メッセージを公開
r.publish('news', 'Hello World')
メッセージのサブスクライブ
PYTHON
import redis
r = redis.Redis()
# チャンネルにサブスクライブ
pubsub = r.pubsub()
pubsub.subscribe('news')
# メッセージをリスン
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Channel: {message['channel']}")
print(f"Message: {message['data']}")
パターンサブスクリプション
PYTHON
import redis
r = redis.Redis()
pubsub = r.pubsub()
pubsub.psubscribe('news:*')
for message in pubsub.listen():
if message['type'] == 'pmessage':
print(f"Pattern: {message['pattern']}")
print(f"Channel: {message['channel']}")
print(f"Message: {message['data']}")
実践的な例
例1:キャッシュデコレーター
PYTHON
import redis
import json
import functools
r = redis.Redis(decode_responses=True)
def cache(expire=3600):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# キャッシュキーを生成
cache_key = f"{func.__name__}:{args}:{kwargs}"
# キャッシュから取得を試みる
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 関数を実行
result = func(*args, **kwargs)
# キャッシュに保存
r.set(cache_key, json.dumps(result), ex=expire)
return result
return wrapper
return decorator
@cache(expire=300)
def get_user(user_id):
# データベースクエリをシミュレート
print(f"Querying database: user_id={user_id}")
return {'id': user_id, 'name': f'User{user_id}'}
# テスト
print(get_user(1)) # データベースにクエリ
print(get_user(1)) # キャッシュから取得
例2:分散ロック
PYTHON
import redis
import time
import uuid
r = redis.Redis()
class DistributedLock:
def __init__(self, key, expire=10):
self.key = f"lock:{key}"
self.expire = expire
self.identifier = str(uuid.uuid4())
def acquire(self):
# ロックの取得を試みる
return r.set(self.key, self.identifier, nx=True, ex=self.expire)
def release(self):
# ロックを解放(Luaスクリプトでアトミック性を確保)
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return r.eval(script, 1, self.key, self.identifier)
def __enter__(self):
while not self.acquire():
time.sleep(0.1)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# ロックを使用
with DistributedLock('resource') as lock:
print('Executing business logic')
time.sleep(1)
例3:レートリミッター
PYTHON
import redis
import time
r = redis.Redis()
def rate_limit(key, limit=10, period=60):
"""
レートリミッター
:param key: レート制限キー
:param limit: 時間枠内の最大リクエスト数
:param period: 時間枠(秒)
:return: リクエストが許可されるかどうか
"""
current = int(time.time())
window_start = current - period
# スライディングウィンドウにSorted Setを使用
pipe = r.pipeline()
# ウィンドウ外のレコードを削除
pipe.zremrangebyscore(key, 0, window_start)
# 現在のウィンドウ内のリクエスト数をカウント
pipe.zcard(key)
# 現在のリクエストを追加
pipe.zadd(key, {str(current): current})
# 有効期限を設定
pipe.expire(key, period)
results = pipe.execute()
count = results[1]
return count < limit
# テスト
for i in range(15):
if rate_limit('api:user:1', limit=10, period=60):
print(f'Request {i}: allowed')
else:
print(f'Request {i}: denied')
例4:メッセージキュー
PYTHON
import redis
import json
r = redis.Redis(decode_responses=True)
class MessageQueue:
def __init__(self, name):
self.name = f"queue:{name}"
def push(self, message):
# エンキュー
r.rpush(self.name, json.dumps(message))
def pop(self, timeout=0):
# デキュー(ブロッキング)
result = r.blpop(self.name, timeout=timeout)
if result:
return json.loads(result[1])
return None
def size(self):
return r.llen(self.name)
# プロデューサー
queue = MessageQueue('tasks')
queue.push({'task': 'send_email', 'to': 'user@example.com'})
queue.push({'task': 'generate_report', 'report_id': 123})
# コンシューマー
while True:
task = queue.pop(timeout=5)
if task:
print(f"Processing task: {task}")
else:
break
エラーハンドリング
PYTHON
import redis
from redis.exceptions import RedisError, ConnectionError
try:
r = redis.Redis(host='localhost', port=6379)
r.set('key', 'value')
except ConnectionError:
print('Connection failed')
except RedisError as e:
print(f'Redis error: {e}')
❓ よくある質問
Q redis-pyはスレッドセーフですか?
A Redisインスタンスはスレッドセーフで、スレッド間で共有できます。ただし、コネクションプールの使用を推奨します。
Q 切断を処理するにはどうすればよいですか?
A redis-pyは自動的に再接続します。retry_on_timeout=Trueを設定できます。
Q decode_responses=Trueは何をしますか?
A バイトを自動的にstrにデコードし、手動デコードを不要にします。
Q redis-pyとaioredisの選び方は?
A 同期コードにはredis-py、非同期コードにはaioredis(またはredis-py 4.2+の非同期サポート)を使用します。
Q プールの最大接続数はどのように設定しますか?
A 同時実行数に基づいて設定します。通常は同時スレッド/プロセス数の2〜3倍です。
📖 まとめ
- redis-pyはPythonで最も一般的に使用されるRedisクライアント
- すべてのRedisコマンドをサポート:文字列、ハッシュ、リスト、セット、ソート済みセット
- コネクションプールで接続を再利用し、パフォーマンス向上
- パイプラインでバッチ操作、ネットワークラウンドトリップを削減
- Pub/Subでメッセージプッシュ
- 実践的な例:キャッシュデコレーター、分散ロック、レートリミッター、メッセージキュー
📝 練習問題
- 基本操作: redis-pyを使用して文字列、ハッシュ、リスト、セットの操作をしましょう
- コネクションプール: コネクションプールを作成し、接続の再利用をテストしましょう
- パイプライン: パイプラインを使用して1000個のキーをバッチ設定し、パフォーマンスを比較しましょう
- 実践: シンプルなキャッシュデコレーターまたは分散ロックを実装しましょう
次のレッスン
次のレッスンでは、JavaとRedisについて学びます。JavaでのRedis操作を解説します。



