def parse_log_line(line: str) -> dict: parts = line.split() ip = parts[0] datetime = parts[3][1:] + " " + parts[4][:-1] method = parts[5][1:] path = parts[6] protocol = parts[7][:-1] status = int(parts[8]) if parts[8].isdigit() else 0 byte_int = int(parts[9]) if parts[9].isdigit() else 0 referer = parts[10][1:-1] user_agent = " ".join(parts[11:])[1:-1] return { "ip": ip, "datetime": datetime, "method": method, "path": path, "protocol": protocol, "status": status, "bytes": byte_int, "referer": referer, "user_agent": user_agent, } def filter_by_status(logs: list[dict], status_codes: list[int]) -> list[dict]: result = [] for log in logs: if log["status"] in status_codes: result.append(log) return result def count_requests_per_ip(logs: list[dict]) -> dict: cnt = {} for log in logs: ip = log["ip"] cnt[ip] = cnt.get(ip, 0) + 1 return cnt def top_requested_paths(logs: list[dict], top_n: int) -> list[tuple]: cnt = {} for log in logs: path = log["path"] cnt[path] = cnt.get(path, 0) + 1 items = list(cnt.items()) items.sort(key=lambda x: x[1], reverse=True) return items[:top_n] def extract_unique_user_agents(logs: list[dict]) -> set: unique = set() for log in logs: unique.add(log["user_agent"]) return unique def total_bytes_transferred(logs: list[dict]) -> int: total = 0 for log in logs: total += log["bytes"] return total def _parse_date(date_str: str) -> tuple: date_time_part = date_str.split()[0] day_month_year, time_part = date_time_part.split(":") day, month, year = day_month_year.split("/") hour, minute, second = map(int, time_part.split(":")) months = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6, "Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12} month_clear = months[month] return int(year), month_clear, int(day), hour, minute, second def filter_by_date_range(logs: list[dict], start_date: str, end_date: str) -> list[dict]: start = _parse_date(start_date) end = _parse_date(end_date) result = [] for log in logs: log_date = _parse_date(log["datetime"]) if start <= log_date <= end: result.append(log) return result def group_by_hour(logs: list[dict]) -> dict: hour_counts = {} for log in logs: time_part = log["datetime"].split()[0] hour = int(time_part.split(":")[1]) hour_counts[hour] = hour_counts.get(hour, 0) + 1 return hour_counts def find_suspicious_ips(logs: list[dict], threshold: int) -> set: ip_counts = count_requests_per_ip(logs) suspicious = set() for ip, cnt in ip_counts.items(): if cnt > threshold: suspicious.add(ip) return suspicious def generate_report(logs: list[dict]) -> str: total = len(logs) ip_counts = count_requests_per_ip(logs) top_ips = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:5] top_paths = top_requested_paths(logs, 5) total_bytes = total_bytes_transferred(logs) hour = group_by_hour(logs) suspicious = find_suspicious_ips(logs, 5) error_logs = filter_by_status(logs, [404, 500, 503]) unique_agents = extract_unique_user_agents(logs) lines = [] lines.append("=== ОТЧЁТ АНАЛИЗА ЛОГОВ СЕРВЕРА ===") lines.append("Всего запросов: " + str(total)) lines.append("\nТоп-5 IP-адресов по числу запросов:") for ip, c in top_ips: lines.append(" " + ip + ": " + str(c)) lines.append("\nТоп-5 запрашиваемых путей:") for p, c in top_paths: lines.append(" " + p + ": " + str(c)) lines.append("\nВсего передано байт: " + str(total_bytes)) lines.append("\nРаспределение по часам (0–23):") for h in range(24): lines.append(" " + str(h) + ":" + str(hour.get(h, 0))) lines.append("\nПодозрительные IP (более 5 запросов):") if suspicious: for ip in suspicious: lines.append(" " + ip) else: lines.append(" Нет") lines.append("\nКоличество записей с ошибками (404/500/503): " + str(len(error_logs))) lines.append("\nКоличество уникальных User-Agent: " + str(len(unique_agents))) return "\n".join(lines) def main(): all_logs = [] with open("data/server_logs.txt", "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: all_logs.append(parse_log_line(line)) report = generate_report(all_logs) print(report) if __name__ == "__main__": main()