From b4fab863485d1b7535c1f4708a9ddfd0a6e024cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A0=D1=83=D1=81=D1=81=D0=BA=D0=B8=D1=85=20=D0=94=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D0=BB?= Date: Fri, 3 Apr 2026 23:55:04 +0300 Subject: [PATCH] feat: implement 10 sensor processing functions --- src/main.py | 45 ++++++++++++++++++++ src/sensors.py | 112 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 src/main.py create mode 100644 src/sensors.py diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..6ba311c --- /dev/null +++ b/src/main.py @@ -0,0 +1,45 @@ +from sensors import ( + load_sensor_data, parse_sensor_line, remove_duplicates_by_timestamp, + filter_valid_records, normalize_unit, group_by_sensor, + calculate_sensor_stats, detect_anomalies, generate_summary_report +) + +def main(): + # 1. Загрузка + raw_lines = load_sensor_data("data/sensors_raw.txt") + + # 2. Парсинг + records = [] + for line in raw_lines: + rec = parse_sensor_line(line) + if rec: + records.append(rec) + + original_count = len(records) + + # 3. Удаление дубликатов + records = remove_duplicates_by_timestamp(records) + + # 4. Фильтрация валидных + records = filter_valid_records(records) + + # 5. Нормализация (все в Цельсии) + records = normalize_unit(records, "C") + + # 6. Группировка и статистика + grouped = group_by_sensor(records) + stats = calculate_sensor_stats(grouped) + + # 7. Аномалии + anomalies = detect_anomalies(records, stats, 2.0) + + # 8. Отчёт + report = generate_summary_report(original_count, len(records), stats, anomalies) + + # 9. Вывод и сохранение + print(report) + with open("data/output.txt", "w", encoding='utf-8') as f: + f.write(report) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/sensors.py b/src/sensors.py new file mode 100644 index 0000000..72accc8 --- /dev/null +++ b/src/sensors.py @@ -0,0 +1,112 @@ +from typing import List, Dict, Optional +import statistics + +def load_sensor_data(filepath: str) -> List[str]: + try: + with open(filepath, 'r', encoding='utf-8') as f: + return [line.strip() for line in f if line.strip()] + except FileNotFoundError: + print(f"Ошибка: файл {filepath} не найден") + return [] + +def parse_sensor_line(line: str) -> Optional[Dict]: + parts = line.split(',') + if len(parts) != 5: + return None + try: + return { + "timestamp": parts[0].strip(), + "sensor_id": parts[1].strip(), + "value": float(parts[2].strip()), + "unit": parts[3].strip(), + "status": parts[4].strip() + } + except ValueError: + return None + +def is_outlier(value: float, threshold: float = 100.0) -> bool: + return value < 0 or value > threshold + +def filter_valid_records(records: List[Dict]) -> List[Dict]: + return [r for r in records + if r["status"] == "ok" and not is_outlier(r["value"])] + +def normalize_unit(records: List[Dict], target_unit: str) -> List[Dict]: + new_records = [] + for r in records: + r2 = r.copy() + if r["unit"] == "C" and target_unit == "F": + r2["value"] = r["value"] * 9/5 + 32 + r2["unit"] = "F" + elif r["unit"] == "F" and target_unit == "C": + r2["value"] = (r["value"] - 32) * 5/9 + r2["unit"] = "C" + else: + r2["unit"] = target_unit + new_records.append(r2) + return new_records + +def remove_duplicates_by_timestamp(records: List[Dict]) -> List[Dict]: + seen = set() + unique = [] + for r in records: + key = (r["timestamp"], r["sensor_id"]) + if key not in seen: + seen.add(key) + unique.append(r) + return unique + +def group_by_sensor(records: List[Dict]) -> Dict[str, List[float]]: + groups = {} + for r in records: + sid = r["sensor_id"] + if sid not in groups: + groups[sid] = [] + groups[sid].append(r["value"]) + return groups + +def calculate_sensor_stats(grouped: Dict[str, List[float]]) -> Dict: + stats = {} + for sid, values in grouped.items(): + if len(values) > 0: + stats[sid] = { + "min": min(values), + "max": max(values), + "mean": statistics.mean(values), + "std": statistics.stdev(values) if len(values) > 1 else 0.0 + } + return stats + +def detect_anomalies(records: List[Dict], stats: Dict, std_mult: float = 2.0) -> List[Dict]: + anomalies = [] + for r in records: + sid = r["sensor_id"] + if sid in stats: + mean = stats[sid]["mean"] + std = stats[sid]["std"] + value = r["value"] + if abs(value - mean) > std_mult * std: + anomalies.append(r) + return anomalies + +def generate_summary_report(original_count: int, filtered_count: int, + stats: Dict, anomalies: List[Dict]) -> str: + lines = [] + lines.append("="*50) + lines.append("ОТЧЁТ ПО АНАЛИЗУ ДАННЫХ ДАТЧИКОВ IoT") + lines.append("="*50) + lines.append(f"Исходное количество записей: {original_count}") + lines.append(f"После фильтрации: {filtered_count}") + lines.append(f"Удалено: {original_count - filtered_count}") + lines.append(f"Количество аномалий: {len(anomalies)}") + lines.append("") + lines.append("СТАТИСТИКА ПО ДАТЧИКАМ:") + for sid, s in stats.items(): + lines.append(f" {sid}: min={s['min']:.1f}, max={s['max']:.1f}, mean={s['mean']:.1f}, std={s['std']:.2f}") + if anomalies: + lines.append("") + lines.append("ПРИМЕРЫ АНОМАЛИЙ (первые 3):") + for a in anomalies[:3]: + lines.append(f" {a['timestamp']} {a['sensor_id']}: {a['value']} {a['unit']}") + lines.append("="*50) + return "\n".join(lines) \ No newline at end of file