Coverage for ibllib/io/extractors/training_audio.py: 70%

105 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1#!/usr/bin/env python 

2# -*- coding:utf-8 -*- 

3from pathlib import Path 

4import logging 

5 

6import numpy as np 

7import scipy.signal 

8import scipy.ndimage 

9from scipy.io import wavfile 

10 

11 

12from ibldsp.utils import WindowGenerator 

13from ibldsp import fourier 

14import ibllib.io.raw_data_loaders as ioraw 

15from ibllib.io.extractors.training_trials import GoCueTimes 

16 

17 

18logger_ = logging.getLogger(__name__) 

19 

20NS_WIN = 2 ** 18 # 2 ** np.ceil(np.log2(1 * fs)) 

21OVERLAP = NS_WIN / 2 

22NS_WELCH = 512 

23FTONE = 5000 

24UNIT = 'dBFS' # dBFS or dbSPL 

25READY_TONE_SPL = 85 

26 

27 

28def detect_ready_tone(w, fs, ftone=FTONE, threshold=0.8): 

29 """ 

30 Detects a transient sinusoid signal in a time-serie 

31 :param w: audio time seried 

32 :param fs: sampling frequency (Hz) 

33 :param ftone: frequency of the tone to detect 

34 :param threshold: ratio of the Hilbert to the signal, between 0 and 1 (set to 0.8) 

35 :return: 

36 """ 

37 # get envelope of DC free signal and envelope of BP signal around freq of interest 

38 h = np.abs(scipy.signal.hilbert(w - np.median(w))) 1abf

39 fh = np.abs(scipy.signal.hilbert(fourier.bp(w, si=1 / fs, b=ftone * np.array([0.9, 0.95, 1.15, 1.1])))) 1abf

40 dtect = scipy.ndimage.uniform_filter1d(fh / (h + 1e-3), int(fs * 0.1)) > threshold 1abf

41 return np.where(np.diff(dtect.astype(int)) == 1)[0] 1abf

42 # tone = np.sin(2 * np.pi * FTONE * np.arange(0, fs * 0.1) / fs) 

43 # tone = tone / np.sum(tone ** 2) 

44 # xc = np.abs(signal.hilbert(signal.correlate(w - np.mean(w), tone))) 

45 

46 

47def _get_conversion_factor(unit=UNIT, ready_tone_spl=READY_TONE_SPL): 

48 # 3 approaches here (not exclusive): 

49 # a- get the mic sensitivity, the preamp gain and DAC parameters and do the math 

50 # b- treat the whole thing as a black box and do a calibration run (cf. people at Renard's lab) 

51 # c- use calibrated ready tone 

52 # The reference of acoustic pressure is 0dBSPL @ 1kHz which is threshold of hearing (20 μPa). 

53 # Usual calibration is 1 Pa (94 dBSPL) at 1 kHz 

54 # c) here we know that the ready tone is 55dB SPL at 5kHz, assuming a flat spectrum between 

55 # 1 and 5 kHz, and observing the peak value on the 5k at the microphone. 

56 if unit == 'dBFS': 1ab

57 return 1.0 1ab

58 distance_to_the_mic = .155 

59 peak_value_observed = 60 

60 rms_value_observed = np.sqrt(2) / 2 * peak_value_observed 

61 fac = 10 ** ((ready_tone_spl - 20 * np.log10(rms_value_observed)) / 20) * distance_to_the_mic 

62 return fac 

63 

64 

65def welchogram(fs, wav, nswin=NS_WIN, overlap=OVERLAP, nperseg=NS_WELCH, detect_kwargs=None): 

66 """ 

67 Computes a spectrogram on a very large audio file. 

68 

69 :param fs: sampling frequency (Hz) 

70 :param wav: wav signal (vector or memmap) 

71 :param nswin: n samples of the sliding window 

72 :param overlap: n samples of the overlap between windows 

73 :param nperseg: n samples for the computation of the spectrogram 

74 :param detect_kwargs: specified paramaters for detection 

75 :return: tscale, fscale, downsampled_spectrogram 

76 """ 

77 ns = wav.shape[0] 1ab

78 window_generator = WindowGenerator(ns=ns, nswin=nswin, overlap=overlap) 1ab

79 nwin = window_generator.nwin 1ab

80 fscale = fourier.fscale(nperseg, 1 / fs, one_sided=True) 1ab

81 W = np.zeros((nwin, len(fscale))) 1ab

82 tscale = window_generator.tscale(fs=fs) 1ab

83 detect = [] 1ab

84 for first, last in window_generator.firstlast: 1ab

85 # load the current window into memory 

86 w = np.float64(wav[first:last]) * _get_conversion_factor() 1ab

87 # detection of ready tones 

88 detect_kwargs = detect_kwargs or {} 1ab

89 a = [d + first for d in detect_ready_tone(w, fs, **detect_kwargs)] 1ab

90 if len(a): 1ab

91 detect += a 1ab

92 # the last window may not allow a pwelch 

93 if (last - first) < nperseg: 1ab

94 continue 

95 # compute PSD estimate for the current window 

96 iw = window_generator.iw 1ab

97 _, W[iw, :] = scipy.signal.welch(w, fs=fs, window='hann', nperseg=nperseg, axis=-1, 1ab

98 detrend='constant', return_onesided=True, scaling='density') 

99 # the onset detection may have duplicates with sliding window, average them and remove 

100 detect = np.sort(np.array(detect)) / fs 1ab

101 ind = np.where(np.diff(detect) < 0.1)[0] 1ab

102 detect[ind] = (detect[ind] + detect[ind + 1]) / 2 1ab

103 detect = np.delete(detect, ind + 1) 1ab

104 return tscale, fscale, W, detect 1ab

105 

106 

107def extract_sound(ses_path, task_collection='raw_behavior_data', device_collection='raw_behavior_data', save=True, force=False, 

108 delete=False): 

109 """ 

110 Simple audio features extraction for ambient sound characterization. 

111 From a wav file, generates several ALF files to be registered on Alyx 

112 

113 :param ses_path: ALF full session path: (/mysubject001/YYYY-MM-DD/001) 

114 :param delete: if True, removes the wav file after processing 

115 :return: list of output files 

116 """ 

117 ses_path = Path(ses_path) 1adbe

118 wav_file = ses_path.joinpath(device_collection, '_iblrig_micData.raw.wav') 1adbe

119 out_folder = ses_path.joinpath(device_collection) 1adbe

120 files_out = {'power': out_folder.joinpath('_iblmic_audioSpectrogram.power.npy'), 1adbe

121 'frequencies': out_folder.joinpath('_iblmic_audioSpectrogram.frequencies.npy'), 

122 'onset_times': out_folder.joinpath('_iblmic_audioOnsetGoCue.times_mic.npy'), 

123 'times_microphone': out_folder.joinpath('_iblmic_audioSpectrogram.times_mic.npy'), 

124 } 

125 if not wav_file.exists(): 1adbe

126 logger_.warning(f"Wav file doesn't exist: {wav_file}") 1de

127 return [files_out[k] for k in files_out if files_out[k].exists()] 1de

128 # crunch the wav file 

129 fs, wav = wavfile.read(wav_file, mmap=False) 1ab

130 if len(wav) == 0: 1ab

131 status = _fix_wav_file(wav_file) 

132 if status != 0: 

133 logger_.error(f"WAV Header empty. Sox couldn't fix it, Abort. {wav_file}") 

134 return 

135 else: 

136 fs, wav = wavfile.read(wav_file, mmap=False) 

137 tscale, fscale, W, detect = welchogram(fs, wav) 1ab

138 # save files 

139 if save: 1ab

140 out_folder.mkdir(exist_ok=True) 1ab

141 np.save(file=files_out['power'], arr=W.astype(np.single)) 1ab

142 np.save(file=files_out['frequencies'], arr=fscale[None, :].astype(np.single)) 1ab

143 np.save(file=files_out['onset_times'], arr=detect) 1ab

144 np.save(file=files_out['times_microphone'], arr=tscale[:, None].astype(np.single)) 1ab

145 # for the time scale, attempt to synchronize using onset sound detection and task data 

146 data = ioraw.load_data(ses_path, task_collection=task_collection) 1ab

147 if data is None: # if no session data, we're done 1ab

148 if delete: 

149 wav_file.unlink() 

150 return 

151 tgocue, _ = GoCueTimes(ses_path).extract(task_collection=task_collection, save=False, bpod_trials=data) 1ab

152 ilast = min(len(tgocue), len(detect)) 1ab

153 dt = tgocue[:ilast] - detect[: ilast] 1ab

154 # only save if dt is consistent for the whole session 

155 if np.std(dt) < 0.2 and save: 1ab

156 files_out['times'] = out_folder / '_iblmic_audioSpectrogram.times.npy' 

157 tscale += np.median(dt) 

158 np.save(file=files_out['times'], arr=tscale[:, None].astype(np.single)) 

159 if delete: 1ab

160 wav_file.unlink() 1a

161 return [files_out[k] for k in files_out] 1ab

162 

163 

164def _fix_wav_file(wav_file): 

165 import platform 

166 import subprocess 

167 status = -1 

168 if platform.system() != 'Linux': 

169 return status 

170 wav_file_tmp = wav_file.with_suffix('.wav_') 

171 wav_file.rename(wav_file_tmp) 

172 command2run = f'sox --ignore-length {wav_file_tmp} {wav_file}' 

173 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, 

174 stderr=subprocess.PIPE) 

175 process.communicate() 

176 if process.returncode == 0: 

177 wav_file_tmp.unlink() 

178 else: 

179 wav_file_tmp.rename(wav_file) 

180 return process.returncode