Coverage for ibllib/io/raw_daq_loaders.py: 83%
167 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1"""Loader functions for various DAQ data formats."""
2from pathlib import Path
3import logging
4from collections import OrderedDict, defaultdict
5import json
7import nptdms
8import numpy as np
9import ibldsp.utils
10import one.alf.io as alfio
11import one.alf.exceptions as alferr
12from one.alf.spec import to_alf
14from ibllib.io.extractors.default_channel_maps import all_default_labels
16logger = logging.getLogger(__name__)
19def load_raw_daq_tdms(path) -> 'nptdms.tdms.TdmsFile':
20 """
21 Load a raw DAQ TDMS file.
23 Parameters
24 ----------
25 path : str, pathlib.Path
26 The location of the .tdms file to laod.
28 Returns
29 -------
30 nptdms.tdms.TdmsFile
31 The loaded TDMS object.
32 """
33 from nptdms import TdmsFile 1he
34 # If path is a directory, glob for a tdms file
35 if (path := Path(path)).is_dir(): # cast to pathlib.Path 1he
36 file_path = next(path.glob('*.tdms'), None)
37 else:
38 file_path = path 1he
39 if not file_path or not file_path.exists(): 1he
40 raise FileNotFoundError
42 return TdmsFile.read(file_path) 1he
45def load_channels_tdms(path, chmap=None):
46 """
48 Note: This currently cannot deal with arbitrary groups.
50 Parameters
51 ----------
52 path : str, pathlib.Path
53 The file or folder path of the raw TDMS data file.
54 chmap: dictionary mapping devices names to channel codes: example {"photometry": 'AI0', 'bpod': 'AI1'}
55 if None, will read all of available channel from the DAQ
57 Returns
58 -------
60 """
62 def _load_digital_channels(data_file, group='Digital', ch='AuxPort'): 1he
63 # the digital channels are encoded on a single uint8 channel where each bit corresponds to an input channel
64 ddata = data_file[group][ch].data.astype(np.uint8) 1e
65 nc = int(2 ** np.floor(np.log2(np.max(ddata)))) 1e
66 ddata = np.unpackbits(ddata[:, np.newaxis], axis=1, count=nc, bitorder='little') 1e
67 data = {} 1e
68 for i in range(ddata.shape[1]): 1e
69 data[f'DI{i}'] = ddata[:, i] 1e
70 return data 1e
72 data_file = load_raw_daq_tdms(path) 1he
73 data = {} 1he
74 digital_channels = None 1he
75 fs = np.nan 1he
76 if chmap: 1he
77 for name, ch in chmap.items(): 1he
78 if ch.lower()[0] == 'a': 1he
79 data[name] = data_file['Analog'][ch.upper()].data 1h
80 fs = data_file['Analog'].properties['ScanRate'] 1h
81 elif ch.lower()[0] == 'd': 1e
82 # do not attempt to load digital channels several times
83 digital_channels = digital_channels or _load_digital_channels(data_file) 1e
84 data[name] = digital_channels[ch.upper()] 1e
85 fs = data_file['Digital'].properties['ScanRate'] 1e
86 else:
87 raise NotImplementedError(f'Extraction of channel "{ch}" not implemented')
88 else:
89 for group in (x.name for x in data_file.groups()): 1he
90 for ch in (x.name for x in data_file[group].channels()): 1he
91 if group == 'Digital' and ch == 'AuxPort': 1he
92 data = {**data, **_load_digital_channels(data_file, group, ch)} 1e
93 else:
94 data[ch] = data_file[group][ch.upper()].data 1h
95 fs = data_file[group].properties['ScanRate'] # from daqami it's unclear that fs could be set per channel 1he
96 return data, fs 1he
99def load_sync_tdms(path, sync_map, fs=None, threshold=2.5, floor_percentile=10):
100 """
101 Load a sync channels from a raw DAQ TDMS file.
103 Parameters
104 ----------
105 path : str, pathlib.Path
106 The file or folder path of the raw TDMS data file.
107 sync_map : dict
108 A map of channel names and channel IDs.
109 fs : float
110 Sampling rate in Hz.
111 threshold : float
112 The threshold for applying to analogue channels
113 floor_percentile : float
114 10% removes the percentile value of the analog trace before thresholding. This is to avoid
115 DC offset drift.
117 Returns
118 -------
119 one.alf.io.AlfBunch
120 A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and
121 the corresponding channel numbers.
122 dict
123 A map of channel names and their corresponding indices.
124 """
125 data_file = load_raw_daq_tdms(path)
126 sync = {}
127 if any(x.lower()[0] != 'a' for x in sync_map.values()):
128 raise NotImplementedError('Non-analogue or arbitrary group channel extraction not supported')
130 raw_channels = [ch for ch in data_file['Analog'].channels() if ch.name.lower() in sync_map.values()]
131 analogue = np.vstack([ch.data for ch in raw_channels])
132 channel_ids = OrderedDict([(ch.name.lower(), ch.properties['ChannelKey']) for ch in raw_channels])
133 offset = np.percentile(analogue, floor_percentile, axis=0)
134 logger.info(f'estimated analogue channel DC Offset approx. {np.mean(offset):.2f}')
135 analogue -= offset
136 ttl = analogue > threshold
137 ind, sign = ibldsp.utils.fronts(ttl.astype(int))
138 try: # attempt to get the times from the meta data
139 times = np.vstack([ch.time_track() for ch in raw_channels])
140 times = times[tuple(ind)]
141 except KeyError:
142 assert fs
143 times = ind[1].astype(float) * 1/fs # noqa
145 # Sort by times
146 ind_sorted = np.argsort(times)
147 sync['times'] = times[ind_sorted]
148 # Map index to channel key then reindex by sorted times
149 sync['channels'] = np.fromiter(channel_ids.values(), dtype=int)[ind[0]][ind_sorted]
150 sync['polarities'] = sign[ind_sorted]
152 # Map sync name to channel key
153 sync_map = {v.lower(): k for k, v in sync_map.items()} # turn inside-out
154 chmap = {sync_map[k]: v for k, v in channel_ids.items()}
155 return sync, chmap
158def correct_counter_discontinuities(raw, overflow=2**32):
159 """
160 Correct over- and underflow wrap around values for DAQ counter channel.
162 Parameters
163 ----------
164 raw : numpy.array
165 An array of counts.
166 overflow : int
167 The maximum representable value of the data before it was cast to float64.
169 Returns
170 -------
171 numpy.array
172 An array of counts with the over- and underflow discontinuities removed.
173 """
174 flowmax = overflow - 1 1dbclan
175 d = np.diff(raw) 1dbclan
176 # correct for counter flow discontinuities
177 d[d >= flowmax] = d[d >= flowmax] - flowmax 1dbclan
178 d[d <= -flowmax] = d[d <= -flowmax] + flowmax 1dbclan
179 return np.cumsum(np.r_[0, d]) + raw[0] # back to position 1dbclan
182def load_timeline_sync_and_chmap(alf_path, chmap=None, timeline=None, save=True):
183 """Load the sync and channel map from disk.
185 If the sync files do not exist, they are extracted from the raw DAQ data and saved.
187 Parameters
188 ----------
189 alf_path : str, pathlib.Path
190 The folder containing the sync file and raw DAQ data.
191 chmap : dict
192 An optional channel map, otherwise extracted based on the union of timeline meta data and
193 default extractor channel map names.
194 timeline : dict
195 An optional timeline object, otherwise is loaded from alf_path.
196 save : bool
197 If true, save the sync files if they don't already exist.
199 Returns
200 -------
201 one.alf.io.AlfBunch
202 A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and
203 the corresponding channel numbers.
204 dict, optional
205 A map of channel names and their corresponding indices for sync channels, if chmap is None.
206 """
207 if not chmap: 1bciaj
208 if not timeline: 1bciaj
209 meta = alfio.load_object(alf_path, 'DAQdata', namespace='timeline', attribute='meta')['meta'] 1bcij
210 else:
211 meta = timeline['meta'] 1a
212 chmap = timeline_meta2chmap(meta, include_channels=all_default_labels()) 1bciaj
213 try: 1bciaj
214 sync = alfio.load_object(alf_path, 'sync') 1bciaj
215 except alferr.ALFObjectNotFound: 1bca
216 if not timeline: 1bca
217 timeline = alfio.load_object(alf_path, 'DAQdata') 1bc
218 sync = extract_sync_timeline(timeline, chmap=chmap) 1bca
219 if save: 1bca
220 alfio.save_object_npy(alf_path, sync, object='sync', namespace='timeline') 1bca
221 return sync, chmap 1bciaj
224def extract_sync_timeline(timeline, chmap=None, floor_percentile=10, threshold=None):
225 """
226 Load sync channels from a timeline object.
228 Note: Because the scan frequency is typically faster than the sample rate, the position and
229 edge count channels may detect more than one front between samples. Therefore for these, the
230 raw data is more accurate than the extracted polarities.
232 Parameters
233 ----------
234 timeline : dict, str, pathlib.Path
235 A timeline object or the file or folder path of the _timeline_DAQdata files.
236 chmap : dict
237 A map of channel names and channel IDs.
238 floor_percentile : float
239 10% removes the percentile value of the analog trace before thresholding. This is to avoid
240 DC offset drift.
241 threshold : float, dict of str: float
242 The threshold for applying to analogue channels. If None, take mean after subtracting
243 floor percentile offset.
245 Returns
246 -------
247 one.alf.io.AlfBunch
248 A dictionary with keys ('times', 'polarities', 'channels'), containing the sync pulses and
249 the corresponding channel numbers.
250 dict, optional
251 A map of channel names and their corresponding indices for sync channels, if chmap is None.
252 """
253 if isinstance(timeline, (str, Path)): 1dbcfa
254 timeline = alfio.load_object(timeline, 'DAQdata', namespace='timeline') 1d
255 assert timeline.keys() >= {'timestamps', 'raw', 'meta'}, 'Timeline object missing attributes' 1dbcfa
257 # If no channel map was passed, load it from 'wiring' file, or extract from meta file
258 return_chmap = chmap is None 1dbcfa
259 chmap = chmap or timeline.get('wiring') or timeline_meta2chmap(timeline['meta']) 1dbcfa
261 # Initialize sync object
262 sync = alfio.AlfBunch((k, np.array([], dtype=d)) for k, d in 1dbcfa
263 (('times', 'f'), ('channels', 'u1'), ('polarities', 'i1')))
264 for label, i in chmap.items(): 1dbcfa
265 try: 1dbcfa
266 info = next(x for x in timeline['meta']['inputs'] if x['name'].lower() == label.lower()) 1dbcfa
267 except StopIteration: 1d
268 logger.warning('sync channel "%s" not found', label) 1d
269 continue 1d
270 raw = timeline['raw'][:, info['arrayColumn'] - 1] # -1 because MATLAB indexes from 1 1dbcfa
271 if info['measurement'] == 'Voltage': 1dbcfa
272 # Get TLLs by applying a threshold to the diff of voltage samples
273 offset = np.percentile(raw, floor_percentile, axis=0) 1dbcfa
274 daqID = info['daqChannelID'] 1dbcfa
275 logger.debug(f'{label} ({daqID}): estimated analogue channel DC Offset approx. {np.mean(offset):.2f}') 1dbcfa
276 step = threshold.get(label) if isinstance(threshold, dict) else threshold 1dbcfa
277 if step is None: 1dbcfa
278 step = np.max(raw - offset) / 2 1dbcfa
279 iup = ibldsp.utils.rises(raw - offset, step=step, analog=True) 1dbcfa
280 idown = ibldsp.utils.falls(raw - offset, step=step, analog=True) 1dbcfa
281 pol = np.r_[np.ones_like(iup), -np.ones_like(idown)].astype('i1') 1dbcfa
282 ind = np.r_[iup, idown] 1dbcfa
284 sync.polarities = np.concatenate((sync.polarities, pol)) 1dbcfa
285 elif info['measurement'] == 'EdgeCount': 1dbca
286 # Monotonically increasing values; extract indices where delta == 1
287 raw = correct_counter_discontinuities(raw) 1dbca
288 ind, = np.where(np.diff(raw)) 1dbca
289 ind += 1 1dbca
290 sync.polarities = np.concatenate((sync.polarities, np.ones_like(ind, dtype='i1'))) 1dbca
291 elif info['measurement'] == 'Position': 1dbca
292 # Bidirectional; extract indices where delta != 0
293 raw = correct_counter_discontinuities(raw) 1dbca
294 d = np.diff(raw) 1dbca
295 ind, = np.where(~np.isclose(d, 0)) 1dbca
296 sync.polarities = np.concatenate((sync.polarities, np.sign(d[ind]).astype('i1'))) 1dbca
297 ind += 1 1dbca
298 else:
299 raise NotImplementedError(f'{info["measurement"]} sync extraction') 1d
300 # Append timestamps of indices and channel index to sync arrays
301 sync.times = np.concatenate((sync.times, timeline['timestamps'][ind])) 1dbcfa
302 sync.channels = np.concatenate((sync.channels, np.full(ind.shape, i, dtype='u1'))) 1dbcfa
304 # Sort arrays by time
305 assert sync.check_dimensions == 0 1dbcfa
306 t_ind = np.argsort(sync.times) 1dbcfa
307 for k in sync: 1dbcfa
308 sync[k] = sync[k][t_ind] 1dbcfa
309 if return_chmap: 1dbcfa
310 return sync, chmap
311 else:
312 return sync 1dbcfa
315def timeline_meta2wiring(path, save=False):
316 """
317 Given a timeline meta data object, return a dictionary of wiring info.
319 Parameters
320 ----------
321 path : str, pathlib.Path
322 The path of the timeline meta file, _timeline_DAQdata.meta.
323 save : bool
324 If true, save the timeline wiring file in the same location as the meta file,
325 _timeline_DAQData.wiring.json.
327 Returns
328 -------
329 dict
330 A dictionary with base keys {'SYSTEM', 'SYNC_WIRING_DIGITAL', 'SYNC_WIRING_ANALOG'}, the
331 latter of which contain maps of channel names and their IDs.
332 pathlib.Path
333 If save=True, returns the path of the wiring file.
334 """
335 meta = alfio.load_object(path, 'DAQdata', namespace='timeline', attribute='meta').get('meta') 1k
336 assert meta, 'No meta data in timeline object' 1k
337 wiring = defaultdict(dict, SYSTEM='timeline') 1k
338 for input in meta['inputs']: 1k
339 key = 'SYNC_WIRING_' + ('ANALOG' if input['measurement'] == 'Voltage' else 'DIGITAL') 1k
340 wiring[key][input['daqChannelID']] = input['name'] 1k
341 if save: 1k
342 out_path = Path(path) / to_alf('DAQ data', 'wiring', 'json', namespace='timeline') 1k
343 with open(out_path, 'w') as fp: 1k
344 json.dump(wiring, fp) 1k
345 return dict(wiring), out_path 1k
346 return dict(wiring) 1k
349def timeline_meta2chmap(meta, exclude_channels=None, include_channels=None):
350 """
351 Convert a timeline meta object to a sync channel map.
353 Parameters
354 ----------
355 meta : dict
356 A loaded timeline metadata file, i.e. _timeline_DAQdata.meta.
357 exclude_channels : list
358 An optional list of channels to exclude from the channel map.
359 include_channels : list
360 An optional list of channels to include from the channel map, takes priority over the
361 exclude list.
363 Returns
364 -------
365 dict
366 A map of channel names and their corresponding indices for sync channels.
367 """
368 chmap = {} 1dmbciaj
369 for input in meta.get('inputs', []): 1dmbciaj
370 if (include_channels is not None and input['name'] not in include_channels) or \ 1dmbciaj
371 (exclude_channels and input['name'] in exclude_channels):
372 continue 1mbciaj
373 chmap[input['name']] = input['arrayColumn'] 1dmbciaj
374 return chmap 1dmbciaj
377def timeline_get_channel(timeline, channel_name):
378 """
379 Given a timeline object, returns the vector of values recorded from a given channel name.
381 Parameters
382 ----------
383 timeline : one.alf.io.AlfBunch
384 A loaded timeline object.
385 channel_name : str
386 The name of a channel to extract.
388 Returns
389 -------
390 numpy.array
391 The channel data.
392 """
393 idx = next(ch['arrayColumn'] for ch in timeline['meta']['inputs'] if ch['name'] == channel_name) 1ola
394 return timeline['raw'][:, idx - 1] # -1 because MATLAB indices start from 1, not 0 1ola