commit 670ae29bbe73b4f86c45fa60cfd2e6720118c524 Author: Егор Батманов Date: Fri Apr 3 14:00:07 2026 +0000 Загрузить файлы в «/» diff --git a/Practik.py b/Practik.py new file mode 100644 index 0000000..a95020a --- /dev/null +++ b/Practik.py @@ -0,0 +1,206 @@ +from typing import List, Dict +from collections import Counter + +#Принимает путь к текстовому файлу, читает его построчно, возвращает список строк. Если файл не найден, возвращает пустой список. Удаляет символы перевода строки. +def load_logs(file_path: str) -> list[str]: + result = [] + try: + file_open = open(file_path, "r", encoding='utf-8') + except: + return [] + for i in file_open.readlines(): + result.append(i.strip('\n')) + return result + +#.append() - добавляет в конец списка +#.strip(\n) - убирает в конце сроки \n +#.lstrip(\n) - убирает в начале сроки \n + + +#Принимает строку лога. Возвращает True, если строка соответствует минимальному формату +#(не пустая и содержит хотя бы 6 элементов, разделенных пробелом — например, IP, -, дата, метод, статус, размер). +#Возвращает False для мусорных строк. +def validate_log_line(line: str) -> bool: + if len(line) < 1 and len(line.split()) < 6: + return False + return True + +#.split() - разбивает на элементы +#len() - количество элементов в списке + + +#Принимает валидную строку лога, извлекает компоненты и возвращает словарь с ключами: +# ip, method, url, status, size. Статус и размер преобразует в int. Если размер равен "-", устанавливает 0. +def parse_log_line(line: str) -> dict: + if validate_log_line(line): + parts = line.split() + ip = parts[0] + method = parts[3] + url = parts[4] + status = int(parts[5]) + size = int(parts[6]) if parts[6] == "-" else 0 + return { + 'ip': ip, + 'method': method, + 'url': url, + 'status': status, + 'size': size + } + return {} + + +#Принимает список словарей-записей и код статуса (например, 404). +#Возвращает новый список, содержащий только записи с указанным статусом. +def filter_status(logs: List[Dict], status_code: int) -> List[Dict]: + result = [] + for log in logs: + if log['status'] == status_code: + result.append(log) + + return result + +# .append() - добавляет элемент в конец списка + +#Принимает список записей и число n. Возвращает список из n кортежей (IP, количество запросов), +#отсортированных по убыванию количества запросов. Использует Counter. +def get_top_clients(logs: List[Dict], n: int) -> List[tuple[str, int]]: + # Собираем все IP-адреса в список + ip_list = [] + for log in logs: + ip_list.append(log['ip']) + + # Используем Counter для подсчета количества запросов от каждого IP + ip_counter = Counter(ip_list) + + # Получаем n самых частых IP + result = ip_counter.most_common(n) + + return result + +# .append() - добавляет элемент в конец списка +# .most_common() - возвращает список самых частых элементов + +#Принимает список записей. Возвращает суммарный размер трафика (поле size) для переданного списка. +def calculate_total_traffic(logs: List[Dict]) -> int: + total = 0 + for log in logs: + total += log['size'] + return total + +#Принимает список записей и HTTP-метод (например, "POST"). Возвращает отфильтрованный список. +def filter_by_method(logs: List[Dict], method: str) -> List[Dict]: + result = [] + for log in logs: + if log['method'] == method: + result.append(log) + return result + +#Принимает список записей. Возвращает множество уникальных URL, к которым были обращения. +def extract_unique_urls(logs: List[Dict]) -> set[str]: + unique_urls = set() + for log in logs: + unique_urls.add(log['url']) + return unique_urls + +#Принимает список записей и порог срабатывания. Возвращает множество IP-адресов, +#у которых количество запросов с кодом 4xx или 5xx превышает threshold. +def detect_suspicious_ips(logs: List[Dict], threshold: int) -> set[str]: + # Словарь для подсчета ошибок по IP + error_count = {} + + for log in logs: + status = log['status'] + # Проверяем, является ли статус ошибкой (4xx или 5xx) + if 400 <= status <= 599: + ip = log['ip'] + if ip in error_count: + error_count[ip] += 1 + else: + error_count[ip] = 1 + + # Формируем множество IP, у которых количество ошибок превышает порог + suspicious = set() + for ip, count in error_count.items(): + if count > threshold: + suspicious.add(ip) + + return suspicious + +#.items() - возвращает пары (ключ, значение) из словаря +# .add() - добавляет элемент в множество (set) + +#Принимает исходный список записей и параметры. Возвращает итоговый словарь отчета. +def generate_report(logs: List[Dict], top_n: int, error_threshold: int) -> dict: + report = { + 'total_requests': len(logs), + 'top_clients': get_top_clients(logs, top_n), + 'total_traffic': calculate_total_traffic(logs), + 'unique_endpoints': extract_unique_urls(logs), + 'suspicious_ips': detect_suspicious_ips(logs, error_threshold) + } + return report + +#Основная функция +def main(): + # Чтение данных + file_path = "server_logs.txt" + raw_logs = load_logs(file_path) + + if not raw_logs: + print(f"Файл {file_path} не найден или пуст") + return + + print(f"Загружено строк: {len(raw_logs)}") + + # Валидация и парсинг + parsed_logs = [] + for line in raw_logs: + if validate_log_line(line): + parsed = parse_log_line(line) + if parsed: # Проверяем, что парсинг успешен + parsed_logs.append(parsed) + else: + # Выводим предупреждение в stderr + import sys + print(f"Предупреждение: невалидная строка '{line[:50]}...' пропущена", file=sys.stderr) + + if not parsed_logs: + print("Нет валидных записей для обработки") + return + + print(f"Валидных записей: {len(parsed_logs)}") + print("-" * 50) + + # Генерация отчета + report = generate_report(parsed_logs, top_n=5, error_threshold=10) + + # Вывод отчета + print("=== ОТЧЕТ ПО ЛОГАМ ВЕБ-СЕРВЕРА ===\n") + print(f"Общее количество запросов: {report['total_requests']}") + print(f"Суммарный трафик: {report['total_traffic']} байт") + print(f"Уникальных эндпоинтов: {len(report['unique_endpoints'])}") + + print("\nТоп-5 клиентов по количеству запросов:") + for ip, count in report['top_clients']: + print(f" {ip}: {count} запросов") + + print(f"\nПодозрительные IP (ошибок > 10):") + if report['suspicious_ips']: + for ip in report['suspicious_ips']: + print(f" {ip}") + else: + print(" Не обнаружено") + + print("-" * 50) + + # Дополнительный анализ: POST-запросы + post_requests = filter_by_method(parsed_logs, "POST") + post_traffic = calculate_total_traffic(post_requests) + + print("\n=== АНАЛИЗ POST-ЗАПРОСОВ ===\n") + print(f"Количество POST-запросов: {len(post_requests)}") + print(f"Суммарный трафик POST-запросов: {post_traffic} байт") + +# Запуск программы +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/server_logs.txt b/server_logs.txt new file mode 100644 index 0000000..f812dca --- /dev/null +++ b/server_logs.txt @@ -0,0 +1,10 @@ +192.168.1.10 - - GET /index.html 200 1234 +192.168.1.15 - - POST /api/login 200 567 +10.0.0.5 - - GET /images/logo.png 200 45020 +192.168.1.10 - - GET /missing_page 404 120 +192.168.1.20 - - DELETE /admin 403 89 +10.0.0.5 - - GET /error 500 0 +192.168.1.15 - - POST /submit 201 340 +192.168.1.10 - - GET /style.css 200 890 +192.168.1.10 - - GET /script.js 200 1500 +192.168.1.10 - - GET /favicon.ico 404 0 \ No newline at end of file