bvn13 c53e58baf1 Add dynamic subscription management via Telegram commands
- Config: atomic JSON writes, stable rule ids + enabled flag with backfill,
  list/add/remove/set_enabled methods
- Bot: admin whitelist (TG_ADMIN_IDS), gate admin commands, new !sub
  list/add/del/on/off commands, skip disabled rules on forward
- main/compose: read TG_ADMIN_IDS from environment

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-23 14:30:18 +03:00

260 lines
12 KiB
Python

import asyncio
from settings import config
import logging
import re
from queue import Queue, Empty
from telethon import TelegramClient, events
from bot.message import WeatherTask
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class Bot():
def __init__(self,
session_file: str,
config_file: str,
api_id: int,
api_hash: str,
owner_id: int,
admin_ids: set = None) -> None:
self.__config_file = config_file
self.__config = None
self.__me = None
self.__owner_id = owner_id
self.__admin_ids = set(admin_ids) if admin_ids else set()
self.__reload()
self.__bot = TelegramClient(session=session_file, api_id=api_id, api_hash=api_hash)
self.__weather_queue = Queue()
@self.__bot.on(events.NewMessage(incoming=True, pattern='!whoami'))
async def onMessageWhoami(event):
await event.reply("{0}".format(event.chat_id))
await self.__bot.send_read_acknowledge(event.chat_id, event.message)
raise events.StopPropagation
@self.__bot.on(events.NewMessage(incoming=True, pattern="!config"))
async def onMessageConfig(event):
if not self.__is_admin(event.sender_id):
raise events.StopPropagation
self.__reload()
await event.reply("```\n{0}\n```".format(self.__config.read_as_text()))
await self.__bot.send_read_acknowledge(event.chat_id, event.message)
raise events.StopPropagation
@self.__bot.on(events.NewMessage(incoming=True, pattern="!reload"))
async def onMessageReload(event):
if not self.__is_admin(event.sender_id):
raise events.StopPropagation
self.__reload()
await event.reply("Reloaded")
await self.__bot.send_read_acknowledge(event.chat_id, event.message)
raise events.StopPropagation
@self.__bot.on(events.NewMessage(incoming=True, pattern="!chats"))
async def onMessageChats(event):
if not self.__is_admin(event.sender_id):
raise events.StopPropagation
await self.__send_all_chats_to_owner()
await self.__bot.send_read_acknowledge(event.chat_id, event.message)
raise events.StopPropagation
@self.__bot.on(events.NewMessage(incoming=True, pattern="!sub"))
async def onMessageSub(event):
if not self.__is_admin(event.sender_id):
raise events.StopPropagation
reply = self.__handle_sub_command(event.message.text or "")
await event.reply(reply)
await self.__bot.send_read_acknowledge(event.chat_id, event.message)
raise events.StopPropagation
@self.__bot.on(events.ChatAction)
async def onChatAction(event):
if event.user_left and event.user.id == self.__me:
await self.__bot.send_message(owner_id, "Я вышел из чата: {0}".format(event.chat_id))
if event.user_joined and event.user.id == self.__me:
await self.__bot.send_message(owner_id, "Я вошел в чат: {0}".format(event.chat_id))
@self.__bot.on(events.NewMessage(incoming=True))
async def onNewMessage(event):
if event.message is None:
return
chat_id = event.chat_id
message = event.message
logger.info("Received message in chat {0}, id {1}: {2}".format(chat_id, message.id, message.text if not message == None else "<No Text>"))
await self.__process_message(event, chat_id, message)
async def start(self) -> None:
await self.__bot.start()
await self.__identity()
self.__bot.loop.create_task(self.__task_loop())
await self.__bot.run_until_disconnected()
def send_weather(self, task: WeatherTask) -> None:
self.__weather_queue.put(task)
async def __task_loop(self) -> None:
while True:
logger.debug("Looking for weather jobs")
weather_task = None
try:
weather_task = self.__weather_queue.get_nowait()
except Empty:
pass
if weather_task is not None:
await self.__send_weather(weather_task.get_key(), weather_task.get_message())
await asyncio.sleep(1)
async def __send_weather(self, key: str, message: str) -> None:
dest_chat_id = self.__config.get_weather_destination_chat_id(key)
if dest_chat_id is not None:
await self.__bot.send_message(dest_chat_id, message, parse_mode='html')
def __reload(self) -> None:
self.__config = config.Config(filename=self.__config_file)
def __is_admin(self, user_id) -> bool:
return user_id == self.__owner_id or user_id in self.__admin_ids
def __handle_sub_command(self, text: str) -> str:
parts = text.strip().split()
if len(parts) < 2:
return self.__sub_help()
cmd = parts[1].lower()
try:
if cmd == 'list':
return self.__sub_list()
if cmd == 'add':
# !sub add <src> <dst> <contain|regexp> <value...>
tokens = text.split(None, 5)
if len(tokens) < 6:
return "Usage: !sub add <src_chat_id> <dst_chat_id> <contain|regexp> <value>"
src = int(tokens[2])
dst = int(tokens[3])
ftype = tokens[4].lower()
if ftype not in ('contain', 'regexp'):
return "filter type must be 'contain' or 'regexp'"
value = tokens[5]
rule = self.__config.add_subscription(src, ftype, value, dst)
return "Added #{0}: {1} [{2}] '{3}' -> {4}".format(
rule['id'], src, ftype, value, dst)
if cmd in ('del', 'rm', 'delete'):
sub_id = int(parts[2])
ok = self.__config.remove_subscription(sub_id)
return "Removed #{0}".format(sub_id) if ok else "Not found: #{0}".format(sub_id)
if cmd in ('on', 'enable'):
sub_id = int(parts[2])
ok = self.__config.set_enabled(sub_id, True)
return "Enabled #{0}".format(sub_id) if ok else "Not found: #{0}".format(sub_id)
if cmd in ('off', 'disable'):
sub_id = int(parts[2])
ok = self.__config.set_enabled(sub_id, False)
return "Disabled #{0}".format(sub_id) if ok else "Not found: #{0}".format(sub_id)
except (ValueError, IndexError):
return self.__sub_help()
return self.__sub_help()
def __sub_list(self) -> str:
subs = self.__config.list_subscriptions()
if not subs:
return "No subscriptions"
lines = []
for s in subs:
flt = s.get('filter', {})
act = s.get('action', {})
status = 'on' if s.get('enabled', True) else 'off'
name = s.get('name', '')
lines.append("#{0} [{1}] {2} {3}:'{4}' -> {5}{6}".format(
s.get('id'), status, s.get('srcChatId'),
flt.get('type'), flt.get('value'), act.get('chatId'),
" ({0})".format(name) if name else ""))
return "\n".join(lines)
def __sub_help(self) -> str:
return ("Commands:\n"
"!sub list\n"
"!sub add <src_chat_id> <dst_chat_id> <contain|regexp> <value>\n"
"!sub del <id>\n"
"!sub on <id> | !sub off <id>")
async def __send_all_chats_to_owner(self) -> None:
text = ""
async for dialog in self.__bot.iter_dialogs():
text = text + "{0} - {1}\n".format(dialog.id, dialog.name)
await self.__bot.send_message(self.__owner_id, text)
async def __process_message(self, event, chat_id, message):
configs = self.__config.get_on_message_for_chat_id(chat_id)
if configs == None:
logger.info("(chat_id, msg_id)=({0}, {1}): Config is None".format(chat_id, message.id))
return
if not isinstance(configs, list):
logger.info("(chat_id, msg_id)=({0}, {1}): Config is not list".format(chat_id, message.id))
return
i = 0
for config in configs:
i = i + 1
if config.get('enabled') is False:
logger.info("(chat_id, msg_id, cfg_id)=({0}, {1}, {2}): Disabled".format(chat_id, message.id, i))
continue
filter = config['filter'] if 'filter' in config else None
action = config['action'] if 'action' in config else None
action_type = action['type'] if 'type' in action else None
if action == None or action_type == None:
logger.info("(chat_id, msg_id, cfg_id)=({0}, {1}, {2}): Action is bad".format(chat_id, message.id, i))
continue
if not filter == None:
logger.info("(chat_id, msg_id, cfg_id)=({0}, {1}, {2}): Filter is set".format(chat_id, message.id, i))
if await self.__needs_skip(filter, message):
logger.info("(chat_id, msg_id, cfg_id)=({0}, {1}, {2}): Skipping".format(chat_id, message.id, i))
continue
if action_type == 'forward':
logger.info("(chat_id, msg_id, cfg_id)=({0}, {1}, {2}): Action type is 'forward'".format(chat_id, message.id, i))
destination = action['chatId'] if 'chatId' in action else None
if destination == None:
logger.info("(chat_id, msg_id, cfg_id)=({0}, {1}, {2}): Destination is None".format(chat_id, message.id, i))
continue
await event.forward_to(destination)
logger.info("(chat_id, msg_id, cfg_id)=({0}, {1}, {2}): Forwarded".format(chat_id, message.id, i))
async def __needs_skip(self, filter: dict, message) -> bool:
filter_type = filter['type'] if 'type' in filter else None
filter_value = filter['value'] if 'value' in filter else None
if filter_type == None or filter_value == None:
logger.info("msg_id={0}: Filter is bad".format(message.id))
return True
text = str(message.text)
if text == None:
logger.info("msg_id={0}: Text is None".format(message.id))
if not message.reply_to == None:
logger.info("msg_id={0}: Reply is present".format(message.id))
reply = await self.__get_reply(message.chat_id, message)
return await self.__needs_skip(filter, reply)
return True
if filter_type == 'contain':
logger.info("msg_id={0}: Filter type is 'contain'".format(message.id))
if text.find(filter_value) == -1:
logger.info("msg_id={0}: Text '{1}' not found".format(message.id, filter_value))
if not message.reply_to == None:
logger.info("msg_id={0}: Reply is present".format(message.id))
reply = await self.__get_reply(message.chat_id, message)
return await self.__needs_skip(filter, reply)
return True
elif filter_type == 'regexp':
logger.info("msg_id={0}: Filter type is 'regexp'".format(message.id))
if not re.match(filter_value, text):
logger.info("msg_id={0}: Text '{1}' not found".format(message.id, filter_value))
if not message.reply_to == None:
logger.info("msg_id={0}: Reply is present".format(message.id))
reply = await self.__get_reply(message.chat_id, message)
return await self.__needs_skip(filter, reply)
return True
logger.info("msg_id={0}: Message to be processed".format(message.id))
return False
async def __get_reply(self, chat_id, message):
return await self.__bot.get_messages(chat_id, ids=message.reply_to.reply_to_msg_id)
async def __identity(self) -> None:
me = await self.__bot.get_me()
self.__me = me.id