Coverage for ibllib/pipes/dynamic_pipeline.py: 93%
216 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import logging
2import re
3from collections import OrderedDict
4from pathlib import Path
5from itertools import chain
6import yaml
8import spikeglx
10import ibllib.io.session_params as sess_params
11import ibllib.io.extractors.base
12import ibllib.pipes.ephys_preprocessing as epp
13import ibllib.pipes.tasks as mtasks
14import ibllib.pipes.base_tasks as bstasks
15import ibllib.pipes.widefield_tasks as wtasks
16import ibllib.pipes.mesoscope_tasks as mscope_tasks
17import ibllib.pipes.sync_tasks as stasks
18import ibllib.pipes.behavior_tasks as btasks
19import ibllib.pipes.video_tasks as vtasks
20import ibllib.pipes.ephys_tasks as etasks
21import ibllib.pipes.audio_tasks as atasks
22import ibllib.pipes.photometry_tasks as ptasks
23# from ibllib.pipes.photometry_tasks import FibrePhotometryPreprocess, FibrePhotometryRegisterRaw
25_logger = logging.getLogger(__name__)
28def acquisition_description_legacy_session(session_path, save=False):
29 """
30 From a legacy session create a dictionary corresponding to the acquisition description.
32 Parameters
33 ----------
34 session_path : str, pathlib.Path
35 A path to a session to describe.
36 save : bool
37 If true, saves the acquisition description file to _ibl_experiment.description.yaml.
39 Returns
40 -------
41 dict
42 The legacy acquisition description.
43 """
44 extractor_type = ibllib.io.extractors.base.get_session_extractor_type(session_path=session_path) 1qrpsci
45 etype2protocol = dict(biased='choice_world_biased', habituation='choice_world_habituation', 1qrpsci
46 training='choice_world_training', ephys='choice_world_recording')
47 dict_ad = get_acquisition_description(etype2protocol[extractor_type]) 1qrpsci
48 if save: 1qrpsci
49 sess_params.write_params(session_path=session_path, data=dict_ad) 1ci
50 return dict_ad 1qrpsci
53def get_acquisition_description(protocol):
54 """"
55 This is a set of example acquisition descriptions for experiments
56 - choice_world_recording
57 - choice_world_biased
58 - choice_world_training
59 - choice_world_habituation
60 - choice_world_passive
61 That are part of the IBL pipeline
62 """
63 if protocol == 'choice_world_recording': # canonical ephys 1qrpsci
64 devices = { 1rsc
65 'cameras': {
66 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'},
67 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'},
68 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'},
69 },
70 'neuropixel': {
71 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'},
72 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'}
73 },
74 'microphone': {
75 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
76 },
77 }
78 acquisition_description = { # this is the current ephys pipeline description 1rsc
79 'devices': devices,
80 'tasks': [
81 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}},
82 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}}
83 ],
84 'sync': {
85 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'}
86 },
87 'procedures': ['Ephys recording with acute probe(s)'],
88 'projects': ['ibl_neuropixel_brainwide_01']
89 }
90 else:
91 devices = { 1qpi
92 'cameras': {
93 'left': {'collection': 'raw_video_data', 'sync_label': 'frame2ttl'},
94 },
95 'microphone': {
96 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None}
97 },
98 }
99 acquisition_description = { # this is the current ephys pipeline description 1qpi
100 'devices': devices,
101 'sync': {
102 'bpod': {'collection': 'raw_behavior_data', 'extension': 'bin'}
103 },
104 'procedures': ['Behavior training/tasks'],
105 'projects': ['ibl_neuropixel_brainwide_01']
106 }
107 if protocol == 'choice_world_biased': 1qpi
108 key = 'biasedChoiceWorld' 1qi
109 elif protocol == 'choice_world_training': 1pi
110 key = 'trainingChoiceWorld' 1pi
111 elif protocol == 'choice_world_habituation':
112 key = 'habituationChoiceWorld'
113 else:
114 raise ValueError(f'Unknown protocol "{protocol}"')
115 acquisition_description['tasks'] = [{key: { 1qpi
116 'collection': 'raw_behavior_data',
117 'sync_label': 'bpod', 'main': True # FIXME: What is purpose of main key?
118 }}]
119 acquisition_description['version'] = '1.0.0' 1qrpsci
120 return acquisition_description 1qrpsci
123def make_pipeline(session_path, **pkwargs):
124 """
125 Creates a pipeline of extractor tasks from a session's experiment description file.
127 Parameters
128 ----------
129 session_path : str, Path
130 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'.
131 **pkwargs
132 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor.
134 Returns
135 -------
136 ibllib.pipes.tasks.Pipeline
137 A task pipeline object.
138 """
139 # NB: this pattern is a pattern for dynamic class creation
140 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path)
141 if not session_path or not (session_path := Path(session_path)).exists(): 1aodefgbnjlmhcik
142 raise ValueError('Session path does not exist')
143 tasks = OrderedDict() 1aodefgbnjlmhcik
144 acquisition_description = sess_params.read_params(session_path) 1aodefgbnjlmhcik
145 if not acquisition_description: 1aodefgbnjlmhcik
146 raise ValueError('Experiment description file not found or is empty')
147 devices = acquisition_description.get('devices', {}) 1aodefgbnjlmhcik
148 kwargs = {'session_path': session_path} 1aodefgbnjlmhcik
150 # Registers the experiment description file
151 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1aodefgbnjlmhcik
152 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs)
154 # Syncing tasks
155 (sync, sync_args), = acquisition_description['sync'].items() 1aodefgbnjlmhcik
156 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1aodefgbnjlmhcik
157 sync_args['sync_ext'] = sync_args.pop('extension') 1aodefgbnjlmhcik
158 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1aodefgbnjlmhcik
159 sync_kwargs = {'sync': sync, **sync_args} 1aodefgbnjlmhcik
160 sync_tasks = [] 1aodefgbnjlmhcik
161 if sync == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1aodefgbnjlmhcik
162 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1adefgbc
163 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1adefgbc
164 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
165 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1adefgbc
166 elif sync_args['sync_namespace'] == 'timeline': 1obnjlmhik
167 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1jk
168 elif sync == 'nidq': 1obnlmhi
169 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1h
170 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1h
171 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']])
172 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1h
173 elif sync == 'tdms': 1obnlmi
174 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs)
175 elif sync == 'bpod': 1obnlmi
176 pass # ATM we don't have anything for this not sure it will be needed in the future 1obnlmi
178 # Behavior tasks
179 task_protocols = acquisition_description.get('tasks', []) 1aodefgbnjlmhcik
180 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1aodefgbnjlmhcik
181 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1aodefgbnjlmhcik
182 task_kwargs = {'protocol': protocol, 'collection': collection} 1aodefgbnjlmhcik
183 # For now the order of protocols in the list will take precedence. If collections are numbered,
184 # check that the numbers match the order. This may change in the future.
185 if re.match(r'^raw_task_data_\d{2}$', collection): 1aodefgbnjlmhcik
186 task_kwargs['protocol_number'] = i 1bjik
187 if int(collection.split('_')[-1]) != i: 1bjik
188 _logger.warning('Number in collection name does not match task order')
189 if extractors := task_info.get('extractors', False): 1aodefgbnjlmhcik
190 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bjk
191 task_name = None # to avoid unbound variable issue in the first round 1bjk
192 for j, extractor in enumerate(extractors): 1bjk
193 # Assume previous task in the list is parent
194 parents = [] if j == 0 else [tasks[task_name]] 1bjk
195 # Make sure extractor and sync task don't collide
196 for sync_option in ('nidq', 'bpod'): 1bjk
197 if sync_option in extractor.lower() and not sync == sync_option: 1bjk
198 raise ValueError(f'Extractor "{extractor}" and sync "{sync}" do not match') 1b
199 # Look for the extractor in the behavior extractors module
200 if hasattr(btasks, extractor): 1bjk
201 task = getattr(btasks, extractor) 1bjk
202 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for # example
203 elif hasattr(btasks, extractor + sync.capitalize()): 1b
204 task = getattr(btasks, extractor + sync.capitalize())
205 else:
206 # lookup in the project extraction repo if we find an extractor class
207 import projects.extraction_tasks 1b
208 if hasattr(projects.extraction_tasks, extractor): 1b
209 task = getattr(projects.extraction_tasks, extractor)
210 else:
211 raise NotImplementedError( 1b
212 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects')
213 # Rename the class to something more informative
214 task_name = f'{task.__name__}_{i:02}' 1bjk
215 # For now we assume that the second task in the list is always the trials extractor, which is dependent
216 # on the sync task and sync arguments
217 if j == 1: 1bjk
218 tasks[task_name] = type(task_name, (task,), {})( 1bjk
219 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks
220 )
221 else:
222 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bjk
223 # For the next task, we assume that the previous task is the parent
224 else: # Legacy block to handle sessions without defined extractors
225 # - choice_world_recording
226 # - choice_world_biased
227 # - choice_world_training
228 # - choice_world_habituation
229 if 'habituation' in protocol: 1aodefgnlmhci
230 registration_class = btasks.HabituationRegisterRaw 1n
231 behaviour_class = btasks.HabituationTrialsBpod 1n
232 compute_status = False 1n
233 elif 'passiveChoiceWorld' in protocol: 1aodefglmhci
234 registration_class = btasks.PassiveRegisterRaw 1adefghc
235 behaviour_class = btasks.PassiveTask 1adefghc
236 compute_status = False 1adefghc
237 elif sync_kwargs['sync'] == 'bpod': 1aodefglmhci
238 registration_class = btasks.TrialRegisterRaw 1olmi
239 behaviour_class = btasks.ChoiceWorldTrialsBpod 1olmi
240 compute_status = True 1olmi
241 elif sync_kwargs['sync'] == 'nidq': 1adefghc
242 registration_class = btasks.TrialRegisterRaw 1adefghc
243 behaviour_class = btasks.ChoiceWorldTrialsNidq 1adefghc
244 compute_status = True 1adefghc
245 else:
246 raise NotImplementedError
247 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1aodefgnlmhci
248 **kwargs, **task_kwargs)
249 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1aodefgnlmhci
250 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1aodefgnlmhci
251 **kwargs, **sync_kwargs, **task_kwargs, parents=parents)
252 if compute_status: 1aodefgnlmhci
253 tasks[f"TrainingStatus_{protocol}_{i:02}"] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1aodefglmhci
254 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']])
256 # Ephys tasks
257 if 'neuropixel' in devices: 1aodefgbnjlmhcik
258 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1adefgbc
259 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1adefgbc
261 all_probes = [] 1adefgbc
262 register_tasks = [] 1adefgbc
263 for pname, probe_info in devices['neuropixel'].items(): 1adefgbc
264 meta_file = spikeglx.glob_ephys_files(Path(session_path).joinpath(probe_info['collection']), ext='meta') 1adefgbc
265 meta_file = meta_file[0].get('ap') 1adefgbc
266 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1adefgbc
267 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1adefgbc
269 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1adefgbc
270 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})(
271 **kwargs, **ephys_kwargs, pname=pname)
272 all_probes.append(pname)
273 register_tasks.append(tasks[f'EphyCompressNP21_{pname}'])
274 elif nptype == 'NP2.4' and nshanks > 1: 1adefgbc
275 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1gb
276 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks)
277 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1gb
278 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1gb
279 else:
280 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1adefc
281 **kwargs, **ephys_kwargs, pname=pname)
282 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1adefc
283 all_probes.append(pname) 1adefc
285 if nptype == '3A': 1adefgbc
286 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1e
287 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks)
289 for pname in all_probes: 1adefgbc
290 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1adefgbc
292 if nptype != '3A': 1adefgbc
293 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1adfgbc
294 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks)
295 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1adfgbc
296 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']])
297 else:
298 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1e
299 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']])
301 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1adefgbc
302 **kwargs, **ephys_kwargs, pname=pname, parents=register_task)
303 tasks[f'EphysCellQC_{pname}'] = type(f'EphysCellQC_{pname}', (etasks.EphysCellsQc,), {})( 1adefgbc
304 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'Spikesorting_{pname}']])
306 # Video tasks
307 if 'cameras' in devices: 1aodefgbnjlmhcik
308 cams = list(devices['cameras'].keys()) 1aodefgbnjlmhcik
309 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1aodefgbnjlmhcik
310 video_kwargs = {'device_collection': 'raw_video_data', 1aodefgbnjlmhcik
311 'cameras': cams}
312 video_compressed = sess_params.get_video_compressed(acquisition_description) 1aodefgbnjlmhcik
314 if video_compressed: 1aodefgbnjlmhcik
315 # This is for widefield case where the video is already compressed
316 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})( 1lh
317 **kwargs, **video_kwargs)
318 dlc_parent_task = tasks['VideoConvert'] 1lh
319 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1lh
320 **kwargs, **video_kwargs, **sync_kwargs)
321 else:
322 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1aodefgbnjmcik
323 **kwargs, **video_kwargs)
324 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1aodefgbnjmcik
325 **kwargs, **video_kwargs, **sync_kwargs)
326 dlc_parent_task = tasks['VideoCompress'] 1aodefgbnjmcik
327 if sync == 'bpod': 1aodefgbnjmcik
328 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1obnmi
329 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']])
330 elif sync == 'nidq': 1adefgbjck
331 # Here we restrict to videos that we support (left, right or body)
332 video_kwargs['cameras'] = subset_cams 1adefgbjck
333 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1adefgbjck
334 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks)
336 if sync_kwargs['sync'] != 'bpod': 1aodefgbnjlmhcik
337 # Here we restrict to videos that we support (left, right or body)
338 video_kwargs['cameras'] = subset_cams 1adefgbjhck
339 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1adefgbjhck
340 **kwargs, **video_kwargs, parents=[dlc_parent_task])
341 tasks['PostDLC'] = type('PostDLC', (epp.EphysPostDLC,), {})( 1adefgbjhck
342 **kwargs, parents=[tasks['DLC'], tasks[f'VideoSyncQC_{sync}']])
344 # Audio tasks
345 if 'microphone' in devices: 1aodefgbnjlmhcik
346 (microphone, micro_kwargs), = devices['microphone'].items() 1aodefgbnlmhci
347 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1aodefgbnlmhci
348 if sync_kwargs['sync'] == 'bpod': 1aodefgbnlmhci
349 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1obnlmi
350 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection'])
351 elif sync_kwargs['sync'] == 'nidq': 1adefgbhc
352 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1adefgbhc
354 # Widefield tasks
355 if 'widefield' in devices: 1aodefgbnjlmhcik
356 (_, wfield_kwargs), = devices['widefield'].items() 1h
357 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1h
358 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1h
359 **kwargs, **wfield_kwargs)
360 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1h
361 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']])
362 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1h
363 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']])
364 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1h
365 **kwargs, **wfield_kwargs, **sync_kwargs,
366 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks)
367 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1h
368 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']])
370 # Mesoscope tasks
371 if 'mesoscope' in devices: 1aodefgbnjlmhcik
372 (_, mscope_kwargs), = devices['mesoscope'].items() 1jk
373 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1jk
374 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1jk
375 **kwargs, **mscope_kwargs)
376 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1jk
377 **kwargs, **mscope_kwargs)
378 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1jk
379 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
380 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1jk
381 **kwargs, **mscope_kwargs, **sync_kwargs)
382 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1jk
383 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']])
385 if 'photometry' in devices: 1aodefgbnjlmhcik
386 # {'collection': 'raw_photometry_data', 'sync_label': 'frame_trigger', 'regions': ['Region1G', 'Region3G']}
387 photometry_kwargs = devices['photometry'] 1l
388 tasks['FibrePhotometryRegisterRaw'] = type('FibrePhotometryRegisterRaw', ( 1l
389 ptasks.FibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs)
390 tasks['FibrePhotometryPreprocess'] = type('FibrePhotometryPreprocess', ( 1l
391 ptasks.FibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs,
392 parents=[tasks['FibrePhotometryRegisterRaw']] + sync_tasks)
394 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1aodefgbnjlmhcik
395 p.tasks = tasks 1aodefgbnjlmhcik
396 return p 1aodefgbnjlmhcik
399def make_pipeline_dict(pipeline, save=True):
400 task_dicts = pipeline.create_tasks_list_from_pipeline() 1defgbnjlmh
401 # TODO better name
402 if save: 1defgbnjlmh
403 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file:
404 _ = yaml.dump(task_dicts, file)
405 return task_dicts 1defgbnjlmh
408def load_pipeline_dict(path):
409 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1adefgbnjlmh
410 task_list = yaml.full_load(file) 1adefgbnjlmh
412 return task_list 1adefgbnjlmh