Coverage for ibllib/pipes/sync_tasks.py: 95%
82 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
1import logging
3from ibllib.pipes import base_tasks
4from ibllib.io.extractors.ephys_fpga import extract_sync
5from iblutil.util import Bunch
7import spikeglx
9_logger = logging.getLogger('ibllib')
12class SyncRegisterRaw(base_tasks.RegisterRawDataTask):
13 """Task to register raw DAQ data.
15 Registers DAQ software output for a given device. The object should be _*_DAQdata, where the
16 namespace identifies the DAQ model or acquisition software, e.g. 'mcc', 'ni' or 'ni-usb-6211'.
17 At minimum there should be a raw data dataset of the form _*_DAQdata.raw*, e.g.
18 '_mc_DAQdata.raw.pqt'. The following are optional attribute datasets:
19 - _*_DAQdata.timestamps.npy: for timeline the timestamps array is separate from the samples.
20 - _*_DAQdata.meta.json: for timeline all acquisition meta data (e.g. sample rate, channel
21 names) are stored in a separate file.
22 - _*_DAQdata.wiring.json: for SpikeGLX the channel map is stored in this file.
23 _timeline_softwareEvents.log.htsv: UDP messages and other software events in DAQ time.
24 """
25 priority = 100
26 job_size = 'small'
28 @property
29 def signature(self):
30 signature = { 1jk
31 'input_files': [],
32 'output_files': [(f'*DAQdata.raw.{self.sync_ext}', self.sync_collection, True),
33 ('*DAQdata.timestamps.npy', self.sync_collection, False),
34 ('*DAQdata.meta.json', self.sync_collection, False),
35 ('*DAQdata.wiring.json', self.sync_collection, False),
36 ('*softwareEvents.log.htsv', self.sync_collection, False)]
37 }
38 return signature 1jk
41class SyncMtscomp(base_tasks.DynamicTask):
42 """
43 Task to rename, compress and register raw daq data with .bin format collected using NIDAQ
44 """
46 priority = 90
47 cpu = 2
48 io_charge = 30 # this jobs reads raw ap files
49 job_size = 'small'
51 @property
52 def signature(self):
53 signature = { 1idfbca
54 'input_files': [('*.*bin', self.sync_collection, True),
55 ('*.meta', self.sync_collection, True),
56 ('*.wiring.json', self.sync_collection, True)],
57 'output_files': [(f'_{self.sync_namespace}_DAQdata.raw.cbin', self.sync_collection, True),
58 (f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True),
59 (f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True),
60 (f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)]
61 }
62 return signature 1idfbca
64 def _run(self):
66 out_files = [] 1dfbca
68 # Detect the wiring file and rename (if it hasn't already been renamed)
69 wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None) 1dfbca
70 if wiring_file is not None: 1dfbca
71 if 'DAQdata.wiring' not in wiring_file.stem: 1dfbca
72 new_wiring_file = wiring_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.wiring.json') 1bc
73 wiring_file.replace(new_wiring_file) 1bc
74 else:
75 new_wiring_file = wiring_file 1dfa
77 out_files.append(new_wiring_file) 1dfbca
79 # Search for .bin files in the sync_collection folder
80 bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None) 1dfbca
82 # If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files
83 if bin_file is None: 1dfbca
84 for ext in ['ch', 'meta']: 1f
85 ext_file = next(self.session_path.joinpath(self.sync_collection).glob(f'*.{ext}'), None) 1f
86 if ext_file is not None: 1f
87 if 'DAQdata.raw' not in ext_file.stem: 1f
88 new_ext_file = ext_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{ext_file.suffix}') 1f
89 ext_file.replace(new_ext_file) 1f
90 else:
91 new_ext_file = ext_file 1f
92 out_files.append(new_ext_file) 1f
94 return out_files if len(out_files) > 0 else None 1f
96 # If we do find the .bin file, compress files (only if they haven't already been compressed)
97 sr = spikeglx.Reader(bin_file) 1dbca
98 if sr.is_mtscomp: 1dbca
99 sr.close() 1b
100 cbin_file = bin_file 1b
101 assert cbin_file.suffix == '.cbin' 1b
102 else:
103 cbin_file = sr.compress_file() 1dca
104 sr.close() 1dca
105 bin_file.unlink() 1dca
107 # Rename files (only if they haven't already been renamed)
108 if 'DAQdata.raw' not in cbin_file.stem: 1dbca
109 new_bin_file = cbin_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{cbin_file.suffix}') 1dbca
110 cbin_file.replace(new_bin_file) 1dbca
112 meta_file = cbin_file.with_suffix('.meta') 1dbca
113 new_meta_file = new_bin_file.with_suffix('.meta') 1dbca
114 meta_file.replace(new_meta_file) 1dbca
116 ch_file = cbin_file.with_suffix('.ch') 1dbca
117 new_ch_file = new_bin_file.with_suffix('.ch') 1dbca
118 ch_file.replace(new_ch_file) 1dbca
119 else:
120 new_bin_file = cbin_file
121 new_meta_file = cbin_file.with_suffix('.meta')
122 new_ch_file = cbin_file.with_suffix('.ch')
124 out_files.append(new_bin_file) 1dbca
125 out_files.append(new_ch_file) 1dbca
126 out_files.append(new_meta_file) 1dbca
128 return out_files 1dbca
131class SyncPulses(base_tasks.DynamicTask):
132 """
133 Extract sync pulses from NIDAQ .bin / .cbin file
134 N.B Only extracts sync from sync collection (i.e not equivalent to EphysPulses that extracts sync pulses for each probe)
136 # TODO generalise to other daq and file formats, generalise to 3A probes
137 """
139 priority = 90
140 cpu = 2
141 io_charge = 30 # this jobs reads raw ap files
142 job_size = 'small'
144 @property
145 def signature(self):
146 signature = { 1iga
147 'input_files': [(f'_{self.sync_namespace}_DAQdata.raw.*bin', self.sync_collection, True),
148 (f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True),
149 (f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True),
150 (f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)],
151 'output_files': [(f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
152 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
153 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True)]
154 }
155 return signature 1iga
157 def _run(self, overwrite=False):
158 bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None) 1hga
159 if not bin_file: 1hga
160 return []
162 # TODO this is a hack, once we refactor the sync tasks should make generic extract_sync
163 # that doesn't rely on output of glob_ephys_files
164 files = [Bunch({'nidq': bin_file, 'label': ''})] 1hga
166 _, outputs = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite, namespace=self.sync_namespace) 1hga
168 return outputs 1hga