import numpy as np import matplotlib.pyplot as plt from matplotlib.animation import FuncAnimation import matplotlib.patches as patches import librosa import random import pandas as pd import threading import time from dataclasses import dataclass from sklearn.tree import DecisionTreeRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error from scipy.interpolate import interp1d from scipy.signal import windows # Константы SOUND_SPEED = 343.2 # скорость звука (м/с) MIC_DISTANCE = 0.06 # расстояние между микрофонами (м) ROOM_WIDTH = 3.0 # ширина комнаты (м) ROOM_HEIGHT = 2.0 # высота комнаты (м) SAMPLE_RATE = 48000 # частота дискретизации (Гц) CHUNK = 32768 # размер буфера RMS_THRESHOLD = 0.1 # порог RMS для определения звука SILENCE_TIMEOUT = 0.5 # время в секундах для сохранения последнего угла MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с) @dataclass class Microphone: """Класс для хранения информации о микрофоне""" x: float y: float @dataclass class SoundSource: """Класс для хранения информации об источнике звука""" x: float y: float class TreeRegressionDirectionFinder: def __init__(self, mic_distance: float, audio_file: str): """Инициализация определителя направления с использованием регрессии деревом решений""" self.mic_distance = mic_distance self.mic1 = Microphone(x=-mic_distance / 2, y=0.0) self.mic2 = Microphone(x=mic_distance / 2, y=0.0) self.sound_source = self._generate_random_sound_source() self.running = True self.current_angle = 0.0 self.sound_detected = False self.last_sound_time = 0 self.last_detected_angle = None self.show_arrow = False self.rms_left = 0.0 self.rms_right = 0.0 self.audio_data, self.sample_rate = self.load_audio(audio_file) self.audio_index = 0 self.noise_level = 0.001 # уровень шума self.results = [] self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)] self.last_move_time = time.time() self.max_physical_delay = self.mic_distance / SOUND_SPEED self.model = self.train_regression_model() print("TreeRegressionDirectionFinder инициализирован") def _generate_random_sound_source(self) -> SoundSource: """Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов""" if not hasattr(self, 'angle_ranges'): self.angle_ranges = list(range(-90, 91, 20)) self.current_range_idx = 0 self.angle_count = 0 self.current_angle = self.angle_ranges[0] start_angle = self.angle_ranges[self.current_range_idx] end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90 angle_deg = random.uniform(start_angle, end_angle) angle_rad = np.radians(angle_deg) distance = 1.5 # расстояние source_x = distance * np.sin(angle_rad) source_y = distance * np.cos(angle_rad) self.angle_count += 1 if self.angle_count >= 10: self.angle_count = 0 self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges) print(f"Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}") return SoundSource(x=source_x, y=source_y) def load_audio(self, filename: str) -> tuple: """Загрузка аудиофайла""" try: audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True) return audio_data, sample_rate except Exception as e: raise ValueError(f"Ошибка загрузки аудиофайла: {e}") def calculate_distances(self) -> tuple: """Расчет расстояний от источника звука до микрофонов""" l1 = np.sqrt((self.sound_source.x - self.mic1.x) ** 2 + (self.sound_source.y - self.mic1.y) ** 2) l2 = np.sqrt((self.sound_source.x - self.mic2.x) ** 2 + (self.sound_source.y - self.mic2.y) ** 2) return l1, l2 def process_signals_with_delay(self, signal: np.ndarray) -> tuple: """Обработка сигналов с учетом временного сдвига и шума с интерполяцией""" l1, l2 = self.calculate_distances() t1 = l1 / SOUND_SPEED t2 = l2 / SOUND_SPEED time_points = np.arange(len(signal)) / self.sample_rate interp_func = interp1d(time_points, signal, kind='linear', fill_value="extrapolate") S1 = interp_func(time_points - t1) S2 = interp_func(time_points - t2) min_length = min(len(S1), len(S2)) S1, S2 = S1[:min_length], S2[:min_length] noise1 = np.random.normal(0, self.noise_level, S1.shape) noise2 = np.random.normal(0, self.noise_level, S2.shape) S1 += noise1 S2 += noise2 return S1, S2, t1, t2 def capture_audio(self): """Эмуляция захвата аудиоданных из файла""" if self.audio_index + CHUNK >= len(self.audio_data): self.audio_index = 0 chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK] self.audio_index += CHUNK signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk) return signal1, signal2, t1, t2 def calculate_rms(self, signal: np.ndarray) -> float: """Вычисление RMS сигнала""" return np.sqrt(np.mean(signal ** 2)) def calculate_time_delay_fft(self, signal1: np.ndarray, signal2: np.ndarray) -> tuple: """Расчет временной задержки и пика кросс-корреляции через FFT с оконной функцией Ханна""" window = windows.hann(len(signal1)) signal1 = (signal1 - np.mean(signal1)) / (np.std(signal1) + 1e-10) * window signal2 = (signal2 - np.mean(signal2)) / (np.std(signal2) + 1e-10) * window fft_signal1 = np.fft.rfft(signal1) fft_signal2 = np.fft.rfft(signal2) cross_spectrum = fft_signal1 * np.conj(fft_signal2) cross_spectrum = cross_spectrum / (np.abs(cross_spectrum) + 1e-10) correlation = np.fft.irfft(cross_spectrum) correlation = np.roll(correlation, len(correlation) // 2) max_delay_samples = int(self.max_physical_delay * self.sample_rate) middle_point = len(correlation) // 2 start_idx = middle_point - max_delay_samples end_idx = middle_point + max_delay_samples max_correlation_idx = start_idx + np.argmax(correlation[start_idx:end_idx]) peak_correlation = correlation[max_correlation_idx] if max_correlation_idx > start_idx and max_correlation_idx < end_idx - 1: y0 = correlation[max_correlation_idx - 1] y1 = correlation[max_correlation_idx] y2 = correlation[max_correlation_idx + 1] denom = 2 * (y0 - 2 * y1 + y2) if denom != 0: delta = (y0 - y2) / denom max_correlation_idx += delta delay_samples = max_correlation_idx - middle_point time_delay = delay_samples / self.sample_rate time_delay = np.clip(time_delay, -self.max_physical_delay, self.max_physical_delay) return time_delay, peak_correlation def train_regression_model(self): """Обучение модели регрессии с использованием только пика кросс-корреляции""" n_samples = 50000 n_additional_samples = 20000 X = [] y = [] # Генерация стандартных тренировочных данных for _ in range(n_samples): source = self._generate_random_sound_source() l1 = np.sqrt((source.x - self.mic1.x) ** 2 + (source.y - self.mic1.y) ** 2) l2 = np.sqrt((source.x - self.mic2.x) ** 2 + (source.y - self.mic2.y) ** 2) t1 = l1 / SOUND_SPEED t2 = l2 / SOUND_SPEED # Генерация тестового сигнала для вычисления пика кросс-корреляции signal = np.random.normal(0, 1, CHUNK) time_points = np.arange(len(signal)) / self.sample_rate interp_func = interp1d(time_points, signal, kind='linear', fill_value="extrapolate") S1 = interp_func(time_points - t1) S2 = interp_func(time_points - t2) min_length = min(len(S1), len(S2)) S1, S2 = S1[:min_length], S2[:min_length] noise1 = np.random.normal(0, self.noise_level, S1.shape) noise2 = np.random.normal(0, self.noise_level, S2.shape) S1 += noise1 S2 += noise2 _, peak_correlation = self.calculate_time_delay_fft(S1, S2) true_angle = np.arctan2(source.x, source.y) * 180 / np.pi X.append([peak_correlation]) y.append(true_angle) # Генерация дополнительных данных для углов -90°...-50° и 50°...90° for _ in range(n_additional_samples): if random.choice([True, False]): angle_deg = random.uniform(-90, -50) else: angle_deg = random.uniform(50, 90) angle_rad = np.radians(angle_deg) distance = 1.5 source_x = distance * np.sin(angle_rad) source_y = distance * np.cos(angle_rad) source = SoundSource(x=source_x, y=source_y) l1 = np.sqrt((source.x - self.mic1.x) ** 2 + (source.y - self.mic1.y) ** 2) l2 = np.sqrt((source.x - self.mic2.x) ** 2 + (source.y - self.mic2.y) ** 2) t1 = l1 / SOUND_SPEED t2 = l2 / SOUND_SPEED signal = np.random.normal(0, 1, CHUNK) time_points = np.arange(len(signal)) / self.sample_rate interp_func = interp1d(time_points, signal, kind='linear', fill_value="extrapolate") S1 = interp_func(time_points - t1) S2 = interp_func(time_points - t2) min_length = min(len(S1), len(S2)) S1, S2 = S1[:min_length], S2[:min_length] noise1 = np.random.normal(0, self.noise_level, S1.shape) noise2 = np.random.normal(0, self.noise_level, S2.shape) S1 += noise1 S2 += noise2 _, peak_correlation = self.calculate_time_delay_fft(S1, S2) X.append([peak_correlation]) y.append(angle_deg) X = np.array(X) y = np.array(y) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) model = DecisionTreeRegressor(max_depth=10, random_state=42) model.fit(X_train, y_train) y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) print(f"Среднеквадратичная ошибка модели на тестовых данных: {mse:.4f}") return model def calculate_direction(self, peak_correlation: float) -> float: """Расчет угла направления с использованием модели регрессии""" features = np.array([[peak_correlation]]) angle = self.model.predict(features)[0] angle = np.clip(angle, -90, 90) return angle def run(self): """Обработка аудио в реальном времени""" while self.running: try: current_time = time.time() if current_time - self.last_move_time >= MOVE_INTERVAL: self.sound_source = self._generate_random_sound_source() self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time)) print(f"Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}") self.last_move_time = current_time left, right, t1, t2 = self.capture_audio() self.rms_left = self.calculate_rms(left) self.rms_right = self.calculate_rms(right) peak_amp_left = np.max(np.abs(left)) peak_amp_right = np.max(np.abs(right)) peak_diff = peak_amp_left - peak_amp_right new_sound_detected = (self.rms_left > RMS_THRESHOLD) and (self.rms_right > RMS_THRESHOLD) if new_sound_detected: print(f"RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}") # Исправлено print(f"Peak difference: {peak_diff:.4f}") time_delay, peak_correlation = self.calculate_time_delay_fft(left, right) print(f"Time delay: {time_delay * 1000:.2f} ms") print(f"Peak correlation: {peak_correlation:.4f}") angle = self.calculate_direction(peak_correlation) print(f"Calculated angle: {angle:.1f}°") self.current_angle = angle self.last_detected_angle = angle self.last_sound_time = current_time self.sound_detected = True self.show_arrow = True true_dx = self.sound_source.x true_dy = self.sound_source.y true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi self.results.append({ 'Time Delay (ms)': time_delay * 1000, 'Peak Correlation': peak_correlation, 'Detected Angle (°)': self.current_angle, 'True Angle (°)': true_angle, 'Source X': self.sound_source.x, 'Source Y': self.sound_source.y }) else: if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT: self.sound_detected = False self.show_arrow = True else: self.sound_detected = False self.show_arrow = False time.sleep(CHUNK / self.sample_rate) except Exception as e: print(f"Ошибка в run: {e}") continue def get_coordinates_dataframe(self): """Создание датафрейма с координатами микрофонов и всех позиций источника""" data = { 'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))], 'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions], 'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions], 'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions] } return pd.DataFrame(data) def get_results_dataframe(self): """Создание датафрейма с результатами""" return pd.DataFrame(self.results) def main(): try: finder = TreeRegressionDirectionFinder(MIC_DISTANCE, "my_recording1.wav") print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate) thread = threading.Thread(target=finder.run) thread.start() fig, ax = plt.subplots(figsize=(9, 6)) ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2) ax.set_ylim(-0.5, ROOM_HEIGHT) ax.set_aspect('equal') ax.set_title("Определение направления на источник звука (Tree Regression)", fontsize=12) ax.set_xlabel("X (м)", fontsize=10) ax.set_ylabel("Y (м)", fontsize=10) ax.grid(True) ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=12, label='Микрофон 1') ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=12, label='Микрофон 2') source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=12, label='Источник звука') arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 4 arrow = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, fc='r', ec='r', label='Расчетное направление') arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, fc='g', ec='g', linestyle=':', label='Истинное направление') sound_bar = ax.axhline(y=ROOM_HEIGHT - 1, color='green', linewidth=15, visible=False) angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=10) left_indicator = patches.Rectangle((finder.mic1.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') right_indicator = patches.Rectangle((finder.mic2.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') ax.add_patch(left_indicator) ax.add_patch(right_indicator) def update(frame): source_plot.set_data([finder.sound_source.x], [finder.sound_source.y]) dx = finder.sound_source.x dy = finder.sound_source.y true_angle_rad = np.arctan2(dx, dy) true_end_x = arrow_length * np.sin(true_angle_rad) true_end_y = arrow_length * np.cos(true_angle_rad) arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y) if finder.show_arrow: angle = finder.current_angle calc_angle_rad = np.radians(angle) calc_end_x = arrow_length * np.sin(calc_angle_rad) calc_end_y = arrow_length * np.cos(calc_angle_rad) arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y) angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°") if finder.sound_detected: sound_bar.set_visible(True) ax.set_title("Активное обнаружение звука (Tree Regression)", fontsize=12) else: sound_bar.set_visible(False) ax.set_title("Последнее зафиксированное направление (Tree Regression)", fontsize=12) else: arrow.set_data(x=0, y=0, dx=0, dy=0) angle_text.set_text("") sound_bar.set_visible(False) ax.set_title("Звук не обнаружен", fontsize=12) left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray') right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray') return [source_plot, arrow, arrow_true, sound_bar, left_indicator, right_indicator, angle_text] ani = FuncAnimation(fig, update, frames=None, interval=10, blit=True, cache_frame_data=False) plt.legend(loc='upper left', fontsize=8) plt.tight_layout() plt.show() finder.running = False thread.join() coords_df = finder.get_coordinates_dataframe() results_df = finder.get_results_dataframe() print("\nКоординаты микрофонов и всех позиций источника звука:") print(coords_df.to_string(index=False)) print("\nРезультаты вычислений:") print(results_df.to_string(index=False)) coords_df.to_csv('coordinates_tree.csv', index=False) results_df.to_csv('results_tree.csv', index=False) print("\nДанные сохранены в 'coordinates_tree.csv' и 'results_tree.csv'") except Exception as e: print(f"Ошибка в main: {e}") if __name__ == "__main__": main()