commit fe513ea8b059db09bf1cdfa2fa16a41d008b2c86 Author: Глеб Date: Thu Mar 6 19:25:02 2025 +0300 Добавлен README.md с списком группы diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fa28e4c --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env +*.log +**/__pycache__/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..b691ffd --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +Глеб \ No newline at end of file diff --git a/driver/__init__.py b/driver/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/driver/chrome_proxy.py b/driver/chrome_proxy.py new file mode 100644 index 0000000..375fc65 --- /dev/null +++ b/driver/chrome_proxy.py @@ -0,0 +1,97 @@ +import os +import tempfile + +class ChromeProxy: + def __init__(self, host: str, port: int, username: str = "", password: str = ""): + self.host = host + self.port = port + self.username = username + self.password = password + + def create_extension(self, name: str = "Chrome Proxy", version="1.0.0") -> str: + proxy_folder = tempfile.mkdtemp(prefix="chrome_proxy_") + + self._create_manifest( + name=name, + version=version, + proxy_folder=proxy_folder + ) + + self._create_background_js(proxy_folder) + + return proxy_folder + + def _create_manifest(self, name, version, proxy_folder): + manifest = ChromeProxy.manifest_json + manifest = manifest.replace("", name) + manifest = manifest.replace("", version) + + with open(f"{proxy_folder}/manifest.json", "w") as f: + f.write(manifest) + + def _create_background_js(self, proxy_folder): + js = ChromeProxy.background_js + js = js.replace("", self.host) + js = js.replace("", str(self.port)) + js = js.replace("", self.username) + js = js.replace("", self.password) + + with open(f"{proxy_folder}/background.js", "w") as f: + f.write(js) + + manifest_json = """ + { + "version": "", + "manifest_version": 3, + "name": "", + "permissions": [ + "proxy", + "tabs", + "storage", + "webRequest", + "webRequestAuthProvider" + ], + "host_permissions": [ + "" + ], + "background": { + "service_worker": "background.js" + }, + "minimum_chrome_version": "22.0.0" + } + """ + + background_js = """ + var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "", + port: parseInt("") + }, + bypassList: ["localhost"] + } + }; + + chrome.proxy.settings.set({ + value: config, + scope: "regular" + }, function() {}); + + function callbackFn(details) { + return { + authCredentials: { + username: "", + password: "" + } + }; + } + + chrome.webRequest.onAuthRequired.addListener( + callbackFn, { + urls: [""] + }, + ['blocking'] + ); + """ diff --git a/driver/driver_creator.py b/driver/driver_creator.py new file mode 100644 index 0000000..5d8e123 --- /dev/null +++ b/driver/driver_creator.py @@ -0,0 +1,60 @@ +import random +from driver.chrome_proxy import ChromeProxy +from urllib.parse import urlparse +import undetected_chromedriver as uc +import logging + + +# В patcher.py библиотеки undetected_chromedriver + # import secrets + # prefix = 'undetected' ----> prefix = secrets.token_hex(8) +# Использование случайного токена в качестве префикса гарантирует, +# что каждый процесс будет иметь свою уникальную директорию, +# даже если несколько процессов работают одновременн + +class DriverCreator: + def __init__(self, proxy_list): + self.proxy_list = proxy_list + + def _switch_proxy(self): + proxy = random.choice(self.proxy_list) + parsed_proxy = urlparse(proxy) + return ChromeProxy( + parsed_proxy.hostname, + parsed_proxy.port, + parsed_proxy.username, + parsed_proxy.password + ).create_extension() + + def get_driver(self): + ''' + Отключает JS + Каждый запрос получает `сырой` html + ''' + # extension_path = self._switch_proxy() + + options = uc.ChromeOptions() + # options.add_argument(f"--load-extension={extension_path}") # временно + options.add_argument("--headless=new") + options.add_argument("--disable-gpu") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--no-sandbox") + options.add_argument("--disable-webgl") + options.add_argument("--disable-software-rasterizer") + # options.add_argument("--disable-extensions") + + prefs = {"profile.managed_default_content_settings.javascript": 2} + options.experimental_options["prefs"] = prefs + + driver = uc.Chrome( + options=options, + version_main=132, + # user_multi_procs=True + ) + + driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { + "source": "Object.defineProperty(navigator, 'javaEnabled', {get: () => false});" + }) + driver.execute_cdp_cmd("Emulation.setScriptExecutionDisabled", {"value": True}) + + return driver \ No newline at end of file diff --git a/patcher.py b/patcher.py new file mode 100644 index 0000000..99800dc --- /dev/null +++ b/patcher.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +# this module is part of undetected_chromedriver + +from distutils.version import LooseVersion +import io +import json +import logging +import os +import pathlib +import platform +import random +import re +import shutil +import string +import sys +import time +from urllib.request import urlopen +from urllib.request import urlretrieve +import zipfile +from multiprocessing import Lock +import secrets + +logger = logging.getLogger(__name__) + +IS_POSIX = sys.platform.startswith(("darwin", "cygwin", "linux", "linux2")) + + +class Patcher(object): + lock = Lock() + exe_name = "chromedriver%s" + + platform = sys.platform + if platform.endswith("win32"): + d = "~/appdata/roaming/undetected_chromedriver" + elif "LAMBDA_TASK_ROOT" in os.environ: + d = "/tmp/undetected_chromedriver" + elif platform.startswith(("linux", "linux2")): + d = "~/.local/share/undetected_chromedriver" + elif platform.endswith("darwin"): + d = "~/Library/Application Support/undetected_chromedriver" + else: + d = "~/.undetected_chromedriver" + data_path = os.path.abspath(os.path.expanduser(d)) + + def __init__( + self, + executable_path=None, + force=False, + version_main: int = 0, + user_multi_procs=False, + ): + """ + Args: + executable_path: None = automatic + a full file path to the chromedriver executable + force: False + terminate processes which are holding lock + version_main: 0 = auto + specify main chrome version (rounded, ex: 82) + """ + self.force = force + self._custom_exe_path = False + prefix = secrets.token_hex(8) + self.user_multi_procs = user_multi_procs + + self.is_old_chromedriver = version_main and version_main <= 114 + # Needs to be called before self.exe_name is accessed + self._set_platform_name() + + if not os.path.exists(self.data_path): + os.makedirs(self.data_path, exist_ok=True) + + if not executable_path: + self.executable_path = os.path.join( + self.data_path, "_".join([prefix, self.exe_name]) + ) + + if not IS_POSIX: + if executable_path: + if not executable_path[-4:] == ".exe": + executable_path += ".exe" + + self.zip_path = os.path.join(self.data_path, prefix) + + if not executable_path: + if not self.user_multi_procs: + self.executable_path = os.path.abspath( + os.path.join(".", self.executable_path) + ) + + if executable_path: + self._custom_exe_path = True + self.executable_path = executable_path + + # Set the correct repository to download the Chromedriver from + if self.is_old_chromedriver: + self.url_repo = "https://chromedriver.storage.googleapis.com" + else: + self.url_repo = "https://googlechromelabs.github.io/chrome-for-testing" + + self.version_main = version_main + self.version_full = None + + def _set_platform_name(self): + """ + Set the platform and exe name based on the platform undetected_chromedriver is running on + in order to download the correct chromedriver. + """ + if self.platform.endswith("win32"): + self.platform_name = "win32" + self.exe_name %= ".exe" + if self.platform.endswith(("linux", "linux2")): + self.platform_name = "linux64" + self.exe_name %= "" + if self.platform.endswith("darwin"): + if self.is_old_chromedriver: + self.platform_name = "mac64" + else: + self.platform_name = "mac-x64" + self.exe_name %= "" + + def auto(self, executable_path=None, force=False, version_main=None, _=None): + """ + + Args: + executable_path: + force: + version_main: + + Returns: + + """ + p = pathlib.Path(self.data_path) + if self.user_multi_procs: + with Lock(): + files = list(p.rglob("*chromedriver*")) + most_recent = max(files, key=lambda f: f.stat().st_mtime) + files.remove(most_recent) + list(map(lambda f: f.unlink(), files)) + if self.is_binary_patched(most_recent): + self.executable_path = str(most_recent) + return True + + if executable_path: + self.executable_path = executable_path + self._custom_exe_path = True + + if self._custom_exe_path: + ispatched = self.is_binary_patched(self.executable_path) + if not ispatched: + return self.patch_exe() + else: + return + + if version_main: + self.version_main = version_main + if force is True: + self.force = force + + try: + os.unlink(self.executable_path) + except PermissionError: + if self.force: + self.force_kill_instances(self.executable_path) + return self.auto(force=not self.force) + try: + if self.is_binary_patched(): + # assumes already running AND patched + return True + except PermissionError: + pass + # return False + except FileNotFoundError: + pass + + release = self.fetch_release_number() + self.version_main = release.version[0] + self.version_full = release + self.unzip_package(self.fetch_package()) + return self.patch() + + def driver_binary_in_use(self, path: str = None) -> bool: + """ + naive test to check if a found chromedriver binary is + currently in use + + Args: + path: a string or PathLike object to the binary to check. + if not specified, we check use this object's executable_path + """ + if not path: + path = self.executable_path + p = pathlib.Path(path) + + if not p.exists(): + raise OSError("file does not exist: %s" % p) + try: + with open(p, mode="a+b") as fs: + exc = [] + try: + + fs.seek(0, 0) + except PermissionError as e: + exc.append(e) # since some systems apprently allow seeking + # we conduct another test + try: + fs.readline() + except PermissionError as e: + exc.append(e) + + if exc: + + return True + return False + # ok safe to assume this is in use + except Exception as e: + # logger.exception("whoops ", e) + pass + + def cleanup_unused_files(self): + p = pathlib.Path(self.data_path) + items = list(p.glob("*undetected*")) + for item in items: + try: + item.unlink() + except: + pass + + def patch(self): + self.patch_exe() + return self.is_binary_patched() + + def fetch_release_number(self): + """ + Gets the latest major version available, or the latest major version of self.target_version if set explicitly. + :return: version string + :rtype: LooseVersion + """ + # Endpoint for old versions of Chromedriver (114 and below) + if self.is_old_chromedriver: + path = f"/latest_release_{self.version_main}" + path = path.upper() + logger.debug("getting release number from %s" % path) + return LooseVersion(urlopen(self.url_repo + path).read().decode()) + + # Endpoint for new versions of Chromedriver (115+) + if not self.version_main: + # Fetch the latest version + path = "/last-known-good-versions-with-downloads.json" + logger.debug("getting release number from %s" % path) + with urlopen(self.url_repo + path) as conn: + response = conn.read().decode() + + last_versions = json.loads(response) + return LooseVersion(last_versions["channels"]["Stable"]["version"]) + + # Fetch the latest minor version of the major version provided + path = "/latest-versions-per-milestone-with-downloads.json" + logger.debug("getting release number from %s" % path) + with urlopen(self.url_repo + path) as conn: + response = conn.read().decode() + + major_versions = json.loads(response) + return LooseVersion(major_versions["milestones"][str(self.version_main)]["version"]) + + def parse_exe_version(self): + with io.open(self.executable_path, "rb") as f: + for line in iter(lambda: f.readline(), b""): + match = re.search(rb"platform_handle\x00content\x00([0-9.]*)", line) + if match: + return LooseVersion(match[1].decode()) + + def fetch_package(self): + """ + Downloads ChromeDriver from source + + :return: path to downloaded file + """ + zip_name = f"chromedriver_{self.platform_name}.zip" + if self.is_old_chromedriver: + download_url = "%s/%s/%s" % (self.url_repo, self.version_full.vstring, zip_name) + else: + zip_name = zip_name.replace("_", "-", 1) + download_url = "https://storage.googleapis.com/chrome-for-testing-public/%s/%s/%s" + download_url %= (self.version_full.vstring, self.platform_name, zip_name) + + logger.debug("downloading from %s" % download_url) + return urlretrieve(download_url)[0] + + def unzip_package(self, fp): + """ + Does what it says + + :return: path to unpacked executable + """ + exe_path = self.exe_name + if not self.is_old_chromedriver: + # The new chromedriver unzips into its own folder + zip_name = f"chromedriver-{self.platform_name}" + exe_path = os.path.join(zip_name, self.exe_name) + + logger.debug("unzipping %s" % fp) + try: + os.unlink(self.zip_path) + except (FileNotFoundError, OSError): + pass + + os.makedirs(self.zip_path, mode=0o755, exist_ok=True) + with zipfile.ZipFile(fp, mode="r") as zf: + zf.extractall(self.zip_path) + os.rename(os.path.join(self.zip_path, exe_path), self.executable_path) + os.remove(fp) + shutil.rmtree(self.zip_path) + os.chmod(self.executable_path, 0o755) + return self.executable_path + + @staticmethod + def force_kill_instances(exe_name): + """ + kills running instances. + :param: executable name to kill, may be a path as well + + :return: True on success else False + """ + exe_name = os.path.basename(exe_name) + if IS_POSIX: + r = os.system("kill -f -9 $(pidof %s)" % exe_name) + else: + r = os.system("taskkill /f /im %s" % exe_name) + return not r + + @staticmethod + def gen_random_cdc(): + cdc = random.choices(string.ascii_letters, k=27) + return "".join(cdc).encode() + + def is_binary_patched(self, executable_path=None): + executable_path = executable_path or self.executable_path + try: + with io.open(executable_path, "rb") as fh: + return fh.read().find(b"undetected chromedriver") != -1 + except FileNotFoundError: + return False + + def patch_exe(self): + start = time.perf_counter() + logger.info("patching driver executable %s" % self.executable_path) + with io.open(self.executable_path, "r+b") as fh: + content = fh.read() + # match_injected_codeblock = re.search(rb"{window.*;}", content) + match_injected_codeblock = re.search(rb"\{window\.cdc.*?;\}", content) + if match_injected_codeblock: + target_bytes = match_injected_codeblock[0] + new_target_bytes = ( + b'{console.log("undetected chromedriver 1337!")}'.ljust( + len(target_bytes), b" " + ) + ) + new_content = content.replace(target_bytes, new_target_bytes) + if new_content == content: + logger.warning( + "something went wrong patching the driver binary. could not find injection code block" + ) + else: + logger.debug( + "found block:\n%s\nreplacing with:\n%s" + % (target_bytes, new_target_bytes) + ) + fh.seek(0) + fh.write(new_content) + logger.debug( + "patching took us {:.2f} seconds".format(time.perf_counter() - start) + ) + + def __repr__(self): + return "{0:s}({1:s})".format( + self.__class__.__name__, + self.executable_path, + ) + + def __del__(self): + if self._custom_exe_path: + # if the driver binary is specified by user + # we assume it is important enough to not delete it + return + else: + timeout = 3 # stop trying after this many seconds + t = time.monotonic() + now = lambda: time.monotonic() + while now() - t > timeout: + # we don't want to wait until the end of time + try: + if self.user_multi_procs: + break + os.unlink(self.executable_path) + logger.debug("successfully unlinked %s" % self.executable_path) + break + except (OSError, RuntimeError, PermissionError): + time.sleep(0.01) + continue + except FileNotFoundError: + break diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0d8b373 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +requests +setuptools +python-dotenv +tls-client +undetected-chromedriver +selenium +grpc_interceptor_headers \ No newline at end of file diff --git a/runner.sh b/runner.sh new file mode 100644 index 0000000..185249e --- /dev/null +++ b/runner.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +TARGET_DIR=$(python -c "import undetected_chromedriver as uc; print(uc.__path__[0])" 2>/dev/null) + +if [ -d "$TARGET_DIR" ]; then + echo "patcher.py в $TARGET_DIR" + cp patcher.py "$TARGET_DIR" +else + echo "undetected_chromedriver не найден" +fi + +pip install -r requirements.txt + +python initiator.py \ No newline at end of file