Coverage for ibllib/io/extractors/training_audio.py: 70%
105 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
1#!/usr/bin/env python
2# -*- coding:utf-8 -*-
3from pathlib import Path
4import logging
6import numpy as np
7import scipy.signal
8import scipy.ndimage
9from scipy.io import wavfile
12from ibldsp.utils import WindowGenerator
13from ibldsp import fourier
14import ibllib.io.raw_data_loaders as ioraw
15from ibllib.io.extractors.training_trials import GoCueTimes
18logger_ = logging.getLogger(__name__)
20NS_WIN = 2 ** 18 # 2 ** np.ceil(np.log2(1 * fs))
21OVERLAP = NS_WIN / 2
22NS_WELCH = 512
23FTONE = 5000
24UNIT = 'dBFS' # dBFS or dbSPL
25READY_TONE_SPL = 85
28def detect_ready_tone(w, fs, ftone=FTONE, threshold=0.8):
29 """
30 Detects a transient sinusoid signal in a time-series
31 :param w: audio time seried
32 :param fs: sampling frequency (Hz)
33 :param ftone: frequency of the tone to detect
34 :param threshold: ratio of the Hilbert to the signal, between 0 and 1 (set to 0.8)
35 :return:
36 """
37 # get envelope of DC free signal and envelope of BP signal around freq of interest
38 h = np.abs(scipy.signal.hilbert(w - np.median(w))) 1a
39 fh = np.abs(scipy.signal.hilbert(fourier.bp(w, si=1 / fs, b=ftone * np.array([0.9, 0.95, 1.15, 1.1])))) 1a
40 dtect = scipy.ndimage.uniform_filter1d(fh / (h + 1e-3), int(fs * 0.1)) > threshold 1a
41 return np.where(np.diff(dtect.astype(int)) == 1)[0] 1a
42 # tone = np.sin(2 * np.pi * FTONE * np.arange(0, fs * 0.1) / fs)
43 # tone = tone / np.sum(tone ** 2)
44 # xc = np.abs(signal.hilbert(signal.correlate(w - np.mean(w), tone)))
47def _get_conversion_factor(unit=UNIT, ready_tone_spl=READY_TONE_SPL):
48 # 3 approaches here (not exclusive):
49 # a- get the mic sensitivity, the preamp gain and DAC parameters and do the math
50 # b- treat the whole thing as a black box and do a calibration run (cf. people at Renard's lab)
51 # c- use calibrated ready tone
52 # The reference of acoustic pressure is 0dBSPL @ 1kHz which is threshold of hearing (20 μPa).
53 # Usual calibration is 1 Pa (94 dBSPL) at 1 kHz
54 # c) here we know that the ready tone is 55dB SPL at 5kHz, assuming a flat spectrum between
55 # 1 and 5 kHz, and observing the peak value on the 5k at the microphone.
56 if unit == 'dBFS': 1a
57 return 1.0 1a
58 distance_to_the_mic = .155
59 peak_value_observed = 60
60 rms_value_observed = np.sqrt(2) / 2 * peak_value_observed
61 fac = 10 ** ((ready_tone_spl - 20 * np.log10(rms_value_observed)) / 20) * distance_to_the_mic
62 return fac
65def welchogram(fs, wav, nswin=NS_WIN, overlap=OVERLAP, nperseg=NS_WELCH, detect_kwargs=None):
66 """
67 Computes a spectrogram on a very large audio file.
69 :param fs: sampling frequency (Hz)
70 :param wav: wav signal (vector or memmap)
71 :param nswin: n samples of the sliding window
72 :param overlap: n samples of the overlap between windows
73 :param nperseg: n samples for the computation of the spectrogram
74 :param detect_kwargs: specified paramaters for detection
75 :return: tscale, fscale, downsampled_spectrogram
76 """
77 ns = wav.shape[0] 1a
78 window_generator = WindowGenerator(ns=ns, nswin=nswin, overlap=overlap) 1a
79 nwin = window_generator.nwin 1a
80 fscale = fourier.fscale(nperseg, 1 / fs, one_sided=True) 1a
81 W = np.zeros((nwin, len(fscale))) 1a
82 tscale = window_generator.tscale(fs=fs) 1a
83 detect = [] 1a
84 for first, last in window_generator.firstlast: 1a
85 # load the current window into memory
86 w = np.float64(wav[first:last]) * _get_conversion_factor() 1a
87 # detection of ready tones
88 detect_kwargs = detect_kwargs or {} 1a
89 a = [d + first for d in detect_ready_tone(w, fs, **detect_kwargs)] 1a
90 if len(a): 1a
91 detect += a 1a
92 # the last window may not allow a pwelch
93 if (last - first) < nperseg: 1a
94 continue
95 # compute PSD estimate for the current window
96 iw = window_generator.iw 1a
97 _, W[iw, :] = scipy.signal.welch(w, fs=fs, window='hann', nperseg=nperseg, axis=-1, 1a
98 detrend='constant', return_onesided=True, scaling='density')
99 # the onset detection may have duplicates with sliding window, average them and remove
100 detect = np.sort(np.array(detect)) / fs 1a
101 ind = np.where(np.diff(detect) < 0.1)[0] 1a
102 detect[ind] = (detect[ind] + detect[ind + 1]) / 2 1a
103 detect = np.delete(detect, ind + 1) 1a
104 return tscale, fscale, W, detect 1a
107def extract_sound(ses_path, task_collection='raw_behavior_data', device_collection='raw_behavior_data', save=True, force=False,
108 delete=False):
109 """
110 Simple audio features extraction for ambient sound characterization.
111 From a wav file, generates several ALF files to be registered on Alyx
113 :param ses_path: ALF full session path: (/mysubject001/YYYY-MM-DD/001)
114 :param delete: if True, removes the wav file after processing
115 :return: list of output files
116 """
117 ses_path = Path(ses_path) 1ac
118 wav_file = ses_path.joinpath(device_collection, '_iblrig_micData.raw.wav') 1ac
119 out_folder = ses_path.joinpath(device_collection) 1ac
120 files_out = {'power': out_folder.joinpath('_iblmic_audioSpectrogram.power.npy'), 1ac
121 'frequencies': out_folder.joinpath('_iblmic_audioSpectrogram.frequencies.npy'),
122 'onset_times': out_folder.joinpath('_iblmic_audioOnsetGoCue.times_mic.npy'),
123 'times_microphone': out_folder.joinpath('_iblmic_audioSpectrogram.times_mic.npy'),
124 }
125 if not wav_file.exists(): 1ac
126 logger_.warning(f"Wav file doesn't exist: {wav_file}") 1c
127 return [files_out[k] for k in files_out if files_out[k].exists()] 1c
128 # crunch the wav file
129 fs, wav = wavfile.read(wav_file, mmap=False) 1a
130 if len(wav) == 0: 1a
131 status = _fix_wav_file(wav_file)
132 if status != 0:
133 logger_.error(f"WAV Header empty. Sox couldn't fix it, Abort. {wav_file}")
134 return
135 else:
136 fs, wav = wavfile.read(wav_file, mmap=False)
137 tscale, fscale, W, detect = welchogram(fs, wav) 1a
138 # save files
139 if save: 1a
140 out_folder.mkdir(exist_ok=True) 1a
141 np.save(file=files_out['power'], arr=W.astype(np.single)) 1a
142 np.save(file=files_out['frequencies'], arr=fscale[None, :].astype(np.single)) 1a
143 np.save(file=files_out['onset_times'], arr=detect) 1a
144 np.save(file=files_out['times_microphone'], arr=tscale[:, None].astype(np.single)) 1a
145 # for the time scale, attempt to synchronize using onset sound detection and task data
146 data = ioraw.load_data(ses_path, task_collection=task_collection) 1a
147 if data is None: # if no session data, we're done 1a
148 if delete:
149 wav_file.unlink()
150 return
151 tgocue, _ = GoCueTimes(ses_path).extract(task_collection=task_collection, save=False, bpod_trials=data) 1a
152 ilast = min(len(tgocue), len(detect)) 1a
153 dt = tgocue[:ilast] - detect[: ilast] 1a
154 # only save if dt is consistent for the whole session
155 if np.std(dt) < 0.2 and save: 1a
156 files_out['times'] = out_folder / '_iblmic_audioSpectrogram.times.npy'
157 tscale += np.median(dt)
158 np.save(file=files_out['times'], arr=tscale[:, None].astype(np.single))
159 if delete: 1a
160 wav_file.unlink() 1a
161 return [files_out[k] for k in files_out] 1a
164def _fix_wav_file(wav_file):
165 import platform
166 import subprocess
167 status = -1
168 if platform.system() != 'Linux':
169 return status
170 wav_file_tmp = wav_file.with_suffix('.wav_')
171 wav_file.rename(wav_file_tmp)
172 command2run = f'sox --ignore-length {wav_file_tmp} {wav_file}'
173 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE,
174 stderr=subprocess.PIPE)
175 process.communicate()
176 if process.returncode == 0:
177 wav_file_tmp.unlink()
178 else:
179 wav_file_tmp.rename(wav_file)
180 return process.returncode