Исходный код vk_bot.state.storage

import json
from abc import ABC, abstractmethod
from typing import Any

try:
    from redis import Redis

    redis_installed = True
except ImportError:
    redis_installed = False

try:
    import psycopg
    from psycopg import sql

    postgres_installed = True
except ImportError:
    postgres_installed = False


[документация] class BaseStorage(ABC): """Abstract base class for state storage backends."""
[документация] @abstractmethod def get_state(self, user_id: int) -> str | None: pass
[документация] @abstractmethod def set_state(self, user_id: int, state: str) -> None: pass
[документация] @abstractmethod def get_data(self, user_id: int) -> dict[str, Any]: pass
[документация] @abstractmethod def set_data(self, user_id: int, data: dict[str, Any]) -> None: pass
[документация] @abstractmethod def update_data(self, user_id: int, **kwargs: Any) -> None: pass
[документация] @abstractmethod def delete(self, user_id: int) -> None: pass
[документация] class MemoryStorage(BaseStorage): """In-memory state storage. Data is lost on restart. Suitable for development. """ def __init__(self) -> None: self._states: dict[int, str] = {} self._data: dict[int, dict[str, Any]] = {}
[документация] def get_state(self, user_id: int) -> str | None: return self._states.get(user_id)
[документация] def set_state(self, user_id: int, state: str) -> None: self._states[user_id] = state
[документация] def get_data(self, user_id: int) -> dict[str, Any]: return self._data.get(user_id, {}).copy()
[документация] def set_data(self, user_id: int, data: dict[str, Any]) -> None: self._data[user_id] = data
[документация] def update_data(self, user_id: int, **kwargs: Any) -> None: if user_id not in self._data: self._data[user_id] = {} self._data[user_id].update(kwargs)
[документация] def delete(self, user_id: int) -> None: self._states.pop(user_id, None) self._data.pop(user_id, None)
[документация] class RedisStorage(BaseStorage): """Redis-backed state storage. Persistent storage. Suitable for production. """ def __init__( self, host: str = "localhost", port: int = 6379, db: int = 0, password: str | None = None, ) -> None: if not redis_installed: raise ImportError("Redis is not installed.") self.redis = Redis( host=host, port=port, db=db, password=password, decode_responses=True ) def _state_key(self, user_id: int) -> str: return f"vkbot:state:{user_id}" def _data_key(self, user_id: int) -> str: return f"vkbot:data:{user_id}"
[документация] def get_state(self, user_id: int) -> str | None: return self.redis.get(self._state_key(user_id))
[документация] def set_state(self, user_id: int, state: str) -> None: self.redis.set(self._state_key(user_id), state)
[документация] def get_data(self, user_id: int) -> dict[str, Any]: data = self.redis.get(self._data_key(user_id)) return json.loads(data) if data else {}
[документация] def set_data(self, user_id: int, data: dict[str, Any]) -> None: self.redis.set(self._data_key(user_id), json.dumps(data))
[документация] def update_data(self, user_id: int, **kwargs: Any) -> None: current = self.get_data(user_id) current.update(kwargs) self.set_data(user_id, current)
[документация] def delete(self, user_id: int) -> None: self.redis.delete(self._state_key(user_id)) self.redis.delete(self._data_key(user_id))
[документация] class PostgresStorage(BaseStorage): """PostgreSQL-backed state storage using psycopg3 (synchronous). Persistent storage with full transaction support. Suitable for production. Requires the ``postgres`` extra: ``pip install vk-bot[postgres]``. Args: dsn: PostgreSQL connection string, e.g. ``postgresql://user:pass@localhost/dbname``. table_prefix: Prefix for the tables created by this storage. """ def __init__(self, dsn: str, table_prefix: str = "vk_bot") -> None: if not postgres_installed: raise ImportError( "psycopg is not installed. Install with: pip install vk-bot[postgres]" ) self._dsn = dsn self._table_prefix = table_prefix self._conn: psycopg.Connection | None = None @property def _states_table(self) -> str: return f"{self._table_prefix}_states" @property def _data_table(self) -> str: return f"{self._table_prefix}_data" def _get_conn(self) -> "psycopg.Connection": if self._conn is None or self._conn.closed: self._conn = psycopg.connect(self._dsn) self._init_tables() return self._conn def _init_tables(self) -> None: conn = self._conn if conn is None: raise RuntimeError("Database connection is not initialized") with conn.transaction(): conn.execute( sql.SQL(""" CREATE TABLE IF NOT EXISTS {} ( user_id BIGINT PRIMARY KEY, state TEXT NOT NULL, updated_at TIMESTAMPTZ DEFAULT now() ) """).format(sql.Identifier(self._states_table)) ) conn.execute( sql.SQL(""" CREATE TABLE IF NOT EXISTS {} ( user_id BIGINT PRIMARY KEY, data JSONB NOT NULL DEFAULT '{{}}'::jsonb, updated_at TIMESTAMPTZ DEFAULT now() ) """).format(sql.Identifier(self._data_table)) )
[документация] def close(self) -> None: """Close the database connection.""" if self._conn and not self._conn.closed: self._conn.close() self._conn = None
[документация] def get_state(self, user_id: int) -> str | None: conn = self._get_conn() with conn.transaction(): row = conn.execute( sql.SQL("SELECT state FROM {} WHERE user_id = %s").format( sql.Identifier(self._states_table) ), (user_id,), ).fetchone() return row[0] if row else None
[документация] def set_state(self, user_id: int, state: str) -> None: conn = self._get_conn() with conn.transaction(): conn.execute( sql.SQL(""" INSERT INTO {} (user_id, state, updated_at) VALUES (%s, %s, now()) ON CONFLICT (user_id) DO UPDATE SET state = EXCLUDED.state, updated_at = now() """).format(sql.Identifier(self._states_table)), (user_id, state), )
[документация] def get_data(self, user_id: int) -> dict[str, Any]: conn = self._get_conn() with conn.transaction(): row = conn.execute( sql.SQL("SELECT data FROM {} WHERE user_id = %s").format( sql.Identifier(self._data_table) ), (user_id,), ).fetchone() return dict(row[0]) if row else {}
[документация] def set_data(self, user_id: int, data: dict[str, Any]) -> None: conn = self._get_conn() with conn.transaction(): conn.execute( sql.SQL(""" INSERT INTO {} (user_id, data, updated_at) VALUES (%s, %s::jsonb, now()) ON CONFLICT (user_id) DO UPDATE SET data = EXCLUDED.data, updated_at = now() """).format(sql.Identifier(self._data_table)), (user_id, json.dumps(data)), )
[документация] def update_data(self, user_id: int, **kwargs: Any) -> None: current = self.get_data(user_id) current.update(kwargs) self.set_data(user_id, current)
[документация] def delete(self, user_id: int) -> None: conn = self._get_conn() with conn.transaction(): conn.execute( sql.SQL("DELETE FROM {} WHERE user_id = %s").format( sql.Identifier(self._states_table) ), (user_id,), ) conn.execute( sql.SQL("DELETE FROM {} WHERE user_id = %s").format( sql.Identifier(self._data_table) ), (user_id,), )