From ce433d65435fd56b1fe45403d9c7ce5ce85856d2 Mon Sep 17 00:00:00 2001 From: bvn13 Date: Sun, 22 Mar 2026 18:07:16 +0300 Subject: [PATCH] newsmaker --- .gitignore | 1 + Dockerfile | 12 + README.md | 118 ++++++++++ docker-compose.yaml | 7 + env.example | 6 + pyproject.toml | 26 +++ src/__init__.py | 0 src/adapters/__init__.py | 0 src/adapters/db/__init__.py | 0 src/adapters/db/command_symbols.py | 35 +++ src/adapters/db/rooms.py | 71 ++++++ src/adapters/db/schema.py | 38 ++++ src/adapters/db/sent_news.py | 27 +++ src/adapters/db/subscriptions.py | 122 ++++++++++ src/adapters/jabber/__init__.py | 0 src/adapters/jabber/command_handler.py | 206 +++++++++++++++++ src/adapters/jabber/command_responder.py | 14 ++ src/adapters/jabber/connection.py | 100 ++++++++ src/adapters/jabber/news_publisher.py | 17 ++ src/adapters/scheduled/__init__.py | 0 src/adapters/scheduled/news_checker.py | 34 +++ src/adapters/sources/__init__.py | 0 src/adapters/sources/rss/__init__.py | 0 src/adapters/sources/rss/fetcher.py | 37 +++ src/domain/__init__.py | 0 src/domain/admin/__init__.py | 0 src/domain/admin/entities.py | 14 ++ src/domain/admin/ports.py | 81 +++++++ src/domain/admin/usecases.py | 277 +++++++++++++++++++++++ src/domain/news/__init__.py | 0 src/domain/news/entities.py | 10 + src/domain/news/ports.py | 38 ++++ src/domain/news/usecases.py | 76 +++++++ src/domain/subscriptions/__init__.py | 0 src/domain/subscriptions/entities.py | 46 ++++ src/domain/subscriptions/ports.py | 41 ++++ src/main.py | 116 ++++++++++ 37 files changed, 1570 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 docker-compose.yaml create mode 100644 env.example create mode 100644 pyproject.toml create mode 100644 src/__init__.py create mode 100644 src/adapters/__init__.py create mode 100644 src/adapters/db/__init__.py create mode 100644 src/adapters/db/command_symbols.py create mode 100644 src/adapters/db/rooms.py create mode 100644 src/adapters/db/schema.py create mode 100644 src/adapters/db/sent_news.py create mode 100644 src/adapters/db/subscriptions.py create mode 100644 src/adapters/jabber/__init__.py create mode 100644 src/adapters/jabber/command_handler.py create mode 100644 src/adapters/jabber/command_responder.py create mode 100644 src/adapters/jabber/connection.py create mode 100644 src/adapters/jabber/news_publisher.py create mode 100644 src/adapters/scheduled/__init__.py create mode 100644 src/adapters/scheduled/news_checker.py create mode 100644 src/adapters/sources/__init__.py create mode 100644 src/adapters/sources/rss/__init__.py create mode 100644 src/adapters/sources/rss/fetcher.py create mode 100644 src/domain/__init__.py create mode 100644 src/domain/admin/__init__.py create mode 100644 src/domain/admin/entities.py create mode 100644 src/domain/admin/ports.py create mode 100644 src/domain/admin/usecases.py create mode 100644 src/domain/news/__init__.py create mode 100644 src/domain/news/entities.py create mode 100644 src/domain/news/ports.py create mode 100644 src/domain/news/usecases.py create mode 100644 src/domain/subscriptions/__init__.py create mode 100644 src/domain/subscriptions/entities.py create mode 100644 src/domain/subscriptions/ports.py create mode 100644 src/main.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c49bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1083944 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv + +COPY pyproject.toml uv.lock* ./ +RUN uv sync --frozen --no-dev + +COPY src/ ./src/ + +CMD ["uv", "run", "python", "src/main.py"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..003c947 --- /dev/null +++ b/README.md @@ -0,0 +1,118 @@ +# jabnews + +Jabber-бот для трансляции RSS-новостей в конференции (MUC). + +Бот подписывается на RSS-ленты и автоматически публикует новые записи в указанные jabber-комнаты. + +## Возможности + +- Управление через личные сообщения и команды в конференции +- Подписка на несколько RSS-лент с индивидуальным интервалом опроса +- Дедупликация: каждый RSS-источник опрашивается один раз, даже если подписан в нескольких комнатах +- Персональные командные символы для каждой комнаты +- Разграничение прав: владелец бота, администратор комнаты, участник + +## Команды + +### Личные сообщения + +| Команда | Описание | +|---------|----------| +| `join ` | Добавить бота в конференцию | +| `exit ` | Вывести бота из конференции | +| `list` | Список моих комнат | +| `list-all` | Все комнаты бота — только для владельца | +| `help` | Справка | + +### Команды в конференции + +По умолчанию командные символы — `!`. Их можно изменить командой `cmd`. + +| Команда | Описание | +|---------|----------| +| `! subscribe [интервал]` | Подписаться на RSS (интервал в минутах, default 15, min 5) | +| `! unsubscribe ` | Отписаться от RSS | +| `! list` | Список подписок комнаты | +| `! cmd <символы>` | Сменить командные символы | +| `! help` | Справка по командам комнаты | + +Обращение к боту по нику (`jabnews, help`) эквивалентно команде `! help`. + +## Архитектура + +Проект реализован по паттерну **Ports and Adapters (Hexagonal Architecture)**. + +``` +src/ + domain/ + subscriptions/ — shared kernel: Subscription, SubscriptionRepository + news/ — NewsItem, NewsFetcher, NewsPublisher, FetchAndPublishNews + admin/ — Room, Admin, все usecases управления + adapters/ + jabber/ — XMPP-соединение, публикация новостей, обработка команд + sources/rss/ — получение новостей через feedparser + db/ — SQLite реализации репозиториев + scheduled/ — таймер проверки новостей (каждые 5 мин) + main.py — composition root +``` + +### Направления зависимостей + +``` +domain/admin → domain/subscriptions +domain/news → domain/subscriptions +adapters/* → domain/* +main.py → adapters/* + domain/* +domain/admin ↛ domain/news +domain/news ↛ domain/admin +``` + +Доменный слой не зависит ни от slixmpp, ни от feedparser, ни от aiosqlite. + +## Запуск + +### Требования + +- Docker + Docker Compose + +### Настройка + +```bash +cp env.example .env +``` + +Заполните `.env`: + +```env +BOT_JID=bot@your-jabber-server.com +BOT_PASSWORD=secret +JID_OWNER=owner@your-jabber-server.com +BOT_NICK=jabnews +DB_PATH=/data/jabnews.db +LOG_LEVEL=INFO +``` + +### Запуск через Docker Compose + +```bash +docker compose up -d +``` + +База данных хранится в `./data/jabnews.db` на хосте. + +### Запуск локально (для разработки) + +```bash +uv sync +cp env.example .env +# заполнить .env +uv run python src/main.py +``` + +## Стек + +- Python 3.12+ +- [slixmpp](https://pypi.org/project/slixmpp/) — XMPP-клиент +- [feedparser](https://pypi.org/project/feedparser/) — парсинг RSS +- [aiosqlite](https://pypi.org/project/aiosqlite/) — асинхронная работа с SQLite +- [uv](https://github.com/astral-sh/uv) — сборка и управление зависимостями diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..e0abdce --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,7 @@ +services: + jabnews: + build: . + env_file: .env + volumes: + - ./data:/data + restart: unless-stopped diff --git a/env.example b/env.example new file mode 100644 index 0000000..b8b0888 --- /dev/null +++ b/env.example @@ -0,0 +1,6 @@ +BOT_JID=bot@your-jabber-server.com +BOT_PASSWORD=secret +JID_OWNER=owner@your-jabber-server.com +BOT_NICK=jabnews +DB_PATH=/data/jabnews.db +LOG_LEVEL=INFO diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..cdbe6e4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[project] +name = "jabnews" +version = "0.1.0" +description = "Jabber bot for broadcasting RSS news to conferences" +requires-python = ">=3.12" +dependencies = [ + "slixmpp>=1.8", + "feedparser>=6.0", + "aiosqlite>=0.20", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src"] + +[tool.uv] +dev-dependencies = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[tool.pytest.ini_options] +asyncio_mode = "auto" diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapters/__init__.py b/src/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapters/db/__init__.py b/src/adapters/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapters/db/command_symbols.py b/src/adapters/db/command_symbols.py new file mode 100644 index 0000000..070bce3 --- /dev/null +++ b/src/adapters/db/command_symbols.py @@ -0,0 +1,35 @@ +import aiosqlite + +from src.domain.admin.ports import CommandSymbolsRepository + + +class SqliteCommandSymbolsRepository(CommandSymbolsRepository): + """ + Кэш: dict[room_jid, str] в памяти. + При старте прогружается весь словарь из БД через load_cache(). + При set() — обновляет и кэш и БД. + """ + + def __init__(self, db_path: str) -> None: + self._db_path = db_path + self._cache: dict[str, str] = {} + + async def load_cache(self) -> None: + """Вызывается один раз при старте из main.py до регистрации обработчиков.""" + async with aiosqlite.connect(self._db_path) as db: + async with db.execute("SELECT room_jid, symbols FROM command_symbols") as cursor: + rows = await cursor.fetchall() + self._cache = {row[0]: row[1] for row in rows} + + async def get(self, room_jid: str) -> str: + return self._cache.get(room_jid, self.DEFAULT_SYMBOLS) + + async def set(self, room_jid: str, symbols: str) -> None: + self._cache[room_jid] = symbols + async with aiosqlite.connect(self._db_path) as db: + await db.execute( + """INSERT INTO command_symbols (room_jid, symbols) VALUES (?, ?) + ON CONFLICT(room_jid) DO UPDATE SET symbols = excluded.symbols""", + (room_jid, symbols), + ) + await db.commit() diff --git a/src/adapters/db/rooms.py b/src/adapters/db/rooms.py new file mode 100644 index 0000000..3ebd200 --- /dev/null +++ b/src/adapters/db/rooms.py @@ -0,0 +1,71 @@ +from typing import List, Optional + +import aiosqlite + +from src.domain.admin.entities import Room +from src.domain.admin.ports import RoomRepository + + +class SqliteRoomRepository(RoomRepository): + + def __init__(self, db_path: str) -> None: + self._db_path = db_path + + async def add(self, room: Room) -> None: + async with aiosqlite.connect(self._db_path) as db: + await db.execute( + "INSERT OR REPLACE INTO rooms (jid, admin_jid, is_enabled) VALUES (?, ?, ?)", + (room.jid, room.admin_jid, 1 if room.is_enabled else 0), + ) + await db.commit() + + async def get(self, room_jid: str) -> Optional[Room]: + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + "SELECT jid, admin_jid, is_enabled FROM rooms WHERE jid = ?", + (room_jid,), + ) as cursor: + row = await cursor.fetchone() + if row is None: + return None + return Room(jid=row[0], admin_jid=row[1], is_enabled=bool(row[2])) + + async def disable(self, room_jid: str) -> None: + async with aiosqlite.connect(self._db_path) as db: + await db.execute( + "UPDATE rooms SET is_enabled = 0 WHERE jid = ?", (room_jid,) + ) + await db.commit() + + async def enable(self, room_jid: str) -> None: + async with aiosqlite.connect(self._db_path) as db: + await db.execute( + "UPDATE rooms SET is_enabled = 1 WHERE jid = ?", (room_jid,) + ) + await db.commit() + + async def list_by_admin(self, admin_jid: str) -> List[Room]: + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + "SELECT jid, admin_jid, is_enabled FROM rooms WHERE admin_jid = ?", + (admin_jid,), + ) as cursor: + rows = await cursor.fetchall() + return [Room(jid=r[0], admin_jid=r[1], is_enabled=bool(r[2])) for r in rows] + + async def list_all(self) -> List[Room]: + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + "SELECT jid, admin_jid, is_enabled FROM rooms ORDER BY jid" + ) as cursor: + rows = await cursor.fetchall() + return [Room(jid=r[0], admin_jid=r[1], is_enabled=bool(r[2])) for r in rows] + + async def list_enabled(self) -> List[Room]: + """Используется при старте для восстановления MUC-соединений.""" + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + "SELECT jid, admin_jid, is_enabled FROM rooms WHERE is_enabled = 1" + ) as cursor: + rows = await cursor.fetchall() + return [Room(jid=r[0], admin_jid=r[1], is_enabled=True) for r in rows] diff --git a/src/adapters/db/schema.py b/src/adapters/db/schema.py new file mode 100644 index 0000000..d387c0d --- /dev/null +++ b/src/adapters/db/schema.py @@ -0,0 +1,38 @@ +import aiosqlite + +SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS rooms ( + jid TEXT PRIMARY KEY, + admin_jid TEXT NOT NULL, + is_enabled INTEGER NOT NULL DEFAULT 1 +); + +CREATE TABLE IF NOT EXISTS command_symbols ( + room_jid TEXT PRIMARY KEY, + symbols TEXT NOT NULL DEFAULT '!' +); + +CREATE TABLE IF NOT EXISTS subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + room_jid TEXT NOT NULL, + subscription_type TEXT NOT NULL, + source TEXT NOT NULL, + interval_minutes INTEGER NOT NULL DEFAULT 15, + last_seen TEXT NOT NULL, + UNIQUE(room_jid, source) +); + +CREATE TABLE IF NOT EXISTS sent_news ( + room_jid TEXT NOT NULL, + subscription_id INTEGER NOT NULL, + news_id TEXT NOT NULL, + PRIMARY KEY (room_jid, subscription_id, news_id) +); +""" + + +async def apply_schema(db_path: str) -> None: + """Выполняет DDL при старте. Идемпотентно (IF NOT EXISTS).""" + async with aiosqlite.connect(db_path) as db: + await db.executescript(SCHEMA_SQL) + await db.commit() diff --git a/src/adapters/db/sent_news.py b/src/adapters/db/sent_news.py new file mode 100644 index 0000000..75eb31f --- /dev/null +++ b/src/adapters/db/sent_news.py @@ -0,0 +1,27 @@ +import aiosqlite + +from src.domain.news.ports import SentNewsRepository + + +class SqliteSentNewsRepository(SentNewsRepository): + + def __init__(self, db_path: str) -> None: + self._db_path = db_path + + async def is_sent(self, room_jid: str, subscription_id: int, news_id: str) -> bool: + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + """SELECT 1 FROM sent_news + WHERE room_jid = ? AND subscription_id = ? AND news_id = ?""", + (room_jid, subscription_id, news_id), + ) as cursor: + return await cursor.fetchone() is not None + + async def mark_sent(self, room_jid: str, subscription_id: int, news_id: str) -> None: + async with aiosqlite.connect(self._db_path) as db: + await db.execute( + """INSERT OR IGNORE INTO sent_news (room_jid, subscription_id, news_id) + VALUES (?, ?, ?)""", + (room_jid, subscription_id, news_id), + ) + await db.commit() diff --git a/src/adapters/db/subscriptions.py b/src/adapters/db/subscriptions.py new file mode 100644 index 0000000..a615137 --- /dev/null +++ b/src/adapters/db/subscriptions.py @@ -0,0 +1,122 @@ +from datetime import datetime, timezone +from typing import List, Optional + +import aiosqlite + +from src.domain.subscriptions.entities import Subscription, SubscriptionType +from src.domain.subscriptions.ports import SubscriptionRepository + +_DATE_FMT = "%Y-%m-%dT%H:%M:%S+00:00" + + +def _parse_dt(s: str) -> datetime: + try: + return datetime.fromisoformat(s) + except ValueError: + return datetime.strptime(s, _DATE_FMT).replace(tzinfo=timezone.utc) + + +def _fmt_dt(dt: datetime) -> str: + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.isoformat() + + +class SqliteSubscriptionRepository(SubscriptionRepository): + + def __init__(self, db_path: str) -> None: + self._db_path = db_path + + async def add(self, subscription: Subscription) -> Subscription: + """INSERT OR REPLACE с учётом UNIQUE(room_jid, source). Возвращает объект с id.""" + async with aiosqlite.connect(self._db_path) as db: + await db.execute( + """ + INSERT INTO subscriptions + (room_jid, subscription_type, source, interval_minutes, last_seen) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(room_jid, source) DO UPDATE SET + interval_minutes = excluded.interval_minutes + """, + ( + subscription.room_jid, + subscription.subscription_type.value, + subscription.source, + subscription.interval_minutes, + _fmt_dt(subscription.last_seen), + ), + ) + await db.commit() + return await self.get(subscription.room_jid, subscription.source) + + async def remove(self, room_jid: str, source: str) -> bool: + async with aiosqlite.connect(self._db_path) as db: + cursor = await db.execute( + "DELETE FROM subscriptions WHERE room_jid = ? AND source = ?", + (room_jid, source), + ) + await db.commit() + return cursor.rowcount > 0 + + async def get(self, room_jid: str, source: str) -> Optional[Subscription]: + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + """SELECT id, room_jid, subscription_type, source, interval_minutes, last_seen + FROM subscriptions WHERE room_jid = ? AND source = ?""", + (room_jid, source), + ) as cursor: + row = await cursor.fetchone() + return self._row_to_sub(row) if row else None + + async def list_by_room(self, room_jid: str) -> List[Subscription]: + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + """SELECT id, room_jid, subscription_type, source, interval_minutes, last_seen + FROM subscriptions WHERE room_jid = ? ORDER BY source""", + (room_jid,), + ) as cursor: + rows = await cursor.fetchall() + return [self._row_to_sub(r) for r in rows] + + async def get_due(self, now: datetime) -> List[Subscription]: + """ + Выбирает подписки, у которых last_seen + interval_minutes <= now + и у которых комната включена (is_enabled = 1). + """ + if now.tzinfo is None: + now = now.replace(tzinfo=timezone.utc) + now_str = _fmt_dt(now) + async with aiosqlite.connect(self._db_path) as db: + async with db.execute( + """ + SELECT s.id, s.room_jid, s.subscription_type, s.source, + s.interval_minutes, s.last_seen + FROM subscriptions s + JOIN rooms r ON s.room_jid = r.jid + WHERE r.is_enabled = 1 + AND datetime(s.last_seen, '+' || s.interval_minutes || ' minutes') <= datetime(?) + ORDER BY s.room_jid, s.source + """, + (now_str,), + ) as cursor: + rows = await cursor.fetchall() + return [self._row_to_sub(r) for r in rows] + + async def update_last_seen(self, subscription_id: int, now: datetime) -> None: + async with aiosqlite.connect(self._db_path) as db: + await db.execute( + "UPDATE subscriptions SET last_seen = ? WHERE id = ?", + (_fmt_dt(now), subscription_id), + ) + await db.commit() + + @staticmethod + def _row_to_sub(row: tuple) -> Subscription: + return Subscription( + id=row[0], + room_jid=row[1], + subscription_type=SubscriptionType(row[2]), + source=row[3], + interval_minutes=row[4], + last_seen=_parse_dt(row[5]), + ) diff --git a/src/adapters/jabber/__init__.py b/src/adapters/jabber/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapters/jabber/command_handler.py b/src/adapters/jabber/command_handler.py new file mode 100644 index 0000000..9d3e3cb --- /dev/null +++ b/src/adapters/jabber/command_handler.py @@ -0,0 +1,206 @@ +import logging + +from src.domain.admin.entities import Admin +from src.domain.admin.ports import CommandSymbolsRepository, RoomRepository +from src.domain.admin.usecases import ( + HandleCmd, + HandleExit, + HandleHelp, + HandleJoin, + HandleList, + HandleListAll, + HandleListSubscriptions, + HandleRoomHelp, + HandleSubscribe, + HandleUnsubscribe, +) +from src.domain.subscriptions.entities import Subscription + +logger = logging.getLogger(__name__) + + +class CommandHandler: + """ + Входящий адаптер — диспетчеризует XMPP-сообщения в usecases. + + Личные сообщения (mtype='chat'): + join, exit, list, list-all, help, <неизвестное> → help + + Сообщения в конференции (mtype='groupchat'): + subscribe, list, unsubscribe, cmd, help + обращение по нику бота → help + не-админ + не help → игнорировать + """ + + def __init__( + self, + owner_jid: str, + bot_nick: str, + cmd_symbols_repo: CommandSymbolsRepository, + room_repo: RoomRepository, + handle_join: HandleJoin, + handle_exit: HandleExit, + handle_list: HandleList, + handle_list_all: HandleListAll, + handle_help: HandleHelp, + handle_subscribe: HandleSubscribe, + handle_unsubscribe: HandleUnsubscribe, + handle_list_subs: HandleListSubscriptions, + handle_cmd: HandleCmd, + handle_room_help: HandleRoomHelp, + ) -> None: + self._owner_jid = owner_jid + self._bot_nick = bot_nick.lower() + self._cmd_symbols_repo = cmd_symbols_repo + self._room_repo = room_repo + self._handle_join = handle_join + self._handle_exit = handle_exit + self._handle_list = handle_list + self._handle_list_all = handle_list_all + self._handle_help = handle_help + self._handle_subscribe = handle_subscribe + self._handle_unsubscribe = handle_unsubscribe + self._handle_list_subs = handle_list_subs + self._handle_cmd = handle_cmd + self._handle_room_help = handle_room_help + + async def handle_message(self, msg) -> None: + mtype = msg["type"] + if mtype == "chat": + await self._handle_private(msg) + elif mtype == "groupchat": + await self._handle_room(msg) + + # ------------------------------------------------------------------ + # Личные сообщения + # ------------------------------------------------------------------ + + async def _handle_private(self, msg) -> None: + caller_jid = msg["from"].bare + admin = Admin(jid=caller_jid, is_owner=(caller_jid == self._owner_jid)) + body = msg["body"].strip() if msg["body"] else "" + + parts = body.split(maxsplit=1) + command = parts[0].lower() if parts else "" + arg = parts[1].strip() if len(parts) > 1 else "" + + if command == "join": + if not arg: + await self._handle_help.execute(caller_jid) + return + await self._handle_join.execute(admin, arg) + + elif command == "exit": + if not arg: + await self._handle_help.execute(caller_jid) + return + await self._handle_exit.execute(caller_jid, arg) + + elif command == "list": + await self._handle_list.execute(admin) + + elif command == "list-all": + await self._handle_list_all.execute(caller_jid) + + elif command == "help": + await self._handle_help.execute(caller_jid) + + else: + # Всё неизвестное → help + await self._handle_help.execute(caller_jid) + + # ------------------------------------------------------------------ + # Сообщения в конференции + # ------------------------------------------------------------------ + + async def _handle_room(self, msg) -> None: + room_jid = msg["from"].bare + # full JID участника в MUC: room@conf/nick → берём nick как идентификатор + # но для проверки прав нужен реальный JID — slixmpp предоставляет его через MUC + sender_nick = msg["from"].resource + body = msg["body"].strip() if msg["body"] else "" + + if not body: + return + + # Получаем реальный JID отправителя через MUC plugin + caller_jid = self._get_real_jid(msg) + + # Проверяем обращение по нику бота + bot_nick_lower = self._bot_nick + body_lower = body.lower() + if body_lower.startswith(bot_nick_lower + ",") or body_lower.startswith( + bot_nick_lower + ":" + ): + await self._handle_room_help.execute(room_jid, caller_jid or sender_nick) + return + + symbols = await self._cmd_symbols_repo.get(room_jid) + if not body.startswith(symbols): + return + + # Убрать символы и распарсить команду + command_body = body[len(symbols):].strip() + parts = command_body.split(maxsplit=1) + if not parts: + return + + command = parts[0].lower() + arg = parts[1].strip() if len(parts) > 1 else "" + + # help доступен всем + if command == "help": + await self._handle_room_help.execute(room_jid, caller_jid or sender_nick) + return + + # Остальные команды — только для админа комнаты + if not await self._is_room_admin(room_jid, caller_jid): + return + + if command == "subscribe": + await self._parse_subscribe(room_jid, arg) + + elif command == "unsubscribe": + if arg: + await self._handle_unsubscribe.execute(room_jid, arg) + + elif command == "list": + await self._handle_list_subs.execute(room_jid) + + elif command == "cmd": + if arg: + await self._handle_cmd.execute(room_jid, arg) + + async def _parse_subscribe(self, room_jid: str, arg: str) -> None: + """Парсит: [interval_minutes]""" + parts = arg.split() + if not parts: + return + source = parts[0] + interval = Subscription.DEFAULT_INTERVAL_MINUTES + if len(parts) >= 2: + try: + interval = int(parts[1]) + except ValueError: + pass + await self._handle_subscribe.execute(room_jid, source, interval) + + async def _is_room_admin(self, room_jid: str, caller_jid: str | None) -> bool: + if not caller_jid: + return False + room = await self._room_repo.get(room_jid) + if not room: + return False + return room.admin_jid == caller_jid + + def _get_real_jid(self, msg) -> str | None: + """Получает реальный JID участника через MUC plugin.""" + try: + muc = msg.get_plugin("xep_0045", None) + if muc is None: + return None + room_jid = msg["from"].bare + nick = msg["from"].resource + return str(muc.get_jid_property(room_jid, nick, "jid")) + except Exception: + return None diff --git a/src/adapters/jabber/command_responder.py b/src/adapters/jabber/command_responder.py new file mode 100644 index 0000000..483632a --- /dev/null +++ b/src/adapters/jabber/command_responder.py @@ -0,0 +1,14 @@ +from src.domain.admin.ports import CommandResponder +from .connection import JabberConnection + + +class JabberCommandResponder(CommandResponder): + + def __init__(self, connection: JabberConnection) -> None: + self._connection = connection + + async def respond_private(self, to_jid: str, text: str) -> None: + await self._connection.send_private(to_jid, text) + + async def respond_room(self, room_jid: str, text: str) -> None: + await self._connection.send_to_room(room_jid, text) diff --git a/src/adapters/jabber/connection.py b/src/adapters/jabber/connection.py new file mode 100644 index 0000000..536f538 --- /dev/null +++ b/src/adapters/jabber/connection.py @@ -0,0 +1,100 @@ +import asyncio +import logging +from typing import Callable, Awaitable, List, Optional + +from slixmpp import ClientXMPP +from slixmpp.exceptions import IqError, IqTimeout + +from src.domain.admin.ports import JabberRoomJoiner, JabberRoomLeaver + +logger = logging.getLogger(__name__) + +MessageCallback = Callable[["slixmpp.stanza.Message"], Awaitable[None]] + + +class JabberConnection(ClientXMPP, JabberRoomJoiner, JabberRoomLeaver): + """ + Единственный владелец XMPP-соединения. + Реализует JabberRoomJoiner и JabberRoomLeaver для домена. + """ + + def __init__(self, jid: str, password: str, nick: str) -> None: + super().__init__(jid, password) + self._nick = nick + self._message_callback: Optional[MessageCallback] = None + self._rooms_on_start: List[str] = [] + + self.register_plugin("xep_0045") # MUC + self.register_plugin("xep_0199") # XMPP Ping + + self.add_event_handler("session_start", self._on_session_start) + self.add_event_handler("message", self._on_message) + self.add_event_handler("groupchat_message", self._on_message) + self.add_event_handler("failed_auth", self._on_failed_auth) + + def set_message_callback(self, callback: MessageCallback) -> None: + self._message_callback = callback + + def set_rooms_on_start(self, room_jids: List[str]) -> None: + """Список комнат для автозахода при старте (is_enabled=True).""" + self._rooms_on_start = room_jids + + async def _on_session_start(self, event) -> None: + self.send_presence() + try: + await self.get_roster() + except (IqError, IqTimeout): + logger.warning("Не удалось получить roster") + + for room_jid in self._rooms_on_start: + try: + await self.join_room(room_jid) + except Exception: + logger.exception("Не удалось зайти в комнату %s при старте", room_jid) + + async def _on_message(self, msg) -> None: + # Игнорируем собственные сообщения + if msg["from"].bare == self.boundjid.bare: + return + if self._message_callback is not None: + try: + await self._message_callback(msg) + except Exception: + logger.exception("Ошибка в обработчике сообщений") + + async def _on_failed_auth(self, event) -> None: + logger.error("Ошибка аутентификации XMPP") + self.disconnect() + + # --- JabberRoomJoiner / JabberRoomLeaver --- + + async def join_room(self, room_jid: str) -> None: + muc = self.plugin["xep_0045"] + await muc.join_muc_wait(room_jid, self._nick) + logger.info("Зашёл в комнату %s", room_jid) + + async def leave_room(self, room_jid: str) -> None: + muc = self.plugin["xep_0045"] + await muc.leave_muc(room_jid, self._nick) + logger.info("Вышел из комнаты %s", room_jid) + + # --- Вспомогательные методы для адаптеров --- + + async def send_private(self, to_jid: str, text: str) -> None: + self.send_message(mto=to_jid, mbody=text, mtype="chat") + + async def send_to_room(self, room_jid: str, text: str) -> bool: + """ + Отправляет сообщение в конференцию. + Возвращает False если возникла ошибка (бот не в комнате). + """ + try: + self.send_message(mto=room_jid, mbody=text, mtype="groupchat") + return True + except Exception: + logger.warning("Не удалось отправить сообщение в %s", room_jid) + return False + + @property + def nick(self) -> str: + return self._nick diff --git a/src/adapters/jabber/news_publisher.py b/src/adapters/jabber/news_publisher.py new file mode 100644 index 0000000..d8fb5cf --- /dev/null +++ b/src/adapters/jabber/news_publisher.py @@ -0,0 +1,17 @@ +from src.domain.news.entities import NewsItem +from src.domain.news.ports import NewsPublisher +from .connection import JabberConnection + + +class JabberNewsPublisher(NewsPublisher): + + def __init__(self, connection: JabberConnection) -> None: + self._connection = connection + + async def publish(self, room_jid: str, item: NewsItem) -> bool: + """ + Отправляет новость в комнату. + Возвращает False если бот не в комнате. + """ + text = f"{item.title}\n{item.link}" + return await self._connection.send_to_room(room_jid, text) diff --git a/src/adapters/scheduled/__init__.py b/src/adapters/scheduled/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapters/scheduled/news_checker.py b/src/adapters/scheduled/news_checker.py new file mode 100644 index 0000000..f25e750 --- /dev/null +++ b/src/adapters/scheduled/news_checker.py @@ -0,0 +1,34 @@ +import asyncio +import logging +from datetime import datetime, timezone + +from src.domain.news.usecases import FetchAndPublishNews + +logger = logging.getLogger(__name__) + + +class NewsChecker: + """ + Входящий адаптер: запускает FetchAndPublishNews каждые 5 минут. + Реальный интервал опроса каждой RSS контролируется subscription.interval_minutes. + """ + + CHECK_INTERVAL_SECONDS: int = 5 * 60 + + def __init__(self, usecase: FetchAndPublishNews) -> None: + self._usecase = usecase + + async def start(self) -> None: + """Запускается как asyncio-задача из main.py.""" + logger.info("NewsChecker запущен, интервал %d сек", self.CHECK_INTERVAL_SECONDS) + while True: + await asyncio.sleep(self.CHECK_INTERVAL_SECONDS) + await self._run_once() + + async def _run_once(self) -> None: + now = datetime.now(tz=timezone.utc) + logger.debug("NewsChecker: запуск проверки %s", now.isoformat()) + try: + await self._usecase.execute(now) + except Exception: + logger.exception("Ошибка в NewsChecker") diff --git a/src/adapters/sources/__init__.py b/src/adapters/sources/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapters/sources/rss/__init__.py b/src/adapters/sources/rss/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/adapters/sources/rss/fetcher.py b/src/adapters/sources/rss/fetcher.py new file mode 100644 index 0000000..047d442 --- /dev/null +++ b/src/adapters/sources/rss/fetcher.py @@ -0,0 +1,37 @@ +import asyncio +import logging +from typing import List + +import feedparser + +from src.domain.news.entities import NewsItem +from src.domain.news.ports import NewsFetcher +from src.domain.subscriptions.entities import Subscription + +logger = logging.getLogger(__name__) + + +class RssFetcher(NewsFetcher): + + async def fetch(self, subscription: Subscription) -> List[NewsItem]: + """ + Опрашивает RSS-ленту. feedparser.parse() — блокирующий вызов, + поэтому запускается в executor чтобы не блокировать event loop. + """ + loop = asyncio.get_event_loop() + try: + feed = await loop.run_in_executor(None, feedparser.parse, subscription.source) + except Exception: + logger.exception("Ошибка при чтении RSS %s", subscription.source) + return [] + + items = [] + for entry in feed.entries: + news_id = entry.get("id") or entry.get("link") or "" + title = entry.get("title", "(без заголовка)") + link = entry.get("link", "") + summary = entry.get("summary") + if news_id: + items.append(NewsItem(id=news_id, title=title, link=link, summary=summary)) + + return items diff --git a/src/domain/__init__.py b/src/domain/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/domain/admin/__init__.py b/src/domain/admin/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/domain/admin/entities.py b/src/domain/admin/entities.py new file mode 100644 index 0000000..82676fb --- /dev/null +++ b/src/domain/admin/entities.py @@ -0,0 +1,14 @@ +from dataclasses import dataclass + + +@dataclass +class Room: + jid: str + admin_jid: str + is_enabled: bool + + +@dataclass +class Admin: + jid: str + is_owner: bool diff --git a/src/domain/admin/ports.py b/src/domain/admin/ports.py new file mode 100644 index 0000000..7bbd156 --- /dev/null +++ b/src/domain/admin/ports.py @@ -0,0 +1,81 @@ +from abc import ABC, abstractmethod +from typing import List, Optional + +from .entities import Room + + +class RoomRepository(ABC): + + @abstractmethod + async def add(self, room: Room) -> None: + ... + + @abstractmethod + async def get(self, room_jid: str) -> Optional[Room]: + ... + + @abstractmethod + async def disable(self, room_jid: str) -> None: + """Устанавливает is_enabled=False.""" + ... + + @abstractmethod + async def enable(self, room_jid: str) -> None: + """Устанавливает is_enabled=True.""" + ... + + @abstractmethod + async def list_by_admin(self, admin_jid: str) -> List[Room]: + """Для команды list — только комнаты данного admin_jid.""" + ... + + @abstractmethod + async def list_all(self) -> List[Room]: + """Для команды list-all — все комнаты.""" + ... + + +class CommandSymbolsRepository(ABC): + """ + Хранит COMMAND_SYMBOLS per-room. + Реализация обязана кешировать значения в памяти. + """ + + DEFAULT_SYMBOLS = "!" + + @abstractmethod + async def get(self, room_jid: str) -> str: + """Возвращает символы для комнаты. Default = '!'.""" + ... + + @abstractmethod + async def set(self, room_jid: str, symbols: str) -> None: + ... + + +class CommandResponder(ABC): + """Порт для отправки ответа на команду.""" + + @abstractmethod + async def respond_private(self, to_jid: str, text: str) -> None: + ... + + @abstractmethod + async def respond_room(self, room_jid: str, text: str) -> None: + ... + + +class JabberRoomJoiner(ABC): + """Порт для физического входа в MUC-конференцию.""" + + @abstractmethod + async def join_room(self, room_jid: str) -> None: + ... + + +class JabberRoomLeaver(ABC): + """Порт для физического выхода из MUC-конференции.""" + + @abstractmethod + async def leave_room(self, room_jid: str) -> None: + ... diff --git a/src/domain/admin/usecases.py b/src/domain/admin/usecases.py new file mode 100644 index 0000000..e6184a3 --- /dev/null +++ b/src/domain/admin/usecases.py @@ -0,0 +1,277 @@ +from typing import List + +from .entities import Admin, Room +from .ports import ( + CommandResponder, + CommandSymbolsRepository, + JabberRoomJoiner, + JabberRoomLeaver, + RoomRepository, +) +from ..subscriptions.entities import Subscription, SubscriptionType +from ..subscriptions.ports import SubscriptionRepository + + +# --------------------------------------------------------------------------- +# Личные сообщения +# --------------------------------------------------------------------------- + +class HandleJoin: + """ + join + - Если комната уже есть с другим админом: ответить "уже присутствую, админ: JID". + - Если комната есть и is_enabled=False (тот же админ): включить, зайти снова. + - Иначе: создать Room, физически зайти в конференцию. + """ + + def __init__( + self, + room_repo: RoomRepository, + responder: CommandResponder, + joiner: JabberRoomJoiner, + ) -> None: + self._room_repo = room_repo + self._responder = responder + self._joiner = joiner + + async def execute(self, admin: Admin, room_jid: str) -> None: + existing = await self._room_repo.get(room_jid) + if existing and existing.admin_jid != admin.jid: + await self._responder.respond_private( + admin.jid, + f"Я уже присутствую в комнате {room_jid}, админ: {existing.admin_jid}", + ) + return + + if existing and not existing.is_enabled: + await self._room_repo.enable(room_jid) + elif not existing: + await self._room_repo.add(Room(jid=room_jid, admin_jid=admin.jid, is_enabled=True)) + + await self._joiner.join_room(room_jid) + await self._responder.respond_private(admin.jid, f"Зашёл в комнату {room_jid}") + + +class HandleExit: + """ + exit + - Проверить что caller == admin этой комнаты. + - Выключить комнату, физически выйти. + """ + + def __init__( + self, + room_repo: RoomRepository, + responder: CommandResponder, + leaver: JabberRoomLeaver, + ) -> None: + self._room_repo = room_repo + self._responder = responder + self._leaver = leaver + + async def execute(self, caller_jid: str, room_jid: str) -> None: + room = await self._room_repo.get(room_jid) + if not room: + await self._responder.respond_private(caller_jid, f"Я не в комнате {room_jid}") + return + if room.admin_jid != caller_jid: + await self._responder.respond_private( + caller_jid, + f"Вы не являетесь администратором комнаты {room_jid}", + ) + return + + await self._room_repo.disable(room_jid) + await self._leaver.leave_room(room_jid) + await self._responder.respond_private(caller_jid, f"Вышел из комнаты {room_jid}") + + +class HandleList: + """list — показать комнаты, которые завёл данный JID.""" + + def __init__(self, room_repo: RoomRepository, responder: CommandResponder) -> None: + self._room_repo = room_repo + self._responder = responder + + async def execute(self, admin: Admin) -> None: + rooms = await self._room_repo.list_by_admin(admin.jid) + if not rooms: + await self._responder.respond_private(admin.jid, "Вы не добавили бота ни в одну комнату") + return + lines = [f"{'✓' if r.is_enabled else '✗'} {r.jid}" for r in rooms] + await self._responder.respond_private(admin.jid, "Ваши комнаты:\n" + "\n".join(lines)) + + +class HandleListAll: + """list-all — только для owner. Все комнаты со статусом.""" + + def __init__( + self, + room_repo: RoomRepository, + responder: CommandResponder, + owner_jid: str, + ) -> None: + self._room_repo = room_repo + self._responder = responder + self._owner_jid = owner_jid + + async def execute(self, caller_jid: str) -> None: + if caller_jid != self._owner_jid: + return + rooms = await self._room_repo.list_all() + if not rooms: + await self._responder.respond_private(caller_jid, "Нет ни одной комнаты") + return + lines = [f"{'✓' if r.is_enabled else '✗'} {r.jid} (админ: {r.admin_jid})" for r in rooms] + await self._responder.respond_private(caller_jid, "Все комнаты:\n" + "\n".join(lines)) + + +class HandleHelp: + """help — личное сообщение. Обычному пользователю — без list-all.""" + + def __init__(self, responder: CommandResponder, owner_jid: str) -> None: + self._responder = responder + self._owner_jid = owner_jid + + async def execute(self, caller_jid: str) -> None: + lines = [ + "Команды личных сообщений:", + " join — зайти в конференцию", + " exit — выйти из конференции", + " list — список моих комнат", + " help — эта справка", + ] + if caller_jid == self._owner_jid: + lines.append(" list-all — все комнаты бота (только для владельца)") + await self._responder.respond_private(caller_jid, "\n".join(lines)) + + +# --------------------------------------------------------------------------- +# Команды из конференции +# --------------------------------------------------------------------------- + +class HandleSubscribe: + """ + ! subscribe [interval_minutes] + - Валидировать interval >= 5, default = 15. + - При дубле — обновить interval. + """ + + def __init__( + self, + subscription_repo: SubscriptionRepository, + responder: CommandResponder, + ) -> None: + self._subscription_repo = subscription_repo + self._responder = responder + + async def execute( + self, + room_jid: str, + source: str, + interval_minutes: int = Subscription.DEFAULT_INTERVAL_MINUTES, + ) -> None: + interval_minutes = max(interval_minutes, Subscription.MIN_INTERVAL_MINUTES) + sub = Subscription.create( + room_jid=room_jid, + source=source, + subscription_type=SubscriptionType.RSS, + interval_minutes=interval_minutes, + ) + saved = await self._subscription_repo.add(sub) + await self._responder.respond_room( + room_jid, + f"Подписка на {source} оформлена (интервал: {saved.interval_minutes} мин)", + ) + + +class HandleUnsubscribe: + """! unsubscribe """ + + def __init__( + self, + subscription_repo: SubscriptionRepository, + responder: CommandResponder, + ) -> None: + self._subscription_repo = subscription_repo + self._responder = responder + + async def execute(self, room_jid: str, source: str) -> None: + removed = await self._subscription_repo.remove(room_jid, source) + if removed: + await self._responder.respond_room(room_jid, f"Подписка на {source} удалена") + else: + await self._responder.respond_room(room_jid, f"Подписка на {source} не найдена") + + +class HandleListSubscriptions: + """! list — список подписок комнаты.""" + + def __init__( + self, + subscription_repo: SubscriptionRepository, + responder: CommandResponder, + ) -> None: + self._subscription_repo = subscription_repo + self._responder = responder + + async def execute(self, room_jid: str) -> None: + subs = await self._subscription_repo.list_by_room(room_jid) + if not subs: + await self._responder.respond_room(room_jid, "Нет активных подписок") + return + lines = [f" {s.source} (каждые {s.interval_minutes} мин)" for s in subs] + await self._responder.respond_room(room_jid, "Подписки:\n" + "\n".join(lines)) + + +class HandleCmd: + """! cmd — сменить командные символы для комнаты.""" + + def __init__( + self, + cmd_symbols_repo: CommandSymbolsRepository, + responder: CommandResponder, + ) -> None: + self._cmd_symbols_repo = cmd_symbols_repo + self._responder = responder + + async def execute(self, room_jid: str, new_symbols: str) -> None: + if not new_symbols.strip(): + await self._responder.respond_room(room_jid, "Командные символы не могут быть пустыми") + return + await self._cmd_symbols_repo.set(room_jid, new_symbols) + await self._responder.respond_room( + room_jid, f"Командные символы изменены на: {new_symbols}" + ) + + +class HandleRoomHelp: + """ + ! help — справка по командам комнаты. + Доступна всем. Если не-админ — дополнительно показать JID админа. + """ + + def __init__( + self, + room_repo: RoomRepository, + cmd_symbols_repo: CommandSymbolsRepository, + responder: CommandResponder, + ) -> None: + self._room_repo = room_repo + self._cmd_symbols_repo = cmd_symbols_repo + self._responder = responder + + async def execute(self, room_jid: str, caller_jid: str) -> None: + symbols = await self._cmd_symbols_repo.get(room_jid) + lines = [ + f"Команды конференции (символы: {symbols}):", + f" {symbols} subscribe [интервал_мин] — подписаться на RSS", + f" {symbols} list — список подписок", + f" {symbols} unsubscribe — отписаться", + f" {symbols} cmd <символы> — сменить командные символы", + f" {symbols} help — эта справка", + ] + room = await self._room_repo.get(room_jid) + if room and room.admin_jid != caller_jid: + lines.append(f"Администратор комнаты: {room.admin_jid}") + await self._responder.respond_room(room_jid, "\n".join(lines)) diff --git a/src/domain/news/__init__.py b/src/domain/news/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/domain/news/entities.py b/src/domain/news/entities.py new file mode 100644 index 0000000..279fc0c --- /dev/null +++ b/src/domain/news/entities.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class NewsItem: + id: str # guid или link из RSS — используется для дедупликации + title: str + link: str + summary: Optional[str] = None diff --git a/src/domain/news/ports.py b/src/domain/news/ports.py new file mode 100644 index 0000000..971f344 --- /dev/null +++ b/src/domain/news/ports.py @@ -0,0 +1,38 @@ +from abc import ABC, abstractmethod +from typing import List + +from .entities import NewsItem +from ..subscriptions.entities import Subscription + + +class NewsFetcher(ABC): + """Порт для получения новостей из источника.""" + + @abstractmethod + async def fetch(self, subscription: Subscription) -> List[NewsItem]: + """Возвращает все доступные записи из источника.""" + ... + + +class NewsPublisher(ABC): + """Порт для публикации новости в комнату.""" + + @abstractmethod + async def publish(self, room_jid: str, item: NewsItem) -> bool: + """ + Отправляет новость в комнату. + Возвращает False если бот уже не в комнате — сигнал выключить комнату. + """ + ... + + +class SentNewsRepository(ABC): + """Порт для отслеживания уже отправленных новостей.""" + + @abstractmethod + async def is_sent(self, room_jid: str, subscription_id: int, news_id: str) -> bool: + ... + + @abstractmethod + async def mark_sent(self, room_jid: str, subscription_id: int, news_id: str) -> None: + ... diff --git a/src/domain/news/usecases.py b/src/domain/news/usecases.py new file mode 100644 index 0000000..eb1f80b --- /dev/null +++ b/src/domain/news/usecases.py @@ -0,0 +1,76 @@ +import logging +from datetime import datetime + +from .entities import NewsItem +from .ports import NewsFetcher, NewsPublisher, SentNewsRepository +from ..admin.ports import RoomRepository +from ..subscriptions.ports import SubscriptionRepository + +logger = logging.getLogger(__name__) + + +class FetchAndPublishNews: + """ + Usecase, вызываемый scheduled-адаптером каждые 5 минут. + Реализует алгоритм из п.7.1 ТЗ. + """ + + def __init__( + self, + subscription_repo: SubscriptionRepository, + room_repo: RoomRepository, + fetcher: NewsFetcher, + publisher: NewsPublisher, + sent_news_repo: SentNewsRepository, + ) -> None: + self._subscription_repo = subscription_repo + self._room_repo = room_repo + self._fetcher = fetcher + self._publisher = publisher + self._sent_news_repo = sent_news_repo + + async def execute(self, now: datetime) -> None: + # 1. Собрать подписки, которые нужно опросить + due_subs = await self._subscription_repo.get_due(now) + if not due_subs: + return + + # 2. Дедуплицировать источники — опрашиваем каждый URL один раз + unique_sources = {sub.source for sub in due_subs} + fetched: dict[str, list[NewsItem]] = {} + + for source in unique_sources: + # Берём любую подписку с этим source для передачи в fetcher + sample_sub = next(s for s in due_subs if s.source == source) + try: + items = await self._fetcher.fetch(sample_sub) + fetched[source] = items + except Exception: + logger.exception("Ошибка при опросе источника %s", source) + fetched[source] = [] + + # 3. Для каждой подписки отправить новые сообщения в комнату + for sub in due_subs: + items = fetched.get(sub.source, []) + if not items: + await self._subscription_repo.update_last_seen(sub.id, now) + continue + + room_disabled = False + for item in items: + if await self._sent_news_repo.is_sent(sub.room_jid, sub.id, item.id): + continue + + success = await self._publisher.publish(sub.room_jid, item) + + if not success: + # Бот выброшен из комнаты — выключить её + logger.warning("Бот не в комнате %s, выключаю", sub.room_jid) + await self._room_repo.disable(sub.room_jid) + room_disabled = True + break + + await self._sent_news_repo.mark_sent(sub.room_jid, sub.id, item.id) + + if not room_disabled: + await self._subscription_repo.update_last_seen(sub.id, now) diff --git a/src/domain/subscriptions/__init__.py b/src/domain/subscriptions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/domain/subscriptions/entities.py b/src/domain/subscriptions/entities.py new file mode 100644 index 0000000..0c648fc --- /dev/null +++ b/src/domain/subscriptions/entities.py @@ -0,0 +1,46 @@ +from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum +from typing import Optional + + +class SubscriptionType(str, Enum): + RSS = "rss" + + +@dataclass +class Subscription: + room_jid: str + source: str + subscription_type: SubscriptionType + interval_minutes: int + last_seen: datetime + id: Optional[int] = None # None до сохранения в БД + + MIN_INTERVAL_MINUTES: int = 5 + DEFAULT_INTERVAL_MINUTES: int = 15 + + def __post_init__(self) -> None: + if self.interval_minutes < self.MIN_INTERVAL_MINUTES: + self.interval_minutes = self.MIN_INTERVAL_MINUTES + + def is_due(self, now: datetime) -> bool: + """Возвращает True если пора делать опрос.""" + from datetime import timedelta + return now >= self.last_seen + timedelta(minutes=self.interval_minutes) + + @classmethod + def create( + cls, + room_jid: str, + source: str, + subscription_type: SubscriptionType = SubscriptionType.RSS, + interval_minutes: int = DEFAULT_INTERVAL_MINUTES, + ) -> "Subscription": + return cls( + room_jid=room_jid, + source=source, + subscription_type=subscription_type, + interval_minutes=max(interval_minutes, cls.MIN_INTERVAL_MINUTES), + last_seen=datetime.now(tz=timezone.utc), + ) diff --git a/src/domain/subscriptions/ports.py b/src/domain/subscriptions/ports.py new file mode 100644 index 0000000..9cbadde --- /dev/null +++ b/src/domain/subscriptions/ports.py @@ -0,0 +1,41 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from typing import List, Optional + +from .entities import Subscription + + +class SubscriptionRepository(ABC): + + @abstractmethod + async def add(self, subscription: Subscription) -> Subscription: + """ + Создаёт подписку. Если уже существует (room_jid + source) — обновляет + interval_minutes. Возвращает сохранённый объект с заполненным id. + """ + ... + + @abstractmethod + async def remove(self, room_jid: str, source: str) -> bool: + """Удаляет подписку. Возвращает False если не найдена.""" + ... + + @abstractmethod + async def list_by_room(self, room_jid: str) -> List[Subscription]: + ... + + @abstractmethod + async def get_due(self, now: datetime) -> List[Subscription]: + """ + Возвращает подписки, которые нужно опросить прямо сейчас. + Условие: last_seen + interval_minutes <= now AND room.is_enabled = 1 + """ + ... + + @abstractmethod + async def update_last_seen(self, subscription_id: int, now: datetime) -> None: + ... + + @abstractmethod + async def get(self, room_jid: str, source: str) -> Optional[Subscription]: + ... diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..c611e93 --- /dev/null +++ b/src/main.py @@ -0,0 +1,116 @@ +import asyncio +import logging +import os + +from adapters.db.command_symbols import SqliteCommandSymbolsRepository +from adapters.db.rooms import SqliteRoomRepository +from adapters.db.schema import apply_schema +from adapters.db.sent_news import SqliteSentNewsRepository +from adapters.db.subscriptions import SqliteSubscriptionRepository +from adapters.jabber.command_handler import CommandHandler +from adapters.jabber.command_responder import JabberCommandResponder +from adapters.jabber.connection import JabberConnection +from adapters.jabber.news_publisher import JabberNewsPublisher +from adapters.scheduled.news_checker import NewsChecker +from adapters.sources.rss.fetcher import RssFetcher +from domain.admin.usecases import ( + HandleCmd, + HandleExit, + HandleHelp, + HandleJoin, + HandleList, + HandleListAll, + HandleListSubscriptions, + HandleRoomHelp, + HandleSubscribe, + HandleUnsubscribe, +) +from domain.news.usecases import FetchAndPublishNews + +logging.basicConfig( + level=os.environ.get("LOG_LEVEL", "INFO"), + format="%(asctime)s %(name)s %(levelname)s %(message)s", +) +logger = logging.getLogger(__name__) + + +async def main() -> None: + # 1. Конфиг из окружения + bot_jid = os.environ["BOT_JID"] + bot_password = os.environ["BOT_PASSWORD"] + owner_jid = os.environ["JID_OWNER"] + bot_nick = os.environ.get("BOT_NICK", "jabnews") + db_path = os.environ.get("DB_PATH", "/data/jabnews.db") + + # 2. Применить схему БД + await apply_schema(db_path) + + # 3. Репозитории + room_repo = SqliteRoomRepository(db_path) + sub_repo = SqliteSubscriptionRepository(db_path) + sent_repo = SqliteSentNewsRepository(db_path) + cmd_sym_repo = SqliteCommandSymbolsRepository(db_path) + await cmd_sym_repo.load_cache() + + # 4. Jabber-соединение + connection = JabberConnection(bot_jid, bot_password, bot_nick) + + # 5. Загрузить список активных комнат для автозахода при старте + enabled_rooms = await room_repo.list_enabled() + connection.set_rooms_on_start([r.jid for r in enabled_rooms]) + + # 6. Исходящие адаптеры + publisher = JabberNewsPublisher(connection) + responder = JabberCommandResponder(connection) + fetcher = RssFetcher() + + # 7. Usecases — доменный слой + fetch_and_publish = FetchAndPublishNews( + subscription_repo=sub_repo, + room_repo=room_repo, + fetcher=fetcher, + publisher=publisher, + sent_news_repo=sent_repo, + ) + + handle_join = HandleJoin(room_repo, responder, connection) + handle_exit = HandleExit(room_repo, responder, connection) + handle_list = HandleList(room_repo, responder) + handle_list_all = HandleListAll(room_repo, responder, owner_jid) + handle_help = HandleHelp(responder, owner_jid) + handle_subscribe = HandleSubscribe(sub_repo, responder) + handle_unsubscribe = HandleUnsubscribe(sub_repo, responder) + handle_list_subs = HandleListSubscriptions(sub_repo, responder) + handle_cmd = HandleCmd(cmd_sym_repo, responder) + handle_room_help = HandleRoomHelp(room_repo, cmd_sym_repo, responder) + + # 8. Входящий адаптер — команды + cmd_handler = CommandHandler( + owner_jid=owner_jid, + bot_nick=bot_nick, + cmd_symbols_repo=cmd_sym_repo, + room_repo=room_repo, + handle_join=handle_join, + handle_exit=handle_exit, + handle_list=handle_list, + handle_list_all=handle_list_all, + handle_help=handle_help, + handle_subscribe=handle_subscribe, + handle_unsubscribe=handle_unsubscribe, + handle_list_subs=handle_list_subs, + handle_cmd=handle_cmd, + handle_room_help=handle_room_help, + ) + connection.set_message_callback(cmd_handler.handle_message) + + # 9. Scheduled адаптер + checker = NewsChecker(fetch_and_publish) + + # 10. Запуск + connection.connect() + asyncio.ensure_future(checker.start()) + connection.process(forever=True) + + +if __name__ == "__main__": + asyncio.run(main())