Coverage for ibllib/pipes/dynamic_pipeline.py: 93%
274 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
1"""Task pipeline creation from an acquisition description.
3The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml`
4file and determines the set of tasks required to preprocess the session.
6In the experiment description file there is a 'tasks' key that defines each task protocol and the
7location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors'
8field that should contain a list of dynamic pipeline task class names for extracting the task data.
9These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the
10extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name.
12NB: The standard behvaiour extraction task classes (e.g.
13:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`)
14handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod
15trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod
16trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor`
17map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while
18non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal
19projects repo. The Bpod trials extractor class must be a subclass of the
20:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the
21personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module.
22"""
23import logging
24import re
25from fnmatch import fnmatch
26from collections import OrderedDict
27from pathlib import Path
28from itertools import chain
29import yaml
31import spikeglx
33import ibllib.io.raw_data_loaders as rawio
34import ibllib.io.session_params as sess_params
35import ibllib.pipes.tasks as mtasks
36import ibllib.pipes.base_tasks as bstasks
37import ibllib.pipes.widefield_tasks as wtasks
38import ibllib.pipes.mesoscope_tasks as mscope_tasks
39import ibllib.pipes.sync_tasks as stasks
40import ibllib.pipes.behavior_tasks as btasks
41import ibllib.pipes.video_tasks as vtasks
42import ibllib.pipes.ephys_tasks as etasks
43import ibllib.pipes.audio_tasks as atasks
44import ibllib.pipes.neurophotometrics as ptasks
46_logger = logging.getLogger(__name__)
49def acquisition_description_legacy_session(session_path, save=False):
50 """
51 From a legacy session create a dictionary corresponding to the acquisition description.
53 Parameters
54 ----------
55 session_path : str, pathlib.Path
56 A path to a session to describe.
57 save : bool
58 If true, saves the acquisition description file to _ibl_experiment.description.yaml.
60 Returns
61 -------
62 dict
63 The legacy acquisition description.
64 """
65 settings = rawio.load_settings(session_path) 1oqrpls
66 protocol = settings.get('PYBPOD_PROTOCOL', 'UNKNOWN') 1oqrpls
67 dict_ad = get_acquisition_description(protocol) 1oqrpls
68 if save: 1oqrpls
69 sess_params.write_params(session_path=session_path, data=dict_ad)
70 return dict_ad 1oqrpls
73def get_acquisition_description(protocol):
74 """"
75 This is a set of example acquisition descriptions for experiments
76 - choice_world_recording
77 - choice_world_biased
78 - choice_world_training
79 - choice_world_habituation
80 - choice_world_passive
81 That are part of the IBL pipeline
82 """
83 if 'ephys' in protocol: # canonical ephys 1oqrpls
84 devices = { 1rls
85 'cameras': {
86 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'},
87 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'},
88 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
89 },
90 'neuropixel': {
91 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'},
92 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'}
93 },
94 'microphone': {
95 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
96 },
97 }
98 acquisition_description = { # this is the current ephys pipeline description 1rls
99 'devices': devices,
100 'tasks': [
101 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}},
102 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}
103 ],
104 'sync': {
105 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'}
106 },
107 'procedures': ['Ephys recording with acute probe(s)'],
108 'projects': ['ibl_neuropixel_brainwide_01']
109 }
110 else:
111 devices = { 1oqpl
112 'cameras': {
113 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
114 },
115 'microphone': {
116 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
117 },
118 }
119 acquisition_description = { # this is the current ephys pipeline description 1oqpl
120 'devices': devices,
121 'sync': {'bpod': {'collection': 'raw_behavior_data'}},
122 'procedures': ['Behavior training/tasks'],
123 'projects': ['ibl_neuropixel_brainwide_01']
124 }
125 if 'biased' in protocol: 1oqpl
126 key = 'biasedChoiceWorld' 1oq
127 elif 'training' in protocol: 1opl
128 key = 'trainingChoiceWorld' 1opl
129 elif 'habituation' in protocol: 1l
130 key = 'habituationChoiceWorld'
131 else:
132 raise ValueError(f'Unknown protocol "{protocol}"') 1l
133 acquisition_description['tasks'] = [{key: { 1oqpl
134 'collection': 'raw_behavior_data',
135 'sync_label': 'bpod'
136 }}]
137 acquisition_description['version'] = '1.0.0' 1oqrpls
138 return acquisition_description 1oqrpls
141def _sync_label(sync, acquisition_software=None, **_):
142 """
143 Returns the sync label based on the sync type and acquisition software.
145 The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'.
146 The 'acquisition_software' refers to the software used to acquire the data, e.g.
147 for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect
148 how the data are loaded and extracted, and therefore which tasks to use.
150 The naming convention here is not ideal, and may be changed in the future.
152 Parameters
153 ----------
154 sync : str
155 The sync type, e.g. 'nidq', 'tdms', 'bpod'.
156 acquisition_software : str
157 The acquisition software used to acquire the sync data.
159 Returns
160 -------
161 str
162 The sync label for determining the extractor tasks.
163 """
165 return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync 1aoltnmcdefbkgijh
168def _load_acquisition_description(session_path):
169 """
170 Load a session's acquisition description.
172 Attempts to load from the session path and upon failure, attempts to generate one based on the
173 task protocol (this only works for legacy pipeline sessions).
175 Parameters
176 ----------
177 session_path : str, pathlib.Path
178 A session path.
180 Returns
181 -------
182 dict
183 The acquisition description file.
184 """
185 acquisition_description = sess_params.read_params(session_path) 1aolnmcdefbkgijh
186 if not acquisition_description: 1aolnmcdefbkgijh
187 try: 1ol
188 # v7 sessions used a different folder name for task data;
189 # v8 sessions should always have a description file
190 assert session_path.joinpath('raw_behavior_data').exists() 1ol
191 acquisition_description = acquisition_description_legacy_session(session_path) 1ol
192 assert acquisition_description 1ol
193 except (AssertionError, ValueError): 1l
194 raise ValueError('Experiment description file not found or is empty') 1l
195 return acquisition_description 1aolnmcdefbkgijh
198def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=None, one=None):
199 """
200 Generate behaviour tasks from acquisition description.
202 This returns all behaviour related tasks including TrialsRegisterRaw and TrainingStatus objects.
204 Parameters
205 ----------
206 session_path : str, pathlib.Path
207 A session path.
208 acquisition_description : dict
209 An acquisition description.
210 sync_tasks : list
211 A list of sync tasks to use as behaviour task parents.
212 one : One
213 An instance of ONE to pass to each task.
215 Returns
216 -------
217 dict[str, ibllib.pipes.tasks.Task]
218 A map of Alyx task name to behaviour task object.
219 """
220 if not acquisition_description: 1aolnmcdefbkgijh
221 acquisition_description = _load_acquisition_description(session_path)
222 tasks = OrderedDict() 1aolnmcdefbkgijh
223 sync_tasks = sync_tasks or [] 1aolnmcdefbkgijh
224 kwargs = {'session_path': session_path, 'one': one} 1aolnmcdefbkgijh
226 # Syncing tasks
227 (sync, sync_args), = acquisition_description['sync'].items() 1aolnmcdefbkgijh
228 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1aolnmcdefbkgijh
229 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1aolnmcdefbkgijh
230 sync_args['sync_ext'] = sync_args.pop('extension', None) 1aolnmcdefbkgijh
231 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1aolnmcdefbkgijh
232 sync_kwargs = {'sync': sync, **sync_args} 1aolnmcdefbkgijh
234 # Behavior tasks
235 protocol_numbers = set() 1aolnmcdefbkgijh
236 task_protocols = acquisition_description.get('tasks', []) 1aolnmcdefbkgijh
237 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1aolnmcdefbkgijh
238 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1aolnmcdefbkgijh
239 task_kwargs = {'protocol': protocol, 'collection': collection} 1aolnmcdefbkgijh
240 # The order of protocols in the list will take precedence unless protocol_number is present.
241 # If collections are numbered, check that the numbers match the order.
242 n = i 1aolnmcdefbkgijh
243 if re.match(r'^raw_task_data_\d{2}$', collection): 1aolnmcdefbkgijh
244 # Protocol number may be overridden by the protocol_number key
245 if task_info.get('protocol_number') is not None: 1lnbgi
246 n = task_info['protocol_number'] 1l
247 task_kwargs['protocol_number'] = n 1lnbgi
248 assert not (n in protocol_numbers or protocol_numbers.add(n)), 'protocol numbers must be unique' 1lnbgi
249 if int(collection.split('_')[-1]) != n: 1lnbgi
250 _logger.warning('Number in collection name does not match task order') 1l
251 if extractors := task_info.get('extractors', False): 1aolnmcdefbkgijh
252 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bg
253 task_name = None # to avoid unbound variable issue in the first round 1bg
254 for j, extractor in enumerate(extractors): 1bg
255 # Assume previous task in the list is a parent
256 parents = [] if j == 0 else [tasks[task_name]] 1bg
257 # Make sure extractor and sync task don't collide
258 for sync_option in ('nidq', 'bpod', 'timeline'): 1bg
259 if sync_option in extractor.lower() and not sync_label == sync_option: 1bg
260 raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match') 1b
262 # Look for the extractor in the behavior extractors module
263 if hasattr(btasks, extractor): 1bg
264 task = getattr(btasks, extractor) 1bg
265 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example
266 elif hasattr(btasks, extractor + sync_label.capitalize()): 1b
267 task = getattr(btasks, extractor + sync_label.capitalize())
268 else:
269 # lookup in the project extraction repo if we find an extractor class
270 import projects.extraction_tasks 1b
271 if hasattr(projects.extraction_tasks, extractor): 1b
272 task = getattr(projects.extraction_tasks, extractor)
273 elif hasattr(projects.extraction_tasks, extractor + sync_label.capitalize()): 1b
274 task = getattr(btasks, extractor + sync_label.capitalize())
275 else:
276 raise NotImplementedError( 1b
277 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects')
278 _logger.debug('%s (protocol #%i, task #%i) = %s.%s', 1bg
279 protocol, i, j, task.__module__, task.__name__)
280 # Rename the class to something more informative
281 task_name = f'{task.__name__}_{i:02}' 1bg
282 if not (task.__name__.startswith('TrainingStatus') or task.__name__.endswith('RegisterRaw')): 1bg
283 task_name = f'Trials_{task_name}' 1bg
284 # For now we assume that the second task in the list is always the trials extractor, which is dependent
285 # on the sync task and sync arguments
286 if j == 1: 1bg
287 tasks[task_name] = type(task_name, (task,), {})( 1bg
288 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks
289 )
290 else:
291 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bg
292 # For the next task, we assume that the previous task is the parent
293 else: # Legacy block to handle sessions without defined extractors
294 # - choice_world_recording
295 # - choice_world_biased
296 # - choice_world_training
297 # - choice_world_habituation
298 if 'passiveChoiceWorld' in protocol: 1aolnmcdefkijh
299 registration_class = btasks.PassiveRegisterRaw 1alcdefh
300 try: 1alcdefh
301 behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize()) 1alcdefh
302 except AttributeError:
303 raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"')
304 compute_status = False 1alcdefh
305 elif 'habituation' in protocol: 1aolnmcdefkijh
306 registration_class = btasks.HabituationRegisterRaw 1k
307 behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize()) 1k
308 compute_status = False 1k
309 else:
310 registration_class = btasks.TrialRegisterRaw 1aolnmcdefijh
311 try: 1aolnmcdefijh
312 behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize()) 1aolnmcdefijh
313 except AttributeError: 1l
314 raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') 1l
315 compute_status = True 1aolnmcdefijh
316 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1aolnmcdefkijh
317 **kwargs, **task_kwargs)
318 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1aolnmcdefkijh
319 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1aolnmcdefkijh
320 **kwargs, **sync_kwargs, **task_kwargs, parents=parents)
321 if compute_status: 1aolnmcdefkijh
322 tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1aolnmcdefijh
323 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']])
324 return tasks 1aolnmcdefbkgijh
327def get_trials_tasks(session_path, one=None, bpod_only=False):
328 """
329 Return a list of pipeline trials extractor task objects for a given session.
331 This function supports both legacy and dynamic pipeline sessions. Dynamic tasks are returned
332 for both recent and legacy sessions. Only Trials tasks are returned, not the training status
333 or raw registration tasks.
335 Parameters
336 ----------
337 session_path : str, pathlib.Path
338 An absolute path to a session.
339 one : one.api.One
340 An ONE instance.
341 bpod_only : bool
342 If true, extract trials from Bpod clock instead of the main DAQ's.
344 Returns
345 -------
346 list of pipes.tasks.Task
347 A list of task objects for the provided session.
349 Examples
350 --------
351 Return the tasks for active choice world extraction
353 >>> tasks = list(filter(is_active_trials_task, get_trials_tasks(session_path)))
354 """
355 # Check for an experiment.description file; ensure downloaded if possible
356 if one and one.to_eid(session_path): # to_eid returns None if session not registered 1ol
357 one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False) 1l
358 acquisition_description = _load_acquisition_description(session_path) 1ol
359 if bpod_only and acquisition_description: 1ol
360 acquisition_description['sync'] = {'bpod': {'collection': 'raw_task_data_*'}} 1l
361 try: 1ol
362 trials_tasks = _get_trials_tasks(session_path, acquisition_description, one=one) 1ol
363 return [v for k, v in trials_tasks.items() if k.startswith('Trials_')] 1ol
364 except NotImplementedError as ex: 1l
365 _logger.warning('Failed to get trials tasks: %s', ex) 1l
366 return [] 1l
369def is_active_trials_task(task) -> bool:
370 """
371 Check if task is for active choice world extraction.
373 Parameters
374 ----------
375 task : ibllib.pipes.tasks.Task
376 A task instance to test.
378 Returns
379 -------
380 bool
381 True if the task name starts with 'Trials_' and outputs a trials.table dataset.
382 """
383 trials_task = task.name.lower().startswith('trials_')
384 output_names = [x[0] for x in task.signature.get('output_files', [])]
385 return trials_task and any(fnmatch('_ibl_trials.table.pqt', pat) for pat in output_names)
388def make_pipeline(session_path, **pkwargs):
389 """
390 Creates a pipeline of extractor tasks from a session's experiment description file.
392 Parameters
393 ----------
394 session_path : str, Path
395 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'.
396 pkwargs
397 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor.
399 Returns
400 -------
401 ibllib.pipes.tasks.Pipeline
402 A task pipeline object.
403 """
404 # NB: this pattern is a pattern for dynamic class creation
405 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path)
406 if not session_path or not (session_path := Path(session_path)).exists(): 1anmcdefbkgijh
407 raise ValueError('Session path does not exist')
408 tasks = OrderedDict() 1anmcdefbkgijh
409 acquisition_description = _load_acquisition_description(session_path) 1anmcdefbkgijh
410 devices = acquisition_description.get('devices', {}) 1anmcdefbkgijh
411 kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} 1anmcdefbkgijh
413 # Registers the experiment description file
414 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1anmcdefbkgijh
415 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs)
417 # Syncing tasks
418 (sync, sync_args), = acquisition_description['sync'].items() 1anmcdefbkgijh
419 sync_args = sync_args.copy() # ensure acquisition_description unchanged 1anmcdefbkgijh
420 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1anmcdefbkgijh
421 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1anmcdefbkgijh
422 sync_args['sync_ext'] = sync_args.pop('extension', None) 1anmcdefbkgijh
423 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1anmcdefbkgijh
424 sync_kwargs = {'sync': sync, **sync_args} 1anmcdefbkgijh
425 sync_tasks = [] 1anmcdefbkgijh
426 if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1anmcdefbkgijh
427 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1acdefb
428 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1acdefb
429 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
430 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1acdefb
431 elif sync_label == 'timeline': 1nmbkgijh
432 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1g
433 elif sync_label == 'nidq': 1nmbkijh
434 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1h
435 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1h
436 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
437 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1h
438 elif sync_label == 'tdms': 1nmbkij
439 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
440 elif sync_label == 'bpod': 1nmbkij
441 pass # ATM we don't have anything for this; it may not be needed in the future 1nmbkij
443 # Behavior tasks
444 tasks.update( 1anmcdefbkgijh
445 _get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one'))
446 )
448 # Ephys tasks
449 if 'neuropixel' in devices: 1anmcdefbkgijh
450 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1acdefb
451 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1acdefb
453 all_probes = [] 1acdefb
454 register_tasks = [] 1acdefb
455 for pname, probe_info in devices['neuropixel'].items(): 1acdefb
456 # Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4
457 # extractions, however.
458 probe_collection = next(session_path.glob(probe_info['collection'] + '*')) 1acdefb
459 meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta') 1acdefb
460 meta_file = meta_file[0].get('ap') 1acdefb
461 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb
462 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb
464 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1acdefb
465 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})(
466 **kwargs, **ephys_kwargs, pname=pname)
467 all_probes.append(pname)
468 register_tasks.append(tasks[f'EphyCompressNP21_{pname}'])
469 elif nptype == 'NP2.4' and nshanks > 1: 1acdefb
470 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1fb
471 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks)
472 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1fb
473 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1fb
474 else:
475 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1acde
476 **kwargs, **ephys_kwargs, pname=pname)
477 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1acde
478 all_probes.append(pname) 1acde
480 if nptype == '3A': 1acdefb
481 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1d
482 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks)
484 for pname in all_probes: 1acdefb
485 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1acdefb
487 if nptype != '3A': 1acdefb
488 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1acefb
489 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks)
490 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1acefb
491 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']])
492 else:
493 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1d
494 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']])
496 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1acdefb
497 **kwargs, **ephys_kwargs, pname=pname, parents=register_task)
499 # Video tasks
500 if 'cameras' in devices: 1anmcdefbkgijh
501 cams = list(devices['cameras'].keys()) 1amcdefbkgijh
502 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1amcdefbkgijh
503 video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams} 1amcdefbkgijh
504 video_compressed = sess_params.get_video_compressed(acquisition_description) 1amcdefbkgijh
506 if video_compressed: 1amcdefbkgijh
507 # This is for widefield case where the video is already compressed
508 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) 1h
509 dlc_parent_task = tasks['VideoConvert'] 1h
510 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1h
511 **kwargs, **video_kwargs, **sync_kwargs)
512 else:
513 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1amcdefbkgij
514 **kwargs, **video_kwargs)
515 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1amcdefbkgij
516 **kwargs, **video_kwargs, **sync_kwargs)
517 dlc_parent_task = tasks['VideoCompress'] 1amcdefbkgij
518 if sync == 'bpod': 1amcdefbkgij
519 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1mbkij
520 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']])
521 elif sync == 'nidq': 1acdefbg
522 # Here we restrict to videos that we support (left, right or body)
523 video_kwargs['cameras'] = subset_cams 1acdefbg
524 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1acdefbg
525 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks)
527 if sync_kwargs['sync'] != 'bpod': 1amcdefbkgijh
528 # Here we restrict to videos that we support (left, right or body)
529 # Currently there is no plan to run DLC on the belly cam
530 subset_cams = [c for c in cams if c in ('left', 'right', 'body')] 1acdefbgh
531 video_kwargs['cameras'] = subset_cams 1acdefbgh
532 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1acdefbgh
533 **kwargs, **video_kwargs, parents=[dlc_parent_task])
535 # The PostDLC plots require a trials object for QC
536 # Find the first task that outputs a trials.table dataset
537 trials_task = ( 1acdefbgh
538 t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', []))
539 )
540 if trials_task := next(trials_task, None): 1acdefbgh
541 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task] 1acdefbgh
542 trials_collection = getattr(trials_task, 'output_collection', 'alf') 1acdefbgh
543 else:
544 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']]
545 trials_collection = 'alf'
546 tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( 1acdefbgh
547 **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents)
549 # Audio tasks
550 if 'microphone' in devices: 1anmcdefbkgijh
551 (microphone, micro_kwargs), = devices['microphone'].items() 1anmcdefbkijh
552 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1anmcdefbkijh
553 if sync_kwargs['sync'] == 'bpod': 1anmcdefbkijh
554 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1nmbkij
555 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection'])
556 elif sync_kwargs['sync'] == 'nidq': 1acdefbh
557 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1acdefbh
559 # Widefield tasks
560 if 'widefield' in devices: 1anmcdefbkgijh
561 (_, wfield_kwargs), = devices['widefield'].items() 1h
562 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1h
563 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1h
564 **kwargs, **wfield_kwargs)
565 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1h
566 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']])
567 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1h
568 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']])
569 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1h
570 **kwargs, **wfield_kwargs, **sync_kwargs,
571 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks)
572 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1h
573 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']])
575 # Mesoscope tasks
576 if 'mesoscope' in devices: 1anmcdefbkgijh
577 (_, mscope_kwargs), = devices['mesoscope'].items() 1g
578 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1g
579 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1g
580 **kwargs, **mscope_kwargs)
581 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1g
582 **kwargs, **mscope_kwargs)
583 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1g
584 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
585 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1g
586 **kwargs, **mscope_kwargs, **sync_kwargs)
587 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1g
588 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
590 if 'neurophotometrics' in devices: 1anmcdefbkgijh
591 # {'collection': 'raw_photometry_data', 'datetime': '2024-09-18T16:43:55.207000',
592 # 'fibers': {'G0': {'location': 'NBM'}, 'G1': {'location': 'SI'}}, 'sync_channel': 1}
593 photometry_kwargs = devices['neurophotometrics'] 1i
594 tasks['FibrePhotometrySync'] = type('FibrePhotometrySync', ( 1i
595 ptasks.FibrePhotometrySync,), {})(**kwargs, **photometry_kwargs)
597 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1anmcdefbkgijh
598 p.tasks = tasks 1anmcdefbkgijh
599 return p 1anmcdefbkgijh
602def make_pipeline_dict(pipeline, save=True):
603 task_dicts = pipeline.create_tasks_list_from_pipeline() 1cdefbkgijh
604 # TODO better name
605 if save: 1cdefbkgijh
606 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file:
607 _ = yaml.dump(task_dicts, file)
608 return task_dicts 1cdefbkgijh
611def load_pipeline_dict(path):
612 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1acdefbkgijh
613 task_list = yaml.full_load(file) 1acdefbkgijh
615 return task_list 1acdefbkgijh