From 920fddd00403a04f0eed57b09a27af5429ab91ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D1=80=D1=82=D0=B5=D0=BC=D0=B8=D0=B9=20=D0=9C=D1=83?= =?UTF-8?q?=D1=80=D0=B0=D0=B2=D1=8C=D0=B5=D0=B2?= Date: Sun, 29 Jun 2025 07:23:42 +0000 Subject: [PATCH] =?UTF-8?q?=D0=97=D0=B0=D0=B3=D1=80=D1=83=D0=B7=D0=B8?= =?UTF-8?q?=D1=82=D1=8C=20=D1=84=D0=B0=D0=B9=D0=BB=D1=8B=20=D0=B2=20=C2=AB?= =?UTF-8?q?/=C2=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Gradient Boosting.py | 470 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 470 insertions(+) create mode 100644 Gradient Boosting.py diff --git a/Gradient Boosting.py b/Gradient Boosting.py new file mode 100644 index 0000000..fb15738 --- /dev/null +++ b/Gradient Boosting.py @@ -0,0 +1,470 @@ +import numpy as np +import matplotlib.pyplot as plt +from matplotlib.animation import FuncAnimation +import matplotlib.patches as patches +import librosa +import random +import pandas as pd +import threading +import time +import os +from dataclasses import dataclass +from sklearn.ensemble import GradientBoostingRegressor +from sklearn.model_selection import train_test_split +from sklearn.metrics import mean_squared_error +from scipy.interpolate import interp1d +from scipy.signal import windows +import joblib + +# Константы +SOUND_SPEED = 343.2 # скорость звука (м/с) +MIC_DISTANCE = 0.06 # расстояние между микрофонами (м) +ROOM_WIDTH = 3.0 # ширина комнаты (м) +ROOM_HEIGHT = 2.0 # высота комнаты (м) +SAMPLE_RATE = 48000 # частота дискретизации (Гц) +CHUNK = 32768 # размер буфера +RMS_THRESHOLD = 0.01 # порог RMS для определения звука +SILENCE_TIMEOUT = 0.5 # время в секундах для сохранения последнего угла +MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с) +CORR_WINDOW_SIZE = 15 # Размер окна корреляции (±15 значений, всего 31) +MODEL_PATH = "gradient_boosting_model2.pkl" # Путь для сохранения/загрузки модели + +@dataclass +class Microphone: + """Класс для хранения информации о микрофоне""" + x: float + y: float + +@dataclass +class SoundSource: + """Класс для хранения информации об источнике звука""" + x: float + y: float + +class TreeRegressionDirectionFinder: + def __init__(self, mic_distance: float, audio_file: str, retrain_model: bool = False): + """Инициализация определителя направления с использованием регрессии градиентным бустингом""" + self.mic_distance = mic_distance + self.mic1 = Microphone(x=-mic_distance / 2, y=0.0) + self.mic2 = Microphone(x=mic_distance / 2, y=0.0) + self.angles = np.arange(-90, 91, 10) + self.current_angle_idx = 0 + self.current_repetition = 0 + self.sound_source = self._generate_sequential_sound_source() + self.running = True + self.current_angle = 0.0 + self.sound_detected = False + self.last_sound_time = 0 + self.last_detected_angle = None + self.show_arrow = False + self.rms_left = 0.0 + self.rms_right = 0.0 + self.audio_data, self.sample_rate = self.load_audio(audio_file) + self.audio_index = 0 + self.noise_level = 0.001 + self.results = [] + self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)] + self.last_move_time = time.time() + self.max_physical_delay = self.mic_distance / SOUND_SPEED + self.model = self.load_or_train_model(retrain_model) + print("TreeRegressionDirectionFinder инициализирован") + + def _generate_random_sound_source_for_training(self) -> SoundSource: + """Генерация случайного положения источника звука для обучения""" + x = random.uniform(-ROOM_WIDTH / 2, ROOM_WIDTH / 2) + y = random.uniform(0, ROOM_HEIGHT) + return SoundSource(x=x, y=y) + + def _generate_sequential_sound_source(self) -> SoundSource: + """Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов с шагом 20 градусов""" + if not hasattr(self, 'angle_ranges'): + self.angle_ranges = list(range(-90, 91, 20)) + self.current_range_idx = 0 + self.angle_count = 0 + + start_angle = self.angle_ranges[self.current_range_idx] + end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90 + + angle_deg = random.uniform(start_angle, end_angle) + angle_rad = np.radians(angle_deg) + + distance = 1.5 # расстояние + source_x = distance * np.sin(angle_rad) + source_y = distance * np.cos(angle_rad) + + if abs(source_x) > ROOM_WIDTH / 2: + scale = (ROOM_WIDTH / 2) / abs(source_x) + source_x *= scale + source_y *= scale + if source_y > ROOM_HEIGHT: + scale = ROOM_HEIGHT / source_y + source_x *= scale + source_y *= scale + if source_y < 0: + source_y = 0.0 + source_x = 0.0 + + print( + f"Тестирование: Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}, расстояние={distance:.2f}, повторение {self.angle_count + 1}/10") + + self.angle_count += 1 + if self.angle_count >= 10: # кол-во в зоне + self.angle_count = 0 + self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges) + + return SoundSource(x=source_x, y=source_y) + + def load_audio(self, filename: str) -> tuple: + """Загрузка аудиофайла с нормализацией""" + try: + audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True) + rms = np.sqrt(np.mean(audio_data ** 2)) + if rms > 0: + audio_data = audio_data / rms * 0.1 + print(f"RMS аудиозаписи: {rms:.4f}, после нормализации: {np.sqrt(np.mean(audio_data ** 2)):.4f}") + return audio_data, sample_rate + except Exception as e: + raise ValueError(f"Ошибка загрузки аудиофайла: {e}") + + def get_audio_chunk(self) -> np.ndarray: + """Получение случайного фрагмента аудио размером CHUNK с аугментацией""" + max_index = max(0, len(self.audio_data) - CHUNK) + start_idx = random.randint(0, max_index) + chunk = self.audio_data[start_idx:start_idx + CHUNK] + if len(chunk) < CHUNK: + chunk = np.pad(chunk, (0, CHUNK - len(chunk)), mode='constant') + scale = np.random.uniform(0.8, 1.2) + chunk = chunk * scale + chunk += np.random.normal(0, 0.0005, chunk.shape) + return chunk + + def calculate_distances(self, source: SoundSource) -> tuple: + """Расчет расстояний от источника звука до микрофонов""" + l1 = np.sqrt((source.x - self.mic1.x) ** 2 + (source.y - self.mic1.y) ** 2) + l2 = np.sqrt((source.x - self.mic2.x) ** 2 + (source.y - self.mic2.y) ** 2) + return l1, l2 + + def process_signals_with_delay(self, signal: np.ndarray, source: SoundSource) -> tuple: + """Обработка сигналов с учетом временного сдвига и шума""" + l1, l2 = self.calculate_distances(source) + t1 = l1 / SOUND_SPEED + t2 = l2 / SOUND_SPEED + + time_points = np.arange(len(signal)) / self.sample_rate + interp_func = interp1d(time_points, signal, kind='linear', fill_value="extrapolate") + S1 = interp_func(time_points - t1) + S2 = interp_func(time_points - t2) + + min_length = min(len(S1), len(S2)) + S1, S2 = S1[:min_length], S2[:min_length] + + noise1 = np.random.normal(0, self.noise_level, S1.shape) + noise2 = np.random.normal(0, self.noise_level, S2.shape) + S1 += noise1 + S2 += noise2 + + return S1, S2, t1, t2 + + def capture_audio(self): + """Эмуляция захвата аудиоданных из файла""" + if self.audio_index + CHUNK >= len(self.audio_data): + self.audio_index = 0 + chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK] + self.audio_index += CHUNK + signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk, self.sound_source) + return signal1, signal2, t1, t2 + + def calculate_rms(self, signal: np.ndarray) -> float: + """Вычисление RMS сигнала""" + return np.sqrt(np.mean(signal ** 2)) + + def calculate_time_delay_fft(self, signal1: np.ndarray, signal2: np.ndarray) -> tuple: + """Расчет временной задержки и отрезка корреляции через GCC-PHAT""" + window = windows.hann(len(signal1)) + signal1 = (signal1 - np.mean(signal1)) / (np.std(signal1) + 1e-10) * window + signal2 = (signal2 - np.mean(signal2)) / (np.std(signal2) + 1e-10) * window + fft_signal1 = np.fft.rfft(signal1) + fft_signal2 = np.fft.rfft(signal2) + cross_spectrum = fft_signal1 * np.conj(fft_signal2) + cross_spectrum = cross_spectrum / (np.abs(cross_spectrum) + 1e-10) + correlation = np.fft.irfft(cross_spectrum, n=len(signal1) * 2) + correlation = np.roll(correlation, len(correlation) // 2) + + max_delay_samples = int(self.max_physical_delay * self.sample_rate * 1.5) + middle_point = len(correlation) // 2 + start_idx = middle_point - max_delay_samples + end_idx = middle_point + max_delay_samples + max_correlation_idx = start_idx + np.argmax(correlation[start_idx:end_idx]) + + corr_start = max(start_idx, max_correlation_idx - CORR_WINDOW_SIZE) + corr_end = min(end_idx, max_correlation_idx + CORR_WINDOW_SIZE + 1) + correlation_segment = correlation[corr_start:corr_end] + target_length = 2 * CORR_WINDOW_SIZE + 1 + if len(correlation_segment) < target_length: + correlation_segment = np.pad(correlation_segment, (0, target_length - len(correlation_segment)), mode='constant') + elif len(correlation_segment) > target_length: + correlation_segment = correlation_segment[:target_length] + correlation_segment = correlation_segment / (np.max(np.abs(correlation)) + 1e-10) + + if correlation[max_correlation_idx] < 0.05 * np.max(np.abs(correlation)): + time_delay = 0.0 + else: + if max_correlation_idx > start_idx + 1 and max_correlation_idx < end_idx - 1: + y0 = correlation[max_correlation_idx - 1] + y1 = correlation[max_correlation_idx] + y2 = correlation[max_correlation_idx + 1] + denom = 2 * (y0 - 2 * y1 + y2) + if denom != 0: + delta = (y0 - y2) / denom + max_correlation_idx += delta + delay_samples = max_correlation_idx - middle_point + time_delay = delay_samples / self.sample_rate + return time_delay, correlation_segment + + def train_regression_model(self): + """Обучение модели регрессии на отрезке корреляции""" + n_samples = 5000 + n_additional_samples = 20000 + X = [] + y = [] + + print("Обучение: Генерация случайных тренировочных выборок...") + for _ in range(n_samples): + source = self._generate_random_sound_source_for_training() + chunk = self.get_audio_chunk() + signal1, signal2, _, _ = self.process_signals_with_delay(chunk, source) + _, corr_segment = self.calculate_time_delay_fft(signal1, signal2) + corr_segment += np.random.normal(0, 0.005, corr_segment.shape) + true_angle = np.arctan2(source.x, source.y) * 180 / np.pi + X.append(corr_segment.tolist()) + y.append(true_angle) + + print("Обучение: Генерация дополнительных выборок с шагом 1°...") + n_samples_per_angle = n_additional_samples // 82 + for angle_deg in np.concatenate([np.arange(-90, -49, 1), np.arange(50, 91, 1)]): + angle_rad = np.radians(angle_deg) + distance = np.random.uniform(0.5, 2.0) + source_x = distance * np.sin(angle_rad) + source_y = distance * np.cos(angle_rad) + source = SoundSource(x=source_x, y=source_y) + for _ in range(n_samples_per_angle): + chunk = self.get_audio_chunk() + signal1, signal2, _, _ = self.process_signals_with_delay(chunk, source) + _, corr_segment = self.calculate_time_delay_fft(signal1, signal2) + corr_segment += np.random.normal(0, 0.005, corr_segment.shape) + X.append(corr_segment.tolist()) + y.append(angle_deg) + + print("Обучение: Генерация дополнительных выборок с шагом 10°...") + n_samples_per_angle_10 = 2000 + for angle_deg in np.arange(-90, 91, 10): + angle_rad = np.radians(angle_deg) + distance = np.random.uniform(0.5, 2.0) + source_x = distance * np.sin(angle_rad) + source_y = distance * np.cos(angle_rad) + source = SoundSource(x=source_x, y=source_y) + for _ in range(n_samples_per_angle_10): + chunk = self.get_audio_chunk() + signal1, signal2, _, _ = self.process_signals_with_delay(chunk, source) + _, corr_segment = self.calculate_time_delay_fft(signal1, signal2) + corr_segment += np.random.normal(0, 0.005, corr_segment.shape) + X.append(corr_segment.tolist()) + y.append(angle_deg) + + print("Обучение: Подготовка данных завершена, обучение модели...") + X = np.array(X) + y = np.array(y) + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) + model = GradientBoostingRegressor(n_estimators=600, max_depth=6, learning_rate=0.03, random_state=42) + model.fit(X_train, y_train) + y_pred = model.predict(X_test) + mse = mean_squared_error(y_test, y_pred) + print(f"Обучение: Среднеквадратичная ошибка модели на тестовых данных: {mse:.4f}") + + joblib.dump(model, MODEL_PATH) + print(f"Модель сохранена в {MODEL_PATH}") + return model + + def load_or_train_model(self, retrain: bool): + """Загрузка сохраненной модели или обучение новой""" + if not retrain and os.path.exists(MODEL_PATH): + try: + model = joblib.load(MODEL_PATH) + print(f"Модель загружена из {MODEL_PATH}") + return model + except Exception as e: + print(f"Ошибка загрузки модели: {e}. Обучение новой модели...") + return self.train_regression_model() + + def calculate_direction(self, correlation_segment: np.ndarray) -> float: + """Расчет угла направления с использованием модели регрессии""" + angle = self.model.predict([correlation_segment])[0] + angle = np.clip(angle, -90, 90) + return angle + + def run(self): + """Обработка аудио в реальном времени""" + while self.running: + try: + current_time = time.time() + if current_time - self.last_move_time >= MOVE_INTERVAL: + self.sound_source = self._generate_sequential_sound_source() + self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time)) + print(f"Тестирование: Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}") + self.last_move_time = current_time + + left, right, t1, t2 = self.capture_audio() + self.rms_left = self.calculate_rms(left) + self.rms_right = self.calculate_rms(right) + + new_sound_detected = (self.rms_left > RMS_THRESHOLD) and (self.rms_right > RMS_THRESHOLD) + + if new_sound_detected: + print(f"Тестирование: RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}") + time_delay, correlation_segment = self.calculate_time_delay_fft(left, right) + print(f"Тестирование: Time delay: {time_delay * 1000:.2f} ms, Correlation segment length: {len(correlation_segment)}") + # Сохранение корреляционного отрезка в файл (без использования Matplotlib) + np.savetxt(f"corr_segment_{current_time}.txt", correlation_segment) + angle = self.calculate_direction(correlation_segment) + print(f"Тестирование: Calculated angle: {angle:.1f}°") + self.current_angle = angle + self.last_detected_angle = angle + self.last_sound_time = current_time + self.sound_detected = True + self.show_arrow = True + + true_dx = self.sound_source.x + true_dy = self.sound_source.y + true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi + self.results.append({ + 'Time Delay (ms)': time_delay * 1000, + 'Detected Angle (°)': self.current_angle, + 'True Angle (°)': true_angle, + 'Source X': self.sound_source.x, + 'Source Y': self.sound_source.y + }) + else: + if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT: + self.sound_detected = False + self.show_arrow = True + else: + self.sound_detected = False + self.show_arrow = False + time.sleep(CHUNK / self.sample_rate) + except Exception as e: + print(f"Ошибка в run: {e}") + continue + + def get_coordinates_dataframe(self): + """Создание датафрейма с координатами микрофонов и всех позиций источника""" + data = { + 'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))], + 'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions], + 'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions], + 'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions] + } + return pd.DataFrame(data) + + def get_results_dataframe(self): + """Создание датафрейма с результатами""" + return pd.DataFrame(self.results) + +def main(): + try: + # retrain_model=True, чтобы переобучить модель, или False, чтобы загрузить сохраненную + finder = TreeRegressionDirectionFinder(MIC_DISTANCE, "my_recording1.wav", retrain_model=False) + print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate) + + thread = threading.Thread(target=finder.run) + thread.start() + + plt.switch_backend('TkAgg') + fig, ax = plt.subplots(figsize=(9, 6)) + ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2) + ax.set_ylim(-0.5, ROOM_HEIGHT) + ax.set_aspect('equal') + ax.set_title("Определение направления на источник звука (Gradient Boosting)", fontsize=12) + ax.set_xlabel("X (м)", fontsize=10) + ax.set_ylabel("Y (м)", fontsize=10) + ax.grid(True) + + ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=12, label='Микрофон 1') + ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=12, label='Микрофон 2') + source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=12, + label='Источник звука') + + arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 4 + arrow = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, + fc='r', ec='r', label='Расчетное направление') + arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3, + fc='g', ec='g', linestyle=':', label='Истинное направление') + + sound_bar = ax.axhline(y=ROOM_HEIGHT - 1, color='green', linewidth=15, visible=False) + angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=10) + + left_indicator = patches.Rectangle((finder.mic1.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') + right_indicator = patches.Rectangle((finder.mic2.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray') + ax.add_patch(left_indicator) + ax.add_patch(right_indicator) + + def update(frame): + source_plot.set_data([finder.sound_source.x], [finder.sound_source.y]) + + dx = finder.sound_source.x + dy = finder.sound_source.y + true_angle_rad = np.arctan2(dx, dy) + true_end_x = arrow_length * np.sin(true_angle_rad) + true_end_y = arrow_length * np.cos(true_angle_rad) + arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y) + + if finder.show_arrow: + angle = finder.current_angle + calc_angle_rad = np.radians(angle) + calc_end_x = arrow_length * np.sin(calc_angle_rad) + calc_end_y = arrow_length * np.cos(calc_angle_rad) + arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y) + angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°") + + if finder.sound_detected: + sound_bar.set_visible(True) + ax.set_title("Активное обнаружение звука (Gradient Boosting)", fontsize=12) + else: + sound_bar.set_visible(False) + ax.set_title("Последнее зафиксированное направление (Gradient Boosting)", fontsize=12) + else: + arrow.set_data(x=0, y=0, dx=0, dy=0) + angle_text.set_text("") + sound_bar.set_visible(False) + ax.set_title("Звук не обнаружен", fontsize=12) + + left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray') + right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray') + + return [source_plot, arrow, arrow_true, sound_bar, + left_indicator, right_indicator, angle_text] + + ani = FuncAnimation(fig, update, frames=None, interval=10, blit=True, cache_frame_data=False) + plt.legend(loc='upper left', fontsize=8) + plt.tight_layout() + plt.show() + + finder.running = False + thread.join() + + coords_df = finder.get_coordinates_dataframe() + results_df = finder.get_results_dataframe() + print("\nКоординаты микрофонов и всех позиций источника звука:") + print(coords_df.to_string(index=False)) + print("\nРезультаты вычислений:") + print(results_df.to_string(index=False)) + coords_df.to_csv('coordinates_tree.csv', index=False) + results_df.to_csv('results_tree.csv', index=False) + print("\nДанные сохранены в 'coordinates_tree.csv' и 'results_tree.csv'") + + except Exception as e: + print(f"Ошибка в main: {e}") + finder.running = False + thread.join() + +if __name__ == "__main__": + main() \ No newline at end of file