229 lines
8.3 KiB
Python
229 lines
8.3 KiB
Python
import os
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
|
||
# === 10 функций ===
|
||
|
||
def load_logs(file_path: str) -> list[dict]:
|
||
"""Читает файл с логами, преобразует каждую строку в словарь"""
|
||
logs = []
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
for line in file:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# Парсинг строки лога (формат Nginx/Common Log Format)
|
||
parts = line.split(' ')
|
||
if len(parts) < 10:
|
||
continue
|
||
|
||
ip = parts[0]
|
||
timestamp = parts[3][1:] # убираем '['
|
||
method = parts[5][1:] # убираем '"'
|
||
url = parts[6]
|
||
status = int(parts[8])
|
||
size = int(parts[9])
|
||
|
||
log_entry = {
|
||
'ip': ip,
|
||
'timestamp': timestamp,
|
||
'method': method,
|
||
'url': url,
|
||
'status': status,
|
||
'size': size
|
||
}
|
||
logs.append(log_entry)
|
||
except FileNotFoundError:
|
||
print(f"Ошибка: Файл {file_path} не найден")
|
||
return []
|
||
|
||
return logs
|
||
|
||
|
||
def filter_by_status(logs: list[dict], status_code: int) -> list[dict]:
|
||
"""Возвращает записи с заданным статус-кодом"""
|
||
return [log for log in logs if log['status'] == status_code]
|
||
|
||
|
||
def filter_by_ip_prefix(logs: list[dict], prefix: str) -> list[dict]:
|
||
"""Фильтрует записи по началу IP-адреса"""
|
||
return [log for log in logs if log['ip'].startswith(prefix)]
|
||
|
||
|
||
def get_unique_ips(logs: list[dict]) -> list[str]:
|
||
"""Возвращает отсортированный список уникальных IP-адресов"""
|
||
unique_ips = set(log['ip'] for log in logs)
|
||
return sorted(unique_ips)
|
||
|
||
|
||
def count_requests_by_method(logs: list[dict]) -> dict[str, int]:
|
||
"""Подсчитывает количество запросов по каждому HTTP-методу"""
|
||
method_count = defaultdict(int)
|
||
for log in logs:
|
||
method_count[log['method']] += 1
|
||
return dict(method_count)
|
||
|
||
|
||
def find_top_urls(logs: list[dict], n: int) -> list[tuple[str, int]]:
|
||
"""Возвращает топ-N самых часто запрашиваемых URL"""
|
||
url_count = defaultdict(int)
|
||
for log in logs:
|
||
url_count[log['url']] += 1
|
||
|
||
sorted_urls = sorted(url_count.items(), key=lambda x: x[1], reverse=True)
|
||
return sorted_urls[:n]
|
||
|
||
|
||
def get_avg_response_size(logs: list[dict]) -> float:
|
||
"""Вычисляет средний размер ответа"""
|
||
if not logs:
|
||
return 0.0
|
||
|
||
total_size = sum(log['size'] for log in logs)
|
||
return total_size / len(logs)
|
||
|
||
|
||
def filter_by_date_range(logs: list[dict], start: str, end: str) -> list[dict]:
|
||
"""Фильтрует логи по временному диапазону"""
|
||
def parse_date(date_str: str) -> datetime:
|
||
# Формат: 02/Jan/2025:13:45:12
|
||
return datetime.strptime(date_str, "%d/%b/%Y:%H:%M:%S")
|
||
|
||
start_date = parse_date(start)
|
||
end_date = parse_date(end)
|
||
|
||
filtered = []
|
||
for log in logs:
|
||
try:
|
||
log_date = parse_date(log['timestamp'])
|
||
if start_date <= log_date <= end_date:
|
||
filtered.append(log)
|
||
except:
|
||
continue
|
||
|
||
return filtered
|
||
|
||
|
||
def group_by_hour(logs: list[dict]) -> dict[int, int]:
|
||
"""Группирует количество запросов по часам суток"""
|
||
hour_count = defaultdict(int)
|
||
|
||
for log in logs:
|
||
try:
|
||
# Извлекаем час из временной метки
|
||
timestamp = log['timestamp']
|
||
# Формат: 02/Jan/2025:13:45:12
|
||
hour = int(timestamp.split(':')[0].split('/')[-1].split(':')[0])
|
||
hour_count[hour] += 1
|
||
except:
|
||
continue
|
||
|
||
return dict(hour_count)
|
||
|
||
|
||
def generate_report(logs: list[dict]) -> str:
|
||
"""Формирует текстовый отчёт"""
|
||
if not logs:
|
||
return "Нет данных для отчёта"
|
||
|
||
# Используем другие функции
|
||
method_stats = count_requests_by_method(logs)
|
||
top_urls = find_top_urls(logs, 3)
|
||
avg_size = get_avg_response_size(logs)
|
||
hourly_stats = group_by_hour(logs)
|
||
|
||
# Формируем отчёт
|
||
report = "=" * 50 + "\n"
|
||
report += "ОТЧЁТ ПО ЛОГАМ ВЕБ-СЕРВЕРА\n"
|
||
report += "=" * 50 + "\n\n"
|
||
|
||
report += "1. СТАТИСТИКА ПО HTTP-МЕТОДАМ:\n"
|
||
for method, count in sorted(method_stats.items()):
|
||
report += f" {method}: {count} запросов\n"
|
||
|
||
report += "\n2. ТОП-3 САМЫХ ЧАСТЫХ URL:\n"
|
||
for i, (url, count) in enumerate(top_urls, 1):
|
||
report += f" {i}. {url} - {count} раз(а)\n"
|
||
|
||
report += f"\n3. СРЕДНИЙ РАЗМЕР ОТВЕТА: {avg_size:.2f} байт\n"
|
||
|
||
report += "\n4. РАСПРЕДЕЛЕНИЕ ПО ЧАСАМ:\n"
|
||
for hour in sorted(hourly_stats.keys()):
|
||
report += f" {hour:02d}:00 - {hourly_stats[hour]} запросов\n"
|
||
|
||
report += "\n" + "=" * 50 + "\n"
|
||
|
||
return report
|
||
|
||
|
||
# === Функция main для демонстрации ===
|
||
|
||
def main():
|
||
# 1. Загружаем логи
|
||
logs = load_logs("data/server_logs.txt")
|
||
if not logs:
|
||
print("Не удалось загрузить логи. Проверьте наличие файла data/server_logs.txt")
|
||
return
|
||
|
||
print("=" * 60)
|
||
print("АНАЛИЗ ЛОГОВ ВЕБ-СЕРВЕРА")
|
||
print("=" * 60)
|
||
|
||
# 2. Общее количество записей
|
||
print(f"\n📊 Общее количество записей: {len(logs)}")
|
||
|
||
# 3. Уникальные IP
|
||
unique_ips = get_unique_ips(logs)
|
||
print(f"🌐 Количество уникальных IP: {len(unique_ips)}")
|
||
print(f" Примеры: {unique_ips[:3]}")
|
||
|
||
# 4. Фильтрация по статусу 404
|
||
errors_404 = filter_by_status(logs, 404)
|
||
print(f"\n❌ Количество ошибок 404: {len(errors_404)}")
|
||
|
||
# 5. Топ-2 URL с ошибками 500
|
||
errors_500 = filter_by_status(logs, 500)
|
||
top_500_urls = find_top_urls(errors_500, 2)
|
||
print(f"⚠️ Топ-2 URL с ошибками 500:")
|
||
for url, count in top_500_urls:
|
||
print(f" - {url}: {count} раз(а)")
|
||
|
||
# 6. Фильтрация по префиксу IP (пример)
|
||
local_ips = filter_by_ip_prefix(logs, "192.168.")
|
||
print(f"\n🏠 Запросы с локальных IP (192.168.*): {len(local_ips)}")
|
||
|
||
# 7. Средний размер ответа
|
||
avg_size = get_avg_response_size(logs)
|
||
print(f"📏 Средний размер ответа: {avg_size:.2f} байт")
|
||
|
||
# 8. Фильтрация по дате (пример)
|
||
date_filtered = filter_by_date_range(logs, "01/Jan/2025:00:00:00", "31/Jan/2025:23:59:59")
|
||
print(f"📅 Запросы за январь 2025: {len(date_filtered)}")
|
||
|
||
# 9. Группировка по часам
|
||
hourly = group_by_hour(logs)
|
||
peak_hour = max(hourly, key=hourly.get)
|
||
print(f"⏰ Пиковый час: {peak_hour}:00 ({hourly[peak_hour]} запросов)")
|
||
|
||
# 10. Генерация отчёта
|
||
report = generate_report(logs)
|
||
|
||
# Создаём директорию для отчётов
|
||
os.makedirs("reports", exist_ok=True)
|
||
|
||
# Сохраняем отчёт в файл
|
||
with open("reports/summary.txt", "w", encoding="utf-8") as f:
|
||
f.write(report)
|
||
|
||
print(f"\n✅ Полный отчёт сохранён в файл: reports/summary.txt")
|
||
print("\n" + "=" * 60)
|
||
|
||
|
||
# === Запуск ===
|
||
|
||
if __name__ == "__main__":
|
||
# Запускаем анализ
|
||
main()
|