Coverage for ibllib/pipes/dynamic_pipeline.py: 92%

269 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 15:25 +0000

1"""Task pipeline creation from an acquisition description. 

2 

3The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml` 

4file and determines the set of tasks required to preprocess the session. 

5 

6In the experiment description file there is a 'tasks' key that defines each task protocol and the 

7location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors' 

8field that should contain a list of dynamic pipeline task class names for extracting the task data. 

9These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the 

10extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name. 

11 

12NB: The standard behvaiour extraction task classes (e.g. 

13:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`) 

14handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod 

15trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod 

16trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor` 

17map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while 

18non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal 

19projects repo. The Bpod trials extractor class must be a subclass of the 

20:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the 

21personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module. 

22""" 

23import logging 

24import re 

25from fnmatch import fnmatch 

26from collections import OrderedDict 

27from pathlib import Path 

28from itertools import chain 

29import yaml 

30 

31import spikeglx 

32 

33import ibllib.io.raw_data_loaders as rawio 

34import ibllib.io.session_params as sess_params 

35import ibllib.pipes.tasks as mtasks 

36import ibllib.pipes.base_tasks as bstasks 

37import ibllib.pipes.widefield_tasks as wtasks 

38import ibllib.pipes.mesoscope_tasks as mscope_tasks 

39import ibllib.pipes.sync_tasks as stasks 

40import ibllib.pipes.behavior_tasks as btasks 

41import ibllib.pipes.video_tasks as vtasks 

42import ibllib.pipes.ephys_tasks as etasks 

43import ibllib.pipes.audio_tasks as atasks 

44import ibllib.pipes.neurophotometrics as ptasks 

45 

46_logger = logging.getLogger(__name__) 

47 

48 

49def acquisition_description_legacy_session(session_path, save=False): 

50 """ 

51 From a legacy session create a dictionary corresponding to the acquisition description. 

52 

53 Parameters 

54 ---------- 

55 session_path : str, pathlib.Path 

56 A path to a session to describe. 

57 save : bool 

58 If true, saves the acquisition description file to _ibl_experiment.description.yaml. 

59 

60 Returns 

61 ------- 

62 dict 

63 The legacy acquisition description. 

64 """ 

65 settings = rawio.load_settings(session_path) 1oqrpms

66 protocol = settings.get('PYBPOD_PROTOCOL', 'UNKNOWN') 1oqrpms

67 dict_ad = get_acquisition_description(protocol) 1oqrpms

68 if save: 1oqrpms

69 sess_params.write_params(session_path=session_path, data=dict_ad) 

70 return dict_ad 1oqrpms

71 

72 

73def get_acquisition_description(protocol): 

74 """" 

75 This is a set of example acquisition descriptions for experiments 

76 - choice_world_recording 

77 - choice_world_biased 

78 - choice_world_training 

79 - choice_world_habituation 

80 - choice_world_passive 

81 That are part of the IBL pipeline 

82 """ 

83 if 'ephys' in protocol: # canonical ephys 1oqrpms

84 devices = { 1rms

85 'cameras': { 

86 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

87 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

88 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

89 }, 

90 'neuropixel': { 

91 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'}, 

92 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'} 

93 }, 

94 'microphone': { 

95 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

96 }, 

97 } 

98 acquisition_description = { # this is the current ephys pipeline description 1rms

99 'devices': devices, 

100 'tasks': [ 

101 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}, 

102 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}} 

103 ], 

104 'sync': { 

105 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'} 

106 }, 

107 'procedures': ['Ephys recording with acute probe(s)'], 

108 'projects': ['ibl_neuropixel_brainwide_01'] 

109 } 

110 else: 

111 devices = { 1oqpm

112 'cameras': { 

113 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

114 }, 

115 'microphone': { 

116 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

117 }, 

118 } 

119 acquisition_description = { # this is the current ephys pipeline description 1oqpm

120 'devices': devices, 

121 'sync': {'bpod': {'collection': 'raw_behavior_data'}}, 

122 'procedures': ['Behavior training/tasks'], 

123 'projects': ['ibl_neuropixel_brainwide_01'] 

124 } 

125 if 'biased' in protocol: 1oqpm

126 key = 'biasedChoiceWorld' 1oq

127 elif 'training' in protocol: 1opm

128 key = 'trainingChoiceWorld' 1opm

129 elif 'habituation' in protocol: 1m

130 key = 'habituationChoiceWorld' 

131 else: 

132 raise ValueError(f'Unknown protocol "{protocol}"') 1m

133 acquisition_description['tasks'] = [{key: { 1oqpm

134 'collection': 'raw_behavior_data', 

135 'sync_label': 'bpod' 

136 }}] 

137 acquisition_description['version'] = '1.0.0' 1oqrpms

138 return acquisition_description 1oqrpms

139 

140 

141def _sync_label(sync, acquisition_software=None, **_): 

142 """ 

143 Returns the sync label based on the sync type and acquisition software. 

144 

145 The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'. 

146 The 'acquisition_software' refers to the software used to acquire the data, e.g. 

147 for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect 

148 how the data are loaded and extracted, and therefore which tasks to use. 

149 

150 The naming convention here is not ideal, and may be changed in the future. 

151 

152 Parameters 

153 ---------- 

154 sync : str 

155 The sync type, e.g. 'nidq', 'tdms', 'bpod'. 

156 acquisition_software : str 

157 The acquisition software used to acquire the sync data. 

158 

159 Returns 

160 ------- 

161 str 

162 The sync label for determining the extractor tasks. 

163 """ 

164 

165 return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync 1aomtnlcdefbkhijg

166 

167 

168def _load_acquisition_description(session_path): 

169 """ 

170 Load a session's acquisition description. 

171 

172 Attempts to load from the session path and upon failure, attempts to generate one based on the 

173 task protocol (this only works for legacy pipeline sessions). 

174 

175 Parameters 

176 ---------- 

177 session_path : str, pathlib.Path 

178 A session path. 

179 

180 Returns 

181 ------- 

182 dict 

183 The acquisition description file. 

184 """ 

185 acquisition_description = sess_params.read_params(session_path) 1aomnlcdefbkhijg

186 if not acquisition_description: 1aomnlcdefbkhijg

187 try: 1om

188 # v7 sessions used a different folder name for task data; 

189 # v8 sessions should always have a description file 

190 assert session_path.joinpath('raw_behavior_data').exists() 1om

191 acquisition_description = acquisition_description_legacy_session(session_path) 1om

192 assert acquisition_description 1om

193 except (AssertionError, ValueError): 1m

194 raise ValueError('Experiment description file not found or is empty') 1m

195 return acquisition_description 1aomnlcdefbkhijg

196 

197 

198def _get_trials_tasks(session_path, acquisition_description=None, sync_tasks=None, one=None): 

199 """ 

200 Generate behaviour tasks from acquisition description. 

201 

202 This returns all behaviour related tasks including TrialsRegisterRaw and TrainingStatus objects. 

203 

204 Parameters 

205 ---------- 

206 session_path : str, pathlib.Path 

207 A session path. 

208 acquisition_description : dict 

209 An acquisition description. 

210 sync_tasks : list 

211 A list of sync tasks to use as behaviour task parents. 

212 one : One 

213 An instance of ONE to pass to each task. 

214 

215 Returns 

216 ------- 

217 dict[str, ibllib.pipes.tasks.Task] 

218 A map of Alyx task name to behaviour task object. 

219 """ 

220 if not acquisition_description: 1aomnlcdefbkhijg

221 acquisition_description = _load_acquisition_description(session_path) 

222 tasks = OrderedDict() 1aomnlcdefbkhijg

223 sync_tasks = sync_tasks or [] 1aomnlcdefbkhijg

224 kwargs = {'session_path': session_path, 'one': one} 1aomnlcdefbkhijg

225 

226 # Syncing tasks 

227 (sync, sync_args), = acquisition_description['sync'].items() 1aomnlcdefbkhijg

228 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1aomnlcdefbkhijg

229 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1aomnlcdefbkhijg

230 sync_args['sync_ext'] = sync_args.pop('extension', None) 1aomnlcdefbkhijg

231 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1aomnlcdefbkhijg

232 sync_kwargs = {'sync': sync, **sync_args} 1aomnlcdefbkhijg

233 

234 # Behavior tasks 

235 task_protocols = acquisition_description.get('tasks', []) 1aomnlcdefbkhijg

236 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1aomnlcdefbkhijg

237 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1aomnlcdefbkhijg

238 task_kwargs = {'protocol': protocol, 'collection': collection} 1aomnlcdefbkhijg

239 # For now the order of protocols in the list will take precedence. If collections are numbered, 

240 # check that the numbers match the order. This may change in the future. 

241 if re.match(r'^raw_task_data_\d{2}$', collection): 1aomnlcdefbkhijg

242 task_kwargs['protocol_number'] = i 1mnbhi

243 if int(collection.split('_')[-1]) != i: 1mnbhi

244 _logger.warning('Number in collection name does not match task order') 

245 if extractors := task_info.get('extractors', False): 1aomnlcdefbkhijg

246 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bh

247 task_name = None # to avoid unbound variable issue in the first round 1bh

248 for j, extractor in enumerate(extractors): 1bh

249 # Assume previous task in the list is a parent 

250 parents = [] if j == 0 else [tasks[task_name]] 1bh

251 # Make sure extractor and sync task don't collide 

252 for sync_option in ('nidq', 'bpod', 'timeline'): 1bh

253 if sync_option in extractor.lower() and not sync_label == sync_option: 1bh

254 raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match') 1b

255 

256 # Look for the extractor in the behavior extractors module 

257 if hasattr(btasks, extractor): 1bh

258 task = getattr(btasks, extractor) 1bh

259 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example 

260 elif hasattr(btasks, extractor + sync_label.capitalize()): 1b

261 task = getattr(btasks, extractor + sync_label.capitalize()) 

262 else: 

263 # lookup in the project extraction repo if we find an extractor class 

264 import projects.extraction_tasks 1b

265 if hasattr(projects.extraction_tasks, extractor): 1b

266 task = getattr(projects.extraction_tasks, extractor) 

267 elif hasattr(projects.extraction_tasks, extractor + sync_label.capitalize()): 1b

268 task = getattr(btasks, extractor + sync_label.capitalize()) 

269 else: 

270 raise NotImplementedError( 1b

271 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects') 

272 _logger.debug('%s (protocol #%i, task #%i) = %s.%s', 1bh

273 protocol, i, j, task.__module__, task.__name__) 

274 # Rename the class to something more informative 

275 task_name = f'{task.__name__}_{i:02}' 1bh

276 if not (task.__name__.startswith('TrainingStatus') or task.__name__.endswith('RegisterRaw')): 1bh

277 task_name = f'Trials_{task_name}' 1bh

278 # For now we assume that the second task in the list is always the trials extractor, which is dependent 

279 # on the sync task and sync arguments 

280 if j == 1: 1bh

281 tasks[task_name] = type(task_name, (task,), {})( 1bh

282 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks 

283 ) 

284 else: 

285 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bh

286 # For the next task, we assume that the previous task is the parent 

287 else: # Legacy block to handle sessions without defined extractors 

288 # - choice_world_recording 

289 # - choice_world_biased 

290 # - choice_world_training 

291 # - choice_world_habituation 

292 if 'passiveChoiceWorld' in protocol: 1aomnlcdefkijg

293 registration_class = btasks.PassiveRegisterRaw 1amcdefg

294 try: 1amcdefg

295 behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize()) 1amcdefg

296 except AttributeError: 

297 raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"') 

298 compute_status = False 1amcdefg

299 elif 'habituation' in protocol: 1aomnlcdefkijg

300 registration_class = btasks.HabituationRegisterRaw 1k

301 behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize()) 1k

302 compute_status = False 1k

303 else: 

304 registration_class = btasks.TrialRegisterRaw 1aomnlcdefijg

305 try: 1aomnlcdefijg

306 behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize()) 1aomnlcdefijg

307 except AttributeError: 1m

308 raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') 1m

309 compute_status = True 1aomnlcdefijg

310 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1aomnlcdefkijg

311 **kwargs, **task_kwargs) 

312 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1aomnlcdefkijg

313 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1aomnlcdefkijg

314 **kwargs, **sync_kwargs, **task_kwargs, parents=parents) 

315 if compute_status: 1aomnlcdefkijg

316 tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1aomnlcdefijg

317 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']]) 

318 return tasks 1aomnlcdefbkhijg

319 

320 

321def get_trials_tasks(session_path, one=None, bpod_only=False): 

322 """ 

323 Return a list of pipeline trials extractor task objects for a given session. 

324 

325 This function supports both legacy and dynamic pipeline sessions. Dynamic tasks are returned 

326 for both recent and legacy sessions. Only Trials tasks are returned, not the training status 

327 or raw registration tasks. 

328 

329 Parameters 

330 ---------- 

331 session_path : str, pathlib.Path 

332 An absolute path to a session. 

333 one : one.api.One 

334 An ONE instance. 

335 bpod_only : bool 

336 If true, extract trials from Bpod clock instead of the main DAQ's. 

337 

338 Returns 

339 ------- 

340 list of pipes.tasks.Task 

341 A list of task objects for the provided session. 

342 

343 Examples 

344 -------- 

345 Return the tasks for active choice world extraction 

346 

347 >>> tasks = list(filter(is_active_trials_task, get_trials_tasks(session_path))) 

348 """ 

349 # Check for an experiment.description file; ensure downloaded if possible 

350 if one and one.to_eid(session_path): # to_eid returns None if session not registered 1om

351 one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False) 1m

352 acquisition_description = _load_acquisition_description(session_path) 1om

353 if bpod_only and acquisition_description: 1om

354 acquisition_description['sync'] = {'bpod': {'collection': 'raw_task_data_*'}} 1m

355 try: 1om

356 trials_tasks = _get_trials_tasks(session_path, acquisition_description, one=one) 1om

357 return [v for k, v in trials_tasks.items() if k.startswith('Trials_')] 1om

358 except NotImplementedError as ex: 1m

359 _logger.warning('Failed to get trials tasks: %s', ex) 1m

360 return [] 1m

361 

362 

363def is_active_trials_task(task) -> bool: 

364 """ 

365 Check if task is for active choice world extraction. 

366 

367 Parameters 

368 ---------- 

369 task : ibllib.pipes.tasks.Task 

370 A task instance to test. 

371 

372 Returns 

373 ------- 

374 bool 

375 True if the task name starts with 'Trials_' and outputs a trials.table dataset. 

376 """ 

377 trials_task = task.name.lower().startswith('trials_') 

378 output_names = [x[0] for x in task.signature.get('output_files', [])] 

379 return trials_task and any(fnmatch('_ibl_trials.table.pqt', pat) for pat in output_names) 

380 

381 

382def make_pipeline(session_path, **pkwargs): 

383 """ 

384 Creates a pipeline of extractor tasks from a session's experiment description file. 

385 

386 Parameters 

387 ---------- 

388 session_path : str, Path 

389 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'. 

390 pkwargs 

391 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor. 

392 

393 Returns 

394 ------- 

395 ibllib.pipes.tasks.Pipeline 

396 A task pipeline object. 

397 """ 

398 # NB: this pattern is a pattern for dynamic class creation 

399 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path) 

400 if not session_path or not (session_path := Path(session_path)).exists(): 1anlcdefbkhijg

401 raise ValueError('Session path does not exist') 

402 tasks = OrderedDict() 1anlcdefbkhijg

403 acquisition_description = _load_acquisition_description(session_path) 1anlcdefbkhijg

404 devices = acquisition_description.get('devices', {}) 1anlcdefbkhijg

405 kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} 1anlcdefbkhijg

406 

407 # Registers the experiment description file 

408 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1anlcdefbkhijg

409 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs) 

410 

411 # Syncing tasks 

412 (sync, sync_args), = acquisition_description['sync'].items() 1anlcdefbkhijg

413 sync_args = sync_args.copy() # ensure acquisition_description unchanged 1anlcdefbkhijg

414 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1anlcdefbkhijg

415 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1anlcdefbkhijg

416 sync_args['sync_ext'] = sync_args.pop('extension', None) 1anlcdefbkhijg

417 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1anlcdefbkhijg

418 sync_kwargs = {'sync': sync, **sync_args} 1anlcdefbkhijg

419 sync_tasks = [] 1anlcdefbkhijg

420 if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1anlcdefbkhijg

421 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1acdefb

422 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1acdefb

423 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

424 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1acdefb

425 elif sync_label == 'timeline': 1nlbkhijg

426 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1h

427 elif sync_label == 'nidq': 1nlbkijg

428 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1g

429 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1g

430 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

431 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1g

432 elif sync_label == 'tdms': 1nlbkij

433 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 

434 elif sync_label == 'bpod': 1nlbkij

435 pass # ATM we don't have anything for this; it may not be needed in the future 1nlbkij

436 

437 # Behavior tasks 

438 tasks.update( 1anlcdefbkhijg

439 _get_trials_tasks(session_path, acquisition_description, sync_tasks=sync_tasks, one=pkwargs.get('one')) 

440 ) 

441 

442 # Ephys tasks 

443 if 'neuropixel' in devices: 1anlcdefbkhijg

444 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1acdefb

445 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1acdefb

446 

447 all_probes = [] 1acdefb

448 register_tasks = [] 1acdefb

449 for pname, probe_info in devices['neuropixel'].items(): 1acdefb

450 # Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4 

451 # extractions, however. 

452 probe_collection = next(session_path.glob(probe_info['collection'] + '*')) 1acdefb

453 meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta') 1acdefb

454 meta_file = meta_file[0].get('ap') 1acdefb

455 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb

456 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb

457 

458 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1acdefb

459 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})( 

460 **kwargs, **ephys_kwargs, pname=pname) 

461 all_probes.append(pname) 

462 register_tasks.append(tasks[f'EphyCompressNP21_{pname}']) 

463 elif nptype == 'NP2.4' and nshanks > 1: 1acdefb

464 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1fb

465 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks) 

466 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1fb

467 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1fb

468 else: 

469 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1acde

470 **kwargs, **ephys_kwargs, pname=pname) 

471 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1acde

472 all_probes.append(pname) 1acde

473 

474 if nptype == '3A': 1acdefb

475 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1d

476 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks) 

477 

478 for pname in all_probes: 1acdefb

479 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1acdefb

480 

481 if nptype != '3A': 1acdefb

482 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1acefb

483 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks) 

484 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1acefb

485 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']]) 

486 else: 

487 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1d

488 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']]) 

489 

490 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1acdefb

491 **kwargs, **ephys_kwargs, pname=pname, parents=register_task) 

492 

493 # Video tasks 

494 if 'cameras' in devices: 1anlcdefbkhijg

495 cams = list(devices['cameras'].keys()) 1alcdefbkhijg

496 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1alcdefbkhijg

497 video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams} 1alcdefbkhijg

498 video_compressed = sess_params.get_video_compressed(acquisition_description) 1alcdefbkhijg

499 

500 if video_compressed: 1alcdefbkhijg

501 # This is for widefield case where the video is already compressed 

502 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) 1g

503 dlc_parent_task = tasks['VideoConvert'] 1g

504 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1g

505 **kwargs, **video_kwargs, **sync_kwargs) 

506 else: 

507 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1alcdefbkhij

508 **kwargs, **video_kwargs) 

509 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1alcdefbkhij

510 **kwargs, **video_kwargs, **sync_kwargs) 

511 dlc_parent_task = tasks['VideoCompress'] 1alcdefbkhij

512 if sync == 'bpod': 1alcdefbkhij

513 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1lbkij

514 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']]) 

515 elif sync == 'nidq': 1acdefbh

516 # Here we restrict to videos that we support (left, right or body) 

517 video_kwargs['cameras'] = subset_cams 1acdefbh

518 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1acdefbh

519 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) 

520 

521 if sync_kwargs['sync'] != 'bpod': 1alcdefbkhijg

522 # Here we restrict to videos that we support (left, right or body) 

523 # Currently there is no plan to run DLC on the belly cam 

524 subset_cams = [c for c in cams if c in ('left', 'right', 'body')] 1acdefbhg

525 video_kwargs['cameras'] = subset_cams 1acdefbhg

526 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1acdefbhg

527 **kwargs, **video_kwargs, parents=[dlc_parent_task]) 

528 

529 # The PostDLC plots require a trials object for QC 

530 # Find the first task that outputs a trials.table dataset 

531 trials_task = ( 1acdefbhg

532 t for t in tasks.values() if any('trials.table' in f[0] for f in t.signature.get('output_files', [])) 

533 ) 

534 if trials_task := next(trials_task, None): 1acdefbhg

535 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task] 1acdefbhg

536 trials_collection = getattr(trials_task, 'output_collection', 'alf') 1acdefbhg

537 else: 

538 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']] 

539 trials_collection = 'alf' 

540 tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( 1acdefbhg

541 **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents) 

542 

543 # Audio tasks 

544 if 'microphone' in devices: 1anlcdefbkhijg

545 (microphone, micro_kwargs), = devices['microphone'].items() 1anlcdefbkijg

546 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1anlcdefbkijg

547 if sync_kwargs['sync'] == 'bpod': 1anlcdefbkijg

548 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1nlbkij

549 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection']) 

550 elif sync_kwargs['sync'] == 'nidq': 1acdefbg

551 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1acdefbg

552 

553 # Widefield tasks 

554 if 'widefield' in devices: 1anlcdefbkhijg

555 (_, wfield_kwargs), = devices['widefield'].items() 1g

556 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1g

557 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1g

558 **kwargs, **wfield_kwargs) 

559 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1g

560 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']]) 

561 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1g

562 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']]) 

563 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1g

564 **kwargs, **wfield_kwargs, **sync_kwargs, 

565 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks) 

566 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1g

567 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']]) 

568 

569 # Mesoscope tasks 

570 if 'mesoscope' in devices: 1anlcdefbkhijg

571 (_, mscope_kwargs), = devices['mesoscope'].items() 1h

572 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1h

573 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1h

574 **kwargs, **mscope_kwargs) 

575 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1h

576 **kwargs, **mscope_kwargs) 

577 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1h

578 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

579 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1h

580 **kwargs, **mscope_kwargs, **sync_kwargs) 

581 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1h

582 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

583 

584 if 'neurophotometrics' in devices: 1anlcdefbkhijg

585 # {'collection': 'raw_photometry_data', 'datetime': '2024-09-18T16:43:55.207000', 

586 # 'fibers': {'G0': {'location': 'NBM'}, 'G1': {'location': 'SI'}}, 'sync_channel': 1} 

587 photometry_kwargs = devices['neurophotometrics'] 1i

588 tasks['FibrePhotometrySync'] = type('FibrePhotometrySync', ( 1i

589 ptasks.FibrePhotometrySync,), {})(**kwargs, **photometry_kwargs) 

590 

591 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1anlcdefbkhijg

592 p.tasks = tasks 1anlcdefbkhijg

593 return p 1anlcdefbkhijg

594 

595 

596def make_pipeline_dict(pipeline, save=True): 

597 task_dicts = pipeline.create_tasks_list_from_pipeline() 1cdefbkhijg

598 # TODO better name 

599 if save: 1cdefbkhijg

600 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file: 

601 _ = yaml.dump(task_dicts, file) 

602 return task_dicts 1cdefbkhijg

603 

604 

605def load_pipeline_dict(path): 

606 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1acdefbkhijg

607 task_list = yaml.full_load(file) 1acdefbkhijg

608 

609 return task_list 1acdefbkhijg