import os from datetime import datetime from collections import defaultdict # === 10 функций === def load_logs(file_path: str) -> list[dict]: """Читает файл с логами, преобразует каждую строку в словарь""" logs = [] try: with open(file_path, 'r', encoding='utf-8') as file: for line in file: line = line.strip() if not line: continue # Парсинг строки лога (формат Nginx/Common Log Format) parts = line.split(' ') if len(parts) < 10: continue ip = parts[0] timestamp = parts[3][1:] # убираем '[' method = parts[5][1:] # убираем '"' url = parts[6] status = int(parts[8]) size = int(parts[9]) log_entry = { 'ip': ip, 'timestamp': timestamp, 'method': method, 'url': url, 'status': status, 'size': size } logs.append(log_entry) except FileNotFoundError: print(f"Ошибка: Файл {file_path} не найден") return [] return logs def filter_by_status(logs: list[dict], status_code: int) -> list[dict]: """Возвращает записи с заданным статус-кодом""" return [log for log in logs if log['status'] == status_code] def filter_by_ip_prefix(logs: list[dict], prefix: str) -> list[dict]: """Фильтрует записи по началу IP-адреса""" return [log for log in logs if log['ip'].startswith(prefix)] def get_unique_ips(logs: list[dict]) -> list[str]: """Возвращает отсортированный список уникальных IP-адресов""" unique_ips = set(log['ip'] for log in logs) return sorted(unique_ips) def count_requests_by_method(logs: list[dict]) -> dict[str, int]: """Подсчитывает количество запросов по каждому HTTP-методу""" method_count = defaultdict(int) for log in logs: method_count[log['method']] += 1 return dict(method_count) def find_top_urls(logs: list[dict], n: int) -> list[tuple[str, int]]: """Возвращает топ-N самых часто запрашиваемых URL""" url_count = defaultdict(int) for log in logs: url_count[log['url']] += 1 sorted_urls = sorted(url_count.items(), key=lambda x: x[1], reverse=True) return sorted_urls[:n] def get_avg_response_size(logs: list[dict]) -> float: """Вычисляет средний размер ответа""" if not logs: return 0.0 total_size = sum(log['size'] for log in logs) return total_size / len(logs) def filter_by_date_range(logs: list[dict], start: str, end: str) -> list[dict]: """Фильтрует логи по временному диапазону""" def parse_date(date_str: str) -> datetime: # Формат: 02/Jan/2025:13:45:12 return datetime.strptime(date_str, "%d/%b/%Y:%H:%M:%S") start_date = parse_date(start) end_date = parse_date(end) filtered = [] for log in logs: try: log_date = parse_date(log['timestamp']) if start_date <= log_date <= end_date: filtered.append(log) except: continue return filtered def group_by_hour(logs: list[dict]) -> dict[int, int]: """Группирует количество запросов по часам суток""" hour_count = defaultdict(int) for log in logs: try: # Извлекаем час из временной метки timestamp = log['timestamp'] # Формат: 02/Jan/2025:13:45:12 hour = int(timestamp.split(':')[0].split('/')[-1].split(':')[0]) hour_count[hour] += 1 except: continue return dict(hour_count) def generate_report(logs: list[dict]) -> str: """Формирует текстовый отчёт""" if not logs: return "Нет данных для отчёта" # Используем другие функции method_stats = count_requests_by_method(logs) top_urls = find_top_urls(logs, 3) avg_size = get_avg_response_size(logs) hourly_stats = group_by_hour(logs) # Формируем отчёт report = "=" * 50 + "\n" report += "ОТЧЁТ ПО ЛОГАМ ВЕБ-СЕРВЕРА\n" report += "=" * 50 + "\n\n" report += "1. СТАТИСТИКА ПО HTTP-МЕТОДАМ:\n" for method, count in sorted(method_stats.items()): report += f" {method}: {count} запросов\n" report += "\n2. ТОП-3 САМЫХ ЧАСТЫХ URL:\n" for i, (url, count) in enumerate(top_urls, 1): report += f" {i}. {url} - {count} раз(а)\n" report += f"\n3. СРЕДНИЙ РАЗМЕР ОТВЕТА: {avg_size:.2f} байт\n" report += "\n4. РАСПРЕДЕЛЕНИЕ ПО ЧАСАМ:\n" for hour in sorted(hourly_stats.keys()): report += f" {hour:02d}:00 - {hourly_stats[hour]} запросов\n" report += "\n" + "=" * 50 + "\n" return report # === Функция main для демонстрации === def main(): # 1. Загружаем логи logs = load_logs("data/server_logs.txt") if not logs: print("Не удалось загрузить логи. Проверьте наличие файла data/server_logs.txt") return print("=" * 60) print("АНАЛИЗ ЛОГОВ ВЕБ-СЕРВЕРА") print("=" * 60) # 2. Общее количество записей print(f"\n📊 Общее количество записей: {len(logs)}") # 3. Уникальные IP unique_ips = get_unique_ips(logs) print(f"🌐 Количество уникальных IP: {len(unique_ips)}") print(f" Примеры: {unique_ips[:3]}") # 4. Фильтрация по статусу 404 errors_404 = filter_by_status(logs, 404) print(f"\n❌ Количество ошибок 404: {len(errors_404)}") # 5. Топ-2 URL с ошибками 500 errors_500 = filter_by_status(logs, 500) top_500_urls = find_top_urls(errors_500, 2) print(f"⚠️ Топ-2 URL с ошибками 500:") for url, count in top_500_urls: print(f" - {url}: {count} раз(а)") # 6. Фильтрация по префиксу IP (пример) local_ips = filter_by_ip_prefix(logs, "192.168.") print(f"\n🏠 Запросы с локальных IP (192.168.*): {len(local_ips)}") # 7. Средний размер ответа avg_size = get_avg_response_size(logs) print(f"📏 Средний размер ответа: {avg_size:.2f} байт") # 8. Фильтрация по дате (пример) date_filtered = filter_by_date_range(logs, "01/Jan/2025:00:00:00", "31/Jan/2025:23:59:59") print(f"📅 Запросы за январь 2025: {len(date_filtered)}") # 9. Группировка по часам hourly = group_by_hour(logs) peak_hour = max(hourly, key=hourly.get) print(f"⏰ Пиковый час: {peak_hour}:00 ({hourly[peak_hour]} запросов)") # 10. Генерация отчёта report = generate_report(logs) # Создаём директорию для отчётов os.makedirs("reports", exist_ok=True) # Сохраняем отчёт в файл with open("reports/summary.txt", "w", encoding="utf-8") as f: f.write(report) print(f"\n✅ Полный отчёт сохранён в файл: reports/summary.txt") print("\n" + "=" * 60) # === Запуск === if __name__ == "__main__": # Запускаем анализ main()