From 44ce626a1b4f00baf30b5bc55d4ae86e95c6e324 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D0=BB=D0=B5=D0=B1?= Date: Sat, 3 May 2025 08:25:15 +0300 Subject: [PATCH] feat: celery --- .gitignore | 5 +- celery_app.py | 37 ++++++++++++ celery_beat.py | 19 ++++++ dld.py | 97 +++++++++++++++++++++++++++++++ driver/driver_creator.py | 24 ++++---- initiator.py | 28 +++++++++ requirements.txt | 6 +- tasks/__init__.py | 0 tasks/forum_crawler.py | 97 +++++++++++++++++++++++++++++++ tasks/tg_crawler.py | 121 +++++++++++++++++++++++++++++++++++++++ tg/tg_crawler.py | 77 ------------------------- tg/tg_node_0.py | 47 --------------- tg/tg_node_1.py | 48 ---------------- utils/logg.py | 27 +++++++++ web/app.py | 2 + web/templates/logs.html | 21 +++---- 16 files changed, 456 insertions(+), 200 deletions(-) create mode 100644 celery_app.py create mode 100644 celery_beat.py create mode 100644 dld.py create mode 100644 initiator.py create mode 100644 tasks/__init__.py create mode 100644 tasks/forum_crawler.py create mode 100644 tasks/tg_crawler.py delete mode 100644 tg/tg_crawler.py delete mode 100644 tg/tg_node_0.py delete mode 100644 tg/tg_node_1.py create mode 100644 utils/logg.py diff --git a/.gitignore b/.gitignore index ef2a1da..b6eeef6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ .env *.log **/__pycache__/ -/web/static/*.mp4 \ No newline at end of file +/web/static/*.mp4 +/celerybeat-schedule.bak +/celerybeat-schedule.dat +/celerybeat-schedule.dir \ No newline at end of file diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 0000000..0d0d5a5 --- /dev/null +++ b/celery_app.py @@ -0,0 +1,37 @@ +from celery import Celery +from kombu import Queue + +app = Celery( + 'leak_monitor', + broker='memory://localhost', + # backend='rpc://', + include=['tasks.tg_crawler', 'tasks.forum_crawler'] +) + +app.conf.update( + worker_pool='solo', + worker_max_tasks_per_child=100, + task_serializer='json', + result_serializer='json', + accept_content=['json'], + timezone='Europe/Moscow', + enable_utc=True, +) + +app.conf.task_queues = ( + Queue('telegram', routing_key='telegram.#'), + Queue('forum', routing_key='forum.#'), +) + +app.conf.beat_schedule = { + 'monitor-telegram-channels': { + 'task': 'tasks.tg_crawler.monitor_channel', + 'schedule': 10.0, + 'options': {'queue': 'telegram'} + }, + 'crawl-forum': { + 'task': 'forum_crawler.crawl_forum_task', + 'schedule': 3600.0 * 24, # час + 'args': ('https://thehackernews.com/search/label/data%20breach', []) + }, +} \ No newline at end of file diff --git a/celery_beat.py b/celery_beat.py new file mode 100644 index 0000000..2cc335d --- /dev/null +++ b/celery_beat.py @@ -0,0 +1,19 @@ +from celery_app import app +from utils.logg import LoggerSingleton + +logger = LoggerSingleton.get_logger() + +if __name__ == '__main__': + logger.info("Starting Celery beat scheduler...") + try: + app.loader.import_default_modules() + + beat = app.Beat( + logfile=None, + loglevel='info', + socket_timeout=30 + ) + beat.run() + except Exception as e: + logger.error(f"Beat failed: {e}") + raise \ No newline at end of file diff --git a/dld.py b/dld.py new file mode 100644 index 0000000..cfae475 --- /dev/null +++ b/dld.py @@ -0,0 +1,97 @@ +from bs4 import BeautifulSoup +import os +import psycopg2 +from dotenv import load_dotenv +from celery import Celery +from utils.logg import LoggerSingleton +from driver.driver_creator import DriverCreator +from selenium.webdriver.common.by import By +from time import sleep +from celery_app import app + +load_dotenv() + +logger = LoggerSingleton.get_logger() + +class ForumCrawler: + def __init__(self, forum_url): + self.forum_url = forum_url + # self.proxy_list = proxy_list + # self.db_config = { + # 'host': os.getenv('DB_HOST'), + # 'port': os.getenv('DB_PORT'), + # 'database': os.getenv('DB_NAME'), + # 'user': os.getenv('DB_USER'), + # 'password': os.getenv('DB_PASSWORD') + # } + self.conn = None + # self._connect() + + def _connect(self): + try: + self.conn = psycopg2.connect(**self.db_config) + logger.info("Database connection established") + except Exception as e: + logger.error(f"Database connection error: {e}") + raise + + def _close(self): + if self.conn and not self.conn.closed: + try: + self.conn.close() + logger.info("Database connection closed") + except Exception as e: + logger.error(f"Error closing database connection: {e}") + + def _crawl_leaks(self): + driver_creator = DriverCreator([]) + driver = driver_creator.get_driver() + + try: + logger.info(f"Starting forum crawl: {self.forum_url}") + driver.get(self.forum_url) + sleep(5) + + posts = driver.find_elements(By.CSS_SELECTOR, 'a.story-link h2.home-title') + if not posts: + logger.info("No posts found on the page") + return + last_post = posts[0] + title = last_post.text.strip() + link = last_post.get_attribute('href') + post_content = f"{title} - {link}" + + if 'data breach' in title.lower() or 'leak' in title.lower(): + logger.info(post_content) + # self._save_to_db('Hacker News', post_content) + logger.info(f"New leak found: {title} - {link}") + else: + logger.info("Last post is not about leaks") + + except Exception as e: + logger.error(f"Error during forum crawling: {e}") + raise + finally: + driver.quit() + logger.info("WebDriver session closed") + + def _save_to_db(self, source, message): + if not self.conn or self.conn.closed: + self._connect() + try: + with self.conn.cursor() as cur: + cur.execute( + "INSERT INTO leaks (resource_name, message) VALUES (%s, %s)", + (source, message) + ) + self.conn.commit() + logger.info(f"Leak from {source} saved in the database") + except Exception as e: + logger.error(f"Error writing to the database: {e}") + +if __name__ == '__main__': + crawler = ForumCrawler('https://thehackernews.com/search/label/data%20breach') + try: + crawler._crawl_leaks() + finally: + crawler._close() \ No newline at end of file diff --git a/driver/driver_creator.py b/driver/driver_creator.py index 5d8e123..22213e6 100644 --- a/driver/driver_creator.py +++ b/driver/driver_creator.py @@ -27,34 +27,30 @@ class DriverCreator: ).create_extension() def get_driver(self): - ''' - Отключает JS - Каждый запрос получает `сырой` html - ''' # extension_path = self._switch_proxy() options = uc.ChromeOptions() # options.add_argument(f"--load-extension={extension_path}") # временно - options.add_argument("--headless=new") + # options.add_argument("--headless=new") options.add_argument("--disable-gpu") options.add_argument("--disable-dev-shm-usage") options.add_argument("--no-sandbox") - options.add_argument("--disable-webgl") - options.add_argument("--disable-software-rasterizer") + # options.add_argument("--disable-webgl") + # options.add_argument("--disable-software-rasterizer") # options.add_argument("--disable-extensions") - prefs = {"profile.managed_default_content_settings.javascript": 2} - options.experimental_options["prefs"] = prefs + # prefs = {"profile.managed_default_content_settings.javascript": 2} + # options.experimental_options["prefs"] = prefs driver = uc.Chrome( options=options, - version_main=132, + version_main=135, # user_multi_procs=True ) - driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { - "source": "Object.defineProperty(navigator, 'javaEnabled', {get: () => false});" - }) - driver.execute_cdp_cmd("Emulation.setScriptExecutionDisabled", {"value": True}) + # driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { + # "source": "Object.defineProperty(navigator, 'javaEnabled', {get: () => false});" + # }) + # driver.execute_cdp_cmd("Emulation.setScriptExecutionDisabled", {"value": True}) return driver \ No newline at end of file diff --git a/initiator.py b/initiator.py new file mode 100644 index 0000000..31919f2 --- /dev/null +++ b/initiator.py @@ -0,0 +1,28 @@ +from celery_app import app +from utils.logg import LoggerSingleton + +logger = LoggerSingleton.get_logger() + +# Брокер центральный узел, который: +# Принимает задачи от beat (планировщика) +# Распределяет их по очередям (telegram, forum). +# Передаёт задачи воркерам, которые подписаны на эти очереди. + +def main(): + try: + logger.info("Starting Celery worker...") + app.worker_main( + argv=[ + 'worker', + '--loglevel=debug', + '--pool=solo', + '-Q', 'telegram', + '--without-heartbeat', + '--without-gossip' + ] + ) + except Exception as e: + logger.error(f"Failed to start worker: {e}") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 82c694b..bc505fe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,8 @@ grpc_interceptor_headers telethon schedule psycopg2-binary -docker \ No newline at end of file +docker +asyncio +pytz +flask +apscheduler \ No newline at end of file diff --git a/tasks/__init__.py b/tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tasks/forum_crawler.py b/tasks/forum_crawler.py new file mode 100644 index 0000000..a7518d3 --- /dev/null +++ b/tasks/forum_crawler.py @@ -0,0 +1,97 @@ +from bs4 import BeautifulSoup +import os +import psycopg2 +from dotenv import load_dotenv +from celery import Celery +from utils.logg import LoggerSingleton +from driver.driver_creator import DriverCreator +from selenium.webdriver.common.by import By +from time import sleep +from celery_app import app + +load_dotenv() + +logger = LoggerSingleton.get_logger() + +class ForumCrawler: + def __init__(self, forum_url, proxy_list): + self.forum_url = forum_url + self.proxy_list = proxy_list + self.db_config = { + 'host': os.getenv('DB_HOST'), + 'port': os.getenv('DB_PORT'), + 'database': os.getenv('DB_NAME'), + 'user': os.getenv('DB_USER'), + 'password': os.getenv('DB_PASSWORD') + } + self.conn = None + self._connect() + + def _connect(self): + try: + self.conn = psycopg2.connect(**self.db_config) + logger.info("Database connection established") + except Exception as e: + logger.error(f"Database connection error: {e}") + raise + + def _close(self): + if self.conn and not self.conn.closed: + try: + self.conn.close() + logger.info("Database connection closed") + except Exception as e: + logger.error(f"Error closing database connection: {e}") + + def _crawl_leaks(self): + driver_creator = DriverCreator(self.proxy_list) + driver = driver_creator.get_driver() + + try: + logger.info(f"Starting forum crawl: {self.forum_url}") + driver.get(self.forum_url) + sleep(2) + + posts = driver.find_elements(By.CSS_SELECTOR, 'a.story-link h2.home-title') + if not posts: + logger.info("No posts found on the page") + return + last_post = posts[0] + title = last_post.text.strip() + link = last_post.get_attribute('href') + post_content = f"{title} - {link}" + + if 'data breach' in title.lower() or 'leak' in title.lower(): + self._save_to_db('Hacker News', post_content) + logger.info(f"New leak found: {title} - {link}") + else: + logger.info("Last post is not about leaks") + + except Exception as e: + logger.error(f"Error during forum crawling: {e}") + raise + finally: + driver.quit() + logger.info("WebDriver session closed") + + def _save_to_db(self, source, message): + if not self.conn or self.conn.closed: + self._connect() + try: + with self.conn.cursor() as cur: + cur.execute( + "INSERT INTO leaks (resource_name, message) VALUES (%s, %s)", + (source, message) + ) + self.conn.commit() + logger.info(f"Leak from {source} saved in the database") + except Exception as e: + logger.error(f"Error writing to the database: {e}") + +@app.task(name='forum_crawler.crawl_forum_task') +def crawl_forum_task(forum_url, proxy_list): + crawler = ForumCrawler(forum_url, proxy_list) + try: + crawler._crawl_leaks() + finally: + crawler._close() \ No newline at end of file diff --git a/tasks/tg_crawler.py b/tasks/tg_crawler.py new file mode 100644 index 0000000..eb45629 --- /dev/null +++ b/tasks/tg_crawler.py @@ -0,0 +1,121 @@ +import logging +import os +import asyncio +import psycopg2 +from datetime import datetime +from telethon.sync import TelegramClient +from telethon.tl.functions.messages import GetHistoryRequest +from dotenv import load_dotenv +from celery import Celery +from utils.logg import LoggerSingleton +from celery_app import app + +load_dotenv() + +logger = LoggerSingleton.get_logger() + +class TelegramMonitor: + def __init__(self): + self.db_config = { + 'host': os.getenv('DB_HOST'), + 'port': os.getenv('DB_PORT'), + 'database': os.getenv('DB_NAME'), + 'user': os.getenv('DB_USER'), + 'password': os.getenv('DB_PASSWORD') + } + self.conn = None + self._connect() + + def _connect(self): + try: + self.conn = psycopg2.connect(**self.db_config) + logger.info("Database connection established") + except Exception as e: + logger.error(f"Database connection error: {e}") + raise + + def _close(self): + if self.conn and not self.conn.closed: + try: + self.conn.close() + logger.info("Database connection closed") + except Exception as e: + logger.error(f"Error closing database connection: {e}") + + def _fetch_post(self, channel_username, source_name): + try: + with TelegramClient('session', os.getenv('API_ID'), os.getenv('API_HASH')) as client: + entity = client.get_entity(channel_username) + history = client(GetHistoryRequest( + peer=entity, + limit=1, + offset_date=None, + offset_id=0, + max_id=0, + min_id=0, + add_offset=0, + hash=0 + )) + if history.messages: + self._save_to_db(source_name, history.messages[0].message) + except Exception as e: + logger.error(f"Error fetching post from {source_name}: {e}") + raise + + def _save_to_db(self, source, message): + if not self.conn or self.conn.closed: + self._connect() + + try: + with self.conn.cursor() as cur: + cur.execute( + "INSERT INTO leaks (resource_name, message) VALUES (%s, %s)", + (source, message) + ) + self.conn.commit() + logger.info(f"Data from {source} saved successfully") + except Exception as e: + logger.error(f"Database save error for {source}: {e}") + self.conn.rollback() + raise + +@app.task(bind=True, name='tasks.tg_crawler.monitor_channels') +def monitor_channels(self): + channels = [ + ('trueosint', 'trueosint'), + ('dataleak', 'dataleak'), + ('Vaultofdataleaksss', 'Vaultofdataleaksss') + ] + + logger.info("Starting Telegram channels monitoring") + + for channel, source in channels: + try: + monitor_channel.apply_async( + args=(channel, source), + queue='telegram', + retry=True, + retry_policy={ + 'max_retries': 3, + 'interval_start': 2, + 'interval_step': 5, + 'interval_max': 20 + } + ) + logger.info(f"Sent task for channel: {channel}") + except Exception as e: + logger.error(f"Failed to send task for {channel}: {e}") + raise self.retry(exc=e) + +@app.task(bind=True, name='tasks.tg_crawler.monitor_channel') +def monitor_channel(self, channel, source): + logger.info(f"Starting monitoring channel: {channel}") + monitor = TelegramMonitor() + try: + monitor._fetch_post(channel, source) + logger.info(f"Successfully monitored channel: {channel}") + except Exception as e: + logger.error(f"Channel monitoring failed for {channel}: {e}") + raise self.retry(exc=e) + finally: + monitor._close() \ No newline at end of file diff --git a/tg/tg_crawler.py b/tg/tg_crawler.py deleted file mode 100644 index 9c568da..0000000 --- a/tg/tg_crawler.py +++ /dev/null @@ -1,77 +0,0 @@ -import logging -import os -import psycopg2 -from datetime import datetime -from telethon.sync import TelegramClient -from telethon.tl.functions.messages import GetHistoryRequest - -LOG_FILE = os.getenv('LOG_FILE', '/app/tg_nodes.log') - -if os.path.exists(LOG_FILE) and os.path.isdir(LOG_FILE): - raise RuntimeError(f"Path {LOG_FILE} is a directory! Expected file.") - -logging.basicConfig( - handlers=[ - logging.FileHandler(LOG_FILE), - logging.StreamHandler() - ] -) - -class TelegramChannelMonitor: - db_config = None - - def __init__(self, session_name, api_id, api_hash, channel_username, source_name): - self.session_name = session_name - self.api_id = api_id - self.api_hash = api_hash - self.channel_username = channel_username - self.source_name = source_name - - @classmethod - def set_db_config(cls, config): - cls.db_config = config - - def fetch_last_post(self): - logging.info(f"[{self.source_name}] checking a new post...") - - try: - with TelegramClient(self.session_name, self.api_id, self.api_hash) as client: - entity = client.get_entity(self.channel_username) - history = client(GetHistoryRequest( - peer=entity, - limit=1, - offset_date=None, - offset_id=0, - max_id=0, - min_id=0, - add_offset=0, - hash=0 - )) - - if history.messages: - msg = history.messages[0] - logging.info(f"[{self.source_name}] received a post: {msg.message[:60]}...") - self.save_to_db(self.source_name, msg.message) - else: - logging.info(f"[{self.source_name}] there is no new messages") - except Exception as e: - logging.error(f"[{self.source_name}] error when receiving a post: {e}") - - def save_to_db(self, source, message): - if not self.db_config: - logging.error("DB config is not set") - return - - try: - conn = psycopg2.connect(**self.db_config) - cur = conn.cursor() - cur.execute( - "INSERT INTO leaks (source, message) VALUES (%s, %s)", - (source, message) - ) - conn.commit() - cur.close() - conn.close() - logging.info(f"[{self.source_name}] message is recorded in the database") - except Exception as e: - logging.error(f"[{self.source_name}] error when writing to the database: {e}") diff --git a/tg/tg_node_0.py b/tg/tg_node_0.py deleted file mode 100644 index afa58ad..0000000 --- a/tg/tg_node_0.py +++ /dev/null @@ -1,47 +0,0 @@ -import asyncio -import os -from apscheduler.schedulers.background import BackgroundScheduler -from pytz import timezone -from tg_crawler import TelegramChannelMonitor -from dotenv import load_dotenv -import logging - -load_dotenv() - -def main(): - TelegramChannelMonitor.set_db_config({ - 'host': os.getenv("HOST"), - 'port': os.getenv("PORT"), - 'database': os.getenv("DBNAME"), - 'user': os.getenv("USER"), - 'password': os.getenv("PASSWORD") - }) - - monitor = TelegramChannelMonitor( - session_name='session_trueosint', - api_id=os.getenv("TELETHON_API_ID"), - api_hash=os.getenv("TELETHON_API_HASH"), - channel_username='trueosint', - source_name='trueosint' - ) - - scheduler = BackgroundScheduler() - scheduler.add_job( - monitor.fetch_last_post, - 'cron', - hour=9, - minute=0, - timezone=timezone("Europe/Moscow") - ) - - try: - scheduler.start() - logging.info("Scheduler started successfully") - while True: - pass - except (KeyboardInterrupt, SystemExit): - scheduler.shutdown() - logging.info("Scheduler shut down successfully") - -if __name__ == '__main__': - main() diff --git a/tg/tg_node_1.py b/tg/tg_node_1.py deleted file mode 100644 index 1d813d1..0000000 --- a/tg/tg_node_1.py +++ /dev/null @@ -1,48 +0,0 @@ -import asyncio -import os -from apscheduler.schedulers.background import BackgroundScheduler -from pytz import timezone -from tg_crawler import TelegramChannelMonitor -from dotenv import load_dotenv -import logging - - -load_dotenv() - -def main(): - TelegramChannelMonitor.set_db_config({ - 'host': os.getenv("HOST"), - 'port': os.getenv("PORT"), - 'database': os.getenv("DBNAME"), - 'user': os.getenv("USER"), - 'password': os.getenv("PASSWORD") - }) - - monitor = TelegramChannelMonitor( - session_name='session_trueosint', - api_id=os.getenv("TELETHON_API_ID"), - api_hash=os.getenv("TELETHON_API_HASH"), - channel_username='dataleak', - source_name='dataleak' - ) - - scheduler = BackgroundScheduler() - scheduler.add_job( - monitor.fetch_last_post, - 'cron', - hour=9, - minute=0, - timezone=timezone("Europe/Moscow") - ) - - try: - scheduler.start() - logging.info("Scheduler started successfully") - while True: - pass - except (KeyboardInterrupt, SystemExit): - scheduler.shutdown() - logging.info("Scheduler shut down successfully") - -if __name__ == '__main__': - main() diff --git a/utils/logg.py b/utils/logg.py new file mode 100644 index 0000000..d89177f --- /dev/null +++ b/utils/logg.py @@ -0,0 +1,27 @@ +import logging + +class LoggerSingleton: + _logger = None + + @staticmethod + def get_logger(): + if LoggerSingleton._logger is None: + LoggerSingleton._logger = logging.getLogger(__name__) + + LoggerSingleton._logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + file_handler = logging.FileHandler('tempora.log', encoding='utf-8') + file_handler.setFormatter(formatter) + + LoggerSingleton._logger.addHandler(console_handler) + LoggerSingleton._logger.addHandler(file_handler) + + return LoggerSingleton._logger \ No newline at end of file diff --git a/web/app.py b/web/app.py index 8d64821..6e6ea9b 100644 --- a/web/app.py +++ b/web/app.py @@ -102,6 +102,7 @@ def get_leaks_stats(): conn.close() @app.route("/") +@app.route("/index.html") def index(): parser_status = get_parser_status() leaks_stats = get_leaks_stats() @@ -120,6 +121,7 @@ def index(): leaks_stats=leaks_stats ) +@app.route("/logs.html") @app.route("/logs") def logs(): log_path = '/app/tg_nodes.log' diff --git a/web/templates/logs.html b/web/templates/logs.html index 57c6e1e..7331740 100644 --- a/web/templates/logs.html +++ b/web/templates/logs.html @@ -99,8 +99,10 @@ padding: 20px; margin-left: 250px; transition: margin-left 0.3s ease; - position: relative; - z-index: 1; + min-height: 100vh; + display: flex; + justify-content: center; + align-items: center; } .container.collapsed { @@ -116,7 +118,7 @@ height: 100%; object-fit: cover; z-index: -1; - opacity: 0.8; /* Полупрозрачность видео */ + opacity: 0.8; } /* Затемнение поверх видео */ @@ -126,23 +128,18 @@ left: 0; width: 100%; height: 100%; - background: rgba(0, 0, 0, 0.7); /* Черный полупрозрачный слой */ + background: rgba(0, 0, 0, 0.7); z-index: -1; } /* Контейнер для логов */ .logs-container { - position: absolute; - top: 50%; - left: 20px; - transform: translateY(-50%); - width: 40%; - max-width: 600px; - background-color: rgba(46, 46, 46, 0.9); /* Полупрозрачный фон */ + width: 90%; + max-width: 800px; + background-color: rgba(46, 46, 46, 0.9); padding: 20px; border-radius: 10px; box-shadow: 0 0 20px rgba(51, 153, 255, 0.5); - z-index: 1; } .logs-container h2 {