import logging import os import psycopg2 from datetime import datetime from telethon.sync import TelegramClient from telethon.tl.functions.messages import GetHistoryRequest LOG_FILE = os.getenv('LOG_FILE', '/app/tg_nodes.log') if os.path.exists(LOG_FILE) and os.path.isdir(LOG_FILE): raise RuntimeError(f"Path {LOG_FILE} is a directory! Expected file.") logging.basicConfig( handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) class TelegramChannelMonitor: db_config = None def __init__(self, session_name, api_id, api_hash, channel_username, source_name): self.session_name = session_name self.api_id = api_id self.api_hash = api_hash self.channel_username = channel_username self.source_name = source_name @classmethod def set_db_config(cls, config): cls.db_config = config def fetch_last_post(self): logging.info(f"[{self.source_name}] checking a new post...") try: with TelegramClient(self.session_name, self.api_id, self.api_hash) as client: entity = client.get_entity(self.channel_username) history = client(GetHistoryRequest( peer=entity, limit=1, offset_date=None, offset_id=0, max_id=0, min_id=0, add_offset=0, hash=0 )) if history.messages: msg = history.messages[0] logging.info(f"[{self.source_name}] received a post: {msg.message[:60]}...") self.save_to_db(self.source_name, msg.message) else: logging.info(f"[{self.source_name}] there is no new messages") except Exception as e: logging.error(f"[{self.source_name}] error when receiving a post: {e}") def save_to_db(self, source, message): if not self.db_config: logging.error("DB config is not set") return try: conn = psycopg2.connect(**self.db_config) cur = conn.cursor() cur.execute( "INSERT INTO leaks (source, message) VALUES (%s, %s)", (source, message) ) conn.commit() cur.close() conn.close() logging.info(f"[{self.source_name}] message is recorded in the database") except Exception as e: logging.error(f"[{self.source_name}] error when writing to the database: {e}")