Coverage for ibllib/pipes/dynamic_pipeline.py: 92%
269 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1"""Task pipeline creation from an acquisition description.
3The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml`
4file and determines the set of tasks required to preprocess the session.
6In the experiment description file there is a 'tasks' key that defines each task protocol and the
7location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors'
8field that should contain a list of dynamic pipeline task class names for extracting the task data.
9These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the
10extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name.
12NB: The standard behvaiour extraction task classes (e.g.
13:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`)
14handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod
15trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod
16trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor`
17map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while
18non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal
19projects repo. The Bpod trials extractor class must be a subclass of the
20:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the
21personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module.
22"""
23import logging
24import re
25from fnmatch import fnmatch
26from collections import OrderedDict
27from pathlib import Path
28from itertools import chain
29import yaml
31import spikeglx
33import ibllib.io.raw_data_loaders as rawio
34import ibllib.io.session_params as sess_params
35import ibllib.pipes.tasks as mtasks
36import ibllib.pipes.base_tasks as bstasks
37import ibllib.pipes.widefield_tasks as wtasks
38import ibllib.pipes.mesoscope_tasks as mscope_tasks
39import ibllib.pipes.sync_tasks as stasks
40import ibllib.pipes.behavior_tasks as btasks
41import ibllib.pipes.video_tasks as vtasks
42import ibllib.pipes.ephys_tasks as etasks
43import ibllib.pipes.audio_tasks as atasks
44import ibllib.pipes.neurophotometrics as ptasks
46_logger = logging.getLogger(__name__)
49def acquisition_description_legacy_session(session_path, save=False):
50 """
51 From a legacy session create a dictionary corresponding to the acquisition description.
53 Parameters
54 ----------
55 session_path : str, pathlib.Path
56 A path to a session to describe.
57 save : bool
58 If true, saves the acquisition description file to _ibl_experiment.description.yaml.
60 Returns
61 -------
62 dict
63 The legacy acquisition description.
64 """
65 settings = rawio.load_settings(session_path) 1oqrpms
66 protocol = settings.get('PYBPOD_PROTOCOL', 'UNKNOWN') 1oqrpms
67 dict_ad = get_acquisition_description(protocol) 1oqrpms
68 if save: 1oqrpms
69 sess_params.write_params(session_path=session_path, data=dict_ad)
70 return dict_ad 1oqrpms
73def get_acquisition_description(protocol):
74 """"
75 This is a set of example acquisition descriptions for experiments
76 - choice_world_recording
77 - choice_world_biased
78 - choice_world_training
79 - choice_world_habituation
80 - choice_world_passive
81 That are part of the IBL pipeline
82 """
83 if 'ephys' in protocol: # canonical ephys 1oqrpms
84 devices = { 1rms
85 'cameras': {
86 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'},
87 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'},
88 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
89 },
90 'neuropixel': {
91 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'},
92 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'}
93 },
94 'microphone': {
95 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
96 },
97 }
98 acquisition_description = { # this is the current ephys pipeline description 1rms
99 'devices': devices,
100 'tasks': [
101 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}},
102 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}
103 ],
104 'sync': {
105 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'}
106 },
107 'procedures': ['Ephys recording with acute probe(s)'],
108 'projects': ['ibl_neuropixel_brainwide_01']
109 }
110 else:
111 devices = { 1oqpm
112 'cameras': {
113 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
114 },
115 'microphone': {
116 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
117 },
118 }
119 acquisition_description = { # this is the current ephys pipeline description 1oqpm
120 'devices': devices,
121 'sync': {'bpod': {'collection': 'raw_behavior_data'}},
122 'procedures': ['Behavior training/tasks'],
123 'projects': ['ibl_neuropixel_brainwide_01']
124 }
125 if 'biased' in protocol: 1oqpm
126 key = 'biasedChoiceWorld' 1oq
127 elif 'training' in protocol: 1opm
128 key = 'trainingChoiceWorld' 1opm
129 elif 'habituation' in protocol: 1m
130 key = 'habituationChoiceWorld'
131 else:
132 raise ValueError(f'Unknown protocol "{protocol}"') 1m
133 acquisition_description['tasks'] = [{key: { 1oqpm
134 'collection': 'raw_behavior_data',
135 'sync_label': 'bpod'
136 }}]
137 acquisition_description['version'] = '1.0.0' 1oqrpms
138 return acquisition_description 1oqrpms
141def _sync_label(sync, acquisition_software=None, **_):
142 """
143 Returns the sync label based on the sync type and acquisition software.
145 The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'.
146 The 'acquisition_software' refers to the software used to acquire the data, e.g.
147 for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect
148 how the data are loaded and extracted, and therefore which tasks to use.
150 The naming convention here is not ideal, and may be changed in the future.
152 Parameters
153 ----------
154 sync : str
155 The sync type, e.g. 'nidq', 'tdms', 'bpod'.
156 acquisition_software : str
157 The acquisition software used to acquire the sync data.
159 Returns
160 -------
161 str
162 The sync label for determining the extractor tasks.
163 """
165 return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync 1aomtnlcdefbkhijg
168def _load_acquisition_description(session_path):
169 """
170 Load a session's acquisition description.
172 Attempts to load from the session path and upon failure, attempts to generate one based on the
173 task protocol (this only works for legacy pipeline sessions).
175 Parameters
176 ----------
177 session_path : str, pathlib.Path
178 A session path.
180 Returns
181 -------
182 dict
183 The acquisition description file.
184 """
185 acquisition_description = sess_params.read_params(session_path) 1aomnlcdefbkhijg
186 if not acquisition_description: 1aomnlcdefbkhijg
187 try: 1om
188 # v7 sessions used a different folder name for task data;
189 # v8 sessions should always have a description file
190 assert session_path.joinpath('raw_behavior_data').exists() 1om
191 acquisition_description = acquisition_description_legacy_session(session_path) 1om
192 assert acquisition_description 1om
193 except (AssertionError, ValueError): 1m
194 raise ValueError('Experiment description file not found or is empty') 1m
195 return acquisition_description 1aomnlcdefbkhijg
198def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=None, one=None):
199 """
200 Generate behaviour tasks from acquisition description.
202 This returns all behaviour related tasks including TrialsRegisterRaw and TrainingStatus objects.
204 Parameters
205 ----------
206 session_path : str, pathlib.Path
207 A session path.
208 acquisition_description : dict
209 An acquisition description.
210 sync_tasks : list
211 A list of sync tasks to use as behaviour task parents.
212 one : One
213 An instance of ONE to pass to each task.
215 Returns
216 -------
217 dict[str, ibllib.pipes.tasks.Task]
218 A map of Alyx task name to behaviour task object.
219 """
220 if not acquisition_description: 1aomnlcdefbkhijg
221 acquisition_description = _load_acquisition_description(session_path)
222 tasks = OrderedDict() 1aomnlcdefbkhijg
223 sync_tasks = sync_tasks or [] 1aomnlcdefbkhijg
224 kwargs = {'session_path': session_path, 'one': one} 1aomnlcdefbkhijg
226 # Syncing tasks
227 (sync, sync_args), = acquisition_description['sync'].items() 1aomnlcdefbkhijg
228 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1aomnlcdefbkhijg
229 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1aomnlcdefbkhijg
230 sync_args['sync_ext'] = sync_args.pop('extension', None) 1aomnlcdefbkhijg
231 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1aomnlcdefbkhijg
232 sync_kwargs = {'sync': sync, **sync_args} 1aomnlcdefbkhijg
234 # Behavior tasks
235 task_protocols = acquisition_description.get('tasks', []) 1aomnlcdefbkhijg
236 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1aomnlcdefbkhijg
237 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1aomnlcdefbkhijg
238 task_kwargs = {'protocol': protocol, 'collection': collection} 1aomnlcdefbkhijg
239 # For now the order of protocols in the list will take precedence. If collections are numbered,
240 # check that the numbers match the order. This may change in the future.
241 if re.match(r'^raw_task_data_\d{2}$', collection): 1aomnlcdefbkhijg
242 task_kwargs['protocol_number'] = i 1mnbhi
243 if int(collection.split('_')[-1]) != i: 1mnbhi
244 _logger.warning('Number in collection name does not match task order')
245 if extractors := task_info.get('extractors', False): 1aomnlcdefbkhijg
246 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bh
247 task_name = None # to avoid unbound variable issue in the first round 1bh
248 for j, extractor in enumerate(extractors): 1bh
249 # Assume previous task in the list is a parent
250 parents = [] if j == 0 else [tasks[task_name]] 1bh
251 # Make sure extractor and sync task don't collide
252 for sync_option in ('nidq', 'bpod', 'timeline'): 1bh
253 if sync_option in extractor.lower() and not sync_label == sync_option: 1bh
254 raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match') 1b
256 # Look for the extractor in the behavior extractors module
257 if hasattr(btasks, extractor): 1bh
258 task = getattr(btasks, extractor) 1bh
259 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example
260 elif hasattr(btasks, extractor + sync_label.capitalize()): 1b
261 task = getattr(btasks, extractor + sync_label.capitalize())
262 else:
263 # lookup in the project extraction repo if we find an extractor class
264 import projects.extraction_tasks 1b
265 if hasattr(projects.extraction_tasks, extractor): 1b
266 task = getattr(projects.extraction_tasks, extractor)
267 elif hasattr(projects.extraction_tasks, extractor + sync_label.capitalize()): 1b
268 task = getattr(btasks, extractor + sync_label.capitalize())
269 else:
270 raise NotImplementedError( 1b
271 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects')
272 _logger.debug('%s (protocol #%i, task #%i) = %s.%s', 1bh
273 protocol, i, j, task.__module__, task.__name__)
274 # Rename the class to something more informative
275 task_name = f'{task.__name__}_{i:02}' 1bh
276 if not (task.__name__.startswith('TrainingStatus') or task.__name__.endswith('RegisterRaw')): 1bh
277 task_name = f'Trials_{task_name}' 1bh
278 # For now we assume that the second task in the list is always the trials extractor, which is dependent
279 # on the sync task and sync arguments
280 if j == 1: 1bh
281 tasks[task_name] = type(task_name, (task,), {})( 1bh
282 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks
283 )
284 else:
285 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bh
286 # For the next task, we assume that the previous task is the parent
287 else: # Legacy block to handle sessions without defined extractors
288 # - choice_world_recording
289 # - choice_world_biased
290 # - choice_world_training
291 # - choice_world_habituation
292 if 'passiveChoiceWorld' in protocol: 1aomnlcdefkijg
293 registration_class = btasks.PassiveRegisterRaw 1amcdefg
294 try: 1amcdefg
295 behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize()) 1amcdefg
296 except AttributeError:
297 raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"')
298 compute_status = False 1amcdefg
299 elif 'habituation' in protocol: 1aomnlcdefkijg
300 registration_class = btasks.HabituationRegisterRaw 1k
301 behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize()) 1k
302 compute_status = False 1k
303 else:
304 registration_class = btasks.TrialRegisterRaw 1aomnlcdefijg
305 try: 1aomnlcdefijg
306 behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize()) 1aomnlcdefijg
307 except AttributeError: 1m
308 raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') 1m
309 compute_status = True 1aomnlcdefijg
310 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1aomnlcdefkijg
311 **kwargs, **task_kwargs)
312 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1aomnlcdefkijg
313 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1aomnlcdefkijg
314 **kwargs, **sync_kwargs, **task_kwargs, parents=parents)
315 if compute_status: 1aomnlcdefkijg
316 tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1aomnlcdefijg
317 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']])
318 return tasks 1aomnlcdefbkhijg
321def get_trials_tasks(session_path, one=None, bpod_only=False):
322 """
323 Return a list of pipeline trials extractor task objects for a given session.
325 This function supports both legacy and dynamic pipeline sessions. Dynamic tasks are returned
326 for both recent and legacy sessions. Only Trials tasks are returned, not the training status
327 or raw registration tasks.
329 Parameters
330 ----------
331 session_path : str, pathlib.Path
332 An absolute path to a session.
333 one : one.api.One
334 An ONE instance.
335 bpod_only : bool
336 If true, extract trials from Bpod clock instead of the main DAQ's.
338 Returns
339 -------
340 list of pipes.tasks.Task
341 A list of task objects for the provided session.
343 Examples
344 --------
345 Return the tasks for active choice world extraction
347 >>> tasks = list(filter(is_active_trials_task, get_trials_tasks(session_path)))
348 """
349 # Check for an experiment.description file; ensure downloaded if possible
350 if one and one.to_eid(session_path): # to_eid returns None if session not registered 1om
351 one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False) 1m
352 acquisition_description = _load_acquisition_description(session_path) 1om
353 if bpod_only and acquisition_description: 1om
354 acquisition_description['sync'] = {'bpod': {'collection': 'raw_task_data_*'}} 1m
355 try: 1om
356 trials_tasks = _get_trials_tasks(session_path, acquisition_description, one=one) 1om
357 return [v for k, v in trials_tasks.items() if k.startswith('Trials_')] 1om
358 except NotImplementedError as ex: 1m
359 _logger.warning('Failed to get trials tasks: %s', ex) 1m
360 return [] 1m
363def is_active_trials_task(task) -> bool:
364 """
365 Check if task is for active choice world extraction.
367 Parameters
368 ----------
369 task : ibllib.pipes.tasks.Task
370 A task instance to test.
372 Returns
373 -------
374 bool
375 True if the task name starts with 'Trials_' and outputs a trials.table dataset.
376 """
377 trials_task = task.name.lower().startswith('trials_')
378 output_names = [x[0] for x in task.signature.get('output_files', [])]
379 return trials_task and any(fnmatch('_ibl_trials.table.pqt', pat) for pat in output_names)
382def make_pipeline(session_path, **pkwargs):
383 """
384 Creates a pipeline of extractor tasks from a session's experiment description file.
386 Parameters
387 ----------
388 session_path : str, Path
389 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'.
390 pkwargs
391 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor.
393 Returns
394 -------
395 ibllib.pipes.tasks.Pipeline
396 A task pipeline object.
397 """
398 # NB: this pattern is a pattern for dynamic class creation
399 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path)
400 if not session_path or not (session_path := Path(session_path)).exists(): 1anlcdefbkhijg
401 raise ValueError('Session path does not exist')
402 tasks = OrderedDict() 1anlcdefbkhijg
403 acquisition_description = _load_acquisition_description(session_path) 1anlcdefbkhijg
404 devices = acquisition_description.get('devices', {}) 1anlcdefbkhijg
405 kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} 1anlcdefbkhijg
407 # Registers the experiment description file
408 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1anlcdefbkhijg
409 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs)
411 # Syncing tasks
412 (sync, sync_args), = acquisition_description['sync'].items() 1anlcdefbkhijg
413 sync_args = sync_args.copy() # ensure acquisition_description unchanged 1anlcdefbkhijg
414 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1anlcdefbkhijg
415 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1anlcdefbkhijg
416 sync_args['sync_ext'] = sync_args.pop('extension', None) 1anlcdefbkhijg
417 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1anlcdefbkhijg
418 sync_kwargs = {'sync': sync, **sync_args} 1anlcdefbkhijg
419 sync_tasks = [] 1anlcdefbkhijg
420 if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1anlcdefbkhijg
421 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1acdefb
422 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1acdefb
423 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
424 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1acdefb
425 elif sync_label == 'timeline': 1nlbkhijg
426 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1h
427 elif sync_label == 'nidq': 1nlbkijg
428 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1g
429 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1g
430 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
431 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1g
432 elif sync_label == 'tdms': 1nlbkij
433 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
434 elif sync_label == 'bpod': 1nlbkij
435 pass # ATM we don't have anything for this; it may not be needed in the future 1nlbkij
437 # Behavior tasks
438 tasks.update( 1anlcdefbkhijg
439 _get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one'))
440 )
442 # Ephys tasks
443 if 'neuropixel' in devices: 1anlcdefbkhijg
444 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1acdefb
445 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1acdefb
447 all_probes = [] 1acdefb
448 register_tasks = [] 1acdefb
449 for pname, probe_info in devices['neuropixel'].items(): 1acdefb
450 # Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4
451 # extractions, however.
452 probe_collection = next(session_path.glob(probe_info['collection'] + '*')) 1acdefb
453 meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta') 1acdefb
454 meta_file = meta_file[0].get('ap') 1acdefb
455 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb
456 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb
458 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1acdefb
459 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})(
460 **kwargs, **ephys_kwargs, pname=pname)
461 all_probes.append(pname)
462 register_tasks.append(tasks[f'EphyCompressNP21_{pname}'])
463 elif nptype == 'NP2.4' and nshanks > 1: 1acdefb
464 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1fb
465 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks)
466 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1fb
467 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1fb
468 else:
469 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1acde
470 **kwargs, **ephys_kwargs, pname=pname)
471 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1acde
472 all_probes.append(pname) 1acde
474 if nptype == '3A': 1acdefb
475 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1d
476 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks)
478 for pname in all_probes: 1acdefb
479 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1acdefb
481 if nptype != '3A': 1acdefb
482 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1acefb
483 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks)
484 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1acefb
485 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']])
486 else:
487 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1d
488 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']])
490 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1acdefb
491 **kwargs, **ephys_kwargs, pname=pname, parents=register_task)
493 # Video tasks
494 if 'cameras' in devices: 1anlcdefbkhijg
495 cams = list(devices['cameras'].keys()) 1alcdefbkhijg
496 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1alcdefbkhijg
497 video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams} 1alcdefbkhijg
498 video_compressed = sess_params.get_video_compressed(acquisition_description) 1alcdefbkhijg
500 if video_compressed: 1alcdefbkhijg
501 # This is for widefield case where the video is already compressed
502 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) 1g
503 dlc_parent_task = tasks['VideoConvert'] 1g
504 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1g
505 **kwargs, **video_kwargs, **sync_kwargs)
506 else:
507 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1alcdefbkhij
508 **kwargs, **video_kwargs)
509 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1alcdefbkhij
510 **kwargs, **video_kwargs, **sync_kwargs)
511 dlc_parent_task = tasks['VideoCompress'] 1alcdefbkhij
512 if sync == 'bpod': 1alcdefbkhij
513 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1lbkij
514 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']])
515 elif sync == 'nidq': 1acdefbh
516 # Here we restrict to videos that we support (left, right or body)
517 video_kwargs['cameras'] = subset_cams 1acdefbh
518 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1acdefbh
519 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks)
521 if sync_kwargs['sync'] != 'bpod': 1alcdefbkhijg
522 # Here we restrict to videos that we support (left, right or body)
523 # Currently there is no plan to run DLC on the belly cam
524 subset_cams = [c for c in cams if c in ('left', 'right', 'body')] 1acdefbhg
525 video_kwargs['cameras'] = subset_cams 1acdefbhg
526 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1acdefbhg
527 **kwargs, **video_kwargs, parents=[dlc_parent_task])
529 # The PostDLC plots require a trials object for QC
530 # Find the first task that outputs a trials.table dataset
531 trials_task = ( 1acdefbhg
532 t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', []))
533 )
534 if trials_task := next(trials_task, None): 1acdefbhg
535 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task] 1acdefbhg
536 trials_collection = getattr(trials_task, 'output_collection', 'alf') 1acdefbhg
537 else:
538 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']]
539 trials_collection = 'alf'
540 tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( 1acdefbhg
541 **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents)
543 # Audio tasks
544 if 'microphone' in devices: 1anlcdefbkhijg
545 (microphone, micro_kwargs), = devices['microphone'].items() 1anlcdefbkijg
546 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1anlcdefbkijg
547 if sync_kwargs['sync'] == 'bpod': 1anlcdefbkijg
548 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1nlbkij
549 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection'])
550 elif sync_kwargs['sync'] == 'nidq': 1acdefbg
551 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1acdefbg
553 # Widefield tasks
554 if 'widefield' in devices: 1anlcdefbkhijg
555 (_, wfield_kwargs), = devices['widefield'].items() 1g
556 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1g
557 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1g
558 **kwargs, **wfield_kwargs)
559 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1g
560 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']])
561 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1g
562 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']])
563 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1g
564 **kwargs, **wfield_kwargs, **sync_kwargs,
565 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks)
566 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1g
567 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']])
569 # Mesoscope tasks
570 if 'mesoscope' in devices: 1anlcdefbkhijg
571 (_, mscope_kwargs), = devices['mesoscope'].items() 1h
572 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1h
573 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1h
574 **kwargs, **mscope_kwargs)
575 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1h
576 **kwargs, **mscope_kwargs)
577 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1h
578 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
579 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1h
580 **kwargs, **mscope_kwargs, **sync_kwargs)
581 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1h
582 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
584 if 'neurophotometrics' in devices: 1anlcdefbkhijg
585 # {'collection': 'raw_photometry_data', 'datetime': '2024-09-18T16:43:55.207000',
586 # 'fibers': {'G0': {'location': 'NBM'}, 'G1': {'location': 'SI'}}, 'sync_channel': 1}
587 photometry_kwargs = devices['neurophotometrics'] 1i
588 tasks['FibrePhotometrySync'] = type('FibrePhotometrySync', ( 1i
589 ptasks.FibrePhotometrySync,), {})(**kwargs, **photometry_kwargs)
591 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1anlcdefbkhijg
592 p.tasks = tasks 1anlcdefbkhijg
593 return p 1anlcdefbkhijg
596def make_pipeline_dict(pipeline, save=True):
597 task_dicts = pipeline.create_tasks_list_from_pipeline() 1cdefbkhijg
598 # TODO better name
599 if save: 1cdefbkhijg
600 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file:
601 _ = yaml.dump(task_dicts, file)
602 return task_dicts 1cdefbkhijg
605def load_pipeline_dict(path):
606 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1acdefbkhijg
607 task_list = yaml.full_load(file) 1acdefbkhijg
609 return task_list 1acdefbkhijg