REPO1/main.py

141 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

def parse_log_line(line: str) -> dict:
parts = line.split()
ip = parts[0]
datetime = parts[3][1:] + " " + parts[4][:-1]
method = parts[5][1:]
path = parts[6]
protocol = parts[7][:-1]
status = int(parts[8]) if parts[8].isdigit() else 0
byte_int = int(parts[9]) if parts[9].isdigit() else 0
referer = parts[10][1:-1]
user_agent = " ".join(parts[11:])[1:-1]
return {
"ip": ip,
"datetime": datetime,
"method": method,
"path": path,
"protocol": protocol,
"status": status,
"bytes": byte_int,
"referer": referer,
"user_agent": user_agent,
}
def filter_by_status(logs: list[dict], status_codes: list[int]) -> list[dict]:
result = []
for log in logs:
if log["status"] in status_codes:
result.append(log)
return result
def count_requests_per_ip(logs: list[dict]) -> dict:
cnt = {}
for log in logs:
ip = log["ip"]
cnt[ip] = cnt.get(ip, 0) + 1
return cnt
def top_requested_paths(logs: list[dict], top_n: int) -> list[tuple]:
cnt = {}
for log in logs:
path = log["path"]
cnt[path] = cnt.get(path, 0) + 1
items = list(cnt.items())
items.sort(key=lambda x: x[1], reverse=True)
return items[:top_n]
def extract_unique_user_agents(logs: list[dict]) -> set:
unique = set()
for log in logs:
unique.add(log["user_agent"])
return unique
def total_bytes_transferred(logs: list[dict]) -> int:
total = 0
for log in logs:
total += log["bytes"]
return total
def _parse_date(date_str: str) -> tuple:
date_time_part = date_str.split()[0]
day_month_year, time_part = date_time_part.split(":")
day, month, year = day_month_year.split("/")
hour, minute, second = map(int, time_part.split(":"))
months = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6,
"Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12}
month_clear = months[month]
return int(year), month_clear, int(day), hour, minute, second
def filter_by_date_range(logs: list[dict], start_date: str, end_date: str) -> list[dict]:
start = _parse_date(start_date)
end = _parse_date(end_date)
result = []
for log in logs:
log_date = _parse_date(log["datetime"])
if start <= log_date <= end:
result.append(log)
return result
def group_by_hour(logs: list[dict]) -> dict:
hour_counts = {}
for log in logs:
time_part = log["datetime"].split()[0]
hour = int(time_part.split(":")[1])
hour_counts[hour] = hour_counts.get(hour, 0) + 1
return hour_counts
def find_suspicious_ips(logs: list[dict], threshold: int) -> set:
ip_counts = count_requests_per_ip(logs)
suspicious = set()
for ip, cnt in ip_counts.items():
if cnt > threshold:
suspicious.add(ip)
return suspicious
def generate_report(logs: list[dict]) -> str:
total = len(logs)
ip_counts = count_requests_per_ip(logs)
top_ips = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:5]
top_paths = top_requested_paths(logs, 5)
total_bytes = total_bytes_transferred(logs)
hour = group_by_hour(logs)
suspicious = find_suspicious_ips(logs, 5)
error_logs = filter_by_status(logs, [404, 500, 503])
unique_agents = extract_unique_user_agents(logs)
lines = []
lines.append("=== ОТЧЁТ АНАЛИЗА ЛОГОВ СЕРВЕРА ===")
lines.append("Всего запросов: " + str(total))
lines.append("\nТоп-5 IP-адресов по числу запросов:")
for ip, c in top_ips:
lines.append(" " + ip + ": " + str(c))
lines.append("\nТоп-5 запрашиваемых путей:")
for p, c in top_paths:
lines.append(" " + p + ": " + str(c))
lines.append("\nВсего передано байт: " + str(total_bytes))
lines.append("\nРаспределение по часам (023):")
for h in range(24):
lines.append(" " + str(h) + ":" + str(hour.get(h, 0)))
lines.append("\nПодозрительные IP (более 5 запросов):")
if suspicious:
for ip in suspicious:
lines.append(" " + ip)
else:
lines.append(" Нет")
lines.append("\nКоличество записей с ошибками (404/500/503): " + str(len(error_logs)))
lines.append("\nКоличество уникальных User-Agent: " + str(len(unique_agents)))
return "\n".join(lines)
def main():
all_logs = []
with open("data/server_logs.txt", "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
all_logs.append(parse_log_line(line))
report = generate_report(all_logs)
print(report)
if __name__ == "__main__":
main()