praktika1/src/sensors.py
2026-04-03 23:55:04 +03:00

112 lines
3.9 KiB
Python

from typing import List, Dict, Optional
import statistics
def load_sensor_data(filepath: str) -> List[str]:
try:
with open(filepath, 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"Ошибка: файл {filepath} не найден")
return []
def parse_sensor_line(line: str) -> Optional[Dict]:
parts = line.split(',')
if len(parts) != 5:
return None
try:
return {
"timestamp": parts[0].strip(),
"sensor_id": parts[1].strip(),
"value": float(parts[2].strip()),
"unit": parts[3].strip(),
"status": parts[4].strip()
}
except ValueError:
return None
def is_outlier(value: float, threshold: float = 100.0) -> bool:
return value < 0 or value > threshold
def filter_valid_records(records: List[Dict]) -> List[Dict]:
return [r for r in records
if r["status"] == "ok" and not is_outlier(r["value"])]
def normalize_unit(records: List[Dict], target_unit: str) -> List[Dict]:
new_records = []
for r in records:
r2 = r.copy()
if r["unit"] == "C" and target_unit == "F":
r2["value"] = r["value"] * 9/5 + 32
r2["unit"] = "F"
elif r["unit"] == "F" and target_unit == "C":
r2["value"] = (r["value"] - 32) * 5/9
r2["unit"] = "C"
else:
r2["unit"] = target_unit
new_records.append(r2)
return new_records
def remove_duplicates_by_timestamp(records: List[Dict]) -> List[Dict]:
seen = set()
unique = []
for r in records:
key = (r["timestamp"], r["sensor_id"])
if key not in seen:
seen.add(key)
unique.append(r)
return unique
def group_by_sensor(records: List[Dict]) -> Dict[str, List[float]]:
groups = {}
for r in records:
sid = r["sensor_id"]
if sid not in groups:
groups[sid] = []
groups[sid].append(r["value"])
return groups
def calculate_sensor_stats(grouped: Dict[str, List[float]]) -> Dict:
stats = {}
for sid, values in grouped.items():
if len(values) > 0:
stats[sid] = {
"min": min(values),
"max": max(values),
"mean": statistics.mean(values),
"std": statistics.stdev(values) if len(values) > 1 else 0.0
}
return stats
def detect_anomalies(records: List[Dict], stats: Dict, std_mult: float = 2.0) -> List[Dict]:
anomalies = []
for r in records:
sid = r["sensor_id"]
if sid in stats:
mean = stats[sid]["mean"]
std = stats[sid]["std"]
value = r["value"]
if abs(value - mean) > std_mult * std:
anomalies.append(r)
return anomalies
def generate_summary_report(original_count: int, filtered_count: int,
stats: Dict, anomalies: List[Dict]) -> str:
lines = []
lines.append("="*50)
lines.append("ОТЧЁТ ПО АНАЛИЗУ ДАННЫХ ДАТЧИКОВ IoT")
lines.append("="*50)
lines.append(f"Исходное количество записей: {original_count}")
lines.append(f"После фильтрации: {filtered_count}")
lines.append(f"Удалено: {original_count - filtered_count}")
lines.append(f"Количество аномалий: {len(anomalies)}")
lines.append("")
lines.append("СТАТИСТИКА ПО ДАТЧИКАМ:")
for sid, s in stats.items():
lines.append(f" {sid}: min={s['min']:.1f}, max={s['max']:.1f}, mean={s['mean']:.1f}, std={s['std']:.2f}")
if anomalies:
lines.append("")
lines.append("ПРИМЕРЫ АНОМАЛИЙ (первые 3):")
for a in anomalies[:3]:
lines.append(f" {a['timestamp']} {a['sensor_id']}: {a['value']} {a['unit']}")
lines.append("="*50)
return "\n".join(lines)