From fad9fdf7b854b1e02358319c8538777f58e8e037 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D1=80=D1=82=D0=B5=D0=BC=D0=B8=D0=B9=20=D0=9C=D1=83?= =?UTF-8?q?=D1=80=D0=B0=D0=B2=D1=8C=D0=B5=D0=B2?= Date: Sun, 29 Jun 2025 07:20:22 +0000 Subject: [PATCH] =?UTF-8?q?=D0=A3=D0=B4=D0=B0=D0=BB=D0=B8=D1=82=D1=8C=20TD?= =?UTF-8?q?OA.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TDOA.py | 354 -------------------------------------------------------- 1 file changed, 354 deletions(-) delete mode 100644 TDOA.py diff --git a/TDOA.py b/TDOA.py deleted file mode 100644 index 9114d50..0000000 --- a/TDOA.py +++ /dev/null @@ -1,354 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -from matplotlib.animation import FuncAnimation -import matplotlib.patches as patches -import librosa -import random -import pandas as pd -import threading -import time -from dataclasses import dataclass - -# Константы -SOUND_SPEED = 343.2 # скорость звука (м/с) -MIC_DISTANCE = 0.06 # расстояние между микрофонами (м) -ROOM_WIDTH = 3.0 # ширина комнаты (м) -ROOM_HEIGHT = 2.0 # высота комнаты (м) -SAMPLE_RATE = 48000 # частота дискретизации (Гц) -CHUNK = 16384 # размер буфера -RMS_THRESHOLD = 0.1 # порог RMS для определения звука -SILENCE_TIMEOUT = 0.5 # время в секундах для сохранения последнего угла -MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с) - -@dataclass -class Microphone: - """Класс для хранения информации о микрофоне""" - x: float - y: float - -@dataclass -class SoundSource: - """Класс для хранения информации об источнике звука""" - x: float - y: float - -class DirectionFinder: - def __init__(self, mic_distance: float, audio_file: str): - """Инициализация определителя направления""" - self.mic_distance = mic_distance - self.mic1 = Microphone(x=-mic_distance / 2, y=0.0) - self.mic2 = Microphone(x=mic_distance / 2, y=0.0) - self.sound_source = self._generate_sequential_sound_source() - self.running = True - self.current_angle = 0.0 - self.sound_detected = False - self.last_sound_time = 0 - self.last_detected_angle = None - self.show_arrow = False - self.rms_left = 0.0 - self.rms_right = 0.0 - self.audio_data, self.sample_rate = self.load_audio(audio_file) - self.audio_index = 0 - self.noise_level = 0.001 # Уровень шума - self.results = [] - self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)] - self.last_move_time = time.time() - self.max_physical_delay = self.mic_distance / SOUND_SPEED # Максимальная физическая задержка - - def _generate_sequential_sound_source(self) -> SoundSource: - """Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов с шагом 20 градусов""" - if not hasattr(self, 'angle_ranges'): - self.angle_ranges = list(range(-90, 91, 20)) - self.current_range_idx = 0 - self.angle_count = 0 - - start_angle = self.angle_ranges[self.current_range_idx] - end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90 - - angle_deg = random.uniform(start_angle, end_angle) - angle_rad = np.radians(angle_deg) - - distance = 1.5 # расстояние - source_x = distance * np.sin(angle_rad) - source_y = distance * np.cos(angle_rad) - - if abs(source_x) > ROOM_WIDTH / 2: - scale = (ROOM_WIDTH / 2) / abs(source_x) - source_x *= scale - source_y *= scale - if source_y > ROOM_HEIGHT: - scale = ROOM_HEIGHT / source_y - source_x *= scale - source_y *= scale - if source_y < 0: - source_y = 0.0 - source_x = 0.0 - - print( - f"Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}, расстояние={distance:.2f}, повторение {self.angle_count + 1}/10") - - self.angle_count += 1 - if self.angle_count >= 10: # кол-во проходов в диапозоне - self.angle_count = 0 - self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges) - - return SoundSource(x=source_x, y=source_y) - - def load_audio(self, filename: str) -> tuple: - """Загрузка аудиофайла""" - try: - audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True) - return audio_data, sample_rate - except Exception as e: - raise ValueError(f"Ошибка загрузки аудиофайла: {e}") - - def calculate_distances(self) -> tuple: - """Расчет расстояний от источника звука до микрофонов""" - l1 = np.sqrt((self.sound_source.x - self.mic1.x) ** 2 + - (self.sound_source.y - self.mic1.y) ** 2) - l2 = np.sqrt((self.sound_source.x - self.mic2.x) ** 2 + - (self.sound_source.y - self.mic2.y) ** 2) - return l1, l2 - - def process_signals_with_delay(self, signal: np.ndarray) -> tuple: - """Обработка сигналов с учетом временного сдвига и шума""" - l1, l2 = self.calculate_distances() - t1 = l1 / SOUND_SPEED - t2 = l2 / SOUND_SPEED - log = int(self.sample_rate * (t2 - t1)) - - if log >= 0: - S1 = signal[abs(log):] if log != 0 else signal.copy() - S2 = signal[:-abs(log)] if log != 0 else signal.copy() - else: - S1 = signal[:-abs(log)] if log != 0 else signal.copy() - S2 = signal[abs(log):] - - min_length = min(len(S1), len(S2)) - S1 = S1[:min_length] - S2 = S2[:min_length] - - noise1 = np.random.normal(0, self.noise_level, size=S1.shape) - noise2 = np.random.normal(0, self.noise_level, size=S2.shape) - S1 = S1 + noise1 - S2 = S2 + noise2 - - return S1, S2, t1, t2 - - def capture_audio(self): - """Эмуляция захвата аудиоданных из файла""" - if self.audio_index + CHUNK >= len(self.audio_data): - self.audio_index = 0 - chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK] - self.audio_index += CHUNK - - signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk) - return signal1, signal2, t1, t2 - - def calculate_time_delay_fft(self, signal1: np.ndarray, signal2: np.ndarray) -> float: - """Расчет временной задержки через FFT с параболической интерполяцией""" - signal1 = (signal1 - np.mean(signal1)) / (np.std(signal1) + 1e-10) - signal2 = (signal2 - np.mean(signal2)) / (np.std(signal2) + 1e-10) - fft_signal1 = np.fft.rfft(signal1) - fft_signal2 = np.fft.rfft(signal2) - cross_spectrum = fft_signal1 * np.conj(fft_signal2) - cross_spectrum = cross_spectrum / (np.abs(cross_spectrum) + 1e-10) - correlation = np.fft.irfft(cross_spectrum) - correlation = np.roll(correlation, len(correlation) // 2) - - max_delay_samples = int(self.max_physical_delay * self.sample_rate) - middle_point = len(correlation) // 2 - start_idx = middle_point - max_delay_samples - end_idx = middle_point + max_delay_samples - max_correlation_idx = start_idx + np.argmax(correlation[start_idx:end_idx]) - - peak_value = correlation[max_correlation_idx] - if peak_value < 0.1: # Игнорирование RMS до... - return 0.0 - - # Параболическая интерполяция - if max_correlation_idx > start_idx and max_correlation_idx < end_idx - 1: - y0 = correlation[max_correlation_idx - 1] - y1 = correlation[max_correlation_idx] - y2 = correlation[max_correlation_idx + 1] - denom = 2 * (y0 - 2 * y1 + y2) - if denom != 0: - delta = (y0 - y2) / denom - max_correlation_idx += delta - - delay_samples = max_correlation_idx - middle_point - time_delay = delay_samples / self.sample_rate - time_delay = np.clip(time_delay, -self.max_physical_delay, self.max_physical_delay) - return time_delay - - def calculate_direction(self, time_delay: float) -> float: - """Расчет угла направления""" - sin_theta = (SOUND_SPEED * time_delay) / self.mic_distance - sin_theta = np.clip(sin_theta, -1, 1) - angle = np.arcsin(sin_theta) * 180 / np.pi - return angle - - def calculate_rms(self, signal: np.ndarray) -> float: - """Вычисление RMS сигнала""" - return np.sqrt(np.mean(signal ** 2)) - - def run(self): - """Обработка аудио в реальном времени""" - while self.running: - current_time = time.time() - if current_time - self.last_move_time >= MOVE_INTERVAL: - self.sound_source = self._generate_sequential_sound_source() - self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time)) - print(f"Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}") - self.last_move_time = current_time - - left, right, t1, t2 = self.capture_audio() - self.rms_left = self.calculate_rms(left) - self.rms_right = self.calculate_rms(right) - signal_diff = np.mean(np.abs(left - right)) - - new_sound_detected = (self.rms_left > RMS_THRESHOLD) and (self.rms_right > RMS_THRESHOLD) - - if new_sound_detected: - print(f"RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}") - print(f"Signal difference: {signal_diff:.4f}") - time_delay = self.calculate_time_delay_fft(left, right) - print(f"Time delay: {time_delay * 1000:.2f} ms") - angle = self.calculate_direction(time_delay) - print(f"Calculated angle: {angle:.1f}°") - self.current_angle = angle - self.last_detected_angle = self.current_angle - self.last_sound_time = current_time - self.sound_detected = True - self.show_arrow = True - - true_dx = self.sound_source.x - true_dy = self.sound_source.y - true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi - self.results.append({ - 't1 (ms)': t1 * 1000, - 't2 (ms)': t2 * 1000, - 'Time Delay (ms)': time_delay * 1000, - 'Detected Angle (°)': self.current_angle, - 'True Angle (°)': true_angle, - 'Source X': self.sound_source.x, - 'Source Y': self.sound_source.y - }) - else: - if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT: - self.sound_detected = False - self.show_arrow = True - else: - self.sound_detected = False - self.show_arrow = False - time.sleep(CHUNK / self.sample_rate) - - def get_coordinates_dataframe(self): - """Создание датафрейма с координатами микрофонов и всех позиций источника""" - data = { - 'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))], - 'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions], - 'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions], - 'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions] - } - return pd.DataFrame(data) - - def get_results_dataframe(self): - """Создание датафрейма с результатами""" - return pd.DataFrame(self.results) - -def main(): - try: - finder = DirectionFinder(MIC_DISTANCE, "my_recording1.wav") - print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate) - - thread = threading.Thread(target=finder.run) - thread.start() - - fig, ax = plt.subplots(figsize=(9, 6)) - ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2) - ax.set_ylim(-0.5, ROOM_HEIGHT) - ax.set_aspect('equal') - ax.set_title("Определение направления на источник звука", fontsize=12) - ax.set_xlabel("X (м)", fontsize=10) - ax.set_ylabel("Y (м)", fontsize=10) - ax.grid(True) - - ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=12, label='Микрофон 1') - ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=12, label='Микрофон 2') - source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=12, - label='Источник звука') - - arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 4 - arrow = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, - fc='r', ec='r', label='Расчетное направление') - arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, - fc='g', ec='g', linestyle=':', label='Истинное направление') - - sound_bar = ax.axhline(y=ROOM_HEIGHT - 1, color='green', linewidth=15, visible=False) - angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=10) - - left_indicator = patches.Rectangle((finder.mic1.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') - right_indicator = patches.Rectangle((finder.mic2.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') - ax.add_patch(left_indicator) - ax.add_patch(right_indicator) - - def update(frame): - source_plot.set_data([finder.sound_source.x], [finder.sound_source.y]) - - dx = finder.sound_source.x - dy = finder.sound_source.y - true_angle_rad = np.arctan2(dx, dy) - true_end_x = arrow_length * np.sin(true_angle_rad) - true_end_y = arrow_length * np.cos(true_angle_rad) - arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y) - - if finder.show_arrow: - angle = finder.current_angle - calc_angle_rad = np.radians(angle) - calc_end_x = arrow_length * np.sin(calc_angle_rad) - calc_end_y = arrow_length * np.cos(calc_angle_rad) - arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y) - angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°") - - if finder.sound_detected: - sound_bar.set_visible(True) - ax.set_title("Активное обнаружение звука", fontsize=12) - else: - sound_bar.set_visible(False) - ax.set_title("Последнее зафиксированное направление", fontsize=12) - else: - arrow.set_data(x=0, y=0, dx=0, dy=0) - angle_text.set_text("") - sound_bar.set_visible(False) - ax.set_title("Звук не обнаружен", fontsize=12) - - left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray') - right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray') - - return [source_plot, arrow, arrow_true, sound_bar, - left_indicator, right_indicator, angle_text] - - ani = FuncAnimation(fig, update, frames=None, interval=10, blit=True, cache_frame_data=False) - plt.legend(loc='upper left', fontsize=8) - plt.tight_layout() - plt.show() - - finder.running = False - thread.join() - - coords_df = finder.get_coordinates_dataframe() - results_df = finder.get_results_dataframe() - print("\nКоординаты микрофонов и всех позиций источника звука:") - print(coords_df.to_string(index=False)) - print("\nРезультаты вычислений:") - print(results_df.to_string(index=False)) - coords_df.to_csv('coordinates.csv', index=False) - results_df.to_csv('results.csv', index=False) - print("\nДанные сохранены в 'coordinates.csv' и 'results.csv'") - - except Exception as e: - print(f"Ошибка: {e}") - -if __name__ == "__main__": - main() \ No newline at end of file