jabnews/src/adapters/db/rooms.py
2026-03-22 18:07:16 +03:00

72 lines
2.9 KiB
Python

from typing import List, Optional
import aiosqlite
from src.domain.admin.entities import Room
from src.domain.admin.ports import RoomRepository
class SqliteRoomRepository(RoomRepository):
def __init__(self, db_path: str) -> None:
self._db_path = db_path
async def add(self, room: Room) -> None:
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"INSERT OR REPLACE INTO rooms (jid, admin_jid, is_enabled) VALUES (?, ?, ?)",
(room.jid, room.admin_jid, 1 if room.is_enabled else 0),
)
await db.commit()
async def get(self, room_jid: str) -> Optional[Room]:
async with aiosqlite.connect(self._db_path) as db:
async with db.execute(
"SELECT jid, admin_jid, is_enabled FROM rooms WHERE jid = ?",
(room_jid,),
) as cursor:
row = await cursor.fetchone()
if row is None:
return None
return Room(jid=row[0], admin_jid=row[1], is_enabled=bool(row[2]))
async def disable(self, room_jid: str) -> None:
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"UPDATE rooms SET is_enabled = 0 WHERE jid = ?", (room_jid,)
)
await db.commit()
async def enable(self, room_jid: str) -> None:
async with aiosqlite.connect(self._db_path) as db:
await db.execute(
"UPDATE rooms SET is_enabled = 1 WHERE jid = ?", (room_jid,)
)
await db.commit()
async def list_by_admin(self, admin_jid: str) -> List[Room]:
async with aiosqlite.connect(self._db_path) as db:
async with db.execute(
"SELECT jid, admin_jid, is_enabled FROM rooms WHERE admin_jid = ?",
(admin_jid,),
) as cursor:
rows = await cursor.fetchall()
return [Room(jid=r[0], admin_jid=r[1], is_enabled=bool(r[2])) for r in rows]
async def list_all(self) -> List[Room]:
async with aiosqlite.connect(self._db_path) as db:
async with db.execute(
"SELECT jid, admin_jid, is_enabled FROM rooms ORDER BY jid"
) as cursor:
rows = await cursor.fetchall()
return [Room(jid=r[0], admin_jid=r[1], is_enabled=bool(r[2])) for r in rows]
async def list_enabled(self) -> List[Room]:
"""Используется при старте для восстановления MUC-соединений."""
async with aiosqlite.connect(self._db_path) as db:
async with db.execute(
"SELECT jid, admin_jid, is_enabled FROM rooms WHERE is_enabled = 1"
) as cursor:
rows = await cursor.fetchall()
return [Room(jid=r[0], admin_jid=r[1], is_enabled=True) for r in rows]