Coverage for ibllib/pipes/dynamic_pipeline.py: 93%

267 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""Task pipeline creation from an acquisition description. 

2 

3The principal function here is `make_pipeline` which reads an `_ibl_experiment.description.yaml` 

4file and determines the set of tasks required to preprocess the session. 

5 

6In the experiment description file there is a 'tasks' key that defines each task protocol and the 

7location of the raw data (i.e. task collection). The protocol subkey may contain an 'extractors' 

8field that should contain a list of dynamic pipeline task class names for extracting the task data. 

9These must be subclasses of the :class:`ibllib.pipes.base_tasks.DynamicTask` class. If the 

10extractors key is absent or empty, the tasks are chosen based on the sync label and protocol name. 

11 

12NB: The standard behvaiour extraction task classes (e.g. 

13:class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsBpod` and :class:`ibllib.pipes.behaviour_tasks.ChoiceWorldTrialsNidq`) 

14handle the clock synchronization, behaviour plots and QC. This is typically independent of the Bpod 

15trials extraction (i.e. extraction of trials data from the Bpod raw data, in Bpod time). The Bpod 

16trials extractor class is determined by the :func:`ibllib.io.extractors.base.protocol2extractor` 

17map. IBL protocols may be added to the ibllib.io.extractors.task_extractor_map.json file, while 

18non-IBL ones should be in projects.base.task_extractor_map.json file located in the personal 

19projects repo. The Bpod trials extractor class must be a subclass of the 

20:class:`ibllib.io.extractors.base.BaseBpodTrialsExtractor` class, and located in either the 

21personal projects repo or in :py:mod:`ibllib.io.extractors.bpod_trials` module. 

22""" 

23import logging 

24import re 

25from collections import OrderedDict 

26from pathlib import Path 

27from itertools import chain 

28import yaml 

29 

30import spikeglx 

31 

32import ibllib.io.session_params as sess_params 

33from ibllib.io.extractors.base import get_pipeline, get_session_extractor_type 

34import ibllib.pipes.tasks as mtasks 

35import ibllib.pipes.base_tasks as bstasks 

36import ibllib.pipes.widefield_tasks as wtasks 

37import ibllib.pipes.mesoscope_tasks as mscope_tasks 

38import ibllib.pipes.sync_tasks as stasks 

39import ibllib.pipes.behavior_tasks as btasks 

40import ibllib.pipes.video_tasks as vtasks 

41import ibllib.pipes.ephys_tasks as etasks 

42import ibllib.pipes.audio_tasks as atasks 

43import ibllib.pipes.photometry_tasks as ptasks 

44# from ibllib.pipes.photometry_tasks import FibrePhotometryPreprocess, FibrePhotometryRegisterRaw 

45 

46_logger = logging.getLogger(__name__) 

47 

48 

49def acquisition_description_legacy_session(session_path, save=False): 

50 """ 

51 From a legacy session create a dictionary corresponding to the acquisition description. 

52 

53 Parameters 

54 ---------- 

55 session_path : str, pathlib.Path 

56 A path to a session to describe. 

57 save : bool 

58 If true, saves the acquisition description file to _ibl_experiment.description.yaml. 

59 

60 Returns 

61 ------- 

62 dict 

63 The legacy acquisition description. 

64 """ 

65 extractor_type = get_session_extractor_type(session_path) 1proqsj

66 etype2protocol = dict(biased='choice_world_biased', habituation='choice_world_habituation', 1proqsj

67 training='choice_world_training', ephys='choice_world_recording') 

68 dict_ad = get_acquisition_description(etype2protocol[extractor_type]) 1proqsj

69 if save: 1proqsj

70 sess_params.write_params(session_path=session_path, data=dict_ad) 1qj

71 return dict_ad 1proqsj

72 

73 

74def get_acquisition_description(protocol): 

75 """" 

76 This is a set of example acquisition descriptions for experiments 

77 - choice_world_recording 

78 - choice_world_biased 

79 - choice_world_training 

80 - choice_world_habituation 

81 - choice_world_passive 

82 That are part of the IBL pipeline 

83 """ 

84 if protocol == 'choice_world_recording': # canonical ephys 1proqsj

85 devices = { 1rqs

86 'cameras': { 

87 'right': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

88 'body': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

89 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

90 }, 

91 'neuropixel': { 

92 'probe00': {'collection': 'raw_ephys_data/probe00', 'sync_label': 'imec_sync'}, 

93 'probe01': {'collection': 'raw_ephys_data/probe01', 'sync_label': 'imec_sync'} 

94 }, 

95 'microphone': { 

96 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

97 }, 

98 } 

99 acquisition_description = { # this is the current ephys pipeline description 1rqs

100 'devices': devices, 

101 'tasks': [ 

102 {'ephysChoiceWorld': {'collection': 'raw_behavior_data', 'sync_label': 'bpod'}}, 

103 {'passiveChoiceWorld': {'collection': 'raw_passive_data', 'sync_label': 'bpod'}} 

104 ], 

105 'sync': { 

106 'nidq': {'collection': 'raw_ephys_data', 'extension': 'bin', 'acquisition_software': 'spikeglx'} 

107 }, 

108 'procedures': ['Ephys recording with acute probe(s)'], 

109 'projects': ['ibl_neuropixel_brainwide_01'] 

110 } 

111 else: 

112 devices = { 1poj

113 'cameras': { 

114 'left': {'collection': 'raw_video_data', 'sync_label': 'audio'}, 

115 }, 

116 'microphone': { 

117 'microphone': {'collection': 'raw_behavior_data', 'sync_label': None} 

118 }, 

119 } 

120 acquisition_description = { # this is the current ephys pipeline description 1poj

121 'devices': devices, 

122 'sync': {'bpod': {'collection': 'raw_behavior_data'}}, 

123 'procedures': ['Behavior training/tasks'], 

124 'projects': ['ibl_neuropixel_brainwide_01'] 

125 } 

126 if protocol == 'choice_world_biased': 1poj

127 key = 'biasedChoiceWorld' 1pj

128 elif protocol == 'choice_world_training': 1oj

129 key = 'trainingChoiceWorld' 1oj

130 elif protocol == 'choice_world_habituation': 

131 key = 'habituationChoiceWorld' 

132 else: 

133 raise ValueError(f'Unknown protocol "{protocol}"') 

134 acquisition_description['tasks'] = [{key: { 1poj

135 'collection': 'raw_behavior_data', 

136 'sync_label': 'bpod', 'main': True # FIXME: What is purpose of main key? 

137 }}] 

138 acquisition_description['version'] = '1.0.0' 1proqsj

139 return acquisition_description 1proqsj

140 

141 

142def _sync_label(sync, acquisition_software=None, **_): 

143 """ 

144 Returns the sync label based on the sync type and acquisition software. 

145 

146 The 'sync' usually refers to the DAQ type, e.g. 'nidq', 'tdms', 'bpod'. 

147 The 'acquisition_software' refers to the software used to acquire the data, e.g. 

148 for an NI DAQ, options include 'spikeglx' and 'timeline'. Both of these affect 

149 how the data are loaded and extracted, and therefore which tasks to use. 

150 

151 The naming convention here is not ideal, and may be changed in the future. 

152 

153 Parameters 

154 ---------- 

155 sync : str 

156 The sync type, e.g. 'nidq', 'tdms', 'bpod'. 

157 acquisition_software : str 

158 The acquisition software used to acquire the sync data. 

159 

160 Returns 

161 ------- 

162 str 

163 The sync label for determining the extractor tasks. 

164 """ 

165 

166 return acquisition_software if (sync == 'nidq' and acquisition_software not in ('spikeglx', None)) else sync 1agtncdefbmiklhj

167 

168 

169def make_pipeline(session_path, **pkwargs): 

170 """ 

171 Creates a pipeline of extractor tasks from a session's experiment description file. 

172 

173 Parameters 

174 ---------- 

175 session_path : str, Path 

176 The absolute session path, i.e. '/path/to/subject/yyyy-mm-dd/nnn'. 

177 pkwargs 

178 Optional arguments passed to the ibllib.pipes.tasks.Pipeline constructor. 

179 

180 Returns 

181 ------- 

182 ibllib.pipes.tasks.Pipeline 

183 A task pipeline object. 

184 """ 

185 # NB: this pattern is a pattern for dynamic class creation 

186 # tasks['SyncPulses'] = type('SyncPulses', (epp.EphysPulses,), {})(session_path=session_path) 

187 if not session_path or not (session_path := Path(session_path)).exists(): 1agncdefbmiklhj

188 raise ValueError('Session path does not exist') 

189 tasks = OrderedDict() 1agncdefbmiklhj

190 acquisition_description = sess_params.read_params(session_path) 1agncdefbmiklhj

191 if not acquisition_description: 1agncdefbmiklhj

192 raise ValueError('Experiment description file not found or is empty') 

193 devices = acquisition_description.get('devices', {}) 1agncdefbmiklhj

194 kwargs = {'session_path': session_path, 'one': pkwargs.get('one')} 1agncdefbmiklhj

195 

196 # Registers the experiment description file 

197 tasks['ExperimentDescriptionRegisterRaw'] = type('ExperimentDescriptionRegisterRaw', 1agncdefbmiklhj

198 (bstasks.ExperimentDescriptionRegisterRaw,), {})(**kwargs) 

199 

200 # Syncing tasks 

201 (sync, sync_args), = acquisition_description['sync'].items() 1agncdefbmiklhj

202 sync_label = _sync_label(sync, **sync_args) # get the format of the DAQ data. This informs the extractor task 1agncdefbmiklhj

203 sync_args['sync_collection'] = sync_args.pop('collection') # rename the key so it matches task run arguments 1agncdefbmiklhj

204 sync_args['sync_ext'] = sync_args.pop('extension', None) 1agncdefbmiklhj

205 sync_args['sync_namespace'] = sync_args.pop('acquisition_software', None) 1agncdefbmiklhj

206 sync_kwargs = {'sync': sync, **sync_args} 1agncdefbmiklhj

207 sync_tasks = [] 1agncdefbmiklhj

208 if sync_label == 'nidq' and sync_args['sync_collection'] == 'raw_ephys_data': 1agncdefbmiklhj

209 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (etasks.EphysSyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1agcdefb

210 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (etasks.EphysSyncPulses,), {})( 1agcdefb

211 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

212 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1agcdefb

213 elif sync_label == 'timeline': 1gnbmiklhj

214 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 1gi

215 elif sync_label == 'nidq': 1gnbmklhj

216 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncMtscomp,), {})(**kwargs, **sync_kwargs) 1h

217 tasks[f'SyncPulses_{sync}'] = type(f'SyncPulses_{sync}', (stasks.SyncPulses,), {})( 1h

218 **kwargs, **sync_kwargs, parents=[tasks['SyncRegisterRaw']]) 

219 sync_tasks = [tasks[f'SyncPulses_{sync}']] 1h

220 elif sync_label == 'tdms': 1gnbmklj

221 tasks['SyncRegisterRaw'] = type('SyncRegisterRaw', (stasks.SyncRegisterRaw,), {})(**kwargs, **sync_kwargs) 

222 elif sync_label == 'bpod': 1gnbmklj

223 pass # ATM we don't have anything for this; it may not be needed in the future 1nbmklj

224 

225 # Behavior tasks 

226 task_protocols = acquisition_description.get('tasks', []) 1agncdefbmiklhj

227 for i, (protocol, task_info) in enumerate(chain(*map(dict.items, task_protocols))): 1agncdefbmiklhj

228 collection = task_info.get('collection', f'raw_task_data_{i:02}') 1agncdefbmiklhj

229 task_kwargs = {'protocol': protocol, 'collection': collection} 1agncdefbmiklhj

230 # For now the order of protocols in the list will take precedence. If collections are numbered, 

231 # check that the numbers match the order. This may change in the future. 

232 if re.match(r'^raw_task_data_\d{2}$', collection): 1agncdefbmiklhj

233 task_kwargs['protocol_number'] = i 1gbi

234 if int(collection.split('_')[-1]) != i: 1gbi

235 _logger.warning('Number in collection name does not match task order') 

236 if extractors := task_info.get('extractors', False): 1agncdefbmiklhj

237 extractors = (extractors,) if isinstance(extractors, str) else extractors 1bi

238 task_name = None # to avoid unbound variable issue in the first round 1bi

239 for j, extractor in enumerate(extractors): 1bi

240 # Assume previous task in the list is parent 

241 parents = [] if j == 0 else [tasks[task_name]] 1bi

242 # Make sure extractor and sync task don't collide 

243 for sync_option in ('nidq', 'bpod', 'timeline'): 1bi

244 if sync_option in extractor.lower() and not sync_label == sync_option: 1bi

245 raise ValueError(f'Extractor "{extractor}" and sync "{sync_label}" do not match') 1b

246 # TODO Assert sync_label correct here (currently unused) 

247 # Look for the extractor in the behavior extractors module 

248 if hasattr(btasks, extractor): 1bi

249 task = getattr(btasks, extractor) 1bi

250 # This may happen that the extractor is tied to a specific sync task: look for TrialsChoiceWorldBpod for example 

251 elif hasattr(btasks, extractor + sync_label.capitalize()): 1b

252 task = getattr(btasks, extractor + sync_label.capitalize()) 

253 else: 

254 # lookup in the project extraction repo if we find an extractor class 

255 import projects.extraction_tasks 1b

256 if hasattr(projects.extraction_tasks, extractor): 1b

257 task = getattr(projects.extraction_tasks, extractor) 

258 else: 

259 raise NotImplementedError( 1b

260 f'Extractor "{extractor}" not found in main IBL pipeline nor in personal projects') 

261 _logger.debug('%s (protocol #%i, task #%i) = %s.%s', 1bi

262 protocol, i, j, task.__module__, task.__name__) 

263 # Rename the class to something more informative 

264 task_name = f'{task.__name__}_{i:02}' 1bi

265 # For now we assume that the second task in the list is always the trials extractor, which is dependent 

266 # on the sync task and sync arguments 

267 if j == 1: 1bi

268 tasks[task_name] = type(task_name, (task,), {})( 1bi

269 **kwargs, **sync_kwargs, **task_kwargs, parents=parents + sync_tasks 

270 ) 

271 else: 

272 tasks[task_name] = type(task_name, (task,), {})(**kwargs, **task_kwargs, parents=parents) 1bi

273 # For the next task, we assume that the previous task is the parent 

274 else: # Legacy block to handle sessions without defined extractors 

275 # - choice_world_recording 

276 # - choice_world_biased 

277 # - choice_world_training 

278 # - choice_world_habituation 

279 if 'passiveChoiceWorld' in protocol: 1agncdefmklhj

280 registration_class = btasks.PassiveRegisterRaw 1agcdefh

281 try: 1agcdefh

282 behaviour_class = getattr(btasks, 'PassiveTask' + sync_label.capitalize()) 1agcdefh

283 except AttributeError: 

284 raise NotImplementedError(f'No passive task available for sync namespace "{sync_label}"') 

285 compute_status = False 1agcdefh

286 elif 'habituation' in protocol: 1agncdefmklhj

287 registration_class = btasks.HabituationRegisterRaw 1m

288 behaviour_class = getattr(btasks, 'HabituationTrials' + sync_label.capitalize()) 1m

289 compute_status = False 1m

290 else: 

291 registration_class = btasks.TrialRegisterRaw 1agncdefklhj

292 try: 1agncdefklhj

293 behaviour_class = getattr(btasks, 'ChoiceWorldTrials' + sync_label.capitalize()) 1agncdefklhj

294 except AttributeError: 1g

295 raise NotImplementedError(f'No trials task available for sync namespace "{sync_label}"') 1g

296 compute_status = True 1agncdefklhj

297 tasks[f'RegisterRaw_{protocol}_{i:02}'] = type(f'RegisterRaw_{protocol}_{i:02}', (registration_class,), {})( 1agncdefmklhj

298 **kwargs, **task_kwargs) 

299 parents = [tasks[f'RegisterRaw_{protocol}_{i:02}']] + sync_tasks 1agncdefmklhj

300 tasks[f'Trials_{protocol}_{i:02}'] = type(f'Trials_{protocol}_{i:02}', (behaviour_class,), {})( 1agncdefmklhj

301 **kwargs, **sync_kwargs, **task_kwargs, parents=parents) 

302 if compute_status: 1agncdefmklhj

303 tasks[f'TrainingStatus_{protocol}_{i:02}'] = type(f'TrainingStatus_{protocol}_{i:02}', ( 1agncdefklhj

304 btasks.TrainingStatus,), {})(**kwargs, **task_kwargs, parents=[tasks[f'Trials_{protocol}_{i:02}']]) 

305 

306 # Ephys tasks 

307 if 'neuropixel' in devices: 1agncdefbmiklhj

308 ephys_kwargs = {'device_collection': 'raw_ephys_data'} 1acdefb

309 tasks['EphysRegisterRaw'] = type('EphysRegisterRaw', (etasks.EphysRegisterRaw,), {})(**kwargs, **ephys_kwargs) 1acdefb

310 

311 all_probes = [] 1acdefb

312 register_tasks = [] 1acdefb

313 for pname, probe_info in devices['neuropixel'].items(): 1acdefb

314 # Glob to support collections such as _00a, _00b. This doesn't fix the issue of NP2.4 

315 # extractions, however. 

316 probe_collection = next(session_path.glob(probe_info['collection'] + '*')) 1acdefb

317 meta_file = spikeglx.glob_ephys_files(probe_collection, ext='meta') 1acdefb

318 meta_file = meta_file[0].get('ap') 1acdefb

319 nptype = spikeglx._get_neuropixel_version_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb

320 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1acdefb

321 

322 if (nptype == 'NP2.1') or (nptype == 'NP2.4' and nshanks == 1): 1acdefb

323 tasks[f'EphyCompressNP21_{pname}'] = type(f'EphyCompressNP21_{pname}', (etasks.EphysCompressNP21,), {})( 

324 **kwargs, **ephys_kwargs, pname=pname) 

325 all_probes.append(pname) 

326 register_tasks.append(tasks[f'EphyCompressNP21_{pname}']) 

327 elif nptype == 'NP2.4' and nshanks > 1: 1acdefb

328 tasks[f'EphyCompressNP24_{pname}'] = type(f'EphyCompressNP24_{pname}', (etasks.EphysCompressNP24,), {})( 1fb

329 **kwargs, **ephys_kwargs, pname=pname, nshanks=nshanks) 

330 register_tasks.append(tasks[f'EphyCompressNP24_{pname}']) 1fb

331 all_probes += [f'{pname}{chr(97 + int(shank))}' for shank in range(nshanks)] 1fb

332 else: 

333 tasks[f'EphysCompressNP1_{pname}'] = type(f'EphyCompressNP1_{pname}', (etasks.EphysCompressNP1,), {})( 1acde

334 **kwargs, **ephys_kwargs, pname=pname) 

335 register_tasks.append(tasks[f'EphysCompressNP1_{pname}']) 1acde

336 all_probes.append(pname) 1acde

337 

338 if nptype == '3A': 1acdefb

339 tasks['EphysPulses'] = type('EphysPulses', (etasks.EphysPulses,), {})( 1d

340 **kwargs, **ephys_kwargs, **sync_kwargs, pname=all_probes, parents=register_tasks + sync_tasks) 

341 

342 for pname in all_probes: 1acdefb

343 register_task = [reg_task for reg_task in register_tasks if pname[:7] in reg_task.name] 1acdefb

344 

345 if nptype != '3A': 1acdefb

346 tasks[f'EphysPulses_{pname}'] = type(f'EphysPulses_{pname}', (etasks.EphysPulses,), {})( 1acefb

347 **kwargs, **ephys_kwargs, **sync_kwargs, pname=[pname], parents=register_task + sync_tasks) 

348 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1acefb

349 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'EphysPulses_{pname}']]) 

350 else: 

351 tasks[f'Spikesorting_{pname}'] = type(f'Spikesorting_{pname}', (etasks.SpikeSorting,), {})( 1d

352 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks['EphysPulses']]) 

353 

354 tasks[f'RawEphysQC_{pname}'] = type(f'RawEphysQC_{pname}', (etasks.RawEphysQC,), {})( 1acdefb

355 **kwargs, **ephys_kwargs, pname=pname, parents=register_task) 

356 tasks[f'EphysCellQC_{pname}'] = type(f'EphysCellQC_{pname}', (etasks.EphysCellsQc,), {})( 1acdefb

357 **kwargs, **ephys_kwargs, pname=pname, parents=[tasks[f'Spikesorting_{pname}']]) 

358 

359 # Video tasks 

360 if 'cameras' in devices: 1agncdefbmiklhj

361 cams = list(devices['cameras'].keys()) 1ancdefbmiklhj

362 subset_cams = [c for c in cams if c in ('left', 'right', 'body', 'belly')] 1ancdefbmiklhj

363 video_kwargs = {'device_collection': 'raw_video_data', 'cameras': cams} 1ancdefbmiklhj

364 video_compressed = sess_params.get_video_compressed(acquisition_description) 1ancdefbmiklhj

365 

366 if video_compressed: 1ancdefbmiklhj

367 # This is for widefield case where the video is already compressed 

368 tasks[tn] = type((tn := 'VideoConvert'), (vtasks.VideoConvert,), {})(**kwargs, **video_kwargs) 1kh

369 dlc_parent_task = tasks['VideoConvert'] 1kh

370 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcCamlog,), {})( 1kh

371 **kwargs, **video_kwargs, **sync_kwargs) 

372 else: 

373 tasks[tn] = type((tn := 'VideoRegisterRaw'), (vtasks.VideoRegisterRaw,), {})( 1ancdefbmilj

374 **kwargs, **video_kwargs) 

375 tasks[tn] = type((tn := 'VideoCompress'), (vtasks.VideoCompress,), {})( 1ancdefbmilj

376 **kwargs, **video_kwargs, **sync_kwargs) 

377 dlc_parent_task = tasks['VideoCompress'] 1ancdefbmilj

378 if sync == 'bpod': 1ancdefbmilj

379 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcBpod,), {})( 1nbmlj

380 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']]) 

381 elif sync == 'nidq': 1acdefbi

382 # Here we restrict to videos that we support (left, right or body) 

383 video_kwargs['cameras'] = subset_cams 1acdefbi

384 tasks[tn] = type((tn := f'VideoSyncQC_{sync}'), (vtasks.VideoSyncQcNidq,), {})( 1acdefbi

385 **kwargs, **video_kwargs, **sync_kwargs, parents=[tasks['VideoCompress']] + sync_tasks) 

386 

387 if sync_kwargs['sync'] != 'bpod': 1ancdefbmiklhj

388 # Here we restrict to videos that we support (left, right or body) 

389 # Currently there is no plan to run DLC on the belly cam 

390 subset_cams = [c for c in cams if c in ('left', 'right', 'body')] 1acdefbih

391 video_kwargs['cameras'] = subset_cams 1acdefbih

392 tasks[tn] = type((tn := 'DLC'), (vtasks.DLC,), {})( 1acdefbih

393 **kwargs, **video_kwargs, parents=[dlc_parent_task]) 

394 

395 # The PostDLC plots require a trials object for QC 

396 # Find the first task that outputs a trials.table dataset 

397 trials_task = ( 1acdefbih

398 t for t in tasks.values() if any('trials.table' in f for f in t.signature.get('output_files', [])) 

399 ) 

400 if trials_task := next(trials_task, None): 1acdefbih

401 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}'], trials_task] 

402 trials_collection = getattr(trials_task, 'output_collection', 'alf') 

403 else: 

404 parents = [tasks['DLC'], tasks[f'VideoSyncQC_{sync}']] 1acdefbih

405 trials_collection = 'alf' 1acdefbih

406 tasks[tn] = type((tn := 'PostDLC'), (vtasks.EphysPostDLC,), {})( 1acdefbih

407 **kwargs, cameras=subset_cams, trials_collection=trials_collection, parents=parents) 

408 

409 # Audio tasks 

410 if 'microphone' in devices: 1agncdefbmiklhj

411 (microphone, micro_kwargs), = devices['microphone'].items() 1ancdefbmklhj

412 micro_kwargs['device_collection'] = micro_kwargs.pop('collection') 1ancdefbmklhj

413 if sync_kwargs['sync'] == 'bpod': 1ancdefbmklhj

414 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioSync,), {})( 1nbmklj

415 **kwargs, **sync_kwargs, **micro_kwargs, collection=micro_kwargs['device_collection']) 

416 elif sync_kwargs['sync'] == 'nidq': 1acdefbh

417 tasks['AudioRegisterRaw'] = type('AudioRegisterRaw', (atasks.AudioCompress,), {})(**kwargs, **micro_kwargs) 1acdefbh

418 

419 # Widefield tasks 

420 if 'widefield' in devices: 1agncdefbmiklhj

421 (_, wfield_kwargs), = devices['widefield'].items() 1h

422 wfield_kwargs['device_collection'] = wfield_kwargs.pop('collection') 1h

423 tasks['WideFieldRegisterRaw'] = type('WidefieldRegisterRaw', (wtasks.WidefieldRegisterRaw,), {})( 1h

424 **kwargs, **wfield_kwargs) 

425 tasks['WidefieldCompress'] = type('WidefieldCompress', (wtasks.WidefieldCompress,), {})( 1h

426 **kwargs, **wfield_kwargs, parents=[tasks['WideFieldRegisterRaw']]) 

427 tasks['WidefieldPreprocess'] = type('WidefieldPreprocess', (wtasks.WidefieldPreprocess,), {})( 1h

428 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldCompress']]) 

429 tasks['WidefieldSync'] = type('WidefieldSync', (wtasks.WidefieldSync,), {})( 1h

430 **kwargs, **wfield_kwargs, **sync_kwargs, 

431 parents=[tasks['WideFieldRegisterRaw'], tasks['WidefieldCompress']] + sync_tasks) 

432 tasks['WidefieldFOV'] = type('WidefieldFOV', (wtasks.WidefieldFOV,), {})( 1h

433 **kwargs, **wfield_kwargs, parents=[tasks['WidefieldPreprocess']]) 

434 

435 # Mesoscope tasks 

436 if 'mesoscope' in devices: 1agncdefbmiklhj

437 (_, mscope_kwargs), = devices['mesoscope'].items() 1i

438 mscope_kwargs['device_collection'] = mscope_kwargs.pop('collection') 1i

439 tasks['MesoscopeRegisterSnapshots'] = type('MesoscopeRegisterSnapshots', (mscope_tasks.MesoscopeRegisterSnapshots,), {})( 1i

440 **kwargs, **mscope_kwargs) 

441 tasks['MesoscopePreprocess'] = type('MesoscopePreprocess', (mscope_tasks.MesoscopePreprocess,), {})( 1i

442 **kwargs, **mscope_kwargs) 

443 tasks['MesoscopeFOV'] = type('MesoscopeFOV', (mscope_tasks.MesoscopeFOV,), {})( 1i

444 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

445 tasks['MesoscopeSync'] = type('MesoscopeSync', (mscope_tasks.MesoscopeSync,), {})( 1i

446 **kwargs, **mscope_kwargs, **sync_kwargs) 

447 tasks['MesoscopeCompress'] = type('MesoscopeCompress', (mscope_tasks.MesoscopeCompress,), {})( 1i

448 **kwargs, **mscope_kwargs, parents=[tasks['MesoscopePreprocess']]) 

449 

450 if 'photometry' in devices: 1agncdefbmiklhj

451 # {'collection': 'raw_photometry_data', 'sync_label': 'frame_trigger', 'regions': ['Region1G', 'Region3G']} 

452 photometry_kwargs = devices['photometry'] 1k

453 tasks['FibrePhotometryRegisterRaw'] = type('FibrePhotometryRegisterRaw', ( 1k

454 ptasks.FibrePhotometryRegisterRaw,), {})(**kwargs, **photometry_kwargs) 

455 tasks['FibrePhotometryPreprocess'] = type('FibrePhotometryPreprocess', ( 1k

456 ptasks.FibrePhotometryPreprocess,), {})(**kwargs, **photometry_kwargs, **sync_kwargs, 

457 parents=[tasks['FibrePhotometryRegisterRaw']] + sync_tasks) 

458 

459 p = mtasks.Pipeline(session_path=session_path, **pkwargs) 1agncdefbmiklhj

460 p.tasks = tasks 1agncdefbmiklhj

461 return p 1agncdefbmiklhj

462 

463 

464def make_pipeline_dict(pipeline, save=True): 

465 task_dicts = pipeline.create_tasks_list_from_pipeline() 1cdefbmiklh

466 # TODO better name 

467 if save: 1cdefbmiklh

468 with open(Path(pipeline.session_path).joinpath('pipeline_tasks.yaml'), 'w') as file: 

469 _ = yaml.dump(task_dicts, file) 

470 return task_dicts 1cdefbmiklh

471 

472 

473def load_pipeline_dict(path): 

474 with open(Path(path).joinpath('pipeline_tasks.yaml'), 'r') as file: 1acdefbmiklh

475 task_list = yaml.full_load(file) 1acdefbmiklh

476 

477 return task_list 1acdefbmiklh

478 

479 

480def get_trials_tasks(session_path, one=None): 

481 """ 

482 Return a list of pipeline trials extractor task objects for a given session. 

483 

484 This function supports both legacy and dynamic pipeline sessions. 

485 

486 Parameters 

487 ---------- 

488 session_path : str, pathlib.Path 

489 An absolute path to a session. 

490 one : one.api.One 

491 An ONE instance. 

492 

493 Returns 

494 ------- 

495 list of pipes.tasks.Task 

496 A list of task objects for the provided session. 

497 

498 """ 

499 # Check for an experiment.description file; ensure downloaded if possible 

500 if one and one.to_eid(session_path): # to_eid returns None if session not registered 1g

501 one.load_datasets(session_path, ['_ibl_experiment.description'], download_only=True, assert_present=False) 1g

502 # NB: meta files only required to build neuropixel tasks in make_pipeline 

503 if meta_files := one.list_datasets(session_path, '*.ap.meta', collection='raw_ephys_data*'): 1g

504 one.load_datasets(session_path, meta_files, download_only=True, assert_present=False) 1g

505 experiment_description = sess_params.read_params(session_path) 1g

506 

507 # If experiment description file then use this to make the pipeline 

508 if experiment_description is not None: 1g

509 tasks = [] 1g

510 try: 1g

511 pipeline = make_pipeline(session_path, one=one) 1g

512 trials_tasks = [t for t in pipeline.tasks if 'Trials' in t] 1g

513 for task in trials_tasks: 1g

514 t = pipeline.tasks.get(task) 1g

515 t.__init__(session_path, **t.kwargs) 1g

516 tasks.append(t) 1g

517 except NotImplementedError as ex: 1g

518 _logger.warning('Failed to get trials tasks: %s', ex) 1g

519 else: 

520 # Otherwise default to old way of doing things 

521 if one and one.to_eid(session_path): 1g

522 one.load_dataset(session_path, '_iblrig_taskSettings.raw', collection='raw_behavior_data', download_only=True) 1g

523 pipeline = get_pipeline(session_path) 1g

524 if pipeline == 'training': 1g

525 from ibllib.pipes.training_preprocessing import TrainingTrials 1g

526 tasks = [TrainingTrials(session_path, one=one)] 1g

527 elif pipeline == 'ephys': 1g

528 from ibllib.pipes.ephys_preprocessing import EphysTrials 1g

529 tasks = [EphysTrials(session_path, one=one)] 1g

530 else: 

531 try: 1g

532 # try to find a custom extractor in the personal projects extraction class 

533 import projects.base 1g

534 task_type = get_session_extractor_type(session_path) 1g

535 assert (PipelineClass := projects.base.get_pipeline(task_type)) 1g

536 pipeline = PipelineClass(session_path, one=one) 1g

537 trials_task_name = next((task for task in pipeline.tasks if 'Trials' in task), None) 1g

538 assert trials_task_name, (f'No "Trials" tasks for custom pipeline ' 1g

539 f'"{pipeline.name}" with extractor type "{task_type}"') 

540 task = pipeline.tasks.get(trials_task_name) 1g

541 task(session_path) 1g

542 tasks = [task] 1g

543 except (ModuleNotFoundError, AssertionError) as ex: 1g

544 _logger.warning('Failed to get trials tasks: %s', ex) 1g

545 tasks = [] 1g

546 

547 return tasks 1g