praktik1/PythonApplication1.py

229 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from datetime import datetime
from collections import defaultdict
# === 10 функций ===
def load_logs(file_path: str) -> list[dict]:
"""Читает файл с логами, преобразует каждую строку в словарь"""
logs = []
try:
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
line = line.strip()
if not line:
continue
# Парсинг строки лога (формат Nginx/Common Log Format)
parts = line.split(' ')
if len(parts) < 10:
continue
ip = parts[0]
timestamp = parts[3][1:] # убираем '['
method = parts[5][1:] # убираем '"'
url = parts[6]
status = int(parts[8])
size = int(parts[9])
log_entry = {
'ip': ip,
'timestamp': timestamp,
'method': method,
'url': url,
'status': status,
'size': size
}
logs.append(log_entry)
except FileNotFoundError:
print(f"Ошибка: Файл {file_path} не найден")
return []
return logs
def filter_by_status(logs: list[dict], status_code: int) -> list[dict]:
"""Возвращает записи с заданным статус-кодом"""
return [log for log in logs if log['status'] == status_code]
def filter_by_ip_prefix(logs: list[dict], prefix: str) -> list[dict]:
"""Фильтрует записи по началу IP-адреса"""
return [log for log in logs if log['ip'].startswith(prefix)]
def get_unique_ips(logs: list[dict]) -> list[str]:
"""Возвращает отсортированный список уникальных IP-адресов"""
unique_ips = set(log['ip'] for log in logs)
return sorted(unique_ips)
def count_requests_by_method(logs: list[dict]) -> dict[str, int]:
"""Подсчитывает количество запросов по каждому HTTP-методу"""
method_count = defaultdict(int)
for log in logs:
method_count[log['method']] += 1
return dict(method_count)
def find_top_urls(logs: list[dict], n: int) -> list[tuple[str, int]]:
"""Возвращает топ-N самых часто запрашиваемых URL"""
url_count = defaultdict(int)
for log in logs:
url_count[log['url']] += 1
sorted_urls = sorted(url_count.items(), key=lambda x: x[1], reverse=True)
return sorted_urls[:n]
def get_avg_response_size(logs: list[dict]) -> float:
"""Вычисляет средний размер ответа"""
if not logs:
return 0.0
total_size = sum(log['size'] for log in logs)
return total_size / len(logs)
def filter_by_date_range(logs: list[dict], start: str, end: str) -> list[dict]:
"""Фильтрует логи по временному диапазону"""
def parse_date(date_str: str) -> datetime:
# Формат: 02/Jan/2025:13:45:12
return datetime.strptime(date_str, "%d/%b/%Y:%H:%M:%S")
start_date = parse_date(start)
end_date = parse_date(end)
filtered = []
for log in logs:
try:
log_date = parse_date(log['timestamp'])
if start_date <= log_date <= end_date:
filtered.append(log)
except:
continue
return filtered
def group_by_hour(logs: list[dict]) -> dict[int, int]:
"""Группирует количество запросов по часам суток"""
hour_count = defaultdict(int)
for log in logs:
try:
# Извлекаем час из временной метки
timestamp = log['timestamp']
# Формат: 02/Jan/2025:13:45:12
hour = int(timestamp.split(':')[0].split('/')[-1].split(':')[0])
hour_count[hour] += 1
except:
continue
return dict(hour_count)
def generate_report(logs: list[dict]) -> str:
"""Формирует текстовый отчёт"""
if not logs:
return "Нет данных для отчёта"
# Используем другие функции
method_stats = count_requests_by_method(logs)
top_urls = find_top_urls(logs, 3)
avg_size = get_avg_response_size(logs)
hourly_stats = group_by_hour(logs)
# Формируем отчёт
report = "=" * 50 + "\n"
report += "ОТЧЁТ ПО ЛОГАМ ВЕБ-СЕРВЕРА\n"
report += "=" * 50 + "\n\n"
report += "1. СТАТИСТИКА ПО HTTP-МЕТОДАМ:\n"
for method, count in sorted(method_stats.items()):
report += f" {method}: {count} запросов\n"
report += "\n2. ТОП-3 САМЫХ ЧАСТЫХ URL:\n"
for i, (url, count) in enumerate(top_urls, 1):
report += f" {i}. {url} - {count} раз(а)\n"
report += f"\n3. СРЕДНИЙ РАЗМЕР ОТВЕТА: {avg_size:.2f} байт\n"
report += "\n4. РАСПРЕДЕЛЕНИЕ ПО ЧАСАМ:\n"
for hour in sorted(hourly_stats.keys()):
report += f" {hour:02d}:00 - {hourly_stats[hour]} запросов\n"
report += "\n" + "=" * 50 + "\n"
return report
# === Функция main для демонстрации ===
def main():
# 1. Загружаем логи
logs = load_logs("data/server_logs.txt")
if not logs:
print("Не удалось загрузить логи. Проверьте наличие файла data/server_logs.txt")
return
print("=" * 60)
print("АНАЛИЗ ЛОГОВ ВЕБ-СЕРВЕРА")
print("=" * 60)
# 2. Общее количество записей
print(f"\n📊 Общее количество записей: {len(logs)}")
# 3. Уникальные IP
unique_ips = get_unique_ips(logs)
print(f"🌐 Количество уникальных IP: {len(unique_ips)}")
print(f" Примеры: {unique_ips[:3]}")
# 4. Фильтрация по статусу 404
errors_404 = filter_by_status(logs, 404)
print(f"\n❌ Количество ошибок 404: {len(errors_404)}")
# 5. Топ-2 URL с ошибками 500
errors_500 = filter_by_status(logs, 500)
top_500_urls = find_top_urls(errors_500, 2)
print(f"⚠️ Топ-2 URL с ошибками 500:")
for url, count in top_500_urls:
print(f" - {url}: {count} раз(а)")
# 6. Фильтрация по префиксу IP (пример)
local_ips = filter_by_ip_prefix(logs, "192.168.")
print(f"\n🏠 Запросы с локальных IP (192.168.*): {len(local_ips)}")
# 7. Средний размер ответа
avg_size = get_avg_response_size(logs)
print(f"📏 Средний размер ответа: {avg_size:.2f} байт")
# 8. Фильтрация по дате (пример)
date_filtered = filter_by_date_range(logs, "01/Jan/2025:00:00:00", "31/Jan/2025:23:59:59")
print(f"📅 Запросы за январь 2025: {len(date_filtered)}")
# 9. Группировка по часам
hourly = group_by_hour(logs)
peak_hour = max(hourly, key=hourly.get)
print(f"⏰ Пиковый час: {peak_hour}:00 ({hourly[peak_hour]} запросов)")
# 10. Генерация отчёта
report = generate_report(logs)
# Создаём директорию для отчётов
os.makedirs("reports", exist_ok=True)
# Сохраняем отчёт в файл
with open("reports/summary.txt", "w", encoding="utf-8") as f:
f.write(report)
print(f"\n✅ Полный отчёт сохранён в файл: reports/summary.txt")
print("\n" + "=" * 60)
# === Запуск ===
if __name__ == "__main__":
# Запускаем анализ
main()