Обновить TDOA.py

This commit is contained in:
Артемий Муравьев 2025-06-29 07:11:47 +00:00
parent 7f65cd02e4
commit 3d0aa16918

View File

@ -1,354 +1,354 @@
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation from matplotlib.animation import FuncAnimation
import matplotlib.patches as patches import matplotlib.patches as patches
import librosa import librosa
import random import random
import pandas as pd import pandas as pd
import threading import threading
import time import time
from dataclasses import dataclass from dataclasses import dataclass
# Константы # Константы
SOUND_SPEED = 343.2 # скорость звука (м/с) SOUND_SPEED = 343.2 # скорость звука (м/с)
MIC_DISTANCE = 0.06 # расстояние между микрофонами (м) MIC_DISTANCE = 0.06 # расстояние между микрофонами (м)
ROOM_WIDTH = 3.0 # ширина комнаты (м) ROOM_WIDTH = 3.0 # ширина комнаты (м)
ROOM_HEIGHT = 2.0 # высота комнаты (м) ROOM_HEIGHT = 2.0 # высота комнаты (м)
SAMPLE_RATE = 48000 # частота дискретизации (Гц) SAMPLE_RATE = 48000 # частота дискретизации (Гц)
CHUNK = 16384 # размер буфера CHUNK = 16384 # размер буфера
RMS_THRESHOLD = 0.1 # порог RMS для определения звука RMS_THRESHOLD = 0.1 # порог RMS для определения звука
SILENCE_TIMEOUT = 0.5 # время в секундах для сохранения последнего угла SILENCE_TIMEOUT = 0.5 # время в секундах для сохранения последнего угла
MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с) MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с)
@dataclass @dataclass
class Microphone: class Microphone:
"""Класс для хранения информации о микрофоне""" """Класс для хранения информации о микрофоне"""
x: float x: float
y: float y: float
@dataclass @dataclass
class SoundSource: class SoundSource:
"""Класс для хранения информации об источнике звука""" """Класс для хранения информации об источнике звука"""
x: float x: float
y: float y: float
class DirectionFinder: class DirectionFinder:
def __init__(self, mic_distance: float, audio_file: str): def __init__(self, mic_distance: float, audio_file: str):
"""Инициализация определителя направления""" """Инициализация определителя направления"""
self.mic_distance = mic_distance self.mic_distance = mic_distance
self.mic1 = Microphone(x=-mic_distance / 2, y=0.0) self.mic1 = Microphone(x=-mic_distance / 2, y=0.0)
self.mic2 = Microphone(x=mic_distance / 2, y=0.0) self.mic2 = Microphone(x=mic_distance / 2, y=0.0)
self.sound_source = self._generate_sequential_sound_source() self.sound_source = self._generate_sequential_sound_source()
self.running = True self.running = True
self.current_angle = 0.0 self.current_angle = 0.0
self.sound_detected = False self.sound_detected = False
self.last_sound_time = 0 self.last_sound_time = 0
self.last_detected_angle = None self.last_detected_angle = None
self.show_arrow = False self.show_arrow = False
self.rms_left = 0.0 self.rms_left = 0.0
self.rms_right = 0.0 self.rms_right = 0.0
self.audio_data, self.sample_rate = self.load_audio(audio_file) self.audio_data, self.sample_rate = self.load_audio(audio_file)
self.audio_index = 0 self.audio_index = 0
self.noise_level = 0.001 # Уровень шума self.noise_level = 0.001 # Уровень шума
self.results = [] self.results = []
self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)] self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)]
self.last_move_time = time.time() self.last_move_time = time.time()
self.max_physical_delay = self.mic_distance / SOUND_SPEED # Максимальная физическая задержка self.max_physical_delay = self.mic_distance / SOUND_SPEED # Максимальная физическая задержка
def _generate_sequential_sound_source(self) -> SoundSource: def _generate_sequential_sound_source(self) -> SoundSource:
"""Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов с шагом 20 градусов""" """Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов с шагом 20 градусов"""
if not hasattr(self, 'angle_ranges'): if not hasattr(self, 'angle_ranges'):
self.angle_ranges = list(range(-90, 91, 20)) self.angle_ranges = list(range(-90, 91, 20))
self.current_range_idx = 0 self.current_range_idx = 0
self.angle_count = 0 self.angle_count = 0
start_angle = self.angle_ranges[self.current_range_idx] start_angle = self.angle_ranges[self.current_range_idx]
end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90 end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90
angle_deg = random.uniform(start_angle, end_angle) angle_deg = random.uniform(start_angle, end_angle)
angle_rad = np.radians(angle_deg) angle_rad = np.radians(angle_deg)
distance = 1.5 # расстояние distance = 1.5 # расстояние
source_x = distance * np.sin(angle_rad) source_x = distance * np.sin(angle_rad)
source_y = distance * np.cos(angle_rad) source_y = distance * np.cos(angle_rad)
if abs(source_x) > ROOM_WIDTH / 2: if abs(source_x) > ROOM_WIDTH / 2:
scale = (ROOM_WIDTH / 2) / abs(source_x) scale = (ROOM_WIDTH / 2) / abs(source_x)
source_x *= scale source_x *= scale
source_y *= scale source_y *= scale
if source_y > ROOM_HEIGHT: if source_y > ROOM_HEIGHT:
scale = ROOM_HEIGHT / source_y scale = ROOM_HEIGHT / source_y
source_x *= scale source_x *= scale
source_y *= scale source_y *= scale
if source_y < 0: if source_y < 0:
source_y = 0.0 source_y = 0.0
source_x = 0.0 source_x = 0.0
print( print(
f"Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}, расстояние={distance:.2f}, повторение {self.angle_count + 1}/10") f"Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}, расстояние={distance:.2f}, повторение {self.angle_count + 1}/10")
self.angle_count += 1 self.angle_count += 1
if self.angle_count >= 10: # кол-во проходов в диапозоне if self.angle_count >= 10: # кол-во проходов в диапозоне
self.angle_count = 0 self.angle_count = 0
self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges) self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges)
return SoundSource(x=source_x, y=source_y) return SoundSource(x=source_x, y=source_y)
def load_audio(self, filename: str) -> tuple: def load_audio(self, filename: str) -> tuple:
"""Загрузка аудиофайла""" """Загрузка аудиофайла"""
try: try:
audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True) audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True)
return audio_data, sample_rate return audio_data, sample_rate
except Exception as e: except Exception as e:
raise ValueError(f"Ошибка загрузки аудиофайла: {e}") raise ValueError(f"Ошибка загрузки аудиофайла: {e}")
def calculate_distances(self) -> tuple: def calculate_distances(self) -> tuple:
"""Расчет расстояний от источника звука до микрофонов""" """Расчет расстояний от источника звука до микрофонов"""
l1 = np.sqrt((self.sound_source.x - self.mic1.x) ** 2 + l1 = np.sqrt((self.sound_source.x - self.mic1.x) ** 2 +
(self.sound_source.y - self.mic1.y) ** 2) (self.sound_source.y - self.mic1.y) ** 2)
l2 = np.sqrt((self.sound_source.x - self.mic2.x) ** 2 + l2 = np.sqrt((self.sound_source.x - self.mic2.x) ** 2 +
(self.sound_source.y - self.mic2.y) ** 2) (self.sound_source.y - self.mic2.y) ** 2)
return l1, l2 return l1, l2
def process_signals_with_delay(self, signal: np.ndarray) -> tuple: def process_signals_with_delay(self, signal: np.ndarray) -> tuple:
"""Обработка сигналов с учетом временного сдвига и шума""" """Обработка сигналов с учетом временного сдвига и шума"""
l1, l2 = self.calculate_distances() l1, l2 = self.calculate_distances()
t1 = l1 / SOUND_SPEED t1 = l1 / SOUND_SPEED
t2 = l2 / SOUND_SPEED t2 = l2 / SOUND_SPEED
log = int(self.sample_rate * (t2 - t1)) log = int(self.sample_rate * (t2 - t1))
if log >= 0: if log >= 0:
S1 = signal[abs(log):] if log != 0 else signal.copy() S1 = signal[abs(log):] if log != 0 else signal.copy()
S2 = signal[:-abs(log)] if log != 0 else signal.copy() S2 = signal[:-abs(log)] if log != 0 else signal.copy()
else: else:
S1 = signal[:-abs(log)] if log != 0 else signal.copy() S1 = signal[:-abs(log)] if log != 0 else signal.copy()
S2 = signal[abs(log):] S2 = signal[abs(log):]
min_length = min(len(S1), len(S2)) min_length = min(len(S1), len(S2))
S1 = S1[:min_length] S1 = S1[:min_length]
S2 = S2[:min_length] S2 = S2[:min_length]
noise1 = np.random.normal(0, self.noise_level, size=S1.shape) noise1 = np.random.normal(0, self.noise_level, size=S1.shape)
noise2 = np.random.normal(0, self.noise_level, size=S2.shape) noise2 = np.random.normal(0, self.noise_level, size=S2.shape)
S1 = S1 + noise1 S1 = S1 + noise1
S2 = S2 + noise2 S2 = S2 + noise2
return S1, S2, t1, t2 return S1, S2, t1, t2
def capture_audio(self): def capture_audio(self):
"""Эмуляция захвата аудиоданных из файла""" """Эмуляция захвата аудиоданных из файла"""
if self.audio_index + CHUNK >= len(self.audio_data): if self.audio_index + CHUNK >= len(self.audio_data):
self.audio_index = 0 self.audio_index = 0
chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK] chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK]
self.audio_index += CHUNK self.audio_index += CHUNK
signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk) signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk)
return signal1, signal2, t1, t2 return signal1, signal2, t1, t2
def calculate_time_delay_fft(self, signal1: np.ndarray, signal2: np.ndarray) -> float: def calculate_time_delay_fft(self, signal1: np.ndarray, signal2: np.ndarray) -> float:
"""Расчет временной задержки через FFT с параболической интерполяцией""" """Расчет временной задержки через FFT с параболической интерполяцией"""
signal1 = (signal1 - np.mean(signal1)) / (np.std(signal1) + 1e-10) signal1 = (signal1 - np.mean(signal1)) / (np.std(signal1) + 1e-10)
signal2 = (signal2 - np.mean(signal2)) / (np.std(signal2) + 1e-10) signal2 = (signal2 - np.mean(signal2)) / (np.std(signal2) + 1e-10)
fft_signal1 = np.fft.rfft(signal1) fft_signal1 = np.fft.rfft(signal1)
fft_signal2 = np.fft.rfft(signal2) fft_signal2 = np.fft.rfft(signal2)
cross_spectrum = fft_signal1 * np.conj(fft_signal2) cross_spectrum = fft_signal1 * np.conj(fft_signal2)
cross_spectrum = cross_spectrum / (np.abs(cross_spectrum) + 1e-10) cross_spectrum = cross_spectrum / (np.abs(cross_spectrum) + 1e-10)
correlation = np.fft.irfft(cross_spectrum) correlation = np.fft.irfft(cross_spectrum)
correlation = np.roll(correlation, len(correlation) // 2) correlation = np.roll(correlation, len(correlation) // 2)
max_delay_samples = int(self.max_physical_delay * self.sample_rate) max_delay_samples = int(self.max_physical_delay * self.sample_rate)
middle_point = len(correlation) // 2 middle_point = len(correlation) // 2
start_idx = middle_point - max_delay_samples start_idx = middle_point - max_delay_samples
end_idx = middle_point + max_delay_samples end_idx = middle_point + max_delay_samples
max_correlation_idx = start_idx + np.argmax(correlation[start_idx:end_idx]) max_correlation_idx = start_idx + np.argmax(correlation[start_idx:end_idx])
peak_value = correlation[max_correlation_idx] peak_value = correlation[max_correlation_idx]
if peak_value < 0.1: # Игнорирование RMS до... if peak_value < 0.1: # Игнорирование RMS до...
return 0.0 return 0.0
# Параболическая интерполяция # Параболическая интерполяция
if max_correlation_idx > start_idx and max_correlation_idx < end_idx - 1: if max_correlation_idx > start_idx and max_correlation_idx < end_idx - 1:
y0 = correlation[max_correlation_idx - 1] y0 = correlation[max_correlation_idx - 1]
y1 = correlation[max_correlation_idx] y1 = correlation[max_correlation_idx]
y2 = correlation[max_correlation_idx + 1] y2 = correlation[max_correlation_idx + 1]
denom = 2 * (y0 - 2 * y1 + y2) denom = 2 * (y0 - 2 * y1 + y2)
if denom != 0: if denom != 0:
delta = (y0 - y2) / denom delta = (y0 - y2) / denom
max_correlation_idx += delta max_correlation_idx += delta
delay_samples = max_correlation_idx - middle_point delay_samples = max_correlation_idx - middle_point
time_delay = delay_samples / self.sample_rate time_delay = delay_samples / self.sample_rate
time_delay = np.clip(time_delay, -self.max_physical_delay, self.max_physical_delay) time_delay = np.clip(time_delay, -self.max_physical_delay, self.max_physical_delay)
return time_delay return time_delay
def calculate_direction(self, time_delay: float) -> float: def calculate_direction(self, time_delay: float) -> float:
"""Расчет угла направления""" """Расчет угла направления"""
sin_theta = (SOUND_SPEED * time_delay) / self.mic_distance sin_theta = (SOUND_SPEED * time_delay) / self.mic_distance
sin_theta = np.clip(sin_theta, -1, 1) sin_theta = np.clip(sin_theta, -1, 1)
angle = np.arcsin(sin_theta) * 180 / np.pi angle = np.arcsin(sin_theta) * 180 / np.pi
return angle return angle
def calculate_rms(self, signal: np.ndarray) -> float: def calculate_rms(self, signal: np.ndarray) -> float:
"""Вычисление RMS сигнала""" """Вычисление RMS сигнала"""
return np.sqrt(np.mean(signal ** 2)) return np.sqrt(np.mean(signal ** 2))
def run(self): def run(self):
"""Обработка аудио в реальном времени""" """Обработка аудио в реальном времени"""
while self.running: while self.running:
current_time = time.time() current_time = time.time()
if current_time - self.last_move_time >= MOVE_INTERVAL: if current_time - self.last_move_time >= MOVE_INTERVAL:
self.sound_source = self._generate_sequential_sound_source() self.sound_source = self._generate_sequential_sound_source()
self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time)) self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time))
print(f"Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}") print(f"Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}")
self.last_move_time = current_time self.last_move_time = current_time
left, right, t1, t2 = self.capture_audio() left, right, t1, t2 = self.capture_audio()
self.rms_left = self.calculate_rms(left) self.rms_left = self.calculate_rms(left)
self.rms_right = self.calculate_rms(right) self.rms_right = self.calculate_rms(right)
signal_diff = np.mean(np.abs(left - right)) signal_diff = np.mean(np.abs(left - right))
new_sound_detected = (self.rms_left > RMS_THRESHOLD) and (self.rms_right > RMS_THRESHOLD) new_sound_detected = (self.rms_left > RMS_THRESHOLD) and (self.rms_right > RMS_THRESHOLD)
if new_sound_detected: if new_sound_detected:
print(f"RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}") print(f"RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}")
print(f"Signal difference: {signal_diff:.4f}") print(f"Signal difference: {signal_diff:.4f}")
time_delay = self.calculate_time_delay_fft(left, right) time_delay = self.calculate_time_delay_fft(left, right)
print(f"Time delay: {time_delay * 1000:.2f} ms") print(f"Time delay: {time_delay * 1000:.2f} ms")
angle = self.calculate_direction(time_delay) angle = self.calculate_direction(time_delay)
print(f"Calculated angle: {angle:.1f}°") print(f"Calculated angle: {angle:.1f}°")
self.current_angle = angle self.current_angle = angle
self.last_detected_angle = self.current_angle self.last_detected_angle = self.current_angle
self.last_sound_time = current_time self.last_sound_time = current_time
self.sound_detected = True self.sound_detected = True
self.show_arrow = True self.show_arrow = True
true_dx = self.sound_source.x true_dx = self.sound_source.x
true_dy = self.sound_source.y true_dy = self.sound_source.y
true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi
self.results.append({ self.results.append({
't1 (ms)': t1 * 1000, 't1 (ms)': t1 * 1000,
't2 (ms)': t2 * 1000, 't2 (ms)': t2 * 1000,
'Time Delay (ms)': time_delay * 1000, 'Time Delay (ms)': time_delay * 1000,
'Detected Angle (°)': self.current_angle, 'Detected Angle (°)': self.current_angle,
'True Angle (°)': true_angle, 'True Angle (°)': true_angle,
'Source X': self.sound_source.x, 'Source X': self.sound_source.x,
'Source Y': self.sound_source.y 'Source Y': self.sound_source.y
}) })
else: else:
if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT: if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT:
self.sound_detected = False self.sound_detected = False
self.show_arrow = True self.show_arrow = True
else: else:
self.sound_detected = False self.sound_detected = False
self.show_arrow = False self.show_arrow = False
time.sleep(CHUNK / self.sample_rate) time.sleep(CHUNK / self.sample_rate)
def get_coordinates_dataframe(self): def get_coordinates_dataframe(self):
"""Создание датафрейма с координатами микрофонов и всех позиций источника""" """Создание датафрейма с координатами микрофонов и всех позиций источника"""
data = { data = {
'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))], 'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))],
'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions], 'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions],
'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions], 'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions],
'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions] 'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions]
} }
return pd.DataFrame(data) return pd.DataFrame(data)
def get_results_dataframe(self): def get_results_dataframe(self):
"""Создание датафрейма с результатами""" """Создание датафрейма с результатами"""
return pd.DataFrame(self.results) return pd.DataFrame(self.results)
def main(): def main():
try: try:
finder = DirectionFinder(MIC_DISTANCE, "my_recording1.wav") finder = DirectionFinder(MIC_DISTANCE, "my_recording1.wav")
print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate) print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate)
thread = threading.Thread(target=finder.run) thread = threading.Thread(target=finder.run)
thread.start() thread.start()
fig, ax = plt.subplots(figsize=(9, 6)) fig, ax = plt.subplots(figsize=(9, 6))
ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2) ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2)
ax.set_ylim(-0.5, ROOM_HEIGHT) ax.set_ylim(-0.5, ROOM_HEIGHT)
ax.set_aspect('equal') ax.set_aspect('equal')
ax.set_title("Определение направления на источник звука", fontsize=12) ax.set_title("Определение направления на источник звука", fontsize=12)
ax.set_xlabel("X (м)", fontsize=10) ax.set_xlabel("X (м)", fontsize=10)
ax.set_ylabel("Y (м)", fontsize=10) ax.set_ylabel("Y (м)", fontsize=10)
ax.grid(True) ax.grid(True)
ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=12, label='Микрофон 1') ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=12, label='Микрофон 1')
ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=12, label='Микрофон 2') ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=12, label='Микрофон 2')
source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=12, source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=12,
label='Источник звука') label='Источник звука')
arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 4 arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 4
arrow = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, arrow = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3,
fc='r', ec='r', label='Расчетное направление') fc='r', ec='r', label='Расчетное направление')
arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3,
fc='g', ec='g', linestyle=':', label='Истинное направление') fc='g', ec='g', linestyle=':', label='Истинное направление')
sound_bar = ax.axhline(y=ROOM_HEIGHT - 1, color='green', linewidth=15, visible=False) sound_bar = ax.axhline(y=ROOM_HEIGHT - 1, color='green', linewidth=15, visible=False)
angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=10) angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=10)
left_indicator = patches.Rectangle((finder.mic1.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') left_indicator = patches.Rectangle((finder.mic1.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray')
right_indicator = patches.Rectangle((finder.mic2.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') right_indicator = patches.Rectangle((finder.mic2.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray')
ax.add_patch(left_indicator) ax.add_patch(left_indicator)
ax.add_patch(right_indicator) ax.add_patch(right_indicator)
def update(frame): def update(frame):
source_plot.set_data([finder.sound_source.x], [finder.sound_source.y]) source_plot.set_data([finder.sound_source.x], [finder.sound_source.y])
dx = finder.sound_source.x dx = finder.sound_source.x
dy = finder.sound_source.y dy = finder.sound_source.y
true_angle_rad = np.arctan2(dx, dy) true_angle_rad = np.arctan2(dx, dy)
true_end_x = arrow_length * np.sin(true_angle_rad) true_end_x = arrow_length * np.sin(true_angle_rad)
true_end_y = arrow_length * np.cos(true_angle_rad) true_end_y = arrow_length * np.cos(true_angle_rad)
arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y) arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y)
if finder.show_arrow: if finder.show_arrow:
angle = finder.current_angle angle = finder.current_angle
calc_angle_rad = np.radians(angle) calc_angle_rad = np.radians(angle)
calc_end_x = arrow_length * np.sin(calc_angle_rad) calc_end_x = arrow_length * np.sin(calc_angle_rad)
calc_end_y = arrow_length * np.cos(calc_angle_rad) calc_end_y = arrow_length * np.cos(calc_angle_rad)
arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y) arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y)
angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°") angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°")
if finder.sound_detected: if finder.sound_detected:
sound_bar.set_visible(True) sound_bar.set_visible(True)
ax.set_title("Активное обнаружение звука", fontsize=12) ax.set_title("Активное обнаружение звука", fontsize=12)
else: else:
sound_bar.set_visible(False) sound_bar.set_visible(False)
ax.set_title("Последнее зафиксированное направление", fontsize=12) ax.set_title("Последнее зафиксированное направление", fontsize=12)
else: else:
arrow.set_data(x=0, y=0, dx=0, dy=0) arrow.set_data(x=0, y=0, dx=0, dy=0)
angle_text.set_text("") angle_text.set_text("")
sound_bar.set_visible(False) sound_bar.set_visible(False)
ax.set_title("Звук не обнаружен", fontsize=12) ax.set_title("Звук не обнаружен", fontsize=12)
left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray') left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray')
right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray') right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray')
return [source_plot, arrow, arrow_true, sound_bar, return [source_plot, arrow, arrow_true, sound_bar,
left_indicator, right_indicator, angle_text] left_indicator, right_indicator, angle_text]
ani = FuncAnimation(fig, update, frames=None, interval=10, blit=True, cache_frame_data=False) ani = FuncAnimation(fig, update, frames=None, interval=10, blit=True, cache_frame_data=False)
plt.legend(loc='upper left', fontsize=8) plt.legend(loc='upper left', fontsize=8)
plt.tight_layout() plt.tight_layout()
plt.show() plt.show()
finder.running = False finder.running = False
thread.join() thread.join()
coords_df = finder.get_coordinates_dataframe() coords_df = finder.get_coordinates_dataframe()
results_df = finder.get_results_dataframe() results_df = finder.get_results_dataframe()
print("\nКоординаты микрофонов и всех позиций источника звука:") print("\nКоординаты микрофонов и всех позиций источника звука:")
print(coords_df.to_string(index=False)) print(coords_df.to_string(index=False))
print("\nРезультаты вычислений:") print("\nРезультаты вычислений:")
print(results_df.to_string(index=False)) print(results_df.to_string(index=False))
coords_df.to_csv('coordinates.csv', index=False) coords_df.to_csv('coordinates.csv', index=False)
results_df.to_csv('results.csv', index=False) results_df.to_csv('results.csv', index=False)
print("\nДанные сохранены в 'coordinates.csv' и 'results.csv'") print("\nДанные сохранены в 'coordinates.csv' и 'results.csv'")
except Exception as e: except Exception as e:
print(f"Ошибка: {e}") print(f"Ошибка: {e}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()