Coverage for ibllib/pipes/dynamic_pipeline.py: 93%
267 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""Task pipeline creation from an acquisition description.
3The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml`
4file and determines the set of tasks required to preprocess the session.
6In the experiment description file there is a 'tasks' key that defines each task protocol and the
7location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors'
8field that should contain a list of dynamic pipeline task class names for extracting the task data.
9These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the
10extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name.
12NB: The standard behvaiour extraction task classes (e.g.
13:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`)
14handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod
15trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod
16trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor`
17map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while
18non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal
19projects repo. The Bpod trials extractor class must be a subclass of the
20:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the
21personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module.
22"""
23import logging
24import re
25from collections import OrderedDict
26from pathlib import Path
27from itertools import chain
28import yaml
30import spikeglx
32import ibllib.io.session_params as sess_params
33from ibllib.io.extractors.base import get_pipeline, get_session_extractor_type
34import ibllib.pipes.tasks as mtasks
35import ibllib.pipes.base_tasks as bstasks
36import ibllib.pipes.widefield_tasks as wtasks
37import ibllib.pipes.mesoscope_tasks as mscope_tasks
38import ibllib.pipes.sync_tasks as stasks
39import ibllib.pipes.behavior_tasks as btasks
40import ibllib.pipes.video_tasks as vtasks
41import ibllib.pipes.ephys_tasks as etasks
42import ibllib.pipes.audio_tasks as atasks
43import ibllib.pipes.photometry_tasks as ptasks
44# from ibllib.pipes.photometry_tasks import FibrePhotometryPreprocess, FibrePhotometryRegisterRaw
46_logger = logging.getLogger(__name__)
49def acquisition_description_legacy_session(session_path, save=False):
50 """
51 From a legacy session create a dictionary corresponding to the acquisition description.
53 Parameters
54 ----------
55 session_path : str, pathlib.Path
56 A path to a session to describe.
57 save : bool
58 If true, saves the acquisition description file to _ibl_experiment.description.yaml.
60 Returns
61 -------
62 dict
63 The legacy acquisition description.
64 """
65 extractor_type = get_session_extractor_type(session_path) 1proqsj
66 etype2protocol = dict(biased='choice_world_biased', habituation='choice_world_habituation', 1proqsj
67 training='choice_world_training', ephys='choice_world_recording')
68 dict_ad = get_acquisition_description(etype2protocol[extractor_type]) 1proqsj
69 if save: 1proqsj
70 sess_params.write_params(session_path=session_path, data=dict_ad) 1qj
71 return dict_ad 1proqsj
74def get_acquisition_description(protocol):
75 """"
76 This is a set of example acquisition descriptions for experiments
77 - choice_world_recording
78 - choice_world_biased
79 - choice_world_training
80 - choice_world_habituation
81 - choice_world_passive
82 That are part of the IBL pipeline
83 """
84 if protocol == 'choice_world_recording': # canonical ephys 1proqsj
85 devices = { 1rqs
86 'cameras': {
87 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'},
88 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'},
89 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
90 },
91 'neuropixel': {
92 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'},
93 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'}
94 },
95 'microphone': {
96 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
97 },
98 }
99 acquisition_description = { # this is the current ephys pipeline description 1rqs
100 'devices': devices,
101 'tasks': [
102 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}},
103 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}
104 ],
105 'sync': {
106 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'}
107 },
108 'procedures': ['Ephys recording with acute probe(s)'],
109 'projects': ['ibl_neuropixel_brainwide_01']
110 }
111 else:
112 devices = { 1poj
113 'cameras': {
114 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
115 },
116 'microphone': {
117 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
118 },
119 }
120 acquisition_description = { # this is the current ephys pipeline description 1poj
121 'devices': devices,
122 'sync': {'bpod': {'collection': 'raw_behavior_data'}},
123 'procedures': ['Behavior training/tasks'],
124 'projects': ['ibl_neuropixel_brainwide_01']
125 }
126 if protocol == 'choice_world_biased': 1poj
127 key = 'biasedChoiceWorld' 1pj
128 elif protocol == 'choice_world_training': 1oj
129 key = 'trainingChoiceWorld' 1oj
130 elif protocol == 'choice_world_habituation':
131 key = 'habituationChoiceWorld'
132 else:
133 raise ValueError(f'Unknown protocol "{protocol}"')
134 acquisition_description['tasks'] = [{key: { 1poj
135 'collection': 'raw_behavior_data',
136 'sync_label': 'bpod', 'main': True # FIXME: What is purpose of main key?
137 }}]
138 acquisition_description['version'] = '1.0.0' 1proqsj
139 return acquisition_description 1proqsj
142def _sync_label(sync, acquisition_software=None, **_):
143 """
144 Returns the sync label based on the sync type and acquisition software.
146 The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'.
147 The 'acquisition_software' refers to the software used to acquire the data, e.g.
148 for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect
149 how the data are loaded and extracted, and therefore which tasks to use.
151 The naming convention here is not ideal, and may be changed in the future.
153 Parameters
154 ----------
155 sync : str
156 The sync type, e.g. 'nidq', 'tdms', 'bpod'.
157 acquisition_software : str
158 The acquisition software used to acquire the sync data.
160 Returns
161 -------
162 str
163 The sync label for determining the extractor tasks.
164 """
166 return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync 1agtncdefbmiklhj
169def make_pipeline(session_path, **pkwargs):
170 """
171 Creates a pipeline of extractor tasks from a session's experiment description file.
173 Parameters
174 ----------
175 session_path : str, Path
176 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'.
177 pkwargs
178 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor.
180 Returns
181 -------
182 ibllib.pipes.tasks.Pipeline
183 A task pipeline object.
184 """
185 # NB: this pattern is a pattern for dynamic class creation
186 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path)
187 if not session_path or not (session_path := Path(session_path)).exists(): 1agncdefbmiklhj
188 raise ValueError('Session path does not exist')
189 tasks = OrderedDict() 1agncdefbmiklhj
190 acquisition_description = sess_params.read_params(session_path) 1agncdefbmiklhj
191 if not acquisition_description: 1agncdefbmiklhj
192 raise ValueError('Experiment description file not found or is empty')
193 devices = acquisition_description.get('devices', {}) 1agncdefbmiklhj
194 kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} 1agncdefbmiklhj
196 # Registers the experiment description file
197 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1agncdefbmiklhj
198 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs)
200 # Syncing tasks
201 (sync, sync_args), = acquisition_description['sync'].items() 1agncdefbmiklhj
202 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1agncdefbmiklhj
203 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1agncdefbmiklhj
204 sync_args['sync_ext'] = sync_args.pop('extension', None) 1agncdefbmiklhj
205 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1agncdefbmiklhj
206 sync_kwargs = {'sync': sync, **sync_args} 1agncdefbmiklhj
207 sync_tasks = [] 1agncdefbmiklhj
208 if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1agncdefbmiklhj
209 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1agcdefb
210 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1agcdefb
211 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
212 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1agcdefb
213 elif sync_label == 'timeline': 1gnbmiklhj
214 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1gi
215 elif sync_label == 'nidq': 1gnbmklhj
216 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1h
217 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1h
218 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
219 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1h
220 elif sync_label == 'tdms': 1gnbmklj
221 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
222 elif sync_label == 'bpod': 1gnbmklj
223 pass # ATM we don't have anything for this; it may not be needed in the future 1nbmklj
225 # Behavior tasks
226 task_protocols = acquisition_description.get('tasks', []) 1agncdefbmiklhj
227 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1agncdefbmiklhj
228 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1agncdefbmiklhj
229 task_kwargs = {'protocol': protocol, 'collection': collection} 1agncdefbmiklhj
230 # For now the order of protocols in the list will take precedence. If collections are numbered,
231 # check that the numbers match the order. This may change in the future.
232 if re.match(r'^raw_task_data_\d{2}$', collection): 1agncdefbmiklhj
233 task_kwargs['protocol_number'] = i 1gbi
234 if int(collection.split('_')[-1]) != i: 1gbi
235 _logger.warning('Number in collection name does not match task order')
236 if extractors := task_info.get('extractors', False): 1agncdefbmiklhj
237 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bi
238 task_name = None # to avoid unbound variable issue in the first round 1bi
239 for j, extractor in enumerate(extractors): 1bi
240 # Assume previous task in the list is parent
241 parents = [] if j == 0 else [tasks[task_name]] 1bi
242 # Make sure extractor and sync task don't collide
243 for sync_option in ('nidq', 'bpod', 'timeline'): 1bi
244 if sync_option in extractor.lower() and not sync_label == sync_option: 1bi
245 raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match') 1b
246 # TODO Assert sync_label correct here (currently unused)
247 # Look for the extractor in the behavior extractors module
248 if hasattr(btasks, extractor): 1bi
249 task = getattr(btasks, extractor) 1bi
250 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example
251 elif hasattr(btasks, extractor + sync_label.capitalize()): 1b
252 task = getattr(btasks, extractor + sync_label.capitalize())
253 else:
254 # lookup in the project extraction repo if we find an extractor class
255 import projects.extraction_tasks 1b
256 if hasattr(projects.extraction_tasks, extractor): 1b
257 task = getattr(projects.extraction_tasks, extractor)
258 else:
259 raise NotImplementedError( 1b
260 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects')
261 _logger.debug('%s (protocol #%i, task #%i) = %s.%s', 1bi
262 protocol, i, j, task.__module__, task.__name__)
263 # Rename the class to something more informative
264 task_name = f'{task.__name__}_{i:02}' 1bi
265 # For now we assume that the second task in the list is always the trials extractor, which is dependent
266 # on the sync task and sync arguments
267 if j == 1: 1bi
268 tasks[task_name] = type(task_name, (task,), {})( 1bi
269 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks
270 )
271 else:
272 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bi
273 # For the next task, we assume that the previous task is the parent
274 else: # Legacy block to handle sessions without defined extractors
275 # - choice_world_recording
276 # - choice_world_biased
277 # - choice_world_training
278 # - choice_world_habituation
279 if 'passiveChoiceWorld' in protocol: 1agncdefmklhj
280 registration_class = btasks.PassiveRegisterRaw 1agcdefh
281 try: 1agcdefh
282 behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize()) 1agcdefh
283 except AttributeError:
284 raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"')
285 compute_status = False 1agcdefh
286 elif 'habituation' in protocol: 1agncdefmklhj
287 registration_class = btasks.HabituationRegisterRaw 1m
288 behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize()) 1m
289 compute_status = False 1m
290 else:
291 registration_class = btasks.TrialRegisterRaw 1agncdefklhj
292 try: 1agncdefklhj
293 behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize()) 1agncdefklhj
294 except AttributeError: 1g
295 raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') 1g
296 compute_status = True 1agncdefklhj
297 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1agncdefmklhj
298 **kwargs, **task_kwargs)
299 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1agncdefmklhj
300 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1agncdefmklhj
301 **kwargs, **sync_kwargs, **task_kwargs, parents=parents)
302 if compute_status: 1agncdefmklhj
303 tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1agncdefklhj
304 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']])
306 # Ephys tasks
307 if 'neuropixel' in devices: 1agncdefbmiklhj
308 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1acdefb
309 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1acdefb
311 all_probes = [] 1acdefb
312 register_tasks = [] 1acdefb
313 for pname, probe_info in devices['neuropixel'].items(): 1acdefb
314 # Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4
315 # extractions, however.
316 probe_collection = next(session_path.glob(probe_info['collection'] + '*')) 1acdefb
317 meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta') 1acdefb
318 meta_file = meta_file[0].get('ap') 1acdefb
319 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb
320 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb
322 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1acdefb
323 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})(
324 **kwargs, **ephys_kwargs, pname=pname)
325 all_probes.append(pname)
326 register_tasks.append(tasks[f'EphyCompressNP21_{pname}'])
327 elif nptype == 'NP2.4' and nshanks > 1: 1acdefb
328 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1fb
329 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks)
330 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1fb
331 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1fb
332 else:
333 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1acde
334 **kwargs, **ephys_kwargs, pname=pname)
335 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1acde
336 all_probes.append(pname) 1acde
338 if nptype == '3A': 1acdefb
339 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1d
340 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks)
342 for pname in all_probes: 1acdefb
343 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1acdefb
345 if nptype != '3A': 1acdefb
346 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1acefb
347 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks)
348 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1acefb
349 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']])
350 else:
351 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1d
352 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']])
354 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1acdefb
355 **kwargs, **ephys_kwargs, pname=pname, parents=register_task)
356 tasks[f'EphysCellQC_{pname}'] = type(f'EphysCellQC_{pname}', (etasks.EphysCellsQc,), {})( 1acdefb
357 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'Spikesorting_{pname}']])
359 # Video tasks
360 if 'cameras' in devices: 1agncdefbmiklhj
361 cams = list(devices['cameras'].keys()) 1ancdefbmiklhj
362 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1ancdefbmiklhj
363 video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams} 1ancdefbmiklhj
364 video_compressed = sess_params.get_video_compressed(acquisition_description) 1ancdefbmiklhj
366 if video_compressed: 1ancdefbmiklhj
367 # This is for widefield case where the video is already compressed
368 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) 1kh
369 dlc_parent_task = tasks['VideoConvert'] 1kh
370 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1kh
371 **kwargs, **video_kwargs, **sync_kwargs)
372 else:
373 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1ancdefbmilj
374 **kwargs, **video_kwargs)
375 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1ancdefbmilj
376 **kwargs, **video_kwargs, **sync_kwargs)
377 dlc_parent_task = tasks['VideoCompress'] 1ancdefbmilj
378 if sync == 'bpod': 1ancdefbmilj
379 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1nbmlj
380 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']])
381 elif sync == 'nidq': 1acdefbi
382 # Here we restrict to videos that we support (left, right or body)
383 video_kwargs['cameras'] = subset_cams 1acdefbi
384 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1acdefbi
385 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks)
387 if sync_kwargs['sync'] != 'bpod': 1ancdefbmiklhj
388 # Here we restrict to videos that we support (left, right or body)
389 # Currently there is no plan to run DLC on the belly cam
390 subset_cams = [c for c in cams if c in ('left', 'right', 'body')] 1acdefbih
391 video_kwargs['cameras'] = subset_cams 1acdefbih
392 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1acdefbih
393 **kwargs, **video_kwargs, parents=[dlc_parent_task])
395 # The PostDLC plots require a trials object for QC
396 # Find the first task that outputs a trials.table dataset
397 trials_task = ( 1acdefbih
398 t for t in tasks.values() if any('trials.table' in f for f in t.signature.get('output_files', []))
399 )
400 if trials_task := next(trials_task, None): 1acdefbih
401 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task]
402 trials_collection = getattr(trials_task, 'output_collection', 'alf')
403 else:
404 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']] 1acdefbih
405 trials_collection = 'alf' 1acdefbih
406 tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( 1acdefbih
407 **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents)
409 # Audio tasks
410 if 'microphone' in devices: 1agncdefbmiklhj
411 (microphone, micro_kwargs), = devices['microphone'].items() 1ancdefbmklhj
412 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1ancdefbmklhj
413 if sync_kwargs['sync'] == 'bpod': 1ancdefbmklhj
414 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1nbmklj
415 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection'])
416 elif sync_kwargs['sync'] == 'nidq': 1acdefbh
417 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1acdefbh
419 # Widefield tasks
420 if 'widefield' in devices: 1agncdefbmiklhj
421 (_, wfield_kwargs), = devices['widefield'].items() 1h
422 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1h
423 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1h
424 **kwargs, **wfield_kwargs)
425 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1h
426 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']])
427 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1h
428 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']])
429 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1h
430 **kwargs, **wfield_kwargs, **sync_kwargs,
431 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks)
432 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1h
433 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']])
435 # Mesoscope tasks
436 if 'mesoscope' in devices: 1agncdefbmiklhj
437 (_, mscope_kwargs), = devices['mesoscope'].items() 1i
438 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1i
439 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1i
440 **kwargs, **mscope_kwargs)
441 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1i
442 **kwargs, **mscope_kwargs)
443 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1i
444 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
445 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1i
446 **kwargs, **mscope_kwargs, **sync_kwargs)
447 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1i
448 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
450 if 'photometry' in devices: 1agncdefbmiklhj
451 # {'collection': 'raw_photometry_data', 'sync_label': 'frame_trigger', 'regions': ['Region1G', 'Region3G']}
452 photometry_kwargs = devices['photometry'] 1k
453 tasks['FibrePhotometryRegisterRaw'] = type('FibrePhotometryRegisterRaw', ( 1k
454 ptasks.FibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs)
455 tasks['FibrePhotometryPreprocess'] = type('FibrePhotometryPreprocess', ( 1k
456 ptasks.FibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs,
457 parents=[tasks['FibrePhotometryRegisterRaw']] + sync_tasks)
459 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1agncdefbmiklhj
460 p.tasks = tasks 1agncdefbmiklhj
461 return p 1agncdefbmiklhj
464def make_pipeline_dict(pipeline, save=True):
465 task_dicts = pipeline.create_tasks_list_from_pipeline() 1cdefbmiklh
466 # TODO better name
467 if save: 1cdefbmiklh
468 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file:
469 _ = yaml.dump(task_dicts, file)
470 return task_dicts 1cdefbmiklh
473def load_pipeline_dict(path):
474 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1acdefbmiklh
475 task_list = yaml.full_load(file) 1acdefbmiklh
477 return task_list 1acdefbmiklh
480def get_trials_tasks(session_path, one=None):
481 """
482 Return a list of pipeline trials extractor task objects for a given session.
484 This function supports both legacy and dynamic pipeline sessions.
486 Parameters
487 ----------
488 session_path : str, pathlib.Path
489 An absolute path to a session.
490 one : one.api.One
491 An ONE instance.
493 Returns
494 -------
495 list of pipes.tasks.Task
496 A list of task objects for the provided session.
498 """
499 # Check for an experiment.description file; ensure downloaded if possible
500 if one and one.to_eid(session_path): # to_eid returns None if session not registered 1g
501 one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False) 1g
502 # NB: meta files only required to build neuropixel tasks in make_pipeline
503 if meta_files := one.list_datasets(session_path, '*.ap.meta', collection='raw_ephys_data*'): 1g
504 one.load_datasets(session_path, meta_files, download_only=True, assert_present=False) 1g
505 experiment_description = sess_params.read_params(session_path) 1g
507 # If experiment description file then use this to make the pipeline
508 if experiment_description is not None: 1g
509 tasks = [] 1g
510 try: 1g
511 pipeline = make_pipeline(session_path, one=one) 1g
512 trials_tasks = [t for t in pipeline.tasks if 'Trials' in t] 1g
513 for task in trials_tasks: 1g
514 t = pipeline.tasks.get(task) 1g
515 t.__init__(session_path, **t.kwargs) 1g
516 tasks.append(t) 1g
517 except NotImplementedError as ex: 1g
518 _logger.warning('Failed to get trials tasks: %s', ex) 1g
519 else:
520 # Otherwise default to old way of doing things
521 if one and one.to_eid(session_path): 1g
522 one.load_dataset(session_path, '_iblrig_taskSettings.raw', collection='raw_behavior_data', download_only=True) 1g
523 pipeline = get_pipeline(session_path) 1g
524 if pipeline == 'training': 1g
525 from ibllib.pipes.training_preprocessing import TrainingTrials 1g
526 tasks = [TrainingTrials(session_path, one=one)] 1g
527 elif pipeline == 'ephys': 1g
528 from ibllib.pipes.ephys_preprocessing import EphysTrials 1g
529 tasks = [EphysTrials(session_path, one=one)] 1g
530 else:
531 try: 1g
532 # try to find a custom extractor in the personal projects extraction class
533 import projects.base 1g
534 task_type = get_session_extractor_type(session_path) 1g
535 assert (PipelineClass := projects.base.get_pipeline(task_type)) 1g
536 pipeline = PipelineClass(session_path, one=one) 1g
537 trials_task_name = next((task for task in pipeline.tasks if 'Trials' in task), None) 1g
538 assert trials_task_name, (f'No "Trials" tasks for custom pipeline ' 1g
539 f'"{pipeline.name}" with extractor type "{task_type}"')
540 task = pipeline.tasks.get(trials_task_name) 1g
541 task(session_path) 1g
542 tasks = [task] 1g
543 except (ModuleNotFoundError, AssertionError) as ex: 1g
544 _logger.warning('Failed to get trials tasks: %s', ex) 1g
545 tasks = [] 1g
547 return tasks 1g