Coverage for ibllib/pipes/dynamic_pipeline.py: 93%

216 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import logging 

2import re 

3from collections import OrderedDict 

4from pathlib import Path 

5from itertools import chain 

6import yaml 

7 

8import spikeglx 

9 

10import ibllib.io.session_params as sess_params 

11import ibllib.io.extractors.base 

12import ibllib.pipes.ephys_preprocessing as epp 

13import ibllib.pipes.tasks as mtasks 

14import ibllib.pipes.base_tasks as bstasks 

15import ibllib.pipes.widefield_tasks as wtasks 

16import ibllib.pipes.mesoscope_tasks as mscope_tasks 

17import ibllib.pipes.sync_tasks as stasks 

18import ibllib.pipes.behavior_tasks as btasks 

19import ibllib.pipes.video_tasks as vtasks 

20import ibllib.pipes.ephys_tasks as etasks 

21import ibllib.pipes.audio_tasks as atasks 

22import ibllib.pipes.photometry_tasks as ptasks 

23# from ibllib.pipes.photometry_tasks import FibrePhotometryPreprocess, FibrePhotometryRegisterRaw 

24 

25_logger = logging.getLogger(__name__) 

26 

27 

28def acquisition_description_legacy_session(session_path, save=False): 

29 """ 

30 From a legacy session create a dictionary corresponding to the acquisition description. 

31 

32 Parameters 

33 ---------- 

34 session_path : str, pathlib.Path 

35 A path to a session to describe. 

36 save : bool 

37 If true, saves the acquisition description file to _ibl_experiment.description.yaml. 

38 

39 Returns 

40 ------- 

41 dict 

42 The legacy acquisition description. 

43 """ 

44 extractor_type = ibllib.io.extractors.base.get_session_extractor_type(session_path=session_path) 1qrpsci

45 etype2protocol = dict(biased='choice_world_biased', habituation='choice_world_habituation', 1qrpsci

46 training='choice_world_training', ephys='choice_world_recording') 

47 dict_ad = get_acquisition_description(etype2protocol[extractor_type]) 1qrpsci

48 if save: 1qrpsci

49 sess_params.write_params(session_path=session_path, data=dict_ad) 1ci

50 return dict_ad 1qrpsci

51 

52 

53def get_acquisition_description(protocol): 

54 """" 

55 This is a set of example acquisition descriptions for experiments 

56 - choice_world_recording 

57 - choice_world_biased 

58 - choice_world_training 

59 - choice_world_habituation 

60 - choice_world_passive 

61 That are part of the IBL pipeline 

62 """ 

63 if protocol == 'choice_world_recording': # canonical ephys 1qrpsci

64 devices = { 1rsc

65 'cameras': { 

66 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

67 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

68 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

69 }, 

70 'neuropixel': { 

71 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'}, 

72 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'} 

73 }, 

74 'microphone': { 

75 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

76 }, 

77 } 

78 acquisition_description = { # this is the current ephys pipeline description 1rsc

79 'devices': devices, 

80 'tasks': [ 

81 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}, 

82 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}} 

83 ], 

84 'sync': { 

85 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'} 

86 }, 

87 'procedures': ['Ephys recording with acute probe(s)'], 

88 'projects': ['ibl_neuropixel_brainwide_01'] 

89 } 

90 else: 

91 devices = { 1qpi

92 'cameras': { 

93 'left': {'collection': 'raw_video_data', 'sync_label': 'frame2ttl'}, 

94 }, 

95 'microphone': { 

96 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

97 }, 

98 } 

99 acquisition_description = { # this is the current ephys pipeline description 1qpi

100 'devices': devices, 

101 'sync': { 

102 'bpod': {'collection': 'raw_behavior_data', 'extension': 'bin'} 

103 }, 

104 'procedures': ['Behavior training/tasks'], 

105 'projects': ['ibl_neuropixel_brainwide_01'] 

106 } 

107 if protocol == 'choice_world_biased': 1qpi

108 key = 'biasedChoiceWorld' 1qi

109 elif protocol == 'choice_world_training': 1pi

110 key = 'trainingChoiceWorld' 1pi

111 elif protocol == 'choice_world_habituation': 

112 key = 'habituationChoiceWorld' 

113 else: 

114 raise ValueError(f'Unknown protocol "{protocol}"') 

115 acquisition_description['tasks'] = [{key: { 1qpi

116 'collection': 'raw_behavior_data', 

117 'sync_label': 'bpod', 'main': True # FIXME: What is purpose of main key? 

118 }}] 

119 acquisition_description['version'] = '1.0.0' 1qrpsci

120 return acquisition_description 1qrpsci

121 

122 

123def make_pipeline(session_path, **pkwargs): 

124 """ 

125 Creates a pipeline of extractor tasks from a session's experiment description file. 

126 

127 Parameters 

128 ---------- 

129 session_path : str, Path 

130 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'. 

131 **pkwargs 

132 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor. 

133 

134 Returns 

135 ------- 

136 ibllib.pipes.tasks.Pipeline 

137 A task pipeline object. 

138 """ 

139 # NB: this pattern is a pattern for dynamic class creation 

140 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path) 

141 if not session_path or not (session_path := Path(session_path)).exists(): 1aodefgbnjlmhcik

142 raise ValueError('Session path does not exist') 

143 tasks = OrderedDict() 1aodefgbnjlmhcik

144 acquisition_description = sess_params.read_params(session_path) 1aodefgbnjlmhcik

145 if not acquisition_description: 1aodefgbnjlmhcik

146 raise ValueError('Experiment description file not found or is empty') 

147 devices = acquisition_description.get('devices', {}) 1aodefgbnjlmhcik

148 kwargs = {'session_path': session_path} 1aodefgbnjlmhcik

149 

150 # Registers the experiment description file 

151 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1aodefgbnjlmhcik

152 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs) 

153 

154 # Syncing tasks 

155 (sync, sync_args), = acquisition_description['sync'].items() 1aodefgbnjlmhcik

156 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1aodefgbnjlmhcik

157 sync_args['sync_ext'] = sync_args.pop('extension') 1aodefgbnjlmhcik

158 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1aodefgbnjlmhcik

159 sync_kwargs = {'sync': sync, **sync_args} 1aodefgbnjlmhcik

160 sync_tasks = [] 1aodefgbnjlmhcik

161 if sync == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1aodefgbnjlmhcik

162 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1adefgbc

163 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1adefgbc

164 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

165 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1adefgbc

166 elif sync_args['sync_namespace'] == 'timeline': 1obnjlmhik

167 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1jk

168 elif sync == 'nidq': 1obnlmhi

169 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1h

170 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1h

171 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

172 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1h

173 elif sync == 'tdms': 1obnlmi

174 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 

175 elif sync == 'bpod': 1obnlmi

176 pass # ATM we don't have anything for this not sure it will be needed in the future 1obnlmi

177 

178 # Behavior tasks 

179 task_protocols = acquisition_description.get('tasks', []) 1aodefgbnjlmhcik

180 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1aodefgbnjlmhcik

181 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1aodefgbnjlmhcik

182 task_kwargs = {'protocol': protocol, 'collection': collection} 1aodefgbnjlmhcik

183 # For now the order of protocols in the list will take precedence. If collections are numbered, 

184 # check that the numbers match the order. This may change in the future. 

185 if re.match(r'^raw_task_data_\d{2}$', collection): 1aodefgbnjlmhcik

186 task_kwargs['protocol_number'] = i 1bjik

187 if int(collection.split('_')[-1]) != i: 1bjik

188 _logger.warning('Number in collection name does not match task order') 

189 if extractors := task_info.get('extractors', False): 1aodefgbnjlmhcik

190 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bjk

191 task_name = None # to avoid unbound variable issue in the first round 1bjk

192 for j, extractor in enumerate(extractors): 1bjk

193 # Assume previous task in the list is parent 

194 parents = [] if j == 0 else [tasks[task_name]] 1bjk

195 # Make sure extractor and sync task don't collide 

196 for sync_option in ('nidq', 'bpod'): 1bjk

197 if sync_option in extractor.lower() and not sync == sync_option: 1bjk

198 raise ValueError(f'Extractor "{extractor}" and sync "{sync}" do not match') 1b

199 # Look for the extractor in the behavior extractors module 

200 if hasattr(btasks, extractor): 1bjk

201 task = getattr(btasks, extractor) 1bjk

202 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for # example 

203 elif hasattr(btasks, extractor + sync.capitalize()): 1b

204 task = getattr(btasks, extractor + sync.capitalize()) 

205 else: 

206 # lookup in the project extraction repo if we find an extractor class 

207 import projects.extraction_tasks 1b

208 if hasattr(projects.extraction_tasks, extractor): 1b

209 task = getattr(projects.extraction_tasks, extractor) 

210 else: 

211 raise NotImplementedError( 1b

212 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects') 

213 # Rename the class to something more informative 

214 task_name = f'{task.__name__}_{i:02}' 1bjk

215 # For now we assume that the second task in the list is always the trials extractor, which is dependent 

216 # on the sync task and sync arguments 

217 if j == 1: 1bjk

218 tasks[task_name] = type(task_name, (task,), {})( 1bjk

219 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks 

220 ) 

221 else: 

222 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bjk

223 # For the next task, we assume that the previous task is the parent 

224 else: # Legacy block to handle sessions without defined extractors 

225 # - choice_world_recording 

226 # - choice_world_biased 

227 # - choice_world_training 

228 # - choice_world_habituation 

229 if 'habituation' in protocol: 1aodefgnlmhci

230 registration_class = btasks.HabituationRegisterRaw 1n

231 behaviour_class = btasks.HabituationTrialsBpod 1n

232 compute_status = False 1n

233 elif 'passiveChoiceWorld' in protocol: 1aodefglmhci

234 registration_class = btasks.PassiveRegisterRaw 1adefghc

235 behaviour_class = btasks.PassiveTask 1adefghc

236 compute_status = False 1adefghc

237 elif sync_kwargs['sync'] == 'bpod': 1aodefglmhci

238 registration_class = btasks.TrialRegisterRaw 1olmi

239 behaviour_class = btasks.ChoiceWorldTrialsBpod 1olmi

240 compute_status = True 1olmi

241 elif sync_kwargs['sync'] == 'nidq': 1adefghc

242 registration_class = btasks.TrialRegisterRaw 1adefghc

243 behaviour_class = btasks.ChoiceWorldTrialsNidq 1adefghc

244 compute_status = True 1adefghc

245 else: 

246 raise NotImplementedError 

247 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1aodefgnlmhci

248 **kwargs, **task_kwargs) 

249 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1aodefgnlmhci

250 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1aodefgnlmhci

251 **kwargs, **sync_kwargs, **task_kwargs, parents=parents) 

252 if compute_status: 1aodefgnlmhci

253 tasks[f"TrainingStatus_{protocol}_{i:02}"] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1aodefglmhci

254 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']]) 

255 

256 # Ephys tasks 

257 if 'neuropixel' in devices: 1aodefgbnjlmhcik

258 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1adefgbc

259 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1adefgbc

260 

261 all_probes = [] 1adefgbc

262 register_tasks = [] 1adefgbc

263 for pname, probe_info in devices['neuropixel'].items(): 1adefgbc

264 meta_file = spikeglx.glob_ephys_files(Path(session_path).joinpath(probe_info['collection']), ext='meta') 1adefgbc

265 meta_file = meta_file[0].get('ap') 1adefgbc

266 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1adefgbc

267 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1adefgbc

268 

269 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1adefgbc

270 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})( 

271 **kwargs, **ephys_kwargs, pname=pname) 

272 all_probes.append(pname) 

273 register_tasks.append(tasks[f'EphyCompressNP21_{pname}']) 

274 elif nptype == 'NP2.4' and nshanks > 1: 1adefgbc

275 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1gb

276 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks) 

277 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1gb

278 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1gb

279 else: 

280 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1adefc

281 **kwargs, **ephys_kwargs, pname=pname) 

282 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1adefc

283 all_probes.append(pname) 1adefc

284 

285 if nptype == '3A': 1adefgbc

286 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1e

287 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks) 

288 

289 for pname in all_probes: 1adefgbc

290 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1adefgbc

291 

292 if nptype != '3A': 1adefgbc

293 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1adfgbc

294 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks) 

295 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1adfgbc

296 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']]) 

297 else: 

298 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1e

299 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']]) 

300 

301 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1adefgbc

302 **kwargs, **ephys_kwargs, pname=pname, parents=register_task) 

303 tasks[f'EphysCellQC_{pname}'] = type(f'EphysCellQC_{pname}', (etasks.EphysCellsQc,), {})( 1adefgbc

304 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'Spikesorting_{pname}']]) 

305 

306 # Video tasks 

307 if 'cameras' in devices: 1aodefgbnjlmhcik

308 cams = list(devices['cameras'].keys()) 1aodefgbnjlmhcik

309 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1aodefgbnjlmhcik

310 video_kwargs = {'device_collection': 'raw_video_data', 1aodefgbnjlmhcik

311 'cameras': cams} 

312 video_compressed = sess_params.get_video_compressed(acquisition_description) 1aodefgbnjlmhcik

313 

314 if video_compressed: 1aodefgbnjlmhcik

315 # This is for widefield case where the video is already compressed 

316 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})( 1lh

317 **kwargs, **video_kwargs) 

318 dlc_parent_task = tasks['VideoConvert'] 1lh

319 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1lh

320 **kwargs, **video_kwargs, **sync_kwargs) 

321 else: 

322 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1aodefgbnjmcik

323 **kwargs, **video_kwargs) 

324 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1aodefgbnjmcik

325 **kwargs, **video_kwargs, **sync_kwargs) 

326 dlc_parent_task = tasks['VideoCompress'] 1aodefgbnjmcik

327 if sync == 'bpod': 1aodefgbnjmcik

328 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1obnmi

329 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']]) 

330 elif sync == 'nidq': 1adefgbjck

331 # Here we restrict to videos that we support (left, right or body) 

332 video_kwargs['cameras'] = subset_cams 1adefgbjck

333 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1adefgbjck

334 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) 

335 

336 if sync_kwargs['sync'] != 'bpod': 1aodefgbnjlmhcik

337 # Here we restrict to videos that we support (left, right or body) 

338 video_kwargs['cameras'] = subset_cams 1adefgbjhck

339 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1adefgbjhck

340 **kwargs, **video_kwargs, parents=[dlc_parent_task]) 

341 tasks['PostDLC'] = type('PostDLC', (epp.EphysPostDLC,), {})( 1adefgbjhck

342 **kwargs, parents=[tasks['DLC'], tasks[f'VideoSyncQC_{sync}']]) 

343 

344 # Audio tasks 

345 if 'microphone' in devices: 1aodefgbnjlmhcik

346 (microphone, micro_kwargs), = devices['microphone'].items() 1aodefgbnlmhci

347 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1aodefgbnlmhci

348 if sync_kwargs['sync'] == 'bpod': 1aodefgbnlmhci

349 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1obnlmi

350 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection']) 

351 elif sync_kwargs['sync'] == 'nidq': 1adefgbhc

352 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1adefgbhc

353 

354 # Widefield tasks 

355 if 'widefield' in devices: 1aodefgbnjlmhcik

356 (_, wfield_kwargs), = devices['widefield'].items() 1h

357 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1h

358 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1h

359 **kwargs, **wfield_kwargs) 

360 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1h

361 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']]) 

362 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1h

363 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']]) 

364 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1h

365 **kwargs, **wfield_kwargs, **sync_kwargs, 

366 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks) 

367 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1h

368 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']]) 

369 

370 # Mesoscope tasks 

371 if 'mesoscope' in devices: 1aodefgbnjlmhcik

372 (_, mscope_kwargs), = devices['mesoscope'].items() 1jk

373 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1jk

374 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1jk

375 **kwargs, **mscope_kwargs) 

376 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1jk

377 **kwargs, **mscope_kwargs) 

378 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1jk

379 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

380 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1jk

381 **kwargs, **mscope_kwargs, **sync_kwargs) 

382 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1jk

383 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

384 

385 if 'photometry' in devices: 1aodefgbnjlmhcik

386 # {'collection': 'raw_photometry_data', 'sync_label': 'frame_trigger', 'regions': ['Region1G', 'Region3G']} 

387 photometry_kwargs = devices['photometry'] 1l

388 tasks['FibrePhotometryRegisterRaw'] = type('FibrePhotometryRegisterRaw', ( 1l

389 ptasks.FibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs) 

390 tasks['FibrePhotometryPreprocess'] = type('FibrePhotometryPreprocess', ( 1l

391 ptasks.FibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs, 

392 parents=[tasks['FibrePhotometryRegisterRaw']] + sync_tasks) 

393 

394 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1aodefgbnjlmhcik

395 p.tasks = tasks 1aodefgbnjlmhcik

396 return p 1aodefgbnjlmhcik

397 

398 

399def make_pipeline_dict(pipeline, save=True): 

400 task_dicts = pipeline.create_tasks_list_from_pipeline() 1defgbnjlmh

401 # TODO better name 

402 if save: 1defgbnjlmh

403 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file: 

404 _ = yaml.dump(task_dicts, file) 

405 return task_dicts 1defgbnjlmh

406 

407 

408def load_pipeline_dict(path): 

409 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1adefgbnjlmh

410 task_list = yaml.full_load(file) 1adefgbnjlmh

411 

412 return task_list 1adefgbnjlmh