Redis: кэширование, очереди и сессии

rediscachingdatabasepython

Redis в серверной инфраструктуре

Redis — одна из тех технологий, которые кажутся простыми на первый взгляд, но при правильном использовании решают десятки задач. In-memory key-value хранилище, которое работает как кэш, брокер сообщений, очередь задач и хранилище сессий — и всё это с задержкой в доли миллисекунды. Разберём основные сценарии использования, от базовых команд до production-конфигурации.

Установка и базовая настройка

На Ubuntu/Debian:

# Установка из официального репозитория
sudo apt update
sudo apt install redis-server

# Проверить, что Redis работает
sudo systemctl status redis-server
redis-cli ping
# PONG

Минимальные настройки для production в /etc/redis/redis.conf:

# Привязка к localhost (по умолчанию), для сети -- к конкретному IP
bind 127.0.0.1

# Пароль для подключения
requirepass your_strong_password_here

# Максимальный объём памяти
maxmemory 256mb

# Политика вытеснения при достижении лимита
maxmemory-policy allkeys-lru

# Отключить опасные команды на production
rename-command FLUSHALL ""
rename-command FLUSHDB ""
rename-command CONFIG ""
# Применить изменения
sudo systemctl restart redis-server

# Подключение с паролем
redis-cli -a your_strong_password_here
# Или безопаснее -- без пароля в командной строке
redis-cli
AUTH your_strong_password_here

Основные типы данных и команды

Redis — не просто key-value. Это набор оптимизированных структур данных.

Strings — базовый тип

# Установить значение
SET user:1:name "Алексей"

# Получить значение
GET user:1:name
# "Алексей"

# Установить с TTL (время жизни в секундах)
SET session:abc123 "user_data_json" EX 3600

# Атомарный инкремент (счётчики, rate limiting)
SET page:views 0
INCR page:views      # 1
INCRBY page:views 10 # 11

# Установить, только если ключ не существует (распределённая блокировка)
SET lock:resource1 "owner123" NX EX 30

Hashes — объекты

# Хранение объекта
HSET user:1 name "Алексей" email "alex@example.com" role "admin"

# Получить одно поле
HGET user:1 name
# "Алексей"

# Получить все поля
HGETALL user:1
# name "Алексей" email "alex@example.com" role "admin"

# Инкремент поля
HINCRBY user:1 login_count 1

Хэши — идеальный тип для хранения объектов. Занимают меньше памяти, чем отдельные ключи, и позволяют обновлять поля атомарно.

Lists — очереди

# Добавить в конец (справа)
RPUSH queue:emails "msg1" "msg2" "msg3"

# Забрать из начала (слева) -- FIFO-очередь
LPOP queue:emails
# "msg1"

# Блокирующее чтение (ждать до 30 секунд)
BLPOP queue:emails 30

# Длина списка
LLEN queue:emails

# Диапазон элементов (0-based, -1 -- последний)
LRANGE queue:emails 0 -1

Sets и Sorted Sets

# Sets: уникальные значения, O(1) проверка принадлежности
SADD online:users "user1" "user2" "user3"
SISMEMBER online:users "user1"  # 1 (true)
SMEMBERS online:users           # все элементы
SCARD online:users              # количество: 3

# Sorted Sets: элементы с весом (score)
ZADD leaderboard 100 "player1" 250 "player2" 180 "player3"

# Топ-3 по убыванию
ZREVRANGE leaderboard 0 2 WITHSCORES
# player2 250, player3 180, player1 100

# Ранг элемента
ZREVRANK leaderboard "player2"  # 0 (первое место)

Паттерны кэширования

Cache-aside (Lazy Loading)

Самый распространённый паттерн. Приложение сначала проверяет кэш, при промахе — запрашивает базу и кладёт результат в кэш.

import redis
import json
import psycopg2

r = redis.Redis(host='localhost', port=6379, password='your_password', decode_responses=True)

def get_user(user_id: int) -> dict:
    cache_key = f"user:{user_id}"

    # 1. Проверяем кэш
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. Кэш-промах -- идём в базу
    conn = psycopg2.connect("dbname=myapp")
    cur = conn.cursor()
    cur.execute("SELECT id, name, email FROM users WHERE id = %s", (user_id,))
    row = cur.fetchone()
    cur.close()
    conn.close()

    if row is None:
        return None

    user = {"id": row[0], "name": row[1], "email": row[2]}

    # 3. Кладём в кэш с TTL 5 минут
    r.set(cache_key, json.dumps(user), ex=300)

    return user

Плюсы: кэшируются только запрашиваемые данные, промахи обрабатываются прозрачно. Минусы: первый запрос всегда медленный, данные могут устареть до истечения TTL.

Write-through

При записи обновляется и база, и кэш одновременно. Гарантирует актуальность кэша:

def update_user(user_id: int, name: str, email: str) -> dict:
    # 1. Обновляем в базе
    conn = psycopg2.connect("dbname=myapp")
    cur = conn.cursor()
    cur.execute(
        "UPDATE users SET name = %s, email = %s WHERE id = %s RETURNING id, name, email",
        (name, email, user_id)
    )
    row = cur.fetchone()
    conn.commit()
    cur.close()
    conn.close()

    user = {"id": row[0], "name": row[1], "email": row[2]}

    # 2. Обновляем кэш
    cache_key = f"user:{user_id}"
    r.set(cache_key, json.dumps(user), ex=300)

    return user

Инвалидация по событию

Вместо обновления кэша — удаление. Следующий GET подтянет свежие данные через cache-aside:

def delete_user_cache(user_id: int):
    r.delete(f"user:{user_id}")

def update_user_v2(user_id: int, name: str, email: str):
    # Обновляем базу
    # ...

    # Инвалидируем кэш (не обновляем!)
    delete_user_cache(user_id)

TTL — стратегия выбора

Выбор TTL зависит от характера данных:

  • Статические данные (конфигурация, справочники): 1-24 часа
  • Профили пользователей: 5-15 минут
  • Результаты поиска: 1-5 минут
  • Сессии: 30 минут — 24 часа
  • Rate limiting: секунды — минуты

Pub/Sub: обмен сообщениями

Redis Pub/Sub — простой брокер сообщений. Подписчики получают сообщения в реальном времени.

Redis pub/sub и очереди задач

Publisher:

import redis

r = redis.Redis(host='localhost', port=6379, password='your_password')

# Публикация сообщения в канал
r.publish('notifications', json.dumps({
    'type': 'user_registered',
    'user_id': 42,
    'timestamp': '2025-08-14T10:30:00Z'
}))

r.publish('notifications', json.dumps({
    'type': 'order_completed',
    'order_id': 1001
}))

Subscriber:

import redis
import json

r = redis.Redis(host='localhost', port=6379, password='your_password')
pubsub = r.pubsub()
pubsub.subscribe('notifications')

print("Listening for notifications...")
for message in pubsub.listen():
    if message['type'] == 'message':
        data = json.loads(message['data'])
        print(f"Received: {data}")

        if data['type'] == 'user_registered':
            send_welcome_email(data['user_id'])
        elif data['type'] == 'order_completed':
            generate_invoice(data['order_id'])

Ограничения Pub/Sub:

  • Нет гарантии доставки. Если подписчик оффлайн — сообщение теряется
  • Нет персистентности. Сообщения не сохраняются на диск
  • Для надёжных очередей используйте Streams (Redis 5.0+) или внешние брокеры (RabbitMQ, NATS)

Очереди задач

Простая, но надёжная очередь на основе списков Redis:

Producer

import redis
import json
import uuid

r = redis.Redis(host='localhost', port=6379, password='your_password')

def enqueue_task(queue_name: str, task_type: str, payload: dict):
    task = {
        'id': str(uuid.uuid4()),
        'type': task_type,
        'payload': payload,
        'created_at': '2025-08-14T10:00:00Z'
    }
    r.rpush(queue_name, json.dumps(task))
    return task['id']

# Поставить задачи в очередь
enqueue_task('tasks:email', 'send_email', {
    'to': 'user@example.com',
    'subject': 'Welcome!',
    'template': 'welcome'
})

enqueue_task('tasks:email', 'send_email', {
    'to': 'admin@example.com',
    'subject': 'New signup',
    'template': 'admin_notification'
})

Worker

import redis
import json
import traceback

r = redis.Redis(host='localhost', port=6379, password='your_password', decode_responses=True)

def process_task(task: dict):
    """Обработка задачи в зависимости от типа."""
    if task['type'] == 'send_email':
        send_email(
            to=task['payload']['to'],
            subject=task['payload']['subject'],
            template=task['payload']['template']
        )
    elif task['type'] == 'generate_report':
        generate_report(task['payload'])

def worker(queue_name: str):
    print(f"Worker started. Listening on {queue_name}...")
    while True:
        # BLPOP блокируется до появления элемента (таймаут 30 сек)
        result = r.blpop(queue_name, timeout=30)
        if result is None:
            continue  # таймаут, пробуем снова

        _, raw_task = result
        task = json.loads(raw_task)
        print(f"Processing task {task['id']}: {task['type']}")

        try:
            process_task(task)
            print(f"Task {task['id']} completed")
        except Exception as e:
            print(f"Task {task['id']} failed: {e}")
            traceback.print_exc()
            # Вернуть задачу в очередь (retry)
            r.rpush(f"{queue_name}:failed", json.dumps(task))

if __name__ == '__main__':
    worker('tasks:email')

Для production-очередей лучше использовать библиотеки вроде rq (Redis Queue) или Celery с Redis-бэкендом — они добавляют retry-логику, приоритеты, мониторинг и dead letter queues.

Хранение сессий

Redis — популярный бэкенд для хранения сессий веб-приложений. Преимущества перед файловыми сессиями: скорость, общее хранилище для нескольких инстансов приложения, автоматическое истечение через TTL.

Flask + Redis Sessions

from flask import Flask, session
from redis import Redis
import json
import uuid

app = Flask(__name__)
app.secret_key = 'your-secret-key'

redis_client = Redis(host='localhost', port=6379, password='your_password', decode_responses=True)

SESSION_TTL = 1800  # 30 минут


class RedisSessionStore:
    def save(self, session_id: str, data: dict):
        redis_client.set(
            f"session:{session_id}",
            json.dumps(data),
            ex=SESSION_TTL
        )

    def load(self, session_id: str) -> dict:
        raw = redis_client.get(f"session:{session_id}")
        if raw is None:
            return {}
        # Обновить TTL при каждом доступе (sliding expiration)
        redis_client.expire(f"session:{session_id}", SESSION_TTL)
        return json.loads(raw)

    def destroy(self, session_id: str):
        redis_client.delete(f"session:{session_id}")


store = RedisSessionStore()

Подсчёт активных сессий

def get_active_sessions_count() -> int:
    """Подсчёт активных сессий через SCAN (безопасно для production)."""
    count = 0
    cursor = 0
    while True:
        cursor, keys = redis_client.scan(cursor, match="session:*", count=100)
        count += len(keys)
        if cursor == 0:
            break
    return count

Никогда не используйте KEYS * на production — эта команда блокирует Redis на время выполнения. Для перебора ключей — только SCAN.

Персистентность: RDB и AOF

Redis работает в памяти, но может сохранять данные на диск двумя способами.

RDB (Redis Database Backup)

Периодические снимки всей базы. Компактный бинарный формат.

# redis.conf

# Создать снимок, если за 900 секунд (15 минут) было минимум 1 изменение
save 900 1
# Или за 300 секунд -- минимум 10 изменений
save 300 10
# Или за 60 секунд -- минимум 10000 изменений
save 60 10000

# Имя файла дампа
dbfilename dump.rdb

# Директория для сохранения
dir /var/lib/redis

# Сжатие RDB
rdbcompression yes

Плюсы RDB: компактный, быстрый для восстановления, удобен для бэкапов. Минусы: можно потерять данные за последний интервал между снимками.

AOF (Append-Only File)

Журнал всех операций записи. Надёжнее RDB, но файл больше и восстановление медленнее.

# redis.conf

# Включить AOF
appendonly yes

# Имя файла
appendfilename "appendonly.aof"

# Частота fsync
# always -- каждая операция (максимальная надёжность, медленно)
# everysec -- раз в секунду (рекомендуется)
# no -- на усмотрение ОС (быстро, можно потерять данные)
appendfsync everysec

Рекомендация для production

Используйте оба механизма:

# RDB для бэкапов + AOF для надёжности
save 900 1
save 300 10
appendonly yes
appendfsync everysec

RDB — для быстрого восстановления и бэкапирования. AOF — для минимизации потерь данных (максимум 1 секунда при everysec).

Политики вытеснения памяти

Когда Redis достигает лимита maxmemory, он должен решить, какие ключи удалить. Политика вытеснения определяет эту стратегию:

# Максимальный объём памяти
maxmemory 512mb

# Политика вытеснения
maxmemory-policy allkeys-lru

Доступные политики:

ПолитикаОписание
noevictionНе удалять ничего, возвращать ошибку при записи
allkeys-lruУдалять наименее используемые ключи из всех
allkeys-lfuУдалять наименее частотно используемые ключи
volatile-lruLRU только среди ключей с TTL
volatile-lfuLFU только среди ключей с TTL
allkeys-randomСлучайное удаление
volatile-ttlУдалять ключи с наименьшим оставшимся TTL

Рекомендации:

  • Кэш: allkeys-lru или allkeys-lfu — пусть Redis сам решает, что удалить
  • Хранилище данных: noeviction — лучше получить ошибку, чем молча потерять данные
  • Сессии с TTL: volatile-lru — удалять только ключи с истекающим TTL

Redis Sentinel: High Availability

Для production нужна отказоустойчивость. Redis Sentinel обеспечивает автоматический failover: если master падает, один из replicas повышается до master.

Минимальная топология: 1 master, 2 replicas, 3 Sentinel-процесса.

Конфигурация replica:

# redis-replica.conf
port 6380
replicaof 127.0.0.1 6379
masterauth your_strong_password_here
requirepass your_strong_password_here

Конфигурация Sentinel:

# sentinel.conf
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster your_strong_password_here
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1

Параметр 2 в sentinel monitor — кворум. Это количество Sentinel-процессов, которые должны согласиться, что master недоступен, прежде чем начнётся failover.

# Запуск Sentinel
redis-sentinel /etc/redis/sentinel.conf

# Проверить статус
redis-cli -p 26379 sentinel master mymaster
redis-cli -p 26379 sentinel replicas mymaster

Подключение из Python через Sentinel:

from redis.sentinel import Sentinel

sentinel = Sentinel(
    [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)],
    socket_timeout=0.5
)

# Получить текущий master
master = sentinel.master_for('mymaster', password='your_password', db=0)
master.set('key', 'value')

# Получить replica для чтения
replica = sentinel.slave_for('mymaster', password='your_password', db=0)
value = replica.get('key')

Мониторинг

Команда INFO

INFO — главный инструмент диагностики Redis:

# Полная информация
redis-cli INFO

# Конкретная секция
redis-cli INFO memory
redis-cli INFO stats
redis-cli INFO replication
redis-cli INFO clients

Ключевые метрики:

# Использование памяти
redis-cli INFO memory | grep used_memory_human
# used_memory_human:45.67M

# Количество подключённых клиентов
redis-cli INFO clients | grep connected_clients
# connected_clients:12

# Cache hit rate
redis-cli INFO stats | grep keyspace
# keyspace_hits:1234567
# keyspace_misses:12345
# Hit rate = hits / (hits + misses) * 100
# = 1234567 / (1234567 + 12345) * 100 = 99.01%

# Операций в секунду
redis-cli INFO stats | grep instantaneous_ops_per_sec
# instantaneous_ops_per_sec:1523

Мониторинг в реальном времени

# Все команды в реальном времени (осторожно на production!)
redis-cli MONITOR

# Задержка (latency)
redis-cli --latency
# min: 0, max: 1, avg: 0.23 (1000 samples)

# Непрерывный мониторинг latency
redis-cli --latency-history

# Статистика по типам ключей
redis-cli --bigkeys

# Анализ распределения памяти (Redis 4.0+)
redis-cli --memkeys

Скрипт мониторинга

#!/bin/bash
# redis-health-check.sh

REDIS_CLI="redis-cli -a your_password"
ALERT_EMAIL="admin@example.com"

# Проверка доступности
if ! $REDIS_CLI ping > /dev/null 2>&1; then
    echo "CRITICAL: Redis is down!" | mail -s "Redis Alert" $ALERT_EMAIL
    exit 1
fi

# Использование памяти
USED_MEMORY=$($REDIS_CLI INFO memory | grep used_memory_bytes | cut -d: -f2 | tr -d '\r')
MAX_MEMORY=$($REDIS_CLI CONFIG GET maxmemory | tail -1 | tr -d '\r')

if [ "$MAX_MEMORY" -gt 0 ]; then
    USAGE_PCT=$((USED_MEMORY * 100 / MAX_MEMORY))
    if [ "$USAGE_PCT" -gt 90 ]; then
        echo "WARNING: Redis memory usage at ${USAGE_PCT}%" | 
            mail -s "Redis Memory Alert" $ALERT_EMAIL
    fi
fi

# Количество подключённых клиентов
CLIENTS=$($REDIS_CLI INFO clients | grep connected_clients | cut -d: -f2 | tr -d '\r')
if [ "$CLIENTS" -gt 100 ]; then
    echo "WARNING: Redis has $CLIENTS connected clients" | 
        mail -s "Redis Clients Alert" $ALERT_EMAIL
fi

echo "Redis health check passed. Memory: ${USAGE_PCT:-N/A}%, Clients: $CLIENTS"

Интеграция с Prometheus

Для полноценного мониторинга используйте redis_exporter:

# Запуск redis_exporter
docker run -d --name redis-exporter 
    -p 9121:9121 
    -e REDIS_ADDR=redis://localhost:6379 
    -e REDIS_PASSWORD=your_password 
    oliver006/redis_exporter

Основные метрики для Grafana-дашборда:

  • redis_memory_used_bytes — использование памяти
  • redis_connected_clients — количество клиентов
  • redis_commands_processed_total — общее количество команд
  • redis_keyspace_hits_total / redis_keyspace_misses_total — hit rate кэша
  • redis_evicted_keys_total — количество вытесненных ключей

Итого

Redis — универсальный инструмент, который закрывает множество задач:

  1. Кэширование: cache-aside с TTL — самый распространённый паттерн
  2. Очереди: LPUSH/BRPOP для простых задач, rq/Celery для production
  3. Сессии: быстро, с автоматическим истечением, shared между инстансами
  4. Pub/Sub: real-time нотификации (но без гарантий доставки)
  5. Персистентность: RDB + AOF для надёжности
  6. HA: Redis Sentinel для автоматического failover

Главное — не превращать Redis в основную базу данных. Он идеален как дополнение к PostgreSQL/MySQL: горячие данные в Redis, холодные — в реляционной базе. При таком подходе вы получаете скорость Redis и надёжность традиционной СУБД.

© 2026 Terminal Notes. Built with SvelteKit.