1#!/usr/bin/env python 

2# -*- coding:utf-8 -*- 

3from pathlib import Path 

4import logging 


6import numpy as np 

7import scipy.signal 

8import scipy.ndimage 

9from import wavfile 



12from ibldsp.utils import WindowGenerator 

13from ibldsp import fourier 

14import as ioraw 

15from import GoCueTimes 



18logger_ = logging.getLogger(__name__) 


20NS_WIN = 2 ** 18 # 2 ** np.ceil(np.log2(1 * fs)) 


22NS_WELCH = 512 

23FTONE = 5000 

24UNIT = 'dBFS' # dBFS or dbSPL 




28def detect_ready_tone(w, fs, ftone=FTONE, threshold=0.8): 

29 """ 

30 Detects a transient sinusoid signal in a time-serie 

31 :param w: audio time seried 

32 :param fs: sampling frequency (Hz) 

33 :param ftone: frequency of the tone to detect 

34 :param threshold: ratio of the Hilbert to the signal, between 0 and 1 (set to 0.8) 

35 :return: 

36 """ 

37 # get envelope of DC free signal and envelope of BP signal around freq of interest 

38 h = np.abs(scipy.signal.hilbert(w - np.median(w))) 1abf

39 fh = np.abs(scipy.signal.hilbert(fourier.bp(w, si=1 / fs, b=ftone * np.array([0.9, 0.95, 1.15, 1.1])))) 1abf

40 dtect = scipy.ndimage.uniform_filter1d(fh / (h + 1e-3), int(fs * 0.1)) > threshold 1abf

41 return np.where(np.diff(dtect.astype(int)) == 1)[0] 1abf

42 # tone = np.sin(2 * np.pi * FTONE * np.arange(0, fs * 0.1) / fs) 

43 # tone = tone / np.sum(tone ** 2) 

44 # xc = np.abs(signal.hilbert(signal.correlate(w - np.mean(w), tone))) 



47def _get_conversion_factor(unit=UNIT, ready_tone_spl=READY_TONE_SPL): 

48 # 3 approaches here (not exclusive): 

49 # a- get the mic sensitivity, the preamp gain and DAC parameters and do the math 

50 # b- treat the whole thing as a black box and do a calibration run (cf. people at Renard's lab) 

51 # c- use calibrated ready tone 

52 # The reference of acoustic pressure is 0dBSPL @ 1kHz which is threshold of hearing (20 μPa). 

53 # Usual calibration is 1 Pa (94 dBSPL) at 1 kHz 

54 # c) here we know that the ready tone is 55dB SPL at 5kHz, assuming a flat spectrum between 

55 # 1 and 5 kHz, and observing the peak value on the 5k at the microphone. 

56 if unit == 'dBFS': 1ab

57 return 1.0 1ab

58 distance_to_the_mic = .155 

59 peak_value_observed = 60 

60 rms_value_observed = np.sqrt(2) / 2 * peak_value_observed 

61 fac = 10 ** ((ready_tone_spl - 20 * np.log10(rms_value_observed)) / 20) * distance_to_the_mic 

62 return fac 



65def welchogram(fs, wav, nswin=NS_WIN, overlap=OVERLAP, nperseg=NS_WELCH, detect_kwargs=None): 

66 """ 

67 Computes a spectrogram on a very large audio file. 


69 :param fs: sampling frequency (Hz) 

70 :param wav: wav signal (vector or memmap) 

71 :param nswin: n samples of the sliding window 

72 :param overlap: n samples of the overlap between windows 

73 :param nperseg: n samples for the computation of the spectrogram 

74 :param detect_kwargs: specified paramaters for detection 

75 :return: tscale, fscale, downsampled_spectrogram 

76 """ 

77 ns = wav.shape[0] 1ab

78 window_generator = WindowGenerator(ns=ns, nswin=nswin, overlap=overlap) 1ab

79 nwin = window_generator.nwin 1ab

80 fscale = fourier.fscale(nperseg, 1 / fs, one_sided=True) 1ab

81 W = np.zeros((nwin, len(fscale))) 1ab

82 tscale = window_generator.tscale(fs=fs) 1ab

83 detect = [] 1ab

84 for first, last in window_generator.firstlast: 1ab

85 # load the current window into memory 

86 w = np.float64(wav[first:last]) * _get_conversion_factor() 1ab

87 # detection of ready tones 

88 detect_kwargs = detect_kwargs or {} 1ab

89 a = [d + first for d in detect_ready_tone(w, fs, **detect_kwargs)] 1ab

90 if len(a): 1ab

91 detect += a 1ab

92 # the last window may not allow a pwelch 

93 if (last - first) < nperseg: 1ab

94 continue 

95 # compute PSD estimate for the current window 

96 iw = window_generator.iw 1ab

97 _, W[iw, :] = scipy.signal.welch(w, fs=fs, window='hann', nperseg=nperseg, axis=-1, 1ab

98 detrend='constant', return_onesided=True, scaling='density') 

99 # the onset detection may have duplicates with sliding window, average them and remove 

100 detect = np.sort(np.array(detect)) / fs 1ab

101 ind = np.where(np.diff(detect) < 0.1)[0] 1ab

102 detect[ind] = (detect[ind] + detect[ind + 1]) / 2 1ab

103 detect = np.delete(detect, ind + 1) 1ab

104 return tscale, fscale, W, detect 1ab



107def extract_sound(ses_path, task_collection='raw_behavior_data', device_collection='raw_behavior_data', save=True, force=False, 

108 delete=False): 

109 """ 

110 Simple audio features extraction for ambient sound characterization. 

111 From a wav file, generates several ALF files to be registered on Alyx 


113 :param ses_path: ALF full session path: (/mysubject001/YYYY-MM-DD/001) 

114 :param delete: if True, removes the wav file after processing 

115 :return: list of output files 

116 """ 

117 ses_path = Path(ses_path) 1adbe

118 wav_file = ses_path.joinpath(device_collection, '_iblrig_micData.raw.wav') 1adbe

119 out_folder = ses_path.joinpath(device_collection) 1adbe

120 files_out = {'power': out_folder.joinpath('_iblmic_audioSpectrogram.power.npy'), 1adbe

121 'frequencies': out_folder.joinpath('_iblmic_audioSpectrogram.frequencies.npy'), 

122 'onset_times': out_folder.joinpath('_iblmic_audioOnsetGoCue.times_mic.npy'), 

123 'times_microphone': out_folder.joinpath('_iblmic_audioSpectrogram.times_mic.npy'), 

124 } 

125 if not wav_file.exists(): 1adbe

126 logger_.warning(f"Wav file doesn't exist: {wav_file}") 1de

127 return [files_out[k] for k in files_out if files_out[k].exists()] 1de

128 # crunch the wav file 

129 fs, wav =, mmap=False) 1ab

130 if len(wav) == 0: 1ab

131 status = _fix_wav_file(wav_file) 

132 if status != 0: 

133 logger_.error(f"WAV Header empty. Sox couldn't fix it, Abort. {wav_file}") 

134 return 

135 else: 

136 fs, wav =, mmap=False) 

137 tscale, fscale, W, detect = welchogram(fs, wav) 1ab

138 # save files 

139 if save: 1ab

140 out_folder.mkdir(exist_ok=True) 1ab

141['power'], arr=W.astype(np.single)) 1ab

142['frequencies'], arr=fscale[None, :].astype(np.single)) 1ab

143['onset_times'], arr=detect) 1ab

144['times_microphone'], arr=tscale[:, None].astype(np.single)) 1ab

145 # for the time scale, attempt to synchronize using onset sound detection and task data 

146 data = ioraw.load_data(ses_path, task_collection=task_collection) 1ab

147 if data is None: # if no session data, we're done 1ab

148 if delete: 

149 wav_file.unlink() 

150 return 

151 tgocue, _ = GoCueTimes(ses_path).extract(task_collection=task_collection, save=False, bpod_trials=data) 1ab

152 ilast = min(len(tgocue), len(detect)) 1ab

153 dt = tgocue[:ilast] - detect[: ilast] 1ab

154 # only save if dt is consistent for the whole session 

155 if np.std(dt) < 0.2 and save: 1ab

156 files_out['times'] = out_folder / '_iblmic_audioSpectrogram.times.npy' 

157 tscale += np.median(dt) 

158['times'], arr=tscale[:, None].astype(np.single)) 

159 if delete: 1ab

160 wav_file.unlink() 1a

161 return [files_out[k] for k in files_out] 1ab



164def _fix_wav_file(wav_file): 

165 import platform 

166 import subprocess 

167 status = -1 

168 if platform.system() != 'Linux': 

169 return status 

170 wav_file_tmp = wav_file.with_suffix('.wav_') 

171 wav_file.rename(wav_file_tmp) 

172 command2run = f'sox --ignore-length {wav_file_tmp} {wav_file}' 

173 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, 

174 stderr=subprocess.PIPE) 

175 process.communicate() 

176 if process.returncode == 0: 

177 wav_file_tmp.unlink() 

178 else: 

179 wav_file_tmp.rename(wav_file) 

180 return process.returncode