from typing import List, Dict from collections import Counter #Принимает путь к текстовому файлу, читает его построчно, возвращает список строк. Если файл не найден, возвращает пустой список. Удаляет символы перевода строки. def load_logs(file_path: str) -> list[str]: result = [] try: file_open = open(file_path, "r", encoding='utf-8') except: return [] for i in file_open.readlines(): result.append(i.strip('\n')) return result #.append() - добавляет в конец списка #.strip(\n) - убирает в конце сроки \n #.lstrip(\n) - убирает в начале сроки \n #Принимает строку лога. Возвращает True, если строка соответствует минимальному формату #(не пустая и содержит хотя бы 6 элементов, разделенных пробелом — например, IP, -, дата, метод, статус, размер). #Возвращает False для мусорных строк. def validate_log_line(line: str) -> bool: if len(line) < 1 and len(line.split()) < 6: return False return True #.split() - разбивает на элементы #len() - количество элементов в списке #Принимает валидную строку лога, извлекает компоненты и возвращает словарь с ключами: # ip, method, url, status, size. Статус и размер преобразует в int. Если размер равен "-", устанавливает 0. def parse_log_line(line: str) -> dict: if validate_log_line(line): parts = line.split() ip = parts[0] method = parts[3] url = parts[4] status = int(parts[5]) size = int(parts[6]) if parts[6] == "-" else 0 return { 'ip': ip, 'method': method, 'url': url, 'status': status, 'size': size } return {} #Принимает список словарей-записей и код статуса (например, 404). #Возвращает новый список, содержащий только записи с указанным статусом. def filter_status(logs: List[Dict], status_code: int) -> List[Dict]: result = [] for log in logs: if log['status'] == status_code: result.append(log) return result # .append() - добавляет элемент в конец списка #Принимает список записей и число n. Возвращает список из n кортежей (IP, количество запросов), #отсортированных по убыванию количества запросов. Использует Counter. def get_top_clients(logs: List[Dict], n: int) -> List[tuple[str, int]]: # Собираем все IP-адреса в список ip_list = [] for log in logs: ip_list.append(log['ip']) # Используем Counter для подсчета количества запросов от каждого IP ip_counter = Counter(ip_list) # Получаем n самых частых IP result = ip_counter.most_common(n) return result # .append() - добавляет элемент в конец списка # .most_common() - возвращает список самых частых элементов #Принимает список записей. Возвращает суммарный размер трафика (поле size) для переданного списка. def calculate_total_traffic(logs: List[Dict]) -> int: total = 0 for log in logs: total += log['size'] return total #Принимает список записей и HTTP-метод (например, "POST"). Возвращает отфильтрованный список. def filter_by_method(logs: List[Dict], method: str) -> List[Dict]: result = [] for log in logs: if log['method'] == method: result.append(log) return result #Принимает список записей. Возвращает множество уникальных URL, к которым были обращения. def extract_unique_urls(logs: List[Dict]) -> set[str]: unique_urls = set() for log in logs: unique_urls.add(log['url']) return unique_urls #Принимает список записей и порог срабатывания. Возвращает множество IP-адресов, #у которых количество запросов с кодом 4xx или 5xx превышает threshold. def detect_suspicious_ips(logs: List[Dict], threshold: int) -> set[str]: # Словарь для подсчета ошибок по IP error_count = {} for log in logs: status = log['status'] # Проверяем, является ли статус ошибкой (4xx или 5xx) if 400 <= status <= 599: ip = log['ip'] if ip in error_count: error_count[ip] += 1 else: error_count[ip] = 1 # Формируем множество IP, у которых количество ошибок превышает порог suspicious = set() for ip, count in error_count.items(): if count > threshold: suspicious.add(ip) return suspicious #.items() - возвращает пары (ключ, значение) из словаря # .add() - добавляет элемент в множество (set) #Принимает исходный список записей и параметры. Возвращает итоговый словарь отчета. def generate_report(logs: List[Dict], top_n: int, error_threshold: int) -> dict: report = { 'total_requests': len(logs), 'top_clients': get_top_clients(logs, top_n), 'total_traffic': calculate_total_traffic(logs), 'unique_endpoints': extract_unique_urls(logs), 'suspicious_ips': detect_suspicious_ips(logs, error_threshold) } return report #Основная функция def main(): # Чтение данных file_path = "server_logs.txt" raw_logs = load_logs(file_path) if not raw_logs: print(f"Файл {file_path} не найден или пуст") return print(f"Загружено строк: {len(raw_logs)}") # Валидация и парсинг parsed_logs = [] for line in raw_logs: if validate_log_line(line): parsed = parse_log_line(line) if parsed: # Проверяем, что парсинг успешен parsed_logs.append(parsed) else: # Выводим предупреждение в stderr import sys print(f"Предупреждение: невалидная строка '{line[:50]}...' пропущена", file=sys.stderr) if not parsed_logs: print("Нет валидных записей для обработки") return print(f"Валидных записей: {len(parsed_logs)}") print("-" * 50) # Генерация отчета report = generate_report(parsed_logs, top_n=5, error_threshold=10) # Вывод отчета print("=== ОТЧЕТ ПО ЛОГАМ ВЕБ-СЕРВЕРА ===\n") print(f"Общее количество запросов: {report['total_requests']}") print(f"Суммарный трафик: {report['total_traffic']} байт") print(f"Уникальных эндпоинтов: {len(report['unique_endpoints'])}") print("\nТоп-5 клиентов по количеству запросов:") for ip, count in report['top_clients']: print(f" {ip}: {count} запросов") print(f"\nПодозрительные IP (ошибок > 10):") if report['suspicious_ips']: for ip in report['suspicious_ips']: print(f" {ip}") else: print(" Не обнаружено") print("-" * 50) # Дополнительный анализ: POST-запросы post_requests = filter_by_method(parsed_logs, "POST") post_traffic = calculate_total_traffic(post_requests) print("\n=== АНАЛИЗ POST-ЗАПРОСОВ ===\n") print(f"Количество POST-запросов: {len(post_requests)}") print(f"Суммарный трафик POST-запросов: {post_traffic} байт") # Запуск программы if __name__ == "__main__": main()