rep_egorbat90/Practik.py

206 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import List, Dict
from collections import Counter
#Принимает путь к текстовому файлу, читает его построчно, возвращает список строк. Если файл не найден, возвращает пустой список. Удаляет символы перевода строки.
def load_logs(file_path: str) -> list[str]:
result = []
try:
file_open = open(file_path, "r", encoding='utf-8')
except:
return []
for i in file_open.readlines():
result.append(i.strip('\n'))
return result
#.append() - добавляет в конец списка
#.strip(\n) - убирает в конце сроки \n
#.lstrip(\n) - убирает в начале сроки \n
#Принимает строку лога. Возвращает True, если строка соответствует минимальному формату
#(не пустая и содержит хотя бы 6 элементов, разделенных пробелом — например, IP, -, дата, метод, статус, размер).
#Возвращает False для мусорных строк.
def validate_log_line(line: str) -> bool:
if len(line) < 1 and len(line.split()) < 6:
return False
return True
#.split() - разбивает на элементы
#len() - количество элементов в списке
#Принимает валидную строку лога, извлекает компоненты и возвращает словарь с ключами:
# ip, method, url, status, size. Статус и размер преобразует в int. Если размер равен "-", устанавливает 0.
def parse_log_line(line: str) -> dict:
if validate_log_line(line):
parts = line.split()
ip = parts[0]
method = parts[3]
url = parts[4]
status = int(parts[5])
size = int(parts[6]) if parts[6] == "-" else 0
return {
'ip': ip,
'method': method,
'url': url,
'status': status,
'size': size
}
return {}
#Принимает список словарей-записей и код статуса (например, 404).
#Возвращает новый список, содержащий только записи с указанным статусом.
def filter_status(logs: List[Dict], status_code: int) -> List[Dict]:
result = []
for log in logs:
if log['status'] == status_code:
result.append(log)
return result
# .append() - добавляет элемент в конец списка
#Принимает список записей и число n. Возвращает список из n кортежей (IP, количество запросов),
#отсортированных по убыванию количества запросов. Использует Counter.
def get_top_clients(logs: List[Dict], n: int) -> List[tuple[str, int]]:
# Собираем все IP-адреса в список
ip_list = []
for log in logs:
ip_list.append(log['ip'])
# Используем Counter для подсчета количества запросов от каждого IP
ip_counter = Counter(ip_list)
# Получаем n самых частых IP
result = ip_counter.most_common(n)
return result
# .append() - добавляет элемент в конец списка
# .most_common() - возвращает список самых частых элементов
#Принимает список записей. Возвращает суммарный размер трафика (поле size) для переданного списка.
def calculate_total_traffic(logs: List[Dict]) -> int:
total = 0
for log in logs:
total += log['size']
return total
#Принимает список записей и HTTP-метод (например, "POST"). Возвращает отфильтрованный список.
def filter_by_method(logs: List[Dict], method: str) -> List[Dict]:
result = []
for log in logs:
if log['method'] == method:
result.append(log)
return result
#Принимает список записей. Возвращает множество уникальных URL, к которым были обращения.
def extract_unique_urls(logs: List[Dict]) -> set[str]:
unique_urls = set()
for log in logs:
unique_urls.add(log['url'])
return unique_urls
#Принимает список записей и порог срабатывания. Возвращает множество IP-адресов,
#у которых количество запросов с кодом 4xx или 5xx превышает threshold.
def detect_suspicious_ips(logs: List[Dict], threshold: int) -> set[str]:
# Словарь для подсчета ошибок по IP
error_count = {}
for log in logs:
status = log['status']
# Проверяем, является ли статус ошибкой (4xx или 5xx)
if 400 <= status <= 599:
ip = log['ip']
if ip in error_count:
error_count[ip] += 1
else:
error_count[ip] = 1
# Формируем множество IP, у которых количество ошибок превышает порог
suspicious = set()
for ip, count in error_count.items():
if count > threshold:
suspicious.add(ip)
return suspicious
#.items() - возвращает пары (ключ, значение) из словаря
# .add() - добавляет элемент в множество (set)
#Принимает исходный список записей и параметры. Возвращает итоговый словарь отчета.
def generate_report(logs: List[Dict], top_n: int, error_threshold: int) -> dict:
report = {
'total_requests': len(logs),
'top_clients': get_top_clients(logs, top_n),
'total_traffic': calculate_total_traffic(logs),
'unique_endpoints': extract_unique_urls(logs),
'suspicious_ips': detect_suspicious_ips(logs, error_threshold)
}
return report
#Основная функция
def main():
# Чтение данных
file_path = "server_logs.txt"
raw_logs = load_logs(file_path)
if not raw_logs:
print(f"Файл {file_path} не найден или пуст")
return
print(f"Загружено строк: {len(raw_logs)}")
# Валидация и парсинг
parsed_logs = []
for line in raw_logs:
if validate_log_line(line):
parsed = parse_log_line(line)
if parsed: # Проверяем, что парсинг успешен
parsed_logs.append(parsed)
else:
# Выводим предупреждение в stderr
import sys
print(f"Предупреждение: невалидная строка '{line[:50]}...' пропущена", file=sys.stderr)
if not parsed_logs:
print("Нет валидных записей для обработки")
return
print(f"Валидных записей: {len(parsed_logs)}")
print("-" * 50)
# Генерация отчета
report = generate_report(parsed_logs, top_n=5, error_threshold=10)
# Вывод отчета
print("=== ОТЧЕТ ПО ЛОГАМ ВЕБ-СЕРВЕРА ===\n")
print(f"Общее количество запросов: {report['total_requests']}")
print(f"Суммарный трафик: {report['total_traffic']} байт")
print(f"Уникальных эндпоинтов: {len(report['unique_endpoints'])}")
print("\nТоп-5 клиентов по количеству запросов:")
for ip, count in report['top_clients']:
print(f" {ip}: {count} запросов")
print(f"\nПодозрительные IP (ошибок > 10):")
if report['suspicious_ips']:
for ip in report['suspicious_ips']:
print(f" {ip}")
else:
print(" Не обнаружено")
print("-" * 50)
# Дополнительный анализ: POST-запросы
post_requests = filter_by_method(parsed_logs, "POST")
post_traffic = calculate_total_traffic(post_requests)
print("\n=== АНАЛИЗ POST-ЗАПРОСОВ ===\n")
print(f"Количество POST-запросов: {len(post_requests)}")
print(f"Суммарный трафик POST-запросов: {post_traffic} байт")
# Запуск программы
if __name__ == "__main__":
main()