SoundLocal/Gradient Boosting.py

470 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import matplotlib.patches as patches
import librosa
import random
import pandas as pd
import threading
import time
import os
from dataclasses import dataclass
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from scipy.interpolate import interp1d
from scipy.signal import windows
import joblib
# Константы
SOUND_SPEED = 343.2 # скорость звука (м/с)
MIC_DISTANCE = 0.06 # расстояние между микрофонами (м)
ROOM_WIDTH = 3.0 # ширина комнаты (м)
ROOM_HEIGHT = 2.0 # высота комнаты (м)
SAMPLE_RATE = 48000 # частота дискретизации (Гц)
CHUNK = 32768 # размер буфера
RMS_THRESHOLD = 0.01 # порог RMS для определения звука
SILENCE_TIMEOUT = 0.5 # время в секундах для сохранения последнего угла
MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с)
CORR_WINDOW_SIZE = 15 # Размер окна корреляции (±15 значений, всего 31)
MODEL_PATH = "gradient_boosting_model2.pkl" # Путь для сохранения/загрузки модели
@dataclass
class Microphone:
"""Класс для хранения информации о микрофоне"""
x: float
y: float
@dataclass
class SoundSource:
"""Класс для хранения информации об источнике звука"""
x: float
y: float
class TreeRegressionDirectionFinder:
def __init__(self, mic_distance: float, audio_file: str, retrain_model: bool = False):
"""Инициализация определителя направления с использованием регрессии градиентным бустингом"""
self.mic_distance = mic_distance
self.mic1 = Microphone(x=-mic_distance / 2, y=0.0)
self.mic2 = Microphone(x=mic_distance / 2, y=0.0)
self.angles = np.arange(-90, 91, 10)
self.current_angle_idx = 0
self.current_repetition = 0
self.sound_source = self._generate_sequential_sound_source()
self.running = True
self.current_angle = 0.0
self.sound_detected = False
self.last_sound_time = 0
self.last_detected_angle = None
self.show_arrow = False
self.rms_left = 0.0
self.rms_right = 0.0
self.audio_data, self.sample_rate = self.load_audio(audio_file)
self.audio_index = 0
self.noise_level = 0.001
self.results = []
self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)]
self.last_move_time = time.time()
self.max_physical_delay = self.mic_distance / SOUND_SPEED
self.model = self.load_or_train_model(retrain_model)
print("TreeRegressionDirectionFinder инициализирован")
def _generate_random_sound_source_for_training(self) -> SoundSource:
"""Генерация случайного положения источника звука для обучения"""
x = random.uniform(-ROOM_WIDTH / 2, ROOM_WIDTH / 2)
y = random.uniform(0, ROOM_HEIGHT)
return SoundSource(x=x, y=y)
def _generate_sequential_sound_source(self) -> SoundSource:
"""Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов с шагом 20 градусов"""
if not hasattr(self, 'angle_ranges'):
self.angle_ranges = list(range(-90, 91, 20))
self.current_range_idx = 0
self.angle_count = 0
start_angle = self.angle_ranges[self.current_range_idx]
end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90
angle_deg = random.uniform(start_angle, end_angle)
angle_rad = np.radians(angle_deg)
distance = 1.5 # расстояние
source_x = distance * np.sin(angle_rad)
source_y = distance * np.cos(angle_rad)
if abs(source_x) > ROOM_WIDTH / 2:
scale = (ROOM_WIDTH / 2) / abs(source_x)
source_x *= scale
source_y *= scale
if source_y > ROOM_HEIGHT:
scale = ROOM_HEIGHT / source_y
source_x *= scale
source_y *= scale
if source_y < 0:
source_y = 0.0
source_x = 0.0
print(
f"Тестирование: Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}, расстояние={distance:.2f}, повторение {self.angle_count + 1}/10")
self.angle_count += 1
if self.angle_count >= 10: # кол-во в зоне
self.angle_count = 0
self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges)
return SoundSource(x=source_x, y=source_y)
def load_audio(self, filename: str) -> tuple:
"""Загрузка аудиофайла с нормализацией"""
try:
audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True)
rms = np.sqrt(np.mean(audio_data ** 2))
if rms > 0:
audio_data = audio_data / rms * 0.1
print(f"RMS аудиозаписи: {rms:.4f}, после нормализации: {np.sqrt(np.mean(audio_data ** 2)):.4f}")
return audio_data, sample_rate
except Exception as e:
raise ValueError(f"Ошибка загрузки аудиофайла: {e}")
def get_audio_chunk(self) -> np.ndarray:
"""Получение случайного фрагмента аудио размером CHUNK с аугментацией"""
max_index = max(0, len(self.audio_data) - CHUNK)
start_idx = random.randint(0, max_index)
chunk = self.audio_data[start_idx:start_idx + CHUNK]
if len(chunk) < CHUNK:
chunk = np.pad(chunk, (0, CHUNK - len(chunk)), mode='constant')
scale = np.random.uniform(0.8, 1.2)
chunk = chunk * scale
chunk += np.random.normal(0, 0.0005, chunk.shape)
return chunk
def calculate_distances(self, source: SoundSource) -> tuple:
"""Расчет расстояний от источника звука до микрофонов"""
l1 = np.sqrt((source.x - self.mic1.x) ** 2 + (source.y - self.mic1.y) ** 2)
l2 = np.sqrt((source.x - self.mic2.x) ** 2 + (source.y - self.mic2.y) ** 2)
return l1, l2
def process_signals_with_delay(self, signal: np.ndarray, source: SoundSource) -> tuple:
"""Обработка сигналов с учетом временного сдвига и шума"""
l1, l2 = self.calculate_distances(source)
t1 = l1 / SOUND_SPEED
t2 = l2 / SOUND_SPEED
time_points = np.arange(len(signal)) / self.sample_rate
interp_func = interp1d(time_points, signal, kind='linear', fill_value="extrapolate")
S1 = interp_func(time_points - t1)
S2 = interp_func(time_points - t2)
min_length = min(len(S1), len(S2))
S1, S2 = S1[:min_length], S2[:min_length]
noise1 = np.random.normal(0, self.noise_level, S1.shape)
noise2 = np.random.normal(0, self.noise_level, S2.shape)
S1 += noise1
S2 += noise2
return S1, S2, t1, t2
def capture_audio(self):
"""Эмуляция захвата аудиоданных из файла"""
if self.audio_index + CHUNK >= len(self.audio_data):
self.audio_index = 0
chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK]
self.audio_index += CHUNK
signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk, self.sound_source)
return signal1, signal2, t1, t2
def calculate_rms(self, signal: np.ndarray) -> float:
"""Вычисление RMS сигнала"""
return np.sqrt(np.mean(signal ** 2))
def calculate_time_delay_fft(self, signal1: np.ndarray, signal2: np.ndarray) -> tuple:
"""Расчет временной задержки и отрезка корреляции через GCC-PHAT"""
window = windows.hann(len(signal1))
signal1 = (signal1 - np.mean(signal1)) / (np.std(signal1) + 1e-10) * window
signal2 = (signal2 - np.mean(signal2)) / (np.std(signal2) + 1e-10) * window
fft_signal1 = np.fft.rfft(signal1)
fft_signal2 = np.fft.rfft(signal2)
cross_spectrum = fft_signal1 * np.conj(fft_signal2)
cross_spectrum = cross_spectrum / (np.abs(cross_spectrum) + 1e-10)
correlation = np.fft.irfft(cross_spectrum, n=len(signal1) * 2)
correlation = np.roll(correlation, len(correlation) // 2)
max_delay_samples = int(self.max_physical_delay * self.sample_rate * 1.5)
middle_point = len(correlation) // 2
start_idx = middle_point - max_delay_samples
end_idx = middle_point + max_delay_samples
max_correlation_idx = start_idx + np.argmax(correlation[start_idx:end_idx])
corr_start = max(start_idx, max_correlation_idx - CORR_WINDOW_SIZE)
corr_end = min(end_idx, max_correlation_idx + CORR_WINDOW_SIZE + 1)
correlation_segment = correlation[corr_start:corr_end]
target_length = 2 * CORR_WINDOW_SIZE + 1
if len(correlation_segment) < target_length:
correlation_segment = np.pad(correlation_segment, (0, target_length - len(correlation_segment)), mode='constant')
elif len(correlation_segment) > target_length:
correlation_segment = correlation_segment[:target_length]
correlation_segment = correlation_segment / (np.max(np.abs(correlation)) + 1e-10)
if correlation[max_correlation_idx] < 0.05 * np.max(np.abs(correlation)):
time_delay = 0.0
else:
if max_correlation_idx > start_idx + 1 and max_correlation_idx < end_idx - 1:
y0 = correlation[max_correlation_idx - 1]
y1 = correlation[max_correlation_idx]
y2 = correlation[max_correlation_idx + 1]
denom = 2 * (y0 - 2 * y1 + y2)
if denom != 0:
delta = (y0 - y2) / denom
max_correlation_idx += delta
delay_samples = max_correlation_idx - middle_point
time_delay = delay_samples / self.sample_rate
return time_delay, correlation_segment
def train_regression_model(self):
"""Обучение модели регрессии на отрезке корреляции"""
n_samples = 5000
n_additional_samples = 20000
X = []
y = []
print("Обучение: Генерация случайных тренировочных выборок...")
for _ in range(n_samples):
source = self._generate_random_sound_source_for_training()
chunk = self.get_audio_chunk()
signal1, signal2, _, _ = self.process_signals_with_delay(chunk, source)
_, corr_segment = self.calculate_time_delay_fft(signal1, signal2)
corr_segment += np.random.normal(0, 0.005, corr_segment.shape)
true_angle = np.arctan2(source.x, source.y) * 180 / np.pi
X.append(corr_segment.tolist())
y.append(true_angle)
print("Обучение: Генерация дополнительных выборок с шагом 1°...")
n_samples_per_angle = n_additional_samples // 82
for angle_deg in np.concatenate([np.arange(-90, -49, 1), np.arange(50, 91, 1)]):
angle_rad = np.radians(angle_deg)
distance = np.random.uniform(0.5, 2.0)
source_x = distance * np.sin(angle_rad)
source_y = distance * np.cos(angle_rad)
source = SoundSource(x=source_x, y=source_y)
for _ in range(n_samples_per_angle):
chunk = self.get_audio_chunk()
signal1, signal2, _, _ = self.process_signals_with_delay(chunk, source)
_, corr_segment = self.calculate_time_delay_fft(signal1, signal2)
corr_segment += np.random.normal(0, 0.005, corr_segment.shape)
X.append(corr_segment.tolist())
y.append(angle_deg)
print("Обучение: Генерация дополнительных выборок с шагом 10°...")
n_samples_per_angle_10 = 2000
for angle_deg in np.arange(-90, 91, 10):
angle_rad = np.radians(angle_deg)
distance = np.random.uniform(0.5, 2.0)
source_x = distance * np.sin(angle_rad)
source_y = distance * np.cos(angle_rad)
source = SoundSource(x=source_x, y=source_y)
for _ in range(n_samples_per_angle_10):
chunk = self.get_audio_chunk()
signal1, signal2, _, _ = self.process_signals_with_delay(chunk, source)
_, corr_segment = self.calculate_time_delay_fft(signal1, signal2)
corr_segment += np.random.normal(0, 0.005, corr_segment.shape)
X.append(corr_segment.tolist())
y.append(angle_deg)
print("Обучение: Подготовка данных завершена, обучение модели...")
X = np.array(X)
y = np.array(y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = GradientBoostingRegressor(n_estimators=600, max_depth=6, learning_rate=0.03, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
print(f"Обучение: Среднеквадратичная ошибка модели на тестовых данных: {mse:.4f}")
joblib.dump(model, MODEL_PATH)
print(f"Модель сохранена в {MODEL_PATH}")
return model
def load_or_train_model(self, retrain: bool):
"""Загрузка сохраненной модели или обучение новой"""
if not retrain and os.path.exists(MODEL_PATH):
try:
model = joblib.load(MODEL_PATH)
print(f"Модель загружена из {MODEL_PATH}")
return model
except Exception as e:
print(f"Ошибка загрузки модели: {e}. Обучение новой модели...")
return self.train_regression_model()
def calculate_direction(self, correlation_segment: np.ndarray) -> float:
"""Расчет угла направления с использованием модели регрессии"""
angle = self.model.predict([correlation_segment])[0]
angle = np.clip(angle, -90, 90)
return angle
def run(self):
"""Обработка аудио в реальном времени"""
while self.running:
try:
current_time = time.time()
if current_time - self.last_move_time >= MOVE_INTERVAL:
self.sound_source = self._generate_sequential_sound_source()
self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time))
print(f"Тестирование: Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}")
self.last_move_time = current_time
left, right, t1, t2 = self.capture_audio()
self.rms_left = self.calculate_rms(left)
self.rms_right = self.calculate_rms(right)
new_sound_detected = (self.rms_left > RMS_THRESHOLD) and (self.rms_right > RMS_THRESHOLD)
if new_sound_detected:
print(f"Тестирование: RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}")
time_delay, correlation_segment = self.calculate_time_delay_fft(left, right)
print(f"Тестирование: Time delay: {time_delay * 1000:.2f} ms, Correlation segment length: {len(correlation_segment)}")
# Сохранение корреляционного отрезка в файл (без использования Matplotlib)
np.savetxt(f"corr_segment_{current_time}.txt", correlation_segment)
angle = self.calculate_direction(correlation_segment)
print(f"Тестирование: Calculated angle: {angle:.1f}°")
self.current_angle = angle
self.last_detected_angle = angle
self.last_sound_time = current_time
self.sound_detected = True
self.show_arrow = True
true_dx = self.sound_source.x
true_dy = self.sound_source.y
true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi
self.results.append({
'Time Delay (ms)': time_delay * 1000,
'Detected Angle (°)': self.current_angle,
'True Angle (°)': true_angle,
'Source X': self.sound_source.x,
'Source Y': self.sound_source.y
})
else:
if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT:
self.sound_detected = False
self.show_arrow = True
else:
self.sound_detected = False
self.show_arrow = False
time.sleep(CHUNK / self.sample_rate)
except Exception as e:
print(f"Ошибка в run: {e}")
continue
def get_coordinates_dataframe(self):
"""Создание датафрейма с координатами микрофонов и всех позиций источника"""
data = {
'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))],
'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions],
'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions],
'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions]
}
return pd.DataFrame(data)
def get_results_dataframe(self):
"""Создание датафрейма с результатами"""
return pd.DataFrame(self.results)
def main():
try:
# retrain_model=True, чтобы переобучить модель, или False, чтобы загрузить сохраненную
finder = TreeRegressionDirectionFinder(MIC_DISTANCE, "my_recording1.wav", retrain_model=False)
print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate)
thread = threading.Thread(target=finder.run)
thread.start()
plt.switch_backend('TkAgg')
fig, ax = plt.subplots(figsize=(9, 6))
ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2)
ax.set_ylim(-0.5, ROOM_HEIGHT)
ax.set_aspect('equal')
ax.set_title("Определение направления на источник звука (Gradient Boosting)", fontsize=12)
ax.set_xlabel("X (м)", fontsize=10)
ax.set_ylabel("Y (м)", fontsize=10)
ax.grid(True)
ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=12, label='Микрофон 1')
ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=12, label='Микрофон 2')
source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=12,
label='Источник звука')
arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 4
arrow = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3,
fc='r', ec='r', label='Расчетное направление')
arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.2, head_length=0.3,
fc='g', ec='g', linestyle=':', label='Истинное направление')
sound_bar = ax.axhline(y=ROOM_HEIGHT - 1, color='green', linewidth=15, visible=False)
angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=10)
left_indicator = patches.Rectangle((finder.mic1.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray')
right_indicator = patches.Rectangle((finder.mic2.x - 0.03, -0.15), 0.06, 0.08, facecolor='gray')
ax.add_patch(left_indicator)
ax.add_patch(right_indicator)
def update(frame):
source_plot.set_data([finder.sound_source.x], [finder.sound_source.y])
dx = finder.sound_source.x
dy = finder.sound_source.y
true_angle_rad = np.arctan2(dx, dy)
true_end_x = arrow_length * np.sin(true_angle_rad)
true_end_y = arrow_length * np.cos(true_angle_rad)
arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y)
if finder.show_arrow:
angle = finder.current_angle
calc_angle_rad = np.radians(angle)
calc_end_x = arrow_length * np.sin(calc_angle_rad)
calc_end_y = arrow_length * np.cos(calc_angle_rad)
arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y)
angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°")
if finder.sound_detected:
sound_bar.set_visible(True)
ax.set_title("Активное обнаружение звука (Gradient Boosting)", fontsize=12)
else:
sound_bar.set_visible(False)
ax.set_title("Последнее зафиксированное направление (Gradient Boosting)", fontsize=12)
else:
arrow.set_data(x=0, y=0, dx=0, dy=0)
angle_text.set_text("")
sound_bar.set_visible(False)
ax.set_title("Звук не обнаружен", fontsize=12)
left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray')
right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray')
return [source_plot, arrow, arrow_true, sound_bar,
left_indicator, right_indicator, angle_text]
ani = FuncAnimation(fig, update, frames=None, interval=10, blit=True, cache_frame_data=False)
plt.legend(loc='upper left', fontsize=8)
plt.tight_layout()
plt.show()
finder.running = False
thread.join()
coords_df = finder.get_coordinates_dataframe()
results_df = finder.get_results_dataframe()
print("\nКоординаты микрофонов и всех позиций источника звука:")
print(coords_df.to_string(index=False))
print("\nРезультаты вычислений:")
print(results_df.to_string(index=False))
coords_df.to_csv('coordinates_tree.csv', index=False)
results_df.to_csv('results_tree.csv', index=False)
print("\nДанные сохранены в 'coordinates_tree.csv' и 'results_tree.csv'")
except Exception as e:
print(f"Ошибка в main: {e}")
finder.running = False
thread.join()
if __name__ == "__main__":
main()