112 lines
3.9 KiB
Python
112 lines
3.9 KiB
Python
from typing import List, Dict, Optional
|
|
import statistics
|
|
|
|
def load_sensor_data(filepath: str) -> List[str]:
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
return [line.strip() for line in f if line.strip()]
|
|
except FileNotFoundError:
|
|
print(f"Ошибка: файл {filepath} не найден")
|
|
return []
|
|
|
|
def parse_sensor_line(line: str) -> Optional[Dict]:
|
|
parts = line.split(',')
|
|
if len(parts) != 5:
|
|
return None
|
|
try:
|
|
return {
|
|
"timestamp": parts[0].strip(),
|
|
"sensor_id": parts[1].strip(),
|
|
"value": float(parts[2].strip()),
|
|
"unit": parts[3].strip(),
|
|
"status": parts[4].strip()
|
|
}
|
|
except ValueError:
|
|
return None
|
|
|
|
def is_outlier(value: float, threshold: float = 100.0) -> bool:
|
|
return value < 0 or value > threshold
|
|
|
|
def filter_valid_records(records: List[Dict]) -> List[Dict]:
|
|
return [r for r in records
|
|
if r["status"] == "ok" and not is_outlier(r["value"])]
|
|
|
|
def normalize_unit(records: List[Dict], target_unit: str) -> List[Dict]:
|
|
new_records = []
|
|
for r in records:
|
|
r2 = r.copy()
|
|
if r["unit"] == "C" and target_unit == "F":
|
|
r2["value"] = r["value"] * 9/5 + 32
|
|
r2["unit"] = "F"
|
|
elif r["unit"] == "F" and target_unit == "C":
|
|
r2["value"] = (r["value"] - 32) * 5/9
|
|
r2["unit"] = "C"
|
|
else:
|
|
r2["unit"] = target_unit
|
|
new_records.append(r2)
|
|
return new_records
|
|
|
|
def remove_duplicates_by_timestamp(records: List[Dict]) -> List[Dict]:
|
|
seen = set()
|
|
unique = []
|
|
for r in records:
|
|
key = (r["timestamp"], r["sensor_id"])
|
|
if key not in seen:
|
|
seen.add(key)
|
|
unique.append(r)
|
|
return unique
|
|
|
|
def group_by_sensor(records: List[Dict]) -> Dict[str, List[float]]:
|
|
groups = {}
|
|
for r in records:
|
|
sid = r["sensor_id"]
|
|
if sid not in groups:
|
|
groups[sid] = []
|
|
groups[sid].append(r["value"])
|
|
return groups
|
|
|
|
def calculate_sensor_stats(grouped: Dict[str, List[float]]) -> Dict:
|
|
stats = {}
|
|
for sid, values in grouped.items():
|
|
if len(values) > 0:
|
|
stats[sid] = {
|
|
"min": min(values),
|
|
"max": max(values),
|
|
"mean": statistics.mean(values),
|
|
"std": statistics.stdev(values) if len(values) > 1 else 0.0
|
|
}
|
|
return stats
|
|
|
|
def detect_anomalies(records: List[Dict], stats: Dict, std_mult: float = 2.0) -> List[Dict]:
|
|
anomalies = []
|
|
for r in records:
|
|
sid = r["sensor_id"]
|
|
if sid in stats:
|
|
mean = stats[sid]["mean"]
|
|
std = stats[sid]["std"]
|
|
value = r["value"]
|
|
if abs(value - mean) > std_mult * std:
|
|
anomalies.append(r)
|
|
return anomalies
|
|
|
|
def generate_summary_report(original_count: int, filtered_count: int,
|
|
stats: Dict, anomalies: List[Dict]) -> str:
|
|
lines = []
|
|
lines.append("="*50)
|
|
lines.append("ОТЧЁТ ПО АНАЛИЗУ ДАННЫХ ДАТЧИКОВ IoT")
|
|
lines.append("="*50)
|
|
lines.append(f"Исходное количество записей: {original_count}")
|
|
lines.append(f"После фильтрации: {filtered_count}")
|
|
lines.append(f"Удалено: {original_count - filtered_count}")
|
|
lines.append(f"Количество аномалий: {len(anomalies)}")
|
|
lines.append("")
|
|
lines.append("СТАТИСТИКА ПО ДАТЧИКАМ:")
|
|
for sid, s in stats.items():
|
|
lines.append(f" {sid}: min={s['min']:.1f}, max={s['max']:.1f}, mean={s['mean']:.1f}, std={s['std']:.2f}")
|
|
if anomalies:
|
|
lines.append("")
|
|
lines.append("ПРИМЕРЫ АНОМАЛИЙ (первые 3):")
|
|
for a in anomalies[:3]:
|
|
lines.append(f" {a['timestamp']} {a['sensor_id']}: {a['value']} {a['unit']}")
|
|
lines.append("="*50)
|
|
return "\n".join(lines) |