121 lines
4.0 KiB
Python
121 lines
4.0 KiB
Python
import logging
|
|
import os
|
|
import asyncio
|
|
import psycopg2
|
|
from datetime import datetime
|
|
from telethon.sync import TelegramClient
|
|
from telethon.tl.functions.messages import GetHistoryRequest
|
|
from dotenv import load_dotenv
|
|
from celery import Celery
|
|
from utils.logg import LoggerSingleton
|
|
from celery_app import app
|
|
|
|
load_dotenv()
|
|
|
|
logger = LoggerSingleton.get_logger()
|
|
|
|
class TelegramMonitor:
|
|
def __init__(self):
|
|
self.db_config = {
|
|
'host': os.getenv('DB_HOST'),
|
|
'port': os.getenv('DB_PORT'),
|
|
'database': os.getenv('DB_NAME'),
|
|
'user': os.getenv('DB_USER'),
|
|
'password': os.getenv('DB_PASSWORD')
|
|
}
|
|
self.conn = None
|
|
self._connect()
|
|
|
|
def _connect(self):
|
|
try:
|
|
self.conn = psycopg2.connect(**self.db_config)
|
|
logger.info("Database connection established")
|
|
except Exception as e:
|
|
logger.error(f"Database connection error: {e}")
|
|
raise
|
|
|
|
def _close(self):
|
|
if self.conn and not self.conn.closed:
|
|
try:
|
|
self.conn.close()
|
|
logger.info("Database connection closed")
|
|
except Exception as e:
|
|
logger.error(f"Error closing database connection: {e}")
|
|
|
|
def _fetch_post(self, channel_username, source_name):
|
|
try:
|
|
with TelegramClient('session', os.getenv('API_ID'), os.getenv('API_HASH')) as client:
|
|
entity = client.get_entity(channel_username)
|
|
history = client(GetHistoryRequest(
|
|
peer=entity,
|
|
limit=1,
|
|
offset_date=None,
|
|
offset_id=0,
|
|
max_id=0,
|
|
min_id=0,
|
|
add_offset=0,
|
|
hash=0
|
|
))
|
|
if history.messages:
|
|
self._save_to_db(source_name, history.messages[0].message)
|
|
except Exception as e:
|
|
logger.error(f"Error fetching post from {source_name}: {e}")
|
|
raise
|
|
|
|
def _save_to_db(self, source, message):
|
|
if not self.conn or self.conn.closed:
|
|
self._connect()
|
|
|
|
try:
|
|
with self.conn.cursor() as cur:
|
|
cur.execute(
|
|
"INSERT INTO leaks (resource_name, message) VALUES (%s, %s)",
|
|
(source, message)
|
|
)
|
|
self.conn.commit()
|
|
logger.info(f"Data from {source} saved successfully")
|
|
except Exception as e:
|
|
logger.error(f"Database save error for {source}: {e}")
|
|
self.conn.rollback()
|
|
raise
|
|
|
|
@app.task(bind=True, name='tasks.tg_crawler.monitor_channels')
|
|
def monitor_channels(self):
|
|
channels = [
|
|
('trueosint', 'trueosint'),
|
|
('dataleak', 'dataleak'),
|
|
('Vaultofdataleaksss', 'Vaultofdataleaksss')
|
|
]
|
|
|
|
logger.info("Starting Telegram channels monitoring")
|
|
|
|
for channel, source in channels:
|
|
try:
|
|
monitor_channel.apply_async(
|
|
args=(channel, source),
|
|
queue='telegram',
|
|
retry=True,
|
|
retry_policy={
|
|
'max_retries': 3,
|
|
'interval_start': 2,
|
|
'interval_step': 5,
|
|
'interval_max': 20
|
|
}
|
|
)
|
|
logger.info(f"Sent task for channel: {channel}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send task for {channel}: {e}")
|
|
raise self.retry(exc=e)
|
|
|
|
@app.task(bind=True, name='tasks.tg_crawler.monitor_channel')
|
|
def monitor_channel(self, channel, source):
|
|
logger.info(f"Starting monitoring channel: {channel}")
|
|
monitor = TelegramMonitor()
|
|
try:
|
|
monitor._fetch_post(channel, source)
|
|
logger.info(f"Successfully monitored channel: {channel}")
|
|
except Exception as e:
|
|
logger.error(f"Channel monitoring failed for {channel}: {e}")
|
|
raise self.retry(exc=e)
|
|
finally:
|
|
monitor._close() |