Redis Pipelining

パイプラインを使用すると、コマンドをバッチ送信でき、ネットワークラウンドトリップを削減してパフォーマンスを大幅に向上できます。

パイプラインとは

従来のアプローチ

各コマンドには1回のネットワークラウンドトリップが必要です:

クライアント                   Redisサーバー
  |                         |
  |----- SET key1 -------->|
  |<----- OK --------------|
  |                         |
  |----- SET key2 -------->|
  |<----- OK --------------|
  |                         |
  |----- SET key3 -------->|
  |<----- OK --------------|

3コマンド = 6回のネットワーク転送(3リクエスト + 3レスポンス)

パイプラインアプローチ

複数のコマンドを一度に送信:

クライアント                   Redisサーバー
  |                         |
  |----- SET key1 -------->|
  |----- SET key2 -------->|
  |----- SET key3 -------->|
  |                         |
  |<----- OK --------------|
  |<----- OK --------------|
  |<----- OK --------------|

3コマンド = 2回のネットワーク転送(1バッチリクエスト + 1バッチレスポンス)

💡 パフォーマンス向上: パイプラインはネットワークラウンドトリップを削減し、特にネットワークレイテンシが高い場合にパフォーマンスを大幅に向上させます。

パイプラインの仕組み

ネットワークレイテンシの影響

1msのネットワークラウンドトリップを想定:

従来のアプローチ:
- 100コマンド = 100ラウンドトリップ = 100msのネットワークレイテンシ
- 加えてRedis処理時間(0.1ms/コマンドと仮定)= 10ms
- 合計時間 = 100ms + 10ms = 110ms

パイプライン:
- 100コマンド = 1ラウンドトリップ = 1msのネットワークレイテンシ
- 加えてRedis処理時間 = 10ms
- 合計時間 = 1ms + 10ms = 11ms

パフォーマンス向上:110ms → 11ms、10倍の改善!

パイプラインの特性

⚠️ 補足: パイプラインはトランザクションではなく、アトミック性を保証しません。アトミック性が必要な場合はMULTI/EXECを使用してください。

パイプラインの使用

redis-cliのパイプラインモード

BASH
# redis-cliのパイプラインモードを使用
echo -e "SET key1 value1\nSET key2 value2\nSET key3 value3" | redis-cli

# またはファイルから読み取り
cat commands.txt | redis-cli

Pythonでのパイプライン

PYTHON
import redis

r = redis.Redis(host='localhost', port=6379)

# パイプラインを作成
pipe = r.pipeline()

# パイプラインにコマンドを追加
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.set('key3', 'value3')

# パイプラインを実行
results = pipe.execute()
print(results)  # [True, True, True]

Javaでのパイプライン(Jedis)

JAVA
Jedis jedis = new Jedis("localhost", 6379);

Pipeline pipeline = jedis.pipelined();

pipeline.set("key1", "value1");
pipeline.set("key2", "value2");
pipeline.set("key3", "value3");

List<Object> results = pipeline.syncAndReturnAll();

Node.jsでのパイプライン(ioredis)

JAVASCRIPT
const Redis = require('ioredis');
const redis = new Redis();

const pipeline = redis.pipeline();

pipeline.set('key1', 'value1');
pipeline.set('key2', 'value2');
pipeline.set('key3', 'value3');

const results = await pipeline.exec();

パイプラインとトランザクションの比較

パイプライン + トランザクション

パイプラインはトランザクションと組み合わせることができます:

PYTHON
import redis

r = redis.Redis()

# トランザクションを有効にしたパイプラインを作成
pipe = r.pipeline(transaction=True)

pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.incr('counter')

# トランザクションを実行
results = pipe.execute()

パイプラインとトランザクションの比較

項目 パイプライン トランザクション
アトミック性 ❌ なし ✅ あり
分離性 ❌ なし ✅ あり
パフォーマンス向上 ✅ 大幅 ✅ 大幅
バッチ送信 ✅ あり ✅ あり
使用例 バッチ操作、パフォーマンス アトミック性が必要なバッチ操作
💡 選択: アトミック性が不要な場合はパイプラインを使用します。アトミック性が必要な場合はトランザクションまたはパイプライン+トランザクションを使用します。

パイプラインのユースケース

ユースケース1:バッチ設定

PYTHON
import redis

r = redis.Redis()
pipe = r.pipeline()

# 1000個のキーをバッチ設定
for i in range(1000):
    pipe.set(f'key:{i}', f'value:{i}')

results = pipe.execute()
print(f'Set {len(results)} keys')

ユースケース2:バッチ取得

PYTHON
import redis

r = redis.Redis()
pipe = r.pipeline()

# 1000個のキーをバッチ取得
keys = [f'key:{i}' for i in range(1000)]
for key in keys:
    pipe.get(key)

values = pipe.execute()
print(f'Got {len(values)} values')

ユースケース3:バッチ削除

PYTHON
import redis

r = redis.Redis()
pipe = r.pipeline()

# バッチ削除
keys = ['key1', 'key2', 'key3', 'key4', 'key5']
for key in keys:
    pipe.delete(key)

results = pipe.execute()
print(f'Deleted {sum(results)} keys')

ユースケース4:データインポート

PYTHON
import redis
import json

r = redis.Redis()
pipe = r.pipeline()

# JSONファイルからデータをインポート
with open('data.json', 'r') as f:
    data = json.load(f)
    
    for item in data:
        key = f"user:{item['id']}"
        value = json.dumps(item)
        pipe.set(key, value)

pipe.execute()
print('Import complete')

ユースケース5:バッチカウンター更新

PYTHON
import redis

r = redis.Redis()
pipe = r.pipeline()

# バッチでカウンターを更新
counters = {
    'article:1:views': 10,
    'article:2:views': 20,
    'article:3:views': 30,
}

for key, increment in counters.items():
    pipe.incrby(key, increment)

pipe.execute()
print('Counters updated')

パイプラインのパフォーマンス比較

テストコード

PYTHON
import redis
import time

r = redis.Redis()

# テストデータ
n = 10000

# 方法1:通常のアプローチ
start = time.time()
for i in range(n):
    r.set(f'key:{i}', f'value:{i}')
end = time.time()
print(f'Regular: {end - start:.2f} seconds')

# 方法2:パイプライン
start = time.time()
pipe = r.pipeline()
for i in range(n):
    pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()
end = time.time()
print(f'Pipeline: {end - start:.2f} seconds')

サンプル結果

Regular: 5.23 seconds
Pipeline: 0.51 seconds

Performance improvement: about 10x
▶ 試してみよう

パイプラインのベストプラクティス

1. 適切なバッチサイズを設定

PYTHON
# ❌ 一度に送信するコマンドが多すぎる
pipe = r.pipeline()
for i in range(100000):
    pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()  # 大量のメモリを使用する可能性がある

# ✅ バッチで実行
batch_size = 1000
for batch in range(0, 100000, batch_size):
    pipe = r.pipeline()
    for i in range(batch, batch + batch_size):
        pipe.set(f'key:{i}', f'value:{i}')
    pipe.execute()

2. エラーハンドリング

PYTHON
import redis

r = redis.Redis()
pipe = r.pipeline()

pipe.set('key1', 'value1')
pipe.incr('key1')  # これは失敗する(key1は数値ではない)
pipe.set('key2', 'value2')

try:
    results = pipe.execute()
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f'Command {i} failed: {result}')
except Exception as e:
    print(f'Pipeline execution failed: {e}')

3. アトミック性のためにトランザクションを使用

PYTHON
import redis

r = redis.Redis()

# アトミック性が必要な場合はトランザクションを使用
pipe = r.pipeline(transaction=True)

pipe.set('account:a', '100')
pipe.set('account:b', '50')

try:
    results = pipe.execute()
    print('Transaction executed successfully')
except Exception as e:
    print(f'Transaction failed: {e}')

4. パイプラインのパフォーマンスを監視

PYTHON
import redis
import time

r = redis.Redis()

start = time.time()
pipe = r.pipeline()

for i in range(10000):
    pipe.set(f'key:{i}', f'value:{i}')

results = pipe.execute()
end = time.time()

print(f'Execution time: {end - start:.2f} seconds')
print(f'QPS: {10000 / (end - start):.0f}')

パイプラインの制限

1. メモリ使用量

PYTHON
# パイプラインはすべてのコマンドと結果をキャッシュする
# バッチが大きすぎると大量のメモリを使用する

# 解決策:バッチで実行

2. 非アトミック

PYTHON
# パイプライン内のコマンドは他のクライアントに中断される可能性がある
# アトミック性が必要な場合はトランザクションを使用

3. 中間結果を使用できない

PYTHON
# パイプライン内で前のコマンドの結果を使用できない

# ❌ 誤った例
pipe = r.pipeline()
pipe.incr('counter')
# 後続のコマンドでincrの結果を取得できない
pipe.set('result', ???)  # incrの結果を使用できない
pipe.execute()

# ✅ 解決策:Luaスクリプトを使用

❓ よくある質問

Q パイプラインとトランザクションの違いは何ですか?
A パイプラインはネットワークラウンドトリップを削減しますが、アトミック性を保証しません。トランザクションはアトミック性を保証し、ネットワークラウンドトリップも削減します。
Q パイプラインによりどの程度のパフォーマンス向上が見込めますか?
A ネットワークレイテンシに依存します。レイテンシが高いほど改善効果が大きくなります。通常は5〜10倍です。
Q どのバッチサイズを使用すべきですか?
A バッチあたり100〜1000コマンドを推奨します。大きすぎるとメモリを多く使用し、小さすぎると改善効果が小さくなります。
Q パイプライン内のコマンドが失敗するとどうなりますか?
A 単一のコマンドの失敗は他のコマンドに影響しません。返された結果でエラーを確認してください。
Q いつパイプラインを使用すべきですか?
A バッチ操作、データのインポート/エクスポート、パフォーマンス向上が必要なあらゆるシナリオで使用します。

📖 まとめ

📝 練習問題

  1. 基本パイプライン: パイプラインを使用して100個のキーをバッチ設定し、通常のアプローチとパフォーマンスを比較しましょう
  2. バッチ操作: パイプラインでバッチ取得とバッチ削除を実装しましょう
  3. パフォーマンス比較: 異なるバッチサイズ(10、100、1000)でパフォーマンスをテストしましょう
  4. パイプライン + トランザクション: トランザクション付きパイプラインを使用してアトミックなバッチ操作を実装しましょう

次のレッスン

次のレッスンでは、PythonとRedisについて学びます。PythonでのRedis操作を解説します。

100%