Coverage for ibllib/pipes/dynamic_pipeline.py: 93%

274 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-07 14:26 +0100

1"""Task pipeline creation from an acquisition description. 

2 

3The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml` 

4file and determines the set of tasks required to preprocess the session. 

5 

6In the experiment description file there is a 'tasks' key that defines each task protocol and the 

7location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors' 

8field that should contain a list of dynamic pipeline task class names for extracting the task data. 

9These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the 

10extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name. 

11 

12NB: The standard behvaiour extraction task classes (e.g. 

13:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`) 

14handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod 

15trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod 

16trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor` 

17map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while 

18non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal 

19projects repo. The Bpod trials extractor class must be a subclass of the 

20:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the 

21personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module. 

22""" 

23import logging 

24import re 

25from fnmatch import fnmatch 

26from collections import OrderedDict 

27from pathlib import Path 

28from itertools import chain 

29import yaml 

30 

31import spikeglx 

32 

33import ibllib.io.raw_data_loaders as rawio 

34import ibllib.io.session_params as sess_params 

35import ibllib.pipes.tasks as mtasks 

36import ibllib.pipes.base_tasks as bstasks 

37import ibllib.pipes.widefield_tasks as wtasks 

38import ibllib.pipes.mesoscope_tasks as mscope_tasks 

39import ibllib.pipes.sync_tasks as stasks 

40import ibllib.pipes.behavior_tasks as btasks 

41import ibllib.pipes.video_tasks as vtasks 

42import ibllib.pipes.ephys_tasks as etasks 

43import ibllib.pipes.audio_tasks as atasks 

44import ibllib.pipes.neurophotometrics as ptasks 

45 

46_logger = logging.getLogger(__name__) 

47 

48 

49def acquisition_description_legacy_session(session_path, save=False): 

50 """ 

51 From a legacy session create a dictionary corresponding to the acquisition description. 

52 

53 Parameters 

54 ---------- 

55 session_path : str, pathlib.Path 

56 A path to a session to describe. 

57 save : bool 

58 If true, saves the acquisition description file to _ibl_experiment.description.yaml. 

59 

60 Returns 

61 ------- 

62 dict 

63 The legacy acquisition description. 

64 """ 

65 settings = rawio.load_settings(session_path) 1oqrpls

66 protocol = settings.get('PYBPOD_PROTOCOL', 'UNKNOWN') 1oqrpls

67 dict_ad = get_acquisition_description(protocol) 1oqrpls

68 if save: 1oqrpls

69 sess_params.write_params(session_path=session_path, data=dict_ad) 

70 return dict_ad 1oqrpls

71 

72 

73def get_acquisition_description(protocol): 

74 """" 

75 This is a set of example acquisition descriptions for experiments 

76 - choice_world_recording 

77 - choice_world_biased 

78 - choice_world_training 

79 - choice_world_habituation 

80 - choice_world_passive 

81 That are part of the IBL pipeline 

82 """ 

83 if 'ephys' in protocol: # canonical ephys 1oqrpls

84 devices = { 1rls

85 'cameras': { 

86 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

87 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

88 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

89 }, 

90 'neuropixel': { 

91 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'}, 

92 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'} 

93 }, 

94 'microphone': { 

95 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

96 }, 

97 } 

98 acquisition_description = { # this is the current ephys pipeline description 1rls

99 'devices': devices, 

100 'tasks': [ 

101 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}, 

102 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}} 

103 ], 

104 'sync': { 

105 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'} 

106 }, 

107 'procedures': ['Ephys recording with acute probe(s)'], 

108 'projects': ['ibl_neuropixel_brainwide_01'] 

109 } 

110 else: 

111 devices = { 1oqpl

112 'cameras': { 

113 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

114 }, 

115 'microphone': { 

116 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

117 }, 

118 } 

119 acquisition_description = { # this is the current ephys pipeline description 1oqpl

120 'devices': devices, 

121 'sync': {'bpod': {'collection': 'raw_behavior_data'}}, 

122 'procedures': ['Behavior training/tasks'], 

123 'projects': ['ibl_neuropixel_brainwide_01'] 

124 } 

125 if 'biased' in protocol: 1oqpl

126 key = 'biasedChoiceWorld' 1oq

127 elif 'training' in protocol: 1opl

128 key = 'trainingChoiceWorld' 1opl

129 elif 'habituation' in protocol: 1l

130 key = 'habituationChoiceWorld' 

131 else: 

132 raise ValueError(f'Unknown protocol "{protocol}"') 1l

133 acquisition_description['tasks'] = [{key: { 1oqpl

134 'collection': 'raw_behavior_data', 

135 'sync_label': 'bpod' 

136 }}] 

137 acquisition_description['version'] = '1.0.0' 1oqrpls

138 return acquisition_description 1oqrpls

139 

140 

141def _sync_label(sync, acquisition_software=None, **_): 

142 """ 

143 Returns the sync label based on the sync type and acquisition software. 

144 

145 The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'. 

146 The 'acquisition_software' refers to the software used to acquire the data, e.g. 

147 for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect 

148 how the data are loaded and extracted, and therefore which tasks to use. 

149 

150 The naming convention here is not ideal, and may be changed in the future. 

151 

152 Parameters 

153 ---------- 

154 sync : str 

155 The sync type, e.g. 'nidq', 'tdms', 'bpod'. 

156 acquisition_software : str 

157 The acquisition software used to acquire the sync data. 

158 

159 Returns 

160 ------- 

161 str 

162 The sync label for determining the extractor tasks. 

163 """ 

164 

165 return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync 1aoltnmcdefbkgijh

166 

167 

168def _load_acquisition_description(session_path): 

169 """ 

170 Load a session's acquisition description. 

171 

172 Attempts to load from the session path and upon failure, attempts to generate one based on the 

173 task protocol (this only works for legacy pipeline sessions). 

174 

175 Parameters 

176 ---------- 

177 session_path : str, pathlib.Path 

178 A session path. 

179 

180 Returns 

181 ------- 

182 dict 

183 The acquisition description file. 

184 """ 

185 acquisition_description = sess_params.read_params(session_path) 1aolnmcdefbkgijh

186 if not acquisition_description: 1aolnmcdefbkgijh

187 try: 1ol

188 # v7 sessions used a different folder name for task data; 

189 # v8 sessions should always have a description file 

190 assert session_path.joinpath('raw_behavior_data').exists() 1ol

191 acquisition_description = acquisition_description_legacy_session(session_path) 1ol

192 assert acquisition_description 1ol

193 except (AssertionError, ValueError): 1l

194 raise ValueError('Experiment description file not found or is empty') 1l

195 return acquisition_description 1aolnmcdefbkgijh

196 

197 

198def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=None, one=None): 

199 """ 

200 Generate behaviour tasks from acquisition description. 

201 

202 This returns all behaviour related tasks including TrialsRegisterRaw and TrainingStatus objects. 

203 

204 Parameters 

205 ---------- 

206 session_path : str, pathlib.Path 

207 A session path. 

208 acquisition_description : dict 

209 An acquisition description. 

210 sync_tasks : list 

211 A list of sync tasks to use as behaviour task parents. 

212 one : One 

213 An instance of ONE to pass to each task. 

214 

215 Returns 

216 ------- 

217 dict[str, ibllib.pipes.tasks.Task] 

218 A map of Alyx task name to behaviour task object. 

219 """ 

220 if not acquisition_description: 1aolnmcdefbkgijh

221 acquisition_description = _load_acquisition_description(session_path) 

222 tasks = OrderedDict() 1aolnmcdefbkgijh

223 sync_tasks = sync_tasks or [] 1aolnmcdefbkgijh

224 kwargs = {'session_path': session_path, 'one': one} 1aolnmcdefbkgijh

225 

226 # Syncing tasks 

227 (sync, sync_args), = acquisition_description['sync'].items() 1aolnmcdefbkgijh

228 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1aolnmcdefbkgijh

229 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1aolnmcdefbkgijh

230 sync_args['sync_ext'] = sync_args.pop('extension', None) 1aolnmcdefbkgijh

231 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1aolnmcdefbkgijh

232 sync_kwargs = {'sync': sync, **sync_args} 1aolnmcdefbkgijh

233 

234 # Behavior tasks 

235 protocol_numbers = set() 1aolnmcdefbkgijh

236 task_protocols = acquisition_description.get('tasks', []) 1aolnmcdefbkgijh

237 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1aolnmcdefbkgijh

238 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1aolnmcdefbkgijh

239 task_kwargs = {'protocol': protocol, 'collection': collection} 1aolnmcdefbkgijh

240 # The order of protocols in the list will take precedence unless protocol_number is present. 

241 # If collections are numbered, check that the numbers match the order. 

242 n = i 1aolnmcdefbkgijh

243 if re.match(r'^raw_task_data_\d{2}$', collection): 1aolnmcdefbkgijh

244 # Protocol number may be overridden by the protocol_number key 

245 if task_info.get('protocol_number') is not None: 1lnbgi

246 n = task_info['protocol_number'] 1l

247 task_kwargs['protocol_number'] = n 1lnbgi

248 assert not (n in protocol_numbers or protocol_numbers.add(n)), 'protocol numbers must be unique' 1lnbgi

249 if int(collection.split('_')[-1]) != n: 1lnbgi

250 _logger.warning('Number in collection name does not match task order') 1l

251 if extractors := task_info.get('extractors', False): 1aolnmcdefbkgijh

252 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bg

253 task_name = None # to avoid unbound variable issue in the first round 1bg

254 for j, extractor in enumerate(extractors): 1bg

255 # Assume previous task in the list is a parent 

256 parents = [] if j == 0 else [tasks[task_name]] 1bg

257 # Make sure extractor and sync task don't collide 

258 for sync_option in ('nidq', 'bpod', 'timeline'): 1bg

259 if sync_option in extractor.lower() and not sync_label == sync_option: 1bg

260 raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match') 1b

261 

262 # Look for the extractor in the behavior extractors module 

263 if hasattr(btasks, extractor): 1bg

264 task = getattr(btasks, extractor) 1bg

265 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example 

266 elif hasattr(btasks, extractor + sync_label.capitalize()): 1b

267 task = getattr(btasks, extractor + sync_label.capitalize()) 

268 else: 

269 # lookup in the project extraction repo if we find an extractor class 

270 import projects.extraction_tasks 1b

271 if hasattr(projects.extraction_tasks, extractor): 1b

272 task = getattr(projects.extraction_tasks, extractor) 

273 elif hasattr(projects.extraction_tasks, extractor + sync_label.capitalize()): 1b

274 task = getattr(btasks, extractor + sync_label.capitalize()) 

275 else: 

276 raise NotImplementedError( 1b

277 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects') 

278 _logger.debug('%s (protocol #%i, task #%i) = %s.%s', 1bg

279 protocol, i, j, task.__module__, task.__name__) 

280 # Rename the class to something more informative 

281 task_name = f'{task.__name__}_{i:02}' 1bg

282 if not (task.__name__.startswith('TrainingStatus') or task.__name__.endswith('RegisterRaw')): 1bg

283 task_name = f'Trials_{task_name}' 1bg

284 # For now we assume that the second task in the list is always the trials extractor, which is dependent 

285 # on the sync task and sync arguments 

286 if j == 1: 1bg

287 tasks[task_name] = type(task_name, (task,), {})( 1bg

288 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks 

289 ) 

290 else: 

291 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bg

292 # For the next task, we assume that the previous task is the parent 

293 else: # Legacy block to handle sessions without defined extractors 

294 # - choice_world_recording 

295 # - choice_world_biased 

296 # - choice_world_training 

297 # - choice_world_habituation 

298 if 'passiveChoiceWorld' in protocol: 1aolnmcdefkijh

299 registration_class = btasks.PassiveRegisterRaw 1alcdefh

300 try: 1alcdefh

301 behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize()) 1alcdefh

302 except AttributeError: 

303 raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"') 

304 compute_status = False 1alcdefh

305 elif 'habituation' in protocol: 1aolnmcdefkijh

306 registration_class = btasks.HabituationRegisterRaw 1k

307 behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize()) 1k

308 compute_status = False 1k

309 else: 

310 registration_class = btasks.TrialRegisterRaw 1aolnmcdefijh

311 try: 1aolnmcdefijh

312 behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize()) 1aolnmcdefijh

313 except AttributeError: 1l

314 raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') 1l

315 compute_status = True 1aolnmcdefijh

316 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1aolnmcdefkijh

317 **kwargs, **task_kwargs) 

318 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1aolnmcdefkijh

319 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1aolnmcdefkijh

320 **kwargs, **sync_kwargs, **task_kwargs, parents=parents) 

321 if compute_status: 1aolnmcdefkijh

322 tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1aolnmcdefijh

323 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']]) 

324 return tasks 1aolnmcdefbkgijh

325 

326 

327def get_trials_tasks(session_path, one=None, bpod_only=False): 

328 """ 

329 Return a list of pipeline trials extractor task objects for a given session. 

330 

331 This function supports both legacy and dynamic pipeline sessions. Dynamic tasks are returned 

332 for both recent and legacy sessions. Only Trials tasks are returned, not the training status 

333 or raw registration tasks. 

334 

335 Parameters 

336 ---------- 

337 session_path : str, pathlib.Path 

338 An absolute path to a session. 

339 one : one.api.One 

340 An ONE instance. 

341 bpod_only : bool 

342 If true, extract trials from Bpod clock instead of the main DAQ's. 

343 

344 Returns 

345 ------- 

346 list of pipes.tasks.Task 

347 A list of task objects for the provided session. 

348 

349 Examples 

350 -------- 

351 Return the tasks for active choice world extraction 

352 

353 >>> tasks = list(filter(is_active_trials_task, get_trials_tasks(session_path))) 

354 """ 

355 # Check for an experiment.description file; ensure downloaded if possible 

356 if one and one.to_eid(session_path): # to_eid returns None if session not registered 1ol

357 one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False) 1l

358 acquisition_description = _load_acquisition_description(session_path) 1ol

359 if bpod_only and acquisition_description: 1ol

360 acquisition_description['sync'] = {'bpod': {'collection': 'raw_task_data_*'}} 1l

361 try: 1ol

362 trials_tasks = _get_trials_tasks(session_path, acquisition_description, one=one) 1ol

363 return [v for k, v in trials_tasks.items() if k.startswith('Trials_')] 1ol

364 except NotImplementedError as ex: 1l

365 _logger.warning('Failed to get trials tasks: %s', ex) 1l

366 return [] 1l

367 

368 

369def is_active_trials_task(task) -> bool: 

370 """ 

371 Check if task is for active choice world extraction. 

372 

373 Parameters 

374 ---------- 

375 task : ibllib.pipes.tasks.Task 

376 A task instance to test. 

377 

378 Returns 

379 ------- 

380 bool 

381 True if the task name starts with 'Trials_' and outputs a trials.table dataset. 

382 """ 

383 trials_task = task.name.lower().startswith('trials_') 

384 output_names = [x[0] for x in task.signature.get('output_files', [])] 

385 return trials_task and any(fnmatch('_ibl_trials.table.pqt', pat) for pat in output_names) 

386 

387 

388def make_pipeline(session_path, **pkwargs): 

389 """ 

390 Creates a pipeline of extractor tasks from a session's experiment description file. 

391 

392 Parameters 

393 ---------- 

394 session_path : str, Path 

395 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'. 

396 pkwargs 

397 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor. 

398 

399 Returns 

400 ------- 

401 ibllib.pipes.tasks.Pipeline 

402 A task pipeline object. 

403 """ 

404 # NB: this pattern is a pattern for dynamic class creation 

405 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path) 

406 if not session_path or not (session_path := Path(session_path)).exists(): 1anmcdefbkgijh

407 raise ValueError('Session path does not exist') 

408 tasks = OrderedDict() 1anmcdefbkgijh

409 acquisition_description = _load_acquisition_description(session_path) 1anmcdefbkgijh

410 devices = acquisition_description.get('devices', {}) 1anmcdefbkgijh

411 kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} 1anmcdefbkgijh

412 

413 # Registers the experiment description file 

414 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1anmcdefbkgijh

415 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs) 

416 

417 # Syncing tasks 

418 (sync, sync_args), = acquisition_description['sync'].items() 1anmcdefbkgijh

419 sync_args = sync_args.copy() # ensure acquisition_description unchanged 1anmcdefbkgijh

420 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1anmcdefbkgijh

421 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1anmcdefbkgijh

422 sync_args['sync_ext'] = sync_args.pop('extension', None) 1anmcdefbkgijh

423 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1anmcdefbkgijh

424 sync_kwargs = {'sync': sync, **sync_args} 1anmcdefbkgijh

425 sync_tasks = [] 1anmcdefbkgijh

426 if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1anmcdefbkgijh

427 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1acdefb

428 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1acdefb

429 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

430 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1acdefb

431 elif sync_label == 'timeline': 1nmbkgijh

432 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1g

433 elif sync_label == 'nidq': 1nmbkijh

434 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1h

435 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1h

436 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

437 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1h

438 elif sync_label == 'tdms': 1nmbkij

439 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 

440 elif sync_label == 'bpod': 1nmbkij

441 pass # ATM we don't have anything for this; it may not be needed in the future 1nmbkij

442 

443 # Behavior tasks 

444 tasks.update( 1anmcdefbkgijh

445 _get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one')) 

446 ) 

447 

448 # Ephys tasks 

449 if 'neuropixel' in devices: 1anmcdefbkgijh

450 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1acdefb

451 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1acdefb

452 

453 all_probes = [] 1acdefb

454 register_tasks = [] 1acdefb

455 for pname, probe_info in devices['neuropixel'].items(): 1acdefb

456 # Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4 

457 # extractions, however. 

458 probe_collection = next(session_path.glob(probe_info['collection'] + '*')) 1acdefb

459 meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta') 1acdefb

460 meta_file = meta_file[0].get('ap') 1acdefb

461 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb

462 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb

463 

464 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1acdefb

465 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})( 

466 **kwargs, **ephys_kwargs, pname=pname) 

467 all_probes.append(pname) 

468 register_tasks.append(tasks[f'EphyCompressNP21_{pname}']) 

469 elif nptype == 'NP2.4' and nshanks > 1: 1acdefb

470 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1fb

471 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks) 

472 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1fb

473 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1fb

474 else: 

475 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1acde

476 **kwargs, **ephys_kwargs, pname=pname) 

477 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1acde

478 all_probes.append(pname) 1acde

479 

480 if nptype == '3A': 1acdefb

481 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1d

482 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks) 

483 

484 for pname in all_probes: 1acdefb

485 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1acdefb

486 

487 if nptype != '3A': 1acdefb

488 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1acefb

489 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks) 

490 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1acefb

491 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']]) 

492 else: 

493 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1d

494 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']]) 

495 

496 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1acdefb

497 **kwargs, **ephys_kwargs, pname=pname, parents=register_task) 

498 

499 # Video tasks 

500 if 'cameras' in devices: 1anmcdefbkgijh

501 cams = list(devices['cameras'].keys()) 1amcdefbkgijh

502 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1amcdefbkgijh

503 video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams} 1amcdefbkgijh

504 video_compressed = sess_params.get_video_compressed(acquisition_description) 1amcdefbkgijh

505 

506 if video_compressed: 1amcdefbkgijh

507 # This is for widefield case where the video is already compressed 

508 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) 1h

509 dlc_parent_task = tasks['VideoConvert'] 1h

510 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1h

511 **kwargs, **video_kwargs, **sync_kwargs) 

512 else: 

513 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1amcdefbkgij

514 **kwargs, **video_kwargs) 

515 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1amcdefbkgij

516 **kwargs, **video_kwargs, **sync_kwargs) 

517 dlc_parent_task = tasks['VideoCompress'] 1amcdefbkgij

518 if sync == 'bpod': 1amcdefbkgij

519 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1mbkij

520 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']]) 

521 elif sync == 'nidq': 1acdefbg

522 # Here we restrict to videos that we support (left, right or body) 

523 video_kwargs['cameras'] = subset_cams 1acdefbg

524 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1acdefbg

525 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) 

526 

527 if sync_kwargs['sync'] != 'bpod': 1amcdefbkgijh

528 # Here we restrict to videos that we support (left, right or body) 

529 # Currently there is no plan to run DLC on the belly cam 

530 subset_cams = [c for c in cams if c in ('left', 'right', 'body')] 1acdefbgh

531 video_kwargs['cameras'] = subset_cams 1acdefbgh

532 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1acdefbgh

533 **kwargs, **video_kwargs, parents=[dlc_parent_task]) 

534 

535 # The PostDLC plots require a trials object for QC 

536 # Find the first task that outputs a trials.table dataset 

537 trials_task = ( 1acdefbgh

538 t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', [])) 

539 ) 

540 if trials_task := next(trials_task, None): 1acdefbgh

541 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task] 1acdefbgh

542 trials_collection = getattr(trials_task, 'output_collection', 'alf') 1acdefbgh

543 else: 

544 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']] 

545 trials_collection = 'alf' 

546 tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( 1acdefbgh

547 **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents) 

548 

549 # Audio tasks 

550 if 'microphone' in devices: 1anmcdefbkgijh

551 (microphone, micro_kwargs), = devices['microphone'].items() 1anmcdefbkijh

552 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1anmcdefbkijh

553 if sync_kwargs['sync'] == 'bpod': 1anmcdefbkijh

554 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1nmbkij

555 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection']) 

556 elif sync_kwargs['sync'] == 'nidq': 1acdefbh

557 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1acdefbh

558 

559 # Widefield tasks 

560 if 'widefield' in devices: 1anmcdefbkgijh

561 (_, wfield_kwargs), = devices['widefield'].items() 1h

562 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1h

563 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1h

564 **kwargs, **wfield_kwargs) 

565 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1h

566 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']]) 

567 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1h

568 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']]) 

569 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1h

570 **kwargs, **wfield_kwargs, **sync_kwargs, 

571 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks) 

572 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1h

573 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']]) 

574 

575 # Mesoscope tasks 

576 if 'mesoscope' in devices: 1anmcdefbkgijh

577 (_, mscope_kwargs), = devices['mesoscope'].items() 1g

578 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1g

579 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1g

580 **kwargs, **mscope_kwargs) 

581 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1g

582 **kwargs, **mscope_kwargs) 

583 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1g

584 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

585 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1g

586 **kwargs, **mscope_kwargs, **sync_kwargs) 

587 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1g

588 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

589 

590 if 'neurophotometrics' in devices: 1anmcdefbkgijh

591 # {'collection': 'raw_photometry_data', 'datetime': '2024-09-18T16:43:55.207000', 

592 # 'fibers': {'G0': {'location': 'NBM'}, 'G1': {'location': 'SI'}}, 'sync_channel': 1} 

593 photometry_kwargs = devices['neurophotometrics'] 1i

594 tasks['FibrePhotometrySync'] = type('FibrePhotometrySync', ( 1i

595 ptasks.FibrePhotometrySync,), {})(**kwargs, **photometry_kwargs) 

596 

597 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1anmcdefbkgijh

598 p.tasks = tasks 1anmcdefbkgijh

599 return p 1anmcdefbkgijh

600 

601 

602def make_pipeline_dict(pipeline, save=True): 

603 task_dicts = pipeline.create_tasks_list_from_pipeline() 1cdefbkgijh

604 # TODO better name 

605 if save: 1cdefbkgijh

606 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file: 

607 _ = yaml.dump(task_dicts, file) 

608 return task_dicts 1cdefbkgijh

609 

610 

611def load_pipeline_dict(path): 

612 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1acdefbkgijh

613 task_list = yaml.full_load(file) 1acdefbkgijh

614 

615 return task_list 1acdefbkgijh