SOCAssistant/import os.py

246 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
def load_logs(filepath):
"""Загружает логи из файла"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
lines = []
for line in f:
if line.strip():
lines.append(line.strip())
return lines
except:
print("Ошибка при загрузке файла")
return []
def parse_line(line):
"""Разбирает одну строку лога"""
try:
# Ищем закрывающую скобку
bracket_pos = line.find(']')
if bracket_pos == -1:
return None
timestamp = line[1:bracket_pos] # Убираем первую скобку
rest = line[bracket_pos + 2:] # Пропускаем '] '
# Делим на тип и остальное
parts = rest.split(' ', 1)
event_type = parts[0]
details = {}
if len(parts) > 1:
params = parts[1].split()
for param in params:
if '=' in param:
key, value = param.split('=', 1)
details[key] = value
return {"timestamp": timestamp, "event_type": event_type, "details": details}
except:
return None
def get_ips(logs):
"""Собирает все IP из логов"""
ips = []
for log in logs:
if 'IP' in log.get('details', {}):
ip = log['details']['IP']
if ip not in ips:
ips.append(ip)
return ips
def count_events(logs):
"""Считает сколько каких событий"""
counts = {}
for log in logs:
event = log['event_type']
if event in counts:
counts[event] += 1
else:
counts[event] = 1
return counts
def find_bad_ips(logs, threshold):
"""Ищет IP с большим числом неудачных входов"""
failed = []
for log in logs:
if log['event_type'] == 'FAILED_LOGIN' and 'IP' in log['details']:
failed.append(log['details']['IP'])
ip_counts = {}
for ip in failed:
if ip in ip_counts:
ip_counts[ip] += 1
else:
ip_counts[ip] = 1
bad_ips = []
for ip, count in ip_counts.items():
if count > threshold:
bad_ips.append(ip)
return bad_ips
def add_severity(event):
"""Добавляет уровень важности"""
new_event = event.copy()
if event['event_type'] == 'FAILED_LOGIN':
new_event['severity'] = 'HIGH'
elif event['event_type'] == 'FILE_DELETE':
new_event['severity'] = 'CRITICAL'
else:
new_event['severity'] = 'LOW'
return new_event
def make_report(stats, bad_ips, total):
"""Создает текстовый отчет"""
report = []
report.append("=" * 50)
report.append("ОТЧЕТ ПО ЛОГАМ")
report.append("=" * 50)
report.append("")
report.append("Статистика по событиям:")
report.append("-" * 30)
# Сортируем события по количеству
sorted_events = sorted(stats.items(), key=lambda x: x[1], reverse=True)
for i, (event, count) in enumerate(sorted_events[:3], 1):
report.append(f"{i}. {event}: {count}")
report.append("")
report.append("Подозрительные IP:")
report.append("-" * 30)
if bad_ips:
for ip in bad_ips:
report.append(f"{ip}")
else:
report.append("Нет подозрительных IP")
report.append("")
report.append(f"Всего событий: {total}")
report.append("")
report.append("=" * 50)
return "\n".join(report)
def save_report(text, path):
"""Сохраняет отчет в файл"""
try:
# Создаем папку если нужно
folder = path[:path.rfind('/')]
if folder:
os.makedirs(folder, exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
f.write(text)
return True
except:
print("Не удалось сохранить отчет")
return False
def main():
print("Начинаем анализ логов...")
print("-" * 40)
# Загружаем логи
lines = load_logs("data/security.log")
if not lines:
print("Нет данных для анализа")
return
print(f"Загружено строк: {len(lines)}")
# Парсим логи
logs = []
bad_lines = 0
for line in lines:
parsed = parse_line(line)
if parsed:
logs.append(parsed)
else:
bad_lines += 1
if bad_lines:
print(f"Не удалось разобрать {bad_lines} строк")
else:
print(f"Разобрано событий: {len(logs)}")
if not logs:
print("Нет событий для анализа")
return
# Считаем неудачные входы
failed = 0
for log in logs:
if log['event_type'] == 'FAILED_LOGIN':
failed += 1
print(f"Неудачных попыток входа: {failed}")
# Собираем статистику
stats = count_events(logs)
print("\nТипы событий:")
for event, count in stats.items():
print(f" {event}: {count}")
# Ищем подозрительные IP
bad_ips = find_bad_ips(logs, 3)
print(f"\nНайдено IP с брутфорсом: {len(bad_ips)}")
# Показываем пример обогащения
print("\nПример добавления уровня критичности:")
for i in range(min(3, len(logs))):
enriched = add_severity(logs[i])
print(f" {logs[i]['event_type']} -> {enriched['severity']}")
# Создаем отчет
report = make_report(stats, bad_ips, len(logs))
# Сохраняем
if save_report(report, "reports/security_summary.txt"):
print("\nОтчет сохранен в reports/security_summary.txt")
else:
print("\nОшибка при сохранении отчета")
print("\nГотово!")
# Создаем тестовые данные
def create_test_file():
os.makedirs("data", exist_ok=True)
test_data = [
"[2023-10-01 12:05:45] FAILED_LOGIN User=admin IP=192.168.1.10",
"[2023-10-01 12:06:12] FAILED_LOGIN User=admin IP=192.168.1.10",
"[2023-10-01 12:06:45] FAILED_LOGIN User=admin IP=192.168.1.10",
"[2023-10-01 12:07:23] FAILED_LOGIN User=admin IP=192.168.1.10",
"[2023-10-01 12:08:01] SUCCESS User=admin IP=192.168.1.10",
"[2023-10-01 12:10:15] FAILED_LOGIN User=root IP=10.0.0.5",
"[2023-10-01 12:11:30] FAILED_LOGIN User=root IP=10.0.0.5",
"[2023-10-01 12:12:45] FAILED_LOGIN User=root IP=10.0.0.5",
"[2023-10-01 12:14:00] FAILED_LOGIN User=root IP=10.0.0.5",
"[2023-10-01 12:15:20] FAILED_LOGIN User=root IP=10.0.0.5",
"[2023-10-01 12:16:45] FILE_DELETE User=backup File=/etc/passwd IP=192.168.1.20",
"[2023-10-01 12:20:10] INFO Service=web Status=running",
]
with open("data/security.log", 'w', encoding='utf-8') as f:
for line in test_data:
f.write(line + "\n")
print("Создан тестовый файл data/security.log")
if __name__ == "__main__":
create_test_file()
main()