Coverage for ibllib/io/raw_daq_loaders.py: 83%

167 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""Loader functions for various DAQ data formats.""" 

2from pathlib import Path 

3import logging 

4from collections import OrderedDict, defaultdict 

5import json 

6 

7import nptdms 

8import numpy as np 

9import ibldsp.utils 

10import one.alf.io as alfio 

11import one.alf.exceptions as alferr 

12from one.alf.spec import to_alf 

13 

14from ibllib.io.extractors.default_channel_maps import all_default_labels 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def load_raw_daq_tdms(path) -> 'nptdms.tdms.TdmsFile': 

20 """ 

21 Load a raw DAQ TDMS file. 

22 

23 Parameters 

24 ---------- 

25 path : str, pathlib.Path 

26 The location of the .tdms file to laod. 

27 

28 Returns 

29 ------- 

30 nptdms.tdms.TdmsFile 

31 The loaded TDMS object. 

32 """ 

33 from nptdms import TdmsFile 1ijkhe

34 # If path is a directory, glob for a tdms file 

35 if (path := Path(path)).is_dir(): # cast to pathlib.Path 1ijkhe

36 file_path = next(path.glob('*.tdms'), None) 1i

37 else: 

38 file_path = path 1jkhe

39 if not file_path or not file_path.exists(): 1ijkhe

40 raise FileNotFoundError 

41 

42 return TdmsFile.read(file_path) 1ijkhe

43 

44 

45def load_channels_tdms(path, chmap=None): 

46 """ 

47 

48 Note: This currently cannot deal with arbitrary groups. 

49 

50 Parameters 

51 ---------- 

52 path : str, pathlib.Path 

53 The file or folder path of the raw TDMS data file. 

54 chmap: dictionary mapping devices names to channel codes: example {"photometry": 'AI0', 'bpod': 'AI1'} 

55 if None, will read all of available channel from the DAQ 

56 

57 Returns 

58 ------- 

59 

60 """ 

61 

62 def _load_digital_channels(data_file, group='Digital', ch='AuxPort'): 1ijkhe

63 # the digital channels are encoded on a single uint8 channel where each bit corresponds to an input channel 

64 ddata = data_file[group][ch].data.astype(np.uint8) 1e

65 nc = int(2 ** np.floor(np.log2(np.max(ddata)))) 1e

66 ddata = np.unpackbits(ddata[:, np.newaxis], axis=1, count=nc, bitorder='little') 1e

67 data = {} 1e

68 for i in range(ddata.shape[1]): 1e

69 data[f'DI{i}'] = ddata[:, i] 1e

70 return data 1e

71 

72 data_file = load_raw_daq_tdms(path) 1ijkhe

73 data = {} 1ijkhe

74 digital_channels = None 1ijkhe

75 fs = np.nan 1ijkhe

76 if chmap: 1ijkhe

77 for name, ch in chmap.items(): 1ijkhe

78 if ch.lower()[0] == 'a': 1ijkhe

79 data[name] = data_file['Analog'][ch.upper()].data 1ijkh

80 fs = data_file['Analog'].properties['ScanRate'] 1ijkh

81 elif ch.lower()[0] == 'd': 1e

82 # do not attempt to load digital channels several times 

83 digital_channels = digital_channels or _load_digital_channels(data_file) 1e

84 data[name] = digital_channels[ch.upper()] 1e

85 fs = data_file['Digital'].properties['ScanRate'] 1e

86 else: 

87 raise NotImplementedError(f'Extraction of channel "{ch}" not implemented') 

88 else: 

89 for group in (x.name for x in data_file.groups()): 1he

90 for ch in (x.name for x in data_file[group].channels()): 1he

91 if group == 'Digital' and ch == 'AuxPort': 1he

92 data = {**data, **_load_digital_channels(data_file, group, ch)} 1e

93 else: 

94 data[ch] = data_file[group][ch.upper()].data 1h

95 fs = data_file[group].properties['ScanRate'] # from daqami it's unclear that fs could be set per channel 1he

96 return data, fs 1ijkhe

97 

98 

99def load_sync_tdms(path, sync_map, fs=None, threshold=2.5, floor_percentile=10): 

100 """ 

101 Load a sync channels from a raw DAQ TDMS file. 

102 

103 Parameters 

104 ---------- 

105 path : str, pathlib.Path 

106 The file or folder path of the raw TDMS data file. 

107 sync_map : dict 

108 A map of channel names and channel IDs. 

109 fs : float 

110 Sampling rate in Hz. 

111 threshold : float 

112 The threshold for applying to analogue channels 

113 floor_percentile : float 

114 10% removes the percentile value of the analog trace before thresholding. This is to avoid 

115 DC offset drift. 

116 

117 Returns 

118 ------- 

119 one.alf.io.AlfBunch 

120 A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and 

121 the corresponding channel numbers. 

122 dict 

123 A map of channel names and their corresponding indices. 

124 """ 

125 data_file = load_raw_daq_tdms(path) 

126 sync = {} 

127 if any(x.lower()[0] != 'a' for x in sync_map.values()): 

128 raise NotImplementedError('Non-analogue or arbitrary group channel extraction not supported') 

129 

130 raw_channels = [ch for ch in data_file['Analog'].channels() if ch.name.lower() in sync_map.values()] 

131 analogue = np.vstack([ch.data for ch in raw_channels]) 

132 channel_ids = OrderedDict([(ch.name.lower(), ch.properties['ChannelKey']) for ch in raw_channels]) 

133 offset = np.percentile(analogue, floor_percentile, axis=0) 

134 logger.info(f'estimated analogue channel DC Offset approx. {np.mean(offset):.2f}') 

135 analogue -= offset 

136 ttl = analogue > threshold 

137 ind, sign = ibldsp.utils.fronts(ttl.astype(int)) 

138 try: # attempt to get the times from the meta data 

139 times = np.vstack([ch.time_track() for ch in raw_channels]) 

140 times = times[tuple(ind)] 

141 except KeyError: 

142 assert fs 

143 times = ind[1].astype(float) * 1/fs # noqa 

144 

145 # Sort by times 

146 ind_sorted = np.argsort(times) 

147 sync['times'] = times[ind_sorted] 

148 # Map index to channel key then reindex by sorted times 

149 sync['channels'] = np.fromiter(channel_ids.values(), dtype=int)[ind[0]][ind_sorted] 

150 sync['polarities'] = sign[ind_sorted] 

151 

152 # Map sync name to channel key 

153 sync_map = {v.lower(): k for k, v in sync_map.items()} # turn inside-out 

154 chmap = {sync_map[k]: v for k, v in channel_ids.items()} 

155 return sync, chmap 

156 

157 

158def correct_counter_discontinuities(raw, overflow=2**32): 

159 """ 

160 Correct over- and underflow wrap around values for DAQ counter channel. 

161 

162 Parameters 

163 ---------- 

164 raw : numpy.array 

165 An array of counts. 

166 overflow : int 

167 The maximum representable value of the data before it was cast to float64. 

168 

169 Returns 

170 ------- 

171 numpy.array 

172 An array of counts with the over- and underflow discontinuities removed. 

173 """ 

174 flowmax = overflow - 1 1dbcoaq

175 d = np.diff(raw) 1dbcoaq

176 # correct for counter flow discontinuities 

177 d[d >= flowmax] = d[d >= flowmax] - flowmax 1dbcoaq

178 d[d <= -flowmax] = d[d <= -flowmax] + flowmax 1dbcoaq

179 return np.cumsum(np.r_[0, d]) + raw[0] # back to position 1dbcoaq

180 

181 

182def load_timeline_sync_and_chmap(alf_path, chmap=None, timeline=None, save=True): 

183 """Load the sync and channel map from disk. 

184 

185 If the sync files do not exist, they are extracted from the raw DAQ data and saved. 

186 

187 Parameters 

188 ---------- 

189 alf_path : str, pathlib.Path 

190 The folder containing the sync file and raw DAQ data. 

191 chmap : dict 

192 An optional channel map, otherwise extracted based on the union of timeline meta data and 

193 default extractor channel map names. 

194 timeline : dict 

195 An optional timeline object, otherwise is loaded from alf_path. 

196 save : bool 

197 If true, save the sync files if they don't already exist. 

198 

199 Returns 

200 ------- 

201 one.alf.io.AlfBunch 

202 A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and 

203 the corresponding channel numbers. 

204 dict, optional 

205 A map of channel names and their corresponding indices for sync channels, if chmap is None. 

206 """ 

207 if not chmap: 1bclam

208 if not timeline: 1bclam

209 meta = alfio.load_object(alf_path, 'DAQdata', namespace='timeline', attribute='meta')['meta'] 1bclm

210 else: 

211 meta = timeline['meta'] 1a

212 chmap = timeline_meta2chmap(meta, include_channels=all_default_labels()) 1bclam

213 try: 1bclam

214 sync = alfio.load_object(alf_path, 'sync') 1bclam

215 except alferr.ALFObjectNotFound: 1bca

216 if not timeline: 1bca

217 timeline = alfio.load_object(alf_path, 'DAQdata') 1bc

218 sync = extract_sync_timeline(timeline, chmap=chmap) 1bca

219 if save: 1bca

220 alfio.save_object_npy(alf_path, sync, object='sync', namespace='timeline') 1bca

221 return sync, chmap 1bclam

222 

223 

224def extract_sync_timeline(timeline, chmap=None, floor_percentile=10, threshold=None): 

225 """ 

226 Load sync channels from a timeline object. 

227 

228 Note: Because the scan frequency is typically faster than the sample rate, the position and 

229 edge count channels may detect more than one front between samples. Therefore for these, the 

230 raw data is more accurate than the extracted polarities. 

231 

232 Parameters 

233 ---------- 

234 timeline : dict, str, pathlib.Path 

235 A timeline object or the file or folder path of the _timeline_DAQdata files. 

236 chmap : dict 

237 A map of channel names and channel IDs. 

238 floor_percentile : float 

239 10% removes the percentile value of the analog trace before thresholding. This is to avoid 

240 DC offset drift. 

241 threshold : float, dict of str: float 

242 The threshold for applying to analogue channels. If None, take mean after subtracting 

243 floor percentile offset. 

244 

245 Returns 

246 ------- 

247 one.alf.io.AlfBunch 

248 A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and 

249 the corresponding channel numbers. 

250 dict, optional 

251 A map of channel names and their corresponding indices for sync channels, if chmap is None. 

252 """ 

253 if isinstance(timeline, (str, Path)): 1dbcfa

254 timeline = alfio.load_object(timeline, 'DAQdata', namespace='timeline') 1d

255 assert timeline.keys() >= {'timestamps', 'raw', 'meta'}, 'Timeline object missing attributes' 1dbcfa

256 

257 # If no channel map was passed, load it from 'wiring' file, or extract from meta file 

258 return_chmap = chmap is None 1dbcfa

259 chmap = chmap or timeline.get('wiring') or timeline_meta2chmap(timeline['meta']) 1dbcfa

260 

261 # Initialize sync object 

262 sync = alfio.AlfBunch((k, np.array([], dtype=d)) for k, d in 1dbcfa

263 (('times', 'f'), ('channels', 'u1'), ('polarities', 'i1'))) 

264 for label, i in chmap.items(): 1dbcfa

265 try: 1dbcfa

266 info = next(x for x in timeline['meta']['inputs'] if x['name'].lower() == label.lower()) 1dbcfa

267 except StopIteration: 1d

268 logger.warning('sync channel "%s" not found', label) 1d

269 continue 1d

270 raw = timeline['raw'][:, info['arrayColumn'] - 1] # -1 because MATLAB indexes from 1 1dbcfa

271 if info['measurement'] == 'Voltage': 1dbcfa

272 # Get TLLs by applying a threshold to the diff of voltage samples 

273 offset = np.percentile(raw, floor_percentile, axis=0) 1dbcfa

274 daqID = info['daqChannelID'] 1dbcfa

275 logger.debug(f'{label} ({daqID}): estimated analogue channel DC Offset approx. {np.mean(offset):.2f}') 1dbcfa

276 step = threshold.get(label) if isinstance(threshold, dict) else threshold 1dbcfa

277 if step is None: 1dbcfa

278 step = np.max(raw - offset) / 2 1dbcfa

279 iup = ibldsp.utils.rises(raw - offset, step=step, analog=True) 1dbcfa

280 idown = ibldsp.utils.falls(raw - offset, step=step, analog=True) 1dbcfa

281 pol = np.r_[np.ones_like(iup), -np.ones_like(idown)].astype('i1') 1dbcfa

282 ind = np.r_[iup, idown] 1dbcfa

283 

284 sync.polarities = np.concatenate((sync.polarities, pol)) 1dbcfa

285 elif info['measurement'] == 'EdgeCount': 1dbca

286 # Monotonically increasing values; extract indices where delta == 1 

287 raw = correct_counter_discontinuities(raw) 1dbca

288 ind, = np.where(np.diff(raw)) 1dbca

289 ind += 1 1dbca

290 sync.polarities = np.concatenate((sync.polarities, np.ones_like(ind, dtype='i1'))) 1dbca

291 elif info['measurement'] == 'Position': 1dbca

292 # Bidirectional; extract indices where delta != 0 

293 raw = correct_counter_discontinuities(raw) 1dbca

294 d = np.diff(raw) 1dbca

295 ind, = np.where(~np.isclose(d, 0)) 1dbca

296 sync.polarities = np.concatenate((sync.polarities, np.sign(d[ind]).astype('i1'))) 1dbca

297 ind += 1 1dbca

298 else: 

299 raise NotImplementedError(f'{info["measurement"]} sync extraction') 1d

300 # Append timestamps of indices and channel index to sync arrays 

301 sync.times = np.concatenate((sync.times, timeline['timestamps'][ind])) 1dbcfa

302 sync.channels = np.concatenate((sync.channels, np.full(ind.shape, i, dtype='u1'))) 1dbcfa

303 

304 # Sort arrays by time 

305 assert sync.check_dimensions == 0 1dbcfa

306 t_ind = np.argsort(sync.times) 1dbcfa

307 for k in sync: 1dbcfa

308 sync[k] = sync[k][t_ind] 1dbcfa

309 if return_chmap: 1dbcfa

310 return sync, chmap 

311 else: 

312 return sync 1dbcfa

313 

314 

315def timeline_meta2wiring(path, save=False): 

316 """ 

317 Given a timeline meta data object, return a dictionary of wiring info. 

318 

319 Parameters 

320 ---------- 

321 path : str, pathlib.Path 

322 The path of the timeline meta file, _timeline_DAQdata.meta. 

323 save : bool 

324 If true, save the timeline wiring file in the same location as the meta file, 

325 _timeline_DAQData.wiring.json. 

326 

327 Returns 

328 ------- 

329 dict 

330 A dictionary with base keys {'SYSTEM', 'SYNC_WIRING_DIGITAL', 'SYNC_WIRING_ANALOG'}, the 

331 latter of which contain maps of channel names and their IDs. 

332 pathlib.Path 

333 If save=True, returns the path of the wiring file. 

334 """ 

335 meta = alfio.load_object(path, 'DAQdata', namespace='timeline', attribute='meta').get('meta') 1n

336 assert meta, 'No meta data in timeline object' 1n

337 wiring = defaultdict(dict, SYSTEM='timeline') 1n

338 for input in meta['inputs']: 1n

339 key = 'SYNC_WIRING_' + ('ANALOG' if input['measurement'] == 'Voltage' else 'DIGITAL') 1n

340 wiring[key][input['daqChannelID']] = input['name'] 1n

341 if save: 1n

342 out_path = Path(path) / to_alf('DAQ data', 'wiring', 'json', namespace='timeline') 1n

343 with open(out_path, 'w') as fp: 1n

344 json.dump(wiring, fp) 1n

345 return dict(wiring), out_path 1n

346 return dict(wiring) 1n

347 

348 

349def timeline_meta2chmap(meta, exclude_channels=None, include_channels=None): 

350 """ 

351 Convert a timeline meta object to a sync channel map. 

352 

353 Parameters 

354 ---------- 

355 meta : dict 

356 A loaded timeline metadata file, i.e. _timeline_DAQdata.meta. 

357 exclude_channels : list 

358 An optional list of channels to exclude from the channel map. 

359 include_channels : list 

360 An optional list of channels to include from the channel map, takes priority over the 

361 exclude list. 

362 

363 Returns 

364 ------- 

365 dict 

366 A map of channel names and their corresponding indices for sync channels. 

367 """ 

368 chmap = {} 1dpbclam

369 for input in meta.get('inputs', []): 1dpbclam

370 if (include_channels is not None and input['name'] not in include_channels) or \ 1dpbclam

371 (exclude_channels and input['name'] in exclude_channels): 

372 continue 1pbclam

373 chmap[input['name']] = input['arrayColumn'] 1dpbclam

374 return chmap 1dpbclam

375 

376 

377def timeline_get_channel(timeline, channel_name): 

378 """ 

379 Given a timeline object, returns the vector of values recorded from a given channel name. 

380 

381 Parameters 

382 ---------- 

383 timeline : one.alf.io.AlfBunch 

384 A loaded timeline object. 

385 channel_name : str 

386 The name of a channel to extract. 

387 

388 Returns 

389 ------- 

390 numpy.array 

391 The channel data. 

392 """ 

393 idx = next(ch['arrayColumn'] for ch in timeline['meta']['inputs'] if ch['name'] == channel_name) 1roa

394 return timeline['raw'][:, idx - 1] # -1 because MATLAB indices start from 1, not 0 1roa