commit 3203b4a20c033d70a51c70095df5762301db46b1 Author: Кирилл Гонин Date: Mon Mar 30 18:26:15 2026 +0000 Загрузить файлы в «/» diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..05de0db --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.venv/ +.idea/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..c31d612 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +# SOC Assistant - Система анализа журнала событий информационной безопасности + +SOC Assistant (Security Operations Center Assistant) - это набор утилит для обработки и анализа журналов событий (логов) системы обнаружения вторжений. Система позволяет загружать сырые данные из текстового файла, очищать их, обогащать, агрегировать и формировать аналитическую сводку для выявления потенциальных угроз безопасности. + diff --git a/import os.py b/import os.py new file mode 100644 index 0000000..464dc87 --- /dev/null +++ b/import os.py @@ -0,0 +1,246 @@ +import os + + +def load_logs(filepath): + """Загружает логи из файла""" + try: + with open(filepath, 'r', encoding='utf-8') as f: + lines = [] + for line in f: + if line.strip(): + lines.append(line.strip()) + return lines + except: + print("Ошибка при загрузке файла") + return [] + + +def parse_line(line): + """Разбирает одну строку лога""" + try: + # Ищем закрывающую скобку + bracket_pos = line.find(']') + if bracket_pos == -1: + return None + + timestamp = line[1:bracket_pos] # Убираем первую скобку + rest = line[bracket_pos + 2:] # Пропускаем '] ' + + # Делим на тип и остальное + parts = rest.split(' ', 1) + event_type = parts[0] + + details = {} + if len(parts) > 1: + params = parts[1].split() + for param in params: + if '=' in param: + key, value = param.split('=', 1) + details[key] = value + + return {"timestamp": timestamp, "event_type": event_type, "details": details} + except: + return None + + +def get_ips(logs): + """Собирает все IP из логов""" + ips = [] + for log in logs: + if 'IP' in log.get('details', {}): + ip = log['details']['IP'] + if ip not in ips: + ips.append(ip) + return ips + + +def count_events(logs): + """Считает сколько каких событий""" + counts = {} + for log in logs: + event = log['event_type'] + if event in counts: + counts[event] += 1 + else: + counts[event] = 1 + return counts + + +def find_bad_ips(logs, threshold): + """Ищет IP с большим числом неудачных входов""" + failed = [] + for log in logs: + if log['event_type'] == 'FAILED_LOGIN' and 'IP' in log['details']: + failed.append(log['details']['IP']) + + ip_counts = {} + for ip in failed: + if ip in ip_counts: + ip_counts[ip] += 1 + else: + ip_counts[ip] = 1 + + bad_ips = [] + for ip, count in ip_counts.items(): + if count > threshold: + bad_ips.append(ip) + + return bad_ips + + +def add_severity(event): + """Добавляет уровень важности""" + new_event = event.copy() + if event['event_type'] == 'FAILED_LOGIN': + new_event['severity'] = 'HIGH' + elif event['event_type'] == 'FILE_DELETE': + new_event['severity'] = 'CRITICAL' + else: + new_event['severity'] = 'LOW' + return new_event + + +def make_report(stats, bad_ips, total): + """Создает текстовый отчет""" + report = [] + report.append("=" * 50) + report.append("ОТЧЕТ ПО ЛОГАМ") + report.append("=" * 50) + report.append("") + + report.append("Статистика по событиям:") + report.append("-" * 30) + # Сортируем события по количеству + sorted_events = sorted(stats.items(), key=lambda x: x[1], reverse=True) + for i, (event, count) in enumerate(sorted_events[:3], 1): + report.append(f"{i}. {event}: {count}") + + report.append("") + report.append("Подозрительные IP:") + report.append("-" * 30) + if bad_ips: + for ip in bad_ips: + report.append(f"• {ip}") + else: + report.append("Нет подозрительных IP") + + report.append("") + report.append(f"Всего событий: {total}") + report.append("") + report.append("=" * 50) + + return "\n".join(report) + + +def save_report(text, path): + """Сохраняет отчет в файл""" + try: + # Создаем папку если нужно + folder = path[:path.rfind('/')] + if folder: + os.makedirs(folder, exist_ok=True) + + with open(path, 'w', encoding='utf-8') as f: + f.write(text) + return True + except: + print("Не удалось сохранить отчет") + return False + + +def main(): + print("Начинаем анализ логов...") + print("-" * 40) + + # Загружаем логи + lines = load_logs("data/security.log") + if not lines: + print("Нет данных для анализа") + return + + print(f"Загружено строк: {len(lines)}") + + # Парсим логи + logs = [] + bad_lines = 0 + + for line in lines: + parsed = parse_line(line) + if parsed: + logs.append(parsed) + else: + bad_lines += 1 + + if bad_lines: + print(f"Не удалось разобрать {bad_lines} строк") + else: + print(f"Разобрано событий: {len(logs)}") + + if not logs: + print("Нет событий для анализа") + return + + # Считаем неудачные входы + failed = 0 + for log in logs: + if log['event_type'] == 'FAILED_LOGIN': + failed += 1 + print(f"Неудачных попыток входа: {failed}") + + # Собираем статистику + stats = count_events(logs) + print("\nТипы событий:") + for event, count in stats.items(): + print(f" {event}: {count}") + + # Ищем подозрительные IP + bad_ips = find_bad_ips(logs, 3) + print(f"\nНайдено IP с брутфорсом: {len(bad_ips)}") + + # Показываем пример обогащения + print("\nПример добавления уровня критичности:") + for i in range(min(3, len(logs))): + enriched = add_severity(logs[i]) + print(f" {logs[i]['event_type']} -> {enriched['severity']}") + + # Создаем отчет + report = make_report(stats, bad_ips, len(logs)) + + # Сохраняем + if save_report(report, "reports/security_summary.txt"): + print("\nОтчет сохранен в reports/security_summary.txt") + else: + print("\nОшибка при сохранении отчета") + + print("\nГотово!") + + +# Создаем тестовые данные +def create_test_file(): + os.makedirs("data", exist_ok=True) + + test_data = [ + "[2023-10-01 12:05:45] FAILED_LOGIN User=admin IP=192.168.1.10", + "[2023-10-01 12:06:12] FAILED_LOGIN User=admin IP=192.168.1.10", + "[2023-10-01 12:06:45] FAILED_LOGIN User=admin IP=192.168.1.10", + "[2023-10-01 12:07:23] FAILED_LOGIN User=admin IP=192.168.1.10", + "[2023-10-01 12:08:01] SUCCESS User=admin IP=192.168.1.10", + "[2023-10-01 12:10:15] FAILED_LOGIN User=root IP=10.0.0.5", + "[2023-10-01 12:11:30] FAILED_LOGIN User=root IP=10.0.0.5", + "[2023-10-01 12:12:45] FAILED_LOGIN User=root IP=10.0.0.5", + "[2023-10-01 12:14:00] FAILED_LOGIN User=root IP=10.0.0.5", + "[2023-10-01 12:15:20] FAILED_LOGIN User=root IP=10.0.0.5", + "[2023-10-01 12:16:45] FILE_DELETE User=backup File=/etc/passwd IP=192.168.1.20", + "[2023-10-01 12:20:10] INFO Service=web Status=running", + ] + + with open("data/security.log", 'w', encoding='utf-8') as f: + for line in test_data: + f.write(line + "\n") + + print("Создан тестовый файл data/security.log") + + +if __name__ == "__main__": + create_test_file() + main() \ No newline at end of file