tempora/tg/tg_crawler.py

74 lines
2.5 KiB
Python

import logging
import psycopg2
from datetime import datetime
from telethon.sync import TelegramClient
from telethon.tl.functions.messages import GetHistoryRequest
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('tg_nodes.log'),
logging.StreamHandler()
]
)
class TelegramChannelMonitor:
db_config = None
def __init__(self, session_name, api_id, api_hash, channel_username, source_name):
self.session_name = session_name
self.api_id = api_id
self.api_hash = api_hash
self.channel_username = channel_username
self.source_name = source_name
@classmethod
def set_db_config(cls, config):
cls.db_config = config
def fetch_last_post(self):
logging.info(f"[{self.source_name}] checking a new post...")
try:
with TelegramClient(self.session_name, self.api_id, self.api_hash) as client:
entity = client.get_entity(self.channel_username)
history = client(GetHistoryRequest(
peer=entity,
limit=1,
offset_date=None,
offset_id=0,
max_id=0,
min_id=0,
add_offset=0,
hash=0
))
if history.messages:
msg = history.messages[0]
logging.info(f"[{self.source_name}] received a post: {msg.message[:60]}...")
self.save_to_db(self.source_name, msg.message)
else:
logging.info(f"[{self.source_name}] there is no new messages")
except Exception as e:
logging.error(f"[{self.source_name}] error when receiving a post: {e}")
def save_to_db(self, source, message):
if not self.db_config:
logging.error("DB config is not set")
return
try:
conn = psycopg2.connect(**self.db_config)
cur = conn.cursor()
cur.execute(
"INSERT INTO leaks (source, message) VALUES (%s, %s)",
(source, message)
)
conn.commit()
cur.close()
conn.close()
logging.info(f"[{self.source_name}] message is recorded in the database")
except Exception as e:
logging.error(f"[{self.source_name}] error when writing to the database: {e}")