141 lines
4.8 KiB
Python
141 lines
4.8 KiB
Python
def parse_log_line(line: str) -> dict:
|
||
parts = line.split()
|
||
ip = parts[0]
|
||
datetime = parts[3][1:] + " " + parts[4][:-1]
|
||
method = parts[5][1:]
|
||
path = parts[6]
|
||
protocol = parts[7][:-1]
|
||
status = int(parts[8]) if parts[8].isdigit() else 0
|
||
byte_int = int(parts[9]) if parts[9].isdigit() else 0
|
||
referer = parts[10][1:-1]
|
||
user_agent = " ".join(parts[11:])[1:-1]
|
||
return {
|
||
"ip": ip,
|
||
"datetime": datetime,
|
||
"method": method,
|
||
"path": path,
|
||
"protocol": protocol,
|
||
"status": status,
|
||
"bytes": byte_int,
|
||
"referer": referer,
|
||
"user_agent": user_agent,
|
||
}
|
||
|
||
def filter_by_status(logs: list[dict], status_codes: list[int]) -> list[dict]:
|
||
result = []
|
||
for log in logs:
|
||
if log["status"] in status_codes:
|
||
result.append(log)
|
||
return result
|
||
|
||
def count_requests_per_ip(logs: list[dict]) -> dict:
|
||
cnt = {}
|
||
for log in logs:
|
||
ip = log["ip"]
|
||
cnt[ip] = cnt.get(ip, 0) + 1
|
||
return cnt
|
||
|
||
def top_requested_paths(logs: list[dict], top_n: int) -> list[tuple]:
|
||
cnt = {}
|
||
for log in logs:
|
||
path = log["path"]
|
||
cnt[path] = cnt.get(path, 0) + 1
|
||
items = list(cnt.items())
|
||
items.sort(key=lambda x: x[1], reverse=True)
|
||
return items[:top_n]
|
||
|
||
def extract_unique_user_agents(logs: list[dict]) -> set:
|
||
unique = set()
|
||
for log in logs:
|
||
unique.add(log["user_agent"])
|
||
return unique
|
||
|
||
def total_bytes_transferred(logs: list[dict]) -> int:
|
||
total = 0
|
||
for log in logs:
|
||
total += log["bytes"]
|
||
return total
|
||
|
||
def _parse_date(date_str: str) -> tuple:
|
||
date_time_part = date_str.split()[0]
|
||
day_month_year, time_part = date_time_part.split(":")
|
||
day, month, year = day_month_year.split("/")
|
||
hour, minute, second = map(int, time_part.split(":"))
|
||
months = {"Jan":1, "Feb":2, "Mar":3, "Apr":4, "May":5, "Jun":6,
|
||
"Jul":7, "Aug":8, "Sep":9, "Oct":10, "Nov":11, "Dec":12}
|
||
month_clear = months[month]
|
||
return int(year), month_clear, int(day), hour, minute, second
|
||
|
||
def filter_by_date_range(logs: list[dict], start_date: str, end_date: str) -> list[dict]:
|
||
start = _parse_date(start_date)
|
||
end = _parse_date(end_date)
|
||
result = []
|
||
for log in logs:
|
||
log_date = _parse_date(log["datetime"])
|
||
if start <= log_date <= end:
|
||
result.append(log)
|
||
return result
|
||
|
||
def group_by_hour(logs: list[dict]) -> dict:
|
||
hour_counts = {}
|
||
for log in logs:
|
||
time_part = log["datetime"].split()[0]
|
||
hour = int(time_part.split(":")[1])
|
||
hour_counts[hour] = hour_counts.get(hour, 0) + 1
|
||
return hour_counts
|
||
|
||
def find_suspicious_ips(logs: list[dict], threshold: int) -> set:
|
||
ip_counts = count_requests_per_ip(logs)
|
||
suspicious = set()
|
||
for ip, cnt in ip_counts.items():
|
||
if cnt > threshold:
|
||
suspicious.add(ip)
|
||
return suspicious
|
||
|
||
def generate_report(logs: list[dict]) -> str:
|
||
total = len(logs)
|
||
ip_counts = count_requests_per_ip(logs)
|
||
top_ips = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[:5]
|
||
top_paths = top_requested_paths(logs, 5)
|
||
total_bytes = total_bytes_transferred(logs)
|
||
hour = group_by_hour(logs)
|
||
suspicious = find_suspicious_ips(logs, 5)
|
||
error_logs = filter_by_status(logs, [404, 500, 503])
|
||
unique_agents = extract_unique_user_agents(logs)
|
||
|
||
lines = []
|
||
lines.append("=== ОТЧЁТ АНАЛИЗА ЛОГОВ СЕРВЕРА ===")
|
||
lines.append("Всего запросов: " + str(total))
|
||
lines.append("\nТоп-5 IP-адресов по числу запросов:")
|
||
for ip, c in top_ips:
|
||
lines.append(" " + ip + ": " + str(c))
|
||
lines.append("\nТоп-5 запрашиваемых путей:")
|
||
for p, c in top_paths:
|
||
lines.append(" " + p + ": " + str(c))
|
||
lines.append("\nВсего передано байт: " + str(total_bytes))
|
||
lines.append("\nРаспределение по часам (0–23):")
|
||
for h in range(24):
|
||
lines.append(" " + str(h) + ":" + str(hour.get(h, 0)))
|
||
lines.append("\nПодозрительные IP (более 5 запросов):")
|
||
if suspicious:
|
||
for ip in suspicious:
|
||
lines.append(" " + ip)
|
||
else:
|
||
lines.append(" Нет")
|
||
lines.append("\nКоличество записей с ошибками (404/500/503): " + str(len(error_logs)))
|
||
lines.append("\nКоличество уникальных User-Agent: " + str(len(unique_agents)))
|
||
return "\n".join(lines)
|
||
|
||
def main():
|
||
all_logs = []
|
||
with open("data/server_logs.txt", "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line:
|
||
all_logs.append(parse_log_line(line))
|
||
|
||
report = generate_report(all_logs)
|
||
print(report)
|
||
|
||
if __name__ == "__main__":
|
||
main() |