SoundLocal/MUSIC.py

412 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import matplotlib.patches as patches
import librosa
import random
import pandas as pd
import threading
import time
from dataclasses import dataclass
from scipy.signal import stft, butter, sosfiltfilt
from scipy.fft import rfft, rfftfreq
matplotlib.use('TkAgg')
# Константы
SOUND_SPEED = 343.2 # скорость звука (м/с)
MIC_DISTANCE = 0.06 # расстояние между микрофонами (м)
ROOM_WIDTH = 3.0 # ширина комнаты (м)
ROOM_HEIGHT = 2.0 # высота комнаты (м)
SAMPLE_RATE = 48000 # частота дискретизации (Гц)
CHUNK = 16384 # размер буфера
RMS_THRESHOLD = 0.05 # порог RMS для определения звука
FREQUENCY_RANGE = (500, 2000) # диапазон частот для поиска доминирующей частоты (Гц)
ANGLE_RESOLUTION = 1 # шаг угла для поиска (градусы)
SILENCE_TIMEOUT = 2.0 # время в секундах для сохранения последнего угла
PEAK_THRESHOLD = 0.5 # порог для проверки достоверности пика в MUSIC-спектре
MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с)
@dataclass
class Microphone:
"""Класс для хранения информации о микрофоне"""
x: float
y: float
@dataclass
class SoundSource:
"""Класс для хранения информации об источнике звука"""
x: float
y: float
class MusicDirectionFinder:
def __init__(self, mic_distance: float, audio_file: str):
"""Инициализация определителя направления с использованием MUSIC"""
self.mic_distance = mic_distance
self.mic1 = Microphone(x=-mic_distance / 2, y=0.0)
self.mic2 = Microphone(x=mic_distance / 2, y=0.0)
self.sound_source = self._generate_random_sound_source()
self.running = True
self.current_angle = 0.0
self.sound_detected = False
self.last_sound_time = 0
self.last_detected_angle = None
self.show_arrow = False
self.rms_left = 0.0
self.rms_right = 0.0
self.audio_data, self.sample_rate = self.load_audio(audio_file)
self.audio_index = 0
self.noise_level = 0.001 # шум
self.results = []
self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)]
self.last_move_time = time.time()
print("MusicDirectionFinder инициализирован")
def _generate_random_sound_source(self) -> SoundSource:
"""Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов с шагом 20 градусов"""
if not hasattr(self, 'angle_ranges'):
self.angle_ranges = list(range(-90, 91, 20))
self.current_range_idx = 0
self.angle_count = 0
start_angle = self.angle_ranges[self.current_range_idx]
end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90
angle_deg = random.uniform(start_angle, end_angle)
angle_rad = np.radians(angle_deg)
distance = 1.5 # расстояние
source_x = distance * np.sin(angle_rad)
source_y = distance * np.cos(angle_rad)
if abs(source_x) > ROOM_WIDTH / 2:
scale = (ROOM_WIDTH / 2) / abs(source_x)
source_x *= scale
source_y *= scale
if source_y > ROOM_HEIGHT:
scale = ROOM_HEIGHT / source_y
source_x *= scale
source_y *= scale
if source_y < 0:
source_y = 0.0
source_x = 0.0
print(
f"Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}, расстояние={distance:.2f}, повторение {self.angle_count + 1}/10")
self.angle_count += 1
if self.angle_count >= 10:
self.angle_count = 0
self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges)
return SoundSource(x=source_x, y=source_y)
def load_audio(self, filename: str) -> tuple:
"""Загрузка аудиофайла"""
try:
audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True)
print(f"Аудиофайл загружен: {filename}, длина: {len(audio_data)}")
return audio_data, sample_rate
except Exception as e:
raise ValueError(f"Ошибка загрузки аудиофайла: {e}")
def calculate_distances(self) -> tuple:
"""Расчет расстояний от источника звука до микрофонов"""
l1 = np.sqrt((self.sound_source.x - self.mic1.x) ** 2 +
(self.sound_source.y - self.mic1.y) ** 2)
l2 = np.sqrt((self.sound_source.x - self.mic2.x) ** 2 +
(self.sound_source.y - self.mic2.y) ** 2)
return l1, l2
def process_signals_with_delay(self, signal: np.ndarray) -> tuple:
"""Обработка сигналов с учетом временного сдвига и шума"""
l1, l2 = self.calculate_distances()
t1 = l1 / SOUND_SPEED
t2 = l2 / SOUND_SPEED
log = int(self.sample_rate * (t2 - t1))
if len(signal) < CHUNK:
signal = np.pad(signal, (0, CHUNK - len(signal)), mode='constant')
elif len(signal) > CHUNK:
signal = signal[:CHUNK]
if log >= 0:
S1 = signal[abs(log):] if log != 0 else signal.copy()
S2 = signal[:-abs(log)] if log != 0 else signal.copy()
else:
S1 = signal[:-abs(log)] if log != 0 else signal.copy()
S2 = signal[abs(log):]
min_length = min(len(S1), len(S2))
S1 = S1[:min_length]
S2 = S2[:min_length]
if len(S1) < CHUNK:
S1 = np.pad(S1, (0, CHUNK - len(S1)), mode='constant')
S2 = np.pad(S2, (0, CHUNK - len(S2)), mode='constant')
noise1 = np.random.normal(0, self.noise_level, size=S1.shape)
noise2 = np.random.normal(0, self.noise_level, size=S2.shape)
S1 = S1 + noise1
S2 = S2 + noise2
return S1, S2, t1, t2
def capture_audio(self):
"""Эмуляция захвата аудиоданных из файла"""
if self.audio_index + CHUNK >= len(self.audio_data):
self.audio_index = 0
chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK]
self.audio_index += CHUNK
signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk)
print(f"capture_audio: длина signal1={len(signal1)}, signal2={len(signal2)}")
return signal1, signal2, t1, t2
def calculate_rms(self, signal: np.ndarray) -> float:
"""Вычисление RMS сигнала"""
return np.sqrt(np.mean(signal ** 2))
def find_dominant_frequency(self, signal: np.ndarray) -> float:
"""Определение доминирующей частоты сигнала через FFT"""
if len(signal) != CHUNK:
if len(signal) < CHUNK:
signal = np.pad(signal, (0, CHUNK - len(signal)), mode='constant')
else:
signal = signal[:CHUNK]
fft_signal = np.abs(rfft(signal))
freqs = rfftfreq(CHUNK, 1 / SAMPLE_RATE)
if len(fft_signal) != len(freqs):
raise ValueError(f"Несоответствие размеров fft_signal ({len(fft_signal)}) и freqs ({len(freqs)})")
mask = (freqs >= FREQUENCY_RANGE[0]) & (freqs <= FREQUENCY_RANGE[1])
fft_signal = fft_signal[mask]
freqs = freqs[mask]
if len(fft_signal) == 0:
print("find_dominant_frequency: нет частот в диапазоне, возвращаем FREQUENCY_RANGE[0]")
return FREQUENCY_RANGE[0]
dominant_freq = freqs[np.argmax(fft_signal)]
print(f"find_dominant_frequency: доминирующая частота = {dominant_freq:.1f} Hz")
return dominant_freq
def bandpass_filter(self, signal: np.ndarray, freq: float) -> np.ndarray:
"""Применение полосового фильтра к сигналу"""
bandwidth = 100
lowcut = max(FREQUENCY_RANGE[0], freq - bandwidth / 2)
highcut = min(FREQUENCY_RANGE[1], freq + bandwidth / 2)
sos = butter(4, [lowcut, highcut], btype='band', fs=SAMPLE_RATE, output='sos')
filtered_signal = sosfiltfilt(sos, signal)
return filtered_signal
def steering_vector(self, angle: float, freq: float) -> np.ndarray:
"""Вычисление steering vector для заданного угла и частоты"""
tau = (self.mic_distance * np.sin(np.radians(angle))) / SOUND_SPEED
phase_shift = 2 * np.pi * freq * tau
return np.array([1, np.exp(1j * phase_shift)]) # Исправлено: -1j на 1j
def music_spectrum(self, signal1: np.ndarray, signal2: np.ndarray, freq: float) -> tuple:
"""Вычисление спектра MUSIC для оценки DoA"""
f, t, Zxx1 = stft(signal1, fs=SAMPLE_RATE, nperseg=CHUNK, noverlap=int(CHUNK * 3 / 4))
_, _, Zxx2 = stft(signal2, fs=SAMPLE_RATE, nperseg=CHUNK, noverlap=int(CHUNK * 3 / 4))
freq_idx = np.argmin(np.abs(f - freq))
X = np.vstack((Zxx1[freq_idx, :], Zxx2[freq_idx, :]))
R = np.dot(X, X.conj().T) / X.shape[1]
R += np.eye(R.shape[0]) * 1e-6
eigenvalues, eigenvectors = np.linalg.eigh(R)
idx = np.argsort(eigenvalues)[::-1]
eigenvectors = eigenvectors[:, idx]
En = eigenvectors[:, -1:]
angles = np.arange(-90, 91, ANGLE_RESOLUTION)
music_spectrum = np.zeros(len(angles))
for i, angle in enumerate(angles):
a = self.steering_vector(angle, freq)
music_spectrum[i] = 1 / np.abs(np.dot(a.conj().T, np.dot(En @ En.conj().T, a)))
music_spectrum = music_spectrum / np.max(music_spectrum)
max_idx = np.argmax(music_spectrum)
print(f"music_spectrum: максимум спектра при угле {angles[max_idx]:.1f}°, значение {music_spectrum[max_idx]:.4f}")
if music_spectrum[max_idx] < PEAK_THRESHOLD:
print("music_spectrum: пик ниже порога, возвращаем 0.0")
return 0.0, angles, music_spectrum
estimated_angle = angles[max_idx]
print(f"music_spectrum: расчётный угол = {estimated_angle:.1f}°")
return estimated_angle, angles, music_spectrum
def run(self):
"""Обработка аудио в реальном времени"""
print("Запуск потока run")
while self.running:
try:
current_time = time.time()
if current_time - self.last_move_time >= MOVE_INTERVAL:
self.sound_source = self._generate_random_sound_source()
self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time))
print(f"Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}")
self.last_move_time = current_time
left, right, t1, t2 = self.capture_audio()
self.rms_left = self.calculate_rms(left)
self.rms_right = self.calculate_rms(right)
signal_diff = np.mean(np.abs(left - right))
new_sound_detected = (self.rms_left > RMS_THRESHOLD) or (self.rms_right > RMS_THRESHOLD)
if new_sound_detected:
print(f"RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}")
print(f"Signal difference: {signal_diff:.4f}")
freq = self.find_dominant_frequency(left)
left_filtered = self.bandpass_filter(left, freq)
right_filtered = self.bandpass_filter(right, freq)
angle, _, _ = self.music_spectrum(left_filtered, right_filtered, freq)
self.current_angle = angle
self.last_detected_angle = angle
self.last_sound_time = current_time
self.sound_detected = True
self.show_arrow = True
true_dx = self.sound_source.x
true_dy = self.sound_source.y
true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi
self.results.append({
't1 (ms)': t1 * 1000,
't2 (ms)': t2 * 1000,
'Time Delay (ms)': (t2 - t1) * 1000,
'Detected Angle (°)': angle,
'True Angle (°)': true_angle,
'Source X': self.sound_source.x,
'Source Y': self.sound_source.y
})
else:
if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT:
self.current_angle = self.last_detected_angle
self.sound_detected = False
self.show_arrow = True
else:
self.sound_detected = False
self.show_arrow = False
time.sleep(CHUNK / self.sample_rate)
except Exception as e:
print(f"Ошибка в run: {e}")
continue
print("Поток run завершён")
def get_coordinates_dataframe(self):
"""Создание датафрейма с координатами микрофонов и всех позиций источника"""
data = {
'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))],
'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions],
'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions],
'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions]
}
return pd.DataFrame(data)
def get_results_dataframe(self):
"""Создание датафрейма с результатами"""
return pd.DataFrame(self.results)
def main():
try:
finder = MusicDirectionFinder(MIC_DISTANCE, "my_recording1.wav")
print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate)
thread = threading.Thread(target=finder.run)
thread.start()
print("Поток run запущен")
fig, ax = plt.subplots(figsize=(10, 8))
ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2)
ax.set_ylim(-0.5, ROOM_HEIGHT)
ax.set_title("Определение направления на источник звука (MUSIC)")
ax.set_xlabel("X (м)")
ax.set_ylabel("Y (м)")
ax.grid(True)
print("График инициализирован")
ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=10, label='Микрофон 1')
ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=10, label='Микрофон 2')
source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=10,
label='Источник звука')
arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 3
arrow = ax.arrow(0, 0, 0, 0, head_width=0.05, head_length=0.1, fc='r', ec='r',
label='Расчетное направление')
arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.05, head_length=0.1, fc='g', ec='g',
linestyle=':', label='Истинное направление')
sound_bar = ax.axhline(y=ROOM_HEIGHT - 0.2, color='green', linewidth=20, visible=False)
angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=12)
left_indicator = patches.Rectangle((finder.mic1.x - 0.02, -0.1), 0.04, 0.05, facecolor='gray')
right_indicator = patches.Rectangle((finder.mic2.x - 0.02, -0.1), 0.04, 0.05, facecolor='gray')
ax.add_patch(left_indicator)
ax.add_patch(right_indicator)
def update(frame):
try:
source_plot.set_data([finder.sound_source.x], [finder.sound_source.y])
dx = finder.sound_source.x
dy = finder.sound_source.y
true_angle_rad = np.arctan2(dx, dy)
true_end_x = arrow_length * np.sin(true_angle_rad)
true_end_y = arrow_length * np.cos(true_angle_rad)
arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y)
if finder.show_arrow:
angle = finder.current_angle
calc_angle_rad = np.radians(angle)
calc_end_x = arrow_length * np.sin(calc_angle_rad)
calc_end_y = arrow_length * np.cos(calc_angle_rad)
arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y)
angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°")
if finder.sound_detected:
sound_bar.set_visible(True)
ax.set_title("Активное обнаружение звука (MUSIC)")
else:
sound_bar.set_visible(False)
ax.set_title("Последнее зафиксированное направление (MUSIC)")
else:
arrow.set_data(x=0, y=0, dx=0, dy=0)
angle_text.set_text("")
sound_bar.set_visible(False)
ax.set_title("Звук не обнаружен")
left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray')
right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray')
return [source_plot, arrow, arrow_true, sound_bar, left_indicator, right_indicator, angle_text]
except Exception as e:
print(f"Ошибка в update: {e}")
return []
ani = FuncAnimation(fig, update, frames=None, interval=100, blit=True, cache_frame_data=False)
print("FuncAnimation инициализирован")
plt.legend(loc='upper right')
print("Вызов plt.show()")
plt.show()
print("plt.show() выполнен")
finder.running = False
thread.join()
print("Поток run остановлен")
coords_df = finder.get_coordinates_dataframe()
results_df = finder.get_results_dataframe()
print("\nКоординаты микрофонов и всех позиций источника звука:")
print(coords_df.to_string(index=False))
print("\nРезультаты вычислений:")
print(results_df.to_string(index=False))
coords_df.to_csv('coordinates_music.csv', index=False)
results_df.to_csv('results_music.csv', index=False)
print("\nДанные сохранены в 'coordinates_music.csv' и 'results_music.csv'")
except Exception as e:
print(f"Ошибка в main: {e}")
finder.running = False
thread.join()
if __name__ == "__main__":
main()