Загрузить файлы в «/»
This commit is contained in:
parent
29fac5e7dd
commit
0944c0d061
412
MUSIC.py
Normal file
412
MUSIC.py
Normal file
@ -0,0 +1,412 @@
|
||||
import numpy as np
|
||||
import matplotlib
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.animation import FuncAnimation
|
||||
import matplotlib.patches as patches
|
||||
import librosa
|
||||
import random
|
||||
import pandas as pd
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from scipy.signal import stft, butter, sosfiltfilt
|
||||
from scipy.fft import rfft, rfftfreq
|
||||
|
||||
matplotlib.use('TkAgg')
|
||||
|
||||
# Константы
|
||||
SOUND_SPEED = 343.2 # скорость звука (м/с)
|
||||
MIC_DISTANCE = 0.06 # расстояние между микрофонами (м)
|
||||
ROOM_WIDTH = 3.0 # ширина комнаты (м)
|
||||
ROOM_HEIGHT = 2.0 # высота комнаты (м)
|
||||
SAMPLE_RATE = 48000 # частота дискретизации (Гц)
|
||||
CHUNK = 16384 # размер буфера
|
||||
RMS_THRESHOLD = 0.05 # порог RMS для определения звука
|
||||
FREQUENCY_RANGE = (500, 2000) # диапазон частот для поиска доминирующей частоты (Гц)
|
||||
ANGLE_RESOLUTION = 1 # шаг угла для поиска (градусы)
|
||||
SILENCE_TIMEOUT = 2.0 # время в секундах для сохранения последнего угла
|
||||
PEAK_THRESHOLD = 0.5 # порог для проверки достоверности пика в MUSIC-спектре
|
||||
MOVE_INTERVAL = 0.5 # интервал перемещения источника звука (с)
|
||||
|
||||
@dataclass
|
||||
class Microphone:
|
||||
"""Класс для хранения информации о микрофоне"""
|
||||
x: float
|
||||
y: float
|
||||
|
||||
@dataclass
|
||||
class SoundSource:
|
||||
"""Класс для хранения информации об источнике звука"""
|
||||
x: float
|
||||
y: float
|
||||
|
||||
class MusicDirectionFinder:
|
||||
def __init__(self, mic_distance: float, audio_file: str):
|
||||
"""Инициализация определителя направления с использованием MUSIC"""
|
||||
self.mic_distance = mic_distance
|
||||
self.mic1 = Microphone(x=-mic_distance / 2, y=0.0)
|
||||
self.mic2 = Microphone(x=mic_distance / 2, y=0.0)
|
||||
self.sound_source = self._generate_random_sound_source()
|
||||
self.running = True
|
||||
self.current_angle = 0.0
|
||||
self.sound_detected = False
|
||||
self.last_sound_time = 0
|
||||
self.last_detected_angle = None
|
||||
self.show_arrow = False
|
||||
self.rms_left = 0.0
|
||||
self.rms_right = 0.0
|
||||
self.audio_data, self.sample_rate = self.load_audio(audio_file)
|
||||
self.audio_index = 0
|
||||
self.noise_level = 0.001 # шум
|
||||
self.results = []
|
||||
self.source_positions = [(self.sound_source.x, self.sound_source.y, 0.0)]
|
||||
self.last_move_time = time.time()
|
||||
print("MusicDirectionFinder инициализирован")
|
||||
|
||||
def _generate_random_sound_source(self) -> SoundSource:
|
||||
"""Генерация положения источника звука с последовательным проходом углов от -90 до 90 градусов с шагом 20 градусов"""
|
||||
if not hasattr(self, 'angle_ranges'):
|
||||
self.angle_ranges = list(range(-90, 91, 20))
|
||||
self.current_range_idx = 0
|
||||
self.angle_count = 0
|
||||
|
||||
start_angle = self.angle_ranges[self.current_range_idx]
|
||||
end_angle = start_angle + 20 if self.current_range_idx < len(self.angle_ranges) - 1 else 90
|
||||
|
||||
angle_deg = random.uniform(start_angle, end_angle)
|
||||
angle_rad = np.radians(angle_deg)
|
||||
|
||||
distance = 1.5 # расстояние
|
||||
source_x = distance * np.sin(angle_rad)
|
||||
source_y = distance * np.cos(angle_rad)
|
||||
|
||||
if abs(source_x) > ROOM_WIDTH / 2:
|
||||
scale = (ROOM_WIDTH / 2) / abs(source_x)
|
||||
source_x *= scale
|
||||
source_y *= scale
|
||||
if source_y > ROOM_HEIGHT:
|
||||
scale = ROOM_HEIGHT / source_y
|
||||
source_x *= scale
|
||||
source_y *= scale
|
||||
if source_y < 0:
|
||||
source_y = 0.0
|
||||
source_x = 0.0
|
||||
|
||||
print(
|
||||
f"Генерация новой позиции: угол={angle_deg:.2f}°, x={source_x:.2f}, y={source_y:.2f}, расстояние={distance:.2f}, повторение {self.angle_count + 1}/10")
|
||||
|
||||
self.angle_count += 1
|
||||
if self.angle_count >= 10:
|
||||
self.angle_count = 0
|
||||
self.current_range_idx = (self.current_range_idx + 1) % len(self.angle_ranges)
|
||||
|
||||
return SoundSource(x=source_x, y=source_y)
|
||||
|
||||
def load_audio(self, filename: str) -> tuple:
|
||||
"""Загрузка аудиофайла"""
|
||||
try:
|
||||
audio_data, sample_rate = librosa.load(filename, sr=SAMPLE_RATE, mono=True)
|
||||
print(f"Аудиофайл загружен: {filename}, длина: {len(audio_data)}")
|
||||
return audio_data, sample_rate
|
||||
except Exception as e:
|
||||
raise ValueError(f"Ошибка загрузки аудиофайла: {e}")
|
||||
|
||||
def calculate_distances(self) -> tuple:
|
||||
"""Расчет расстояний от источника звука до микрофонов"""
|
||||
l1 = np.sqrt((self.sound_source.x - self.mic1.x) ** 2 +
|
||||
(self.sound_source.y - self.mic1.y) ** 2)
|
||||
l2 = np.sqrt((self.sound_source.x - self.mic2.x) ** 2 +
|
||||
(self.sound_source.y - self.mic2.y) ** 2)
|
||||
return l1, l2
|
||||
|
||||
def process_signals_with_delay(self, signal: np.ndarray) -> tuple:
|
||||
"""Обработка сигналов с учетом временного сдвига и шума"""
|
||||
l1, l2 = self.calculate_distances()
|
||||
t1 = l1 / SOUND_SPEED
|
||||
t2 = l2 / SOUND_SPEED
|
||||
log = int(self.sample_rate * (t2 - t1))
|
||||
|
||||
if len(signal) < CHUNK:
|
||||
signal = np.pad(signal, (0, CHUNK - len(signal)), mode='constant')
|
||||
elif len(signal) > CHUNK:
|
||||
signal = signal[:CHUNK]
|
||||
|
||||
if log >= 0:
|
||||
S1 = signal[abs(log):] if log != 0 else signal.copy()
|
||||
S2 = signal[:-abs(log)] if log != 0 else signal.copy()
|
||||
else:
|
||||
S1 = signal[:-abs(log)] if log != 0 else signal.copy()
|
||||
S2 = signal[abs(log):]
|
||||
|
||||
min_length = min(len(S1), len(S2))
|
||||
S1 = S1[:min_length]
|
||||
S2 = S2[:min_length]
|
||||
|
||||
if len(S1) < CHUNK:
|
||||
S1 = np.pad(S1, (0, CHUNK - len(S1)), mode='constant')
|
||||
S2 = np.pad(S2, (0, CHUNK - len(S2)), mode='constant')
|
||||
|
||||
noise1 = np.random.normal(0, self.noise_level, size=S1.shape)
|
||||
noise2 = np.random.normal(0, self.noise_level, size=S2.shape)
|
||||
S1 = S1 + noise1
|
||||
S2 = S2 + noise2
|
||||
|
||||
return S1, S2, t1, t2
|
||||
|
||||
def capture_audio(self):
|
||||
"""Эмуляция захвата аудиоданных из файла"""
|
||||
if self.audio_index + CHUNK >= len(self.audio_data):
|
||||
self.audio_index = 0
|
||||
chunk = self.audio_data[self.audio_index:self.audio_index + CHUNK]
|
||||
self.audio_index += CHUNK
|
||||
|
||||
signal1, signal2, t1, t2 = self.process_signals_with_delay(chunk)
|
||||
print(f"capture_audio: длина signal1={len(signal1)}, signal2={len(signal2)}")
|
||||
return signal1, signal2, t1, t2
|
||||
|
||||
def calculate_rms(self, signal: np.ndarray) -> float:
|
||||
"""Вычисление RMS сигнала"""
|
||||
return np.sqrt(np.mean(signal ** 2))
|
||||
|
||||
def find_dominant_frequency(self, signal: np.ndarray) -> float:
|
||||
"""Определение доминирующей частоты сигнала через FFT"""
|
||||
if len(signal) != CHUNK:
|
||||
if len(signal) < CHUNK:
|
||||
signal = np.pad(signal, (0, CHUNK - len(signal)), mode='constant')
|
||||
else:
|
||||
signal = signal[:CHUNK]
|
||||
|
||||
fft_signal = np.abs(rfft(signal))
|
||||
freqs = rfftfreq(CHUNK, 1 / SAMPLE_RATE)
|
||||
|
||||
if len(fft_signal) != len(freqs):
|
||||
raise ValueError(f"Несоответствие размеров fft_signal ({len(fft_signal)}) и freqs ({len(freqs)})")
|
||||
|
||||
mask = (freqs >= FREQUENCY_RANGE[0]) & (freqs <= FREQUENCY_RANGE[1])
|
||||
fft_signal = fft_signal[mask]
|
||||
freqs = freqs[mask]
|
||||
|
||||
if len(fft_signal) == 0:
|
||||
print("find_dominant_frequency: нет частот в диапазоне, возвращаем FREQUENCY_RANGE[0]")
|
||||
return FREQUENCY_RANGE[0]
|
||||
dominant_freq = freqs[np.argmax(fft_signal)]
|
||||
print(f"find_dominant_frequency: доминирующая частота = {dominant_freq:.1f} Hz")
|
||||
return dominant_freq
|
||||
|
||||
def bandpass_filter(self, signal: np.ndarray, freq: float) -> np.ndarray:
|
||||
"""Применение полосового фильтра к сигналу"""
|
||||
bandwidth = 100
|
||||
lowcut = max(FREQUENCY_RANGE[0], freq - bandwidth / 2)
|
||||
highcut = min(FREQUENCY_RANGE[1], freq + bandwidth / 2)
|
||||
sos = butter(4, [lowcut, highcut], btype='band', fs=SAMPLE_RATE, output='sos')
|
||||
filtered_signal = sosfiltfilt(sos, signal)
|
||||
return filtered_signal
|
||||
|
||||
def steering_vector(self, angle: float, freq: float) -> np.ndarray:
|
||||
"""Вычисление steering vector для заданного угла и частоты"""
|
||||
tau = (self.mic_distance * np.sin(np.radians(angle))) / SOUND_SPEED
|
||||
phase_shift = 2 * np.pi * freq * tau
|
||||
return np.array([1, np.exp(1j * phase_shift)]) # Исправлено: -1j на 1j
|
||||
|
||||
def music_spectrum(self, signal1: np.ndarray, signal2: np.ndarray, freq: float) -> tuple:
|
||||
"""Вычисление спектра MUSIC для оценки DoA"""
|
||||
f, t, Zxx1 = stft(signal1, fs=SAMPLE_RATE, nperseg=CHUNK, noverlap=int(CHUNK * 3 / 4))
|
||||
_, _, Zxx2 = stft(signal2, fs=SAMPLE_RATE, nperseg=CHUNK, noverlap=int(CHUNK * 3 / 4))
|
||||
freq_idx = np.argmin(np.abs(f - freq))
|
||||
X = np.vstack((Zxx1[freq_idx, :], Zxx2[freq_idx, :]))
|
||||
R = np.dot(X, X.conj().T) / X.shape[1]
|
||||
R += np.eye(R.shape[0]) * 1e-6
|
||||
eigenvalues, eigenvectors = np.linalg.eigh(R)
|
||||
idx = np.argsort(eigenvalues)[::-1]
|
||||
eigenvectors = eigenvectors[:, idx]
|
||||
En = eigenvectors[:, -1:]
|
||||
angles = np.arange(-90, 91, ANGLE_RESOLUTION)
|
||||
music_spectrum = np.zeros(len(angles))
|
||||
for i, angle in enumerate(angles):
|
||||
a = self.steering_vector(angle, freq)
|
||||
music_spectrum[i] = 1 / np.abs(np.dot(a.conj().T, np.dot(En @ En.conj().T, a)))
|
||||
music_spectrum = music_spectrum / np.max(music_spectrum)
|
||||
max_idx = np.argmax(music_spectrum)
|
||||
print(f"music_spectrum: максимум спектра при угле {angles[max_idx]:.1f}°, значение {music_spectrum[max_idx]:.4f}")
|
||||
if music_spectrum[max_idx] < PEAK_THRESHOLD:
|
||||
print("music_spectrum: пик ниже порога, возвращаем 0.0")
|
||||
return 0.0, angles, music_spectrum
|
||||
estimated_angle = angles[max_idx]
|
||||
print(f"music_spectrum: расчётный угол = {estimated_angle:.1f}°")
|
||||
return estimated_angle, angles, music_spectrum
|
||||
|
||||
def run(self):
|
||||
"""Обработка аудио в реальном времени"""
|
||||
print("Запуск потока run")
|
||||
while self.running:
|
||||
try:
|
||||
current_time = time.time()
|
||||
if current_time - self.last_move_time >= MOVE_INTERVAL:
|
||||
self.sound_source = self._generate_random_sound_source()
|
||||
self.source_positions.append((self.sound_source.x, self.sound_source.y, current_time))
|
||||
print(f"Источник звука перемещен в: x={self.sound_source.x:.2f}, y={self.sound_source.y:.2f}")
|
||||
self.last_move_time = current_time
|
||||
|
||||
left, right, t1, t2 = self.capture_audio()
|
||||
self.rms_left = self.calculate_rms(left)
|
||||
self.rms_right = self.calculate_rms(right)
|
||||
signal_diff = np.mean(np.abs(left - right))
|
||||
new_sound_detected = (self.rms_left > RMS_THRESHOLD) or (self.rms_right > RMS_THRESHOLD)
|
||||
|
||||
if new_sound_detected:
|
||||
print(f"RMS left: {self.rms_left:.4f}, RMS right: {self.rms_right:.4f}")
|
||||
print(f"Signal difference: {signal_diff:.4f}")
|
||||
freq = self.find_dominant_frequency(left)
|
||||
left_filtered = self.bandpass_filter(left, freq)
|
||||
right_filtered = self.bandpass_filter(right, freq)
|
||||
angle, _, _ = self.music_spectrum(left_filtered, right_filtered, freq)
|
||||
self.current_angle = angle
|
||||
self.last_detected_angle = angle
|
||||
self.last_sound_time = current_time
|
||||
self.sound_detected = True
|
||||
self.show_arrow = True
|
||||
|
||||
true_dx = self.sound_source.x
|
||||
true_dy = self.sound_source.y
|
||||
true_angle = np.arctan2(true_dx, true_dy) * 180 / np.pi
|
||||
self.results.append({
|
||||
't1 (ms)': t1 * 1000,
|
||||
't2 (ms)': t2 * 1000,
|
||||
'Time Delay (ms)': (t2 - t1) * 1000,
|
||||
'Detected Angle (°)': angle,
|
||||
'True Angle (°)': true_angle,
|
||||
'Source X': self.sound_source.x,
|
||||
'Source Y': self.sound_source.y
|
||||
})
|
||||
else:
|
||||
if self.last_detected_angle is not None and current_time - self.last_sound_time < SILENCE_TIMEOUT:
|
||||
self.current_angle = self.last_detected_angle
|
||||
self.sound_detected = False
|
||||
self.show_arrow = True
|
||||
else:
|
||||
self.sound_detected = False
|
||||
self.show_arrow = False
|
||||
time.sleep(CHUNK / self.sample_rate)
|
||||
except Exception as e:
|
||||
print(f"Ошибка в run: {e}")
|
||||
continue
|
||||
print("Поток run завершён")
|
||||
|
||||
def get_coordinates_dataframe(self):
|
||||
"""Создание датафрейма с координатами микрофонов и всех позиций источника"""
|
||||
data = {
|
||||
'Object': ['Mic1', 'Mic2'] + [f'SoundSource_{i}' for i in range(len(self.source_positions))],
|
||||
'X': [self.mic1.x, self.mic2.x] + [pos[0] for pos in self.source_positions],
|
||||
'Y': [self.mic1.y, self.mic2.y] + [pos[1] for pos in self.source_positions],
|
||||
'Time': [0.0, 0.0] + [pos[2] for pos in self.source_positions]
|
||||
}
|
||||
return pd.DataFrame(data)
|
||||
|
||||
def get_results_dataframe(self):
|
||||
"""Создание датафрейма с результатами"""
|
||||
return pd.DataFrame(self.results)
|
||||
|
||||
def main():
|
||||
try:
|
||||
finder = MusicDirectionFinder(MIC_DISTANCE, "my_recording1.wav")
|
||||
print("Аудиофайл загружен, частота дискретизации:", finder.sample_rate)
|
||||
|
||||
thread = threading.Thread(target=finder.run)
|
||||
thread.start()
|
||||
print("Поток run запущен")
|
||||
|
||||
fig, ax = plt.subplots(figsize=(10, 8))
|
||||
ax.set_xlim(-ROOM_WIDTH / 2, ROOM_WIDTH / 2)
|
||||
ax.set_ylim(-0.5, ROOM_HEIGHT)
|
||||
ax.set_title("Определение направления на источник звука (MUSIC)")
|
||||
ax.set_xlabel("X (м)")
|
||||
ax.set_ylabel("Y (м)")
|
||||
ax.grid(True)
|
||||
print("График инициализирован")
|
||||
|
||||
ax.plot(finder.mic1.x, finder.mic1.y, 'bs', markersize=10, label='Микрофон 1')
|
||||
ax.plot(finder.mic2.x, finder.mic2.y, 'bs', markersize=10, label='Микрофон 2')
|
||||
source_plot, = ax.plot(finder.sound_source.x, finder.sound_source.y, 'ro', markersize=10,
|
||||
label='Источник звука')
|
||||
|
||||
arrow_length = min(ROOM_WIDTH, ROOM_HEIGHT) / 3
|
||||
arrow = ax.arrow(0, 0, 0, 0, head_width=0.05, head_length=0.1, fc='r', ec='r',
|
||||
label='Расчетное направление')
|
||||
arrow_true = ax.arrow(0, 0, 0, 0, head_width=0.05, head_length=0.1, fc='g', ec='g',
|
||||
linestyle=':', label='Истинное направление')
|
||||
|
||||
sound_bar = ax.axhline(y=ROOM_HEIGHT - 0.2, color='green', linewidth=20, visible=False)
|
||||
angle_text = ax.text(0, ROOM_HEIGHT - 0.3, "", ha='center', va='center', fontsize=12)
|
||||
|
||||
left_indicator = patches.Rectangle((finder.mic1.x - 0.02, -0.1), 0.04, 0.05, facecolor='gray')
|
||||
right_indicator = patches.Rectangle((finder.mic2.x - 0.02, -0.1), 0.04, 0.05, facecolor='gray')
|
||||
ax.add_patch(left_indicator)
|
||||
ax.add_patch(right_indicator)
|
||||
|
||||
def update(frame):
|
||||
try:
|
||||
source_plot.set_data([finder.sound_source.x], [finder.sound_source.y])
|
||||
|
||||
dx = finder.sound_source.x
|
||||
dy = finder.sound_source.y
|
||||
true_angle_rad = np.arctan2(dx, dy)
|
||||
true_end_x = arrow_length * np.sin(true_angle_rad)
|
||||
true_end_y = arrow_length * np.cos(true_angle_rad)
|
||||
arrow_true.set_data(x=0, y=0, dx=true_end_x, dy=true_end_y)
|
||||
|
||||
if finder.show_arrow:
|
||||
angle = finder.current_angle
|
||||
calc_angle_rad = np.radians(angle)
|
||||
calc_end_x = arrow_length * np.sin(calc_angle_rad)
|
||||
calc_end_y = arrow_length * np.cos(calc_angle_rad)
|
||||
arrow.set_data(x=0, y=0, dx=calc_end_x, dy=calc_end_y)
|
||||
angle_text.set_text(f"Расчетный угол: {angle:.1f}°\nИстинный угол: {np.degrees(true_angle_rad):.1f}°")
|
||||
|
||||
if finder.sound_detected:
|
||||
sound_bar.set_visible(True)
|
||||
ax.set_title("Активное обнаружение звука (MUSIC)")
|
||||
else:
|
||||
sound_bar.set_visible(False)
|
||||
ax.set_title("Последнее зафиксированное направление (MUSIC)")
|
||||
else:
|
||||
arrow.set_data(x=0, y=0, dx=0, dy=0)
|
||||
angle_text.set_text("")
|
||||
sound_bar.set_visible(False)
|
||||
ax.set_title("Звук не обнаружен")
|
||||
|
||||
left_indicator.set_facecolor('green' if finder.rms_left > RMS_THRESHOLD else 'gray')
|
||||
right_indicator.set_facecolor('green' if finder.rms_right > RMS_THRESHOLD else 'gray')
|
||||
|
||||
return [source_plot, arrow, arrow_true, sound_bar, left_indicator, right_indicator, angle_text]
|
||||
except Exception as e:
|
||||
print(f"Ошибка в update: {e}")
|
||||
return []
|
||||
|
||||
ani = FuncAnimation(fig, update, frames=None, interval=100, blit=True, cache_frame_data=False)
|
||||
print("FuncAnimation инициализирован")
|
||||
plt.legend(loc='upper right')
|
||||
print("Вызов plt.show()")
|
||||
plt.show()
|
||||
print("plt.show() выполнен")
|
||||
|
||||
finder.running = False
|
||||
thread.join()
|
||||
print("Поток run остановлен")
|
||||
|
||||
coords_df = finder.get_coordinates_dataframe()
|
||||
results_df = finder.get_results_dataframe()
|
||||
print("\nКоординаты микрофонов и всех позиций источника звука:")
|
||||
print(coords_df.to_string(index=False))
|
||||
print("\nРезультаты вычислений:")
|
||||
print(results_df.to_string(index=False))
|
||||
coords_df.to_csv('coordinates_music.csv', index=False)
|
||||
results_df.to_csv('results_music.csv', index=False)
|
||||
print("\nДанные сохранены в 'coordinates_music.csv' и 'results_music.csv'")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Ошибка в main: {e}")
|
||||
finder.running = False
|
||||
thread.join()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user