from bs4 import BeautifulSoup import os import psycopg2 from dotenv import load_dotenv from celery import Celery from utils.logg import LoggerSingleton from driver.driver_creator import DriverCreator from selenium.webdriver.common.by import By from time import sleep from celery_app import app load_dotenv() logger = LoggerSingleton.get_logger() class ForumCrawler: def __init__(self, forum_url, proxy_list): self.forum_url = forum_url self.proxy_list = proxy_list self.db_config = { 'host': os.getenv('DB_HOST'), 'port': os.getenv('DB_PORT'), 'database': os.getenv('DB_NAME'), 'user': os.getenv('DB_USER'), 'password': os.getenv('DB_PASSWORD') } self.conn = None self._connect() def _connect(self): try: self.conn = psycopg2.connect(**self.db_config) logger.info("Database connection established") except Exception as e: logger.error(f"Database connection error: {e}") raise def _close(self): if self.conn and not self.conn.closed: try: self.conn.close() logger.info("Database connection closed") except Exception as e: logger.error(f"Error closing database connection: {e}") def _crawl_leaks(self): driver_creator = DriverCreator(self.proxy_list) driver = driver_creator.get_driver() try: logger.info(f"Starting forum crawl: {self.forum_url}") driver.get(self.forum_url) sleep(2) posts = driver.find_elements(By.CSS_SELECTOR, 'a.story-link h2.home-title') if not posts: logger.info("No posts found on the page") return last_post = posts[0] title = last_post.text.strip() link = last_post.get_attribute('href') post_content = f"{title} - {link}" if 'data breach' in title.lower() or 'leak' in title.lower(): self._save_to_db('Hacker News', post_content) logger.info(f"New leak found: {title} - {link}") else: logger.info("Last post is not about leaks") except Exception as e: logger.error(f"Error during forum crawling: {e}") raise finally: driver.quit() logger.info("WebDriver session closed") def _save_to_db(self, source, message): if not self.conn or self.conn.closed: self._connect() try: with self.conn.cursor() as cur: cur.execute( "INSERT INTO leaks (resource_name, message) VALUES (%s, %s)", (source, message) ) self.conn.commit() logger.info(f"Leak from {source} saved in the database") except Exception as e: logger.error(f"Error writing to the database: {e}") @app.task(name='forum_crawler.crawl_forum_task') def crawl_forum_task(forum_url, proxy_list): crawler = ForumCrawler(forum_url, proxy_list) try: crawler._crawl_leaks() finally: crawler._close()