from typing import List, Dict, Optional import statistics def load_sensor_data(filepath: str) -> List[str]: try: with open(filepath, 'r', encoding='utf-8') as f: return [line.strip() for line in f if line.strip()] except FileNotFoundError: print(f"Ошибка: файл {filepath} не найден") return [] def parse_sensor_line(line: str) -> Optional[Dict]: parts = line.split(',') if len(parts) != 5: return None try: return { "timestamp": parts[0].strip(), "sensor_id": parts[1].strip(), "value": float(parts[2].strip()), "unit": parts[3].strip(), "status": parts[4].strip() } except ValueError: return None def is_outlier(value: float, threshold: float = 100.0) -> bool: return value < 0 or value > threshold def filter_valid_records(records: List[Dict]) -> List[Dict]: return [r for r in records if r["status"] == "ok" and not is_outlier(r["value"])] def normalize_unit(records: List[Dict], target_unit: str) -> List[Dict]: new_records = [] for r in records: r2 = r.copy() if r["unit"] == "C" and target_unit == "F": r2["value"] = r["value"] * 9/5 + 32 r2["unit"] = "F" elif r["unit"] == "F" and target_unit == "C": r2["value"] = (r["value"] - 32) * 5/9 r2["unit"] = "C" else: r2["unit"] = target_unit new_records.append(r2) return new_records def remove_duplicates_by_timestamp(records: List[Dict]) -> List[Dict]: seen = set() unique = [] for r in records: key = (r["timestamp"], r["sensor_id"]) if key not in seen: seen.add(key) unique.append(r) return unique def group_by_sensor(records: List[Dict]) -> Dict[str, List[float]]: groups = {} for r in records: sid = r["sensor_id"] if sid not in groups: groups[sid] = [] groups[sid].append(r["value"]) return groups def calculate_sensor_stats(grouped: Dict[str, List[float]]) -> Dict: stats = {} for sid, values in grouped.items(): if len(values) > 0: stats[sid] = { "min": min(values), "max": max(values), "mean": statistics.mean(values), "std": statistics.stdev(values) if len(values) > 1 else 0.0 } return stats def detect_anomalies(records: List[Dict], stats: Dict, std_mult: float = 2.0) -> List[Dict]: anomalies = [] for r in records: sid = r["sensor_id"] if sid in stats: mean = stats[sid]["mean"] std = stats[sid]["std"] value = r["value"] if abs(value - mean) > std_mult * std: anomalies.append(r) return anomalies def generate_summary_report(original_count: int, filtered_count: int, stats: Dict, anomalies: List[Dict]) -> str: lines = [] lines.append("="*50) lines.append("ОТЧЁТ ПО АНАЛИЗУ ДАННЫХ ДАТЧИКОВ IoT") lines.append("="*50) lines.append(f"Исходное количество записей: {original_count}") lines.append(f"После фильтрации: {filtered_count}") lines.append(f"Удалено: {original_count - filtered_count}") lines.append(f"Количество аномалий: {len(anomalies)}") lines.append("") lines.append("СТАТИСТИКА ПО ДАТЧИКАМ:") for sid, s in stats.items(): lines.append(f" {sid}: min={s['min']:.1f}, max={s['max']:.1f}, mean={s['mean']:.1f}, std={s['std']:.2f}") if anomalies: lines.append("") lines.append("ПРИМЕРЫ АНОМАЛИЙ (первые 3):") for a in anomalies[:3]: lines.append(f" {a['timestamp']} {a['sensor_id']}: {a['value']} {a['unit']}") lines.append("="*50) return "\n".join(lines)