Coverage for ibllib/pipes/behavior_tasks.py: 75%

216 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1"""Standard task protocol extractor dynamic pipeline tasks.""" 

2import logging 

3import traceback 

4 

5from pkg_resources import parse_version 

6import one.alf.io as alfio 

7from one.alf.files import session_path_parts 

8from one.api import ONE 

9 

10from ibllib.oneibl.registration import get_lab 

11from ibllib.pipes import base_tasks 

12from ibllib.io.raw_data_loaders import load_settings 

13from ibllib.qc.task_extractors import TaskQCExtractor 

14from ibllib.qc.task_metrics import HabituationQC, TaskQC 

15from ibllib.io.extractors.ephys_passive import PassiveChoiceWorld 

16from ibllib.io.extractors import bpod_trials 

17from ibllib.io.extractors.base import get_session_extractor_type 

18from ibllib.io.extractors.bpod_trials import get_bpod_extractor 

19from ibllib.io.extractors.ephys_fpga import extract_all 

20from ibllib.io.extractors.mesoscope import TimelineTrials 

21from ibllib.pipes import training_status 

22from ibllib.plots.figures import BehaviourPlots 

23 

24_logger = logging.getLogger('ibllib') 

25 

26 

27class HabituationRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask): 

28 priority = 100 

29 job_size = 'small' 

30 

31 @property 

32 def signature(self): 

33 signature = { 1l

34 'input_files': [], 

35 'output_files': [ 

36 ('_iblrig_taskData.raw.*', self.collection, True), 

37 ('_iblrig_taskSettings.raw.*', self.collection, True), 

38 ('_iblrig_encoderEvents.raw*', self.collection, False), 

39 ('_iblrig_encoderPositions.raw*', self.collection, False), 

40 ('_iblrig_encoderTrialInfo.raw*', self.collection, False), 

41 ('_iblrig_stimPositionScreen.raw*', self.collection, False), 

42 ('_iblrig_syncSquareUpdate.raw*', self.collection, False), 

43 ('_iblrig_ambientSensorData.raw*', self.collection, False) 

44 ] 

45 } 

46 return signature 1l

47 

48 

49class HabituationTrialsBpod(base_tasks.BehaviourTask): 

50 priority = 90 

51 job_size = 'small' 

52 

53 @property 

54 def signature(self): 

55 signature = { 1d

56 'input_files': [ 

57 ('_iblrig_taskData.raw.*', self.collection, True), 

58 ('_iblrig_taskSettings.raw.*', self.collection, True), 

59 ], 

60 'output_files': [ 

61 ('*trials.contrastLeft.npy', self.output_collection, True), 

62 ('*trials.contrastRight.npy', self.output_collection, True), 

63 ('*trials.feedback_times.npy', self.output_collection, True), 

64 ('*trials.feedbackType.npy', self.output_collection, True), 

65 ('*trials.goCue_times.npy', self.output_collection, True), 

66 ('*trials.goCueTrigger_times.npy', self.output_collection, True), 

67 ('*trials.intervals.npy', self.output_collection, True), 

68 ('*trials.rewardVolume.npy', self.output_collection, True), 

69 ('*trials.stimOff_times.npy', self.output_collection, True), 

70 ('*trials.stimOn_times.npy', self.output_collection, True), 

71 ('*trials.stimOnTrigger_times.npy', self.output_collection, True), 

72 ] 

73 } 

74 return signature 1d

75 

76 def _run(self, update=True): 

77 """ 

78 Extracts an iblrig training session 

79 """ 

80 extractor = bpod_trials.get_bpod_extractor(self.session_path, task_collection=self.collection) 1d

81 trials, output_files = extractor.extract(task_collection=self.collection, save=True) 1d

82 

83 if trials is None: 1d

84 return None 

85 if self.one is None or self.one.offline: 1d

86 return output_files 

87 # Run the task QC 

88 # Compile task data for QC 

89 qc = HabituationQC(self.session_path, one=self.one) 1d

90 qc.extractor = TaskQCExtractor(self.session_path, sync_collection=self.sync_collection, 1d

91 one=self.one, sync_type=self.sync, task_collection=self.collection) 

92 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' 1d

93 qc.run(update=update, namespace=namespace) 1d

94 return output_files 1d

95 

96 

97class TrialRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask): 

98 priority = 100 

99 job_size = 'small' 

100 

101 @property 

102 def signature(self): 

103 signature = { 1mnob

104 'input_files': [], 

105 'output_files': [ 

106 ('_iblrig_taskData.raw.*', self.collection, True), 

107 ('_iblrig_taskSettings.raw.*', self.collection, True), 

108 ('_iblrig_encoderEvents.raw*', self.collection, False), 

109 ('_iblrig_encoderPositions.raw*', self.collection, False), 

110 ('_iblrig_encoderTrialInfo.raw*', self.collection, False), 

111 ('_iblrig_stimPositionScreen.raw*', self.collection, False), 

112 ('_iblrig_syncSquareUpdate.raw*', self.collection, False), 

113 ('_iblrig_ambientSensorData.raw*', self.collection, False) 

114 ] 

115 } 

116 return signature 1mnob

117 

118 

119class PassiveRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask): 

120 priority = 100 

121 job_size = 'small' 

122 

123 @property 

124 def signature(self): 

125 signature = { 1p

126 'input_files': [], 

127 'output_files': [('_iblrig_taskSettings.raw.*', self.collection, True), 

128 ('_iblrig_encoderEvents.raw*', self.collection, True), 

129 ('_iblrig_encoderPositions.raw*', self.collection, True), 

130 ('_iblrig_encoderTrialInfo.raw*', self.collection, True), 

131 ('_iblrig_stimPositionScreen.raw*', self.collection, True), 

132 ('_iblrig_syncSquareUpdate.raw*', self.collection, True), 

133 ('_iblrig_RFMapStim.raw*', self.collection, True)] 

134 } 

135 return signature 1p

136 

137 

138class PassiveTask(base_tasks.BehaviourTask): 

139 priority = 90 

140 job_size = 'small' 

141 

142 @property 

143 def signature(self): 

144 signature = { 1ij

145 'input_files': [('_iblrig_taskSettings.raw*', self.collection, True), 

146 ('_iblrig_RFMapStim.raw*', self.collection, True), 

147 (f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, True), 

148 (f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, True), 

149 (f'_{self.sync_namespace}_sync.times.*', self.sync_collection, True), 

150 ('*.wiring.json', self.sync_collection, False), 

151 ('*.meta', self.sync_collection, False)], 

152 'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, True), 

153 ('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True), 

154 ('_ibl_passiveRFM.times.npy', self.output_collection, True), 

155 ('_ibl_passiveStims.table.csv', self.output_collection, True)] 

156 } 

157 return signature 1ij

158 

159 def _run(self, **kwargs): 

160 """returns a list of pathlib.Paths. """ 

161 data, paths = PassiveChoiceWorld(self.session_path).extract( 1ij

162 sync_collection=self.sync_collection, task_collection=self.collection, save=True, 

163 path_out=self.session_path.joinpath(self.output_collection), protocol_number=self.protocol_number) 

164 

165 if any(x is None for x in paths): 1ij

166 self.status = -1 

167 

168 return paths 1ij

169 

170 

171class PassiveTaskTimeline(base_tasks.BehaviourTask, base_tasks.MesoscopeTask): 

172 """TODO should be mesoscope invariant, using wiring file""" 

173 priority = 90 

174 job_size = 'small' 

175 

176 @property 

177 def signature(self): 

178 signature = { 

179 'input_files': [('_iblrig_taskSettings.raw*', self.collection, True), 

180 ('_iblrig_RFMapStim.raw*', self.collection, True), 

181 (f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, False), 

182 (f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, False), 

183 (f'_{self.sync_namespace}_sync.times.*', self.sync_collection, False)], 

184 'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, True), 

185 ('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True), 

186 ('_ibl_passiveRFM.times.npy', self.output_collection, True), 

187 ('_ibl_passiveStims.table.csv', self.output_collection, True)] 

188 } 

189 return signature 

190 

191 def _run(self, **kwargs): 

192 """returns a list of pathlib.Paths. 

193 This class exists to load the sync file and set the protocol_number to None 

194 """ 

195 settings = load_settings(self.session_path, self.collection) 

196 version = settings.get('IBLRIG_VERSION_TAG', '100.0.0') 

197 if version == '100.0.0' or parse_version(version) <= parse_version('7.1.0'): 

198 _logger.warning('Protocol spacers not supported; setting protocol_number to None') 

199 self.protocol_number = None 

200 

201 sync, chmap = self.load_sync() 

202 data, paths = PassiveChoiceWorld(self.session_path).extract( 

203 sync_collection=self.sync_collection, task_collection=self.collection, save=True, 

204 path_out=self.session_path.joinpath(self.output_collection), 

205 protocol_number=self.protocol_number, sync=sync, sync_map=chmap) 

206 

207 if any(x is None for x in paths): 

208 self.status = -1 

209 

210 return paths 

211 

212 

213class ChoiceWorldTrialsBpod(base_tasks.BehaviourTask): 

214 priority = 90 

215 job_size = 'small' 

216 

217 @property 

218 def signature(self): 

219 signature = { 1efgbk

220 'input_files': [ 

221 ('_iblrig_taskData.raw.*', self.collection, True), 

222 ('_iblrig_taskSettings.raw.*', self.collection, True), 

223 ('_iblrig_encoderEvents.raw*', self.collection, True), 

224 ('_iblrig_encoderPositions.raw*', self.collection, True)], 

225 'output_files': [ 

226 ('*trials.goCueTrigger_times.npy', self.output_collection, True), 

227 ('*trials.stimOnTrigger_times.npy', self.output_collection, False), 

228 ('*trials.table.pqt', self.output_collection, True), 

229 ('*wheel.position.npy', self.output_collection, True), 

230 ('*wheel.timestamps.npy', self.output_collection, True), 

231 ('*wheelMoves.intervals.npy', self.output_collection, True), 

232 ('*wheelMoves.peakAmplitude.npy', self.output_collection, True) 

233 ] 

234 } 

235 return signature 1efgbk

236 

237 def _run(self, update=True): 

238 """ 

239 Extracts an iblrig training session 

240 """ 

241 extractor = bpod_trials.get_bpod_extractor(self.session_path, task_collection=self.collection) 1efgbk

242 extractor.default_path = self.output_collection 1efgbk

243 trials, output_files = extractor.extract(task_collection=self.collection, save=True) 1efgbk

244 if trials is None: 1efgb

245 return None 

246 if self.one is None or self.one.offline: 1efgb

247 return output_files 1efg

248 # Run the task QC 

249 # Compile task data for QC 

250 type = get_session_extractor_type(self.session_path, task_collection=self.collection) 1b

251 # FIXME Task data should not need re-extracting 

252 if type == 'habituation': 1b

253 qc = HabituationQC(self.session_path, one=self.one) 

254 qc.extractor = TaskQCExtractor(self.session_path, one=self.one, sync_collection=self.sync_collection, 

255 sync_type=self.sync, task_collection=self.collection) 

256 else: # Update wheel data 

257 qc = TaskQC(self.session_path, one=self.one) 1b

258 qc.extractor = TaskQCExtractor(self.session_path, one=self.one, sync_collection=self.sync_collection, 1b

259 sync_type=self.sync, task_collection=self.collection) 

260 qc.extractor.wheel_encoding = 'X1' 1b

261 # Aggregate and update Alyx QC fields 

262 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' 1b

263 qc.run(update=update, namespace=namespace) 1b

264 

265 return output_files 1b

266 

267 

268class ChoiceWorldTrialsNidq(base_tasks.BehaviourTask): 

269 priority = 90 

270 job_size = 'small' 

271 

272 @property 

273 def signature(self): 

274 signature = { 1hc

275 'input_files': [ 

276 ('_iblrig_taskData.raw.*', self.collection, True), 

277 ('_iblrig_taskSettings.raw.*', self.collection, True), 

278 ('_iblrig_encoderEvents.raw*', self.collection, True), 

279 ('_iblrig_encoderPositions.raw*', self.collection, True), 

280 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

281 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

282 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

283 ('*wiring.json', self.sync_collection, False), 

284 ('*.meta', self.sync_collection, True)], 

285 'output_files': [ 

286 ('*trials.goCueTrigger_times.npy', self.output_collection, True), 

287 ('*trials.intervals_bpod.npy', self.output_collection, False), 

288 ('*trials.stimOff_times.npy', self.output_collection, False), 

289 ('*trials.table.pqt', self.output_collection, True), 

290 ('*wheel.position.npy', self.output_collection, True), 

291 ('*wheel.timestamps.npy', self.output_collection, True), 

292 ('*wheelMoves.intervals.npy', self.output_collection, True), 

293 ('*wheelMoves.peakAmplitude.npy', self.output_collection, True) 

294 ] 

295 } 

296 return signature 1hc

297 

298 def _behaviour_criterion(self, update=True): 

299 """ 

300 Computes and update the behaviour criterion on Alyx 

301 """ 

302 from brainbox.behavior import training 

303 

304 trials = alfio.load_object(self.session_path.joinpath(self.output_collection), 'trials') 

305 good_enough = training.criterion_delay( 

306 n_trials=trials["intervals"].shape[0], 

307 perf_easy=training.compute_performance_easy(trials), 

308 ) 

309 if update: 

310 eid = self.one.path2eid(self.session_path, query_type='remote') 

311 self.one.alyx.json_field_update( 

312 "sessions", eid, "extended_qc", {"behavior": int(good_enough)} 

313 ) 

314 

315 def _extract_behaviour(self): 

316 dsets, out_files = extract_all(self.session_path, self.sync_collection, task_collection=self.collection, 1h

317 save_path=self.session_path.joinpath(self.output_collection), 

318 protocol_number=self.protocol_number, save=True) 

319 

320 return dsets, out_files 1h

321 

322 def _run_qc(self, trials_data, update=True, plot_qc=True): 

323 # Run the task QC 

324 qc = TaskQC(self.session_path, one=self.one, log=_logger) 

325 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one, sync_collection=self.sync_collection, 

326 sync_type=self.sync, task_collection=self.collection) 

327 # Extract extra datasets required for QC 

328 qc.extractor.data = trials_data # FIXME This line is pointless 

329 qc.extractor.extract_data() 

330 

331 # Aggregate and update Alyx QC fields 

332 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' 

333 qc.run(update=update, namespace=namespace) 

334 

335 if plot_qc: 

336 _logger.info('Creating Trials QC plots') 

337 try: 

338 # TODO needs to be adapted for chained protocols 

339 session_id = self.one.path2eid(self.session_path) 

340 plot_task = BehaviourPlots(session_id, self.session_path, one=self.one) 

341 _ = plot_task.run() 

342 self.plot_tasks.append(plot_task) 

343 

344 except Exception: 

345 _logger.error('Could not create Trials QC Plot') 

346 _logger.error(traceback.format_exc()) 

347 self.status = -1 

348 

349 def _run(self, update=True, plot_qc=True): 

350 dsets, out_files = self._extract_behaviour() 1hc

351 

352 if not self.one or self.one.offline: 1hc

353 return out_files 1hc

354 

355 self._behaviour_criterion(update=update) 

356 self._run_qc(dsets, update=update, plot_qc=plot_qc) 

357 return out_files 

358 

359 

360class ChoiceWorldTrialsTimeline(ChoiceWorldTrialsNidq): 

361 """Behaviour task extractor with DAQdata.raw NPY datasets.""" 

362 @property 

363 def signature(self): 

364 signature = super().signature 1c

365 signature['input_files'] = [ 1c

366 ('_iblrig_taskData.raw.*', self.collection, True), 

367 ('_iblrig_taskSettings.raw.*', self.collection, True), 

368 ('_iblrig_encoderEvents.raw*', self.collection, True), 

369 ('_iblrig_encoderPositions.raw*', self.collection, True), 

370 (f'_{self.sync_namespace}_DAQdata.raw.npy', self.sync_collection, True), 

371 (f'_{self.sync_namespace}_DAQdata.timestamps.npy', self.sync_collection, True), 

372 (f'_{self.sync_namespace}_DAQdata.meta.json', self.sync_collection, True), 

373 ] 

374 if self.protocol: 1c

375 extractor = get_bpod_extractor(self.session_path, protocol=self.protocol) 1c

376 if extractor.save_names: 1c

377 signature['output_files'] = [(fn, self.output_collection, True) 1c

378 for fn in filter(None, extractor.save_names)] 

379 return signature 1c

380 

381 def _extract_behaviour(self): 

382 """Extract the Bpod trials data and Timeline acquired signals.""" 

383 # First determine the extractor from the task protocol 

384 extractor = get_bpod_extractor(self.session_path, self.protocol, self.collection) 1c

385 ret, _ = extractor.extract(save=False, task_collection=self.collection) 1c

386 bpod_trials = {k: v for k, v in zip(extractor.var_names, ret)} 1c

387 

388 trials = TimelineTrials(self.session_path, bpod_trials=bpod_trials) 1c

389 save_path = self.session_path / self.output_collection 1c

390 if not self._spacer_support(extractor.settings): 1c

391 _logger.warning('Protocol spacers not supported; setting protocol_number to None') 1c

392 self.protocol_number = None 1c

393 dsets, out_files = trials.extract( 1c

394 save=True, path_out=save_path, sync_collection=self.sync_collection, 

395 task_collection=self.collection, protocol_number=self.protocol_number) 

396 

397 if not isinstance(dsets, dict): 1c

398 dsets = {k: v for k, v in zip(trials.var_names, dsets)} 1c

399 

400 self.timeline = trials.timeline # Store for QC later 1c

401 self.frame2ttl = trials.frame2ttl 1c

402 self.audio = trials.audio 1c

403 

404 return dsets, out_files 1c

405 

406 def _run_qc(self, trials_data, update=True, **kwargs): 

407 """ 

408 Run the task QC and update Alyx with results. 

409 

410 Parameters 

411 ---------- 

412 trials_data : dict 

413 The extracted trials data. 

414 update : bool 

415 If true, update Alyx with the result. 

416 

417 Notes 

418 ----- 

419 - Unlike the super class, currently the QC plots are not generated. 

420 - Expects the frame2ttl and audio attributes to be set from running _extract_behaviour. 

421 """ 

422 # TODO Task QC extractor for Timeline 

423 qc = TaskQC(self.session_path, one=self.one, log=_logger) 

424 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one, sync_collection=self.sync_collection, 

425 sync_type=self.sync, task_collection=self.collection) 

426 # Extract extra datasets required for QC 

427 qc.extractor.data = TaskQCExtractor.rename_data(trials_data.copy()) 

428 qc.extractor.load_raw_data() 

429 

430 qc.extractor.frame_ttls = self.frame2ttl 

431 qc.extractor.audio_ttls = self.audio 

432 # qc.extractor.bpod_ttls = channel_events('bpod') 

433 

434 # Aggregate and update Alyx QC fields 

435 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' 

436 qc.run(update=update, namespace=namespace) 

437 

438 

439class TrainingStatus(base_tasks.BehaviourTask): 

440 priority = 90 

441 job_size = 'small' 

442 

443 @property 

444 def signature(self): 

445 signature = { 1b

446 'input_files': [ 

447 ('_iblrig_taskData.raw.*', self.collection, True), 

448 ('_iblrig_taskSettings.raw.*', self.collection, True), 

449 ('*trials.table.pqt', self.output_collection, True)], 

450 'output_files': [] 

451 } 

452 return signature 1b

453 

454 def _run(self, upload=True): 

455 """ 

456 Extracts training status for subject 

457 """ 

458 

459 lab = get_lab(self.session_path, self.one.alyx) 1b

460 if lab == 'cortexlab': 1b

461 one = ONE(base_url='https://alyx.internationalbrainlab.org') 1b

462 else: 

463 one = self.one 

464 

465 df = training_status.get_latest_training_information(self.session_path, one) 1b

466 if df is not None: 1b

467 training_status.make_plots( 1b

468 self.session_path, self.one, df=df, save=True, upload=upload, task_collection=self.collection) 

469 # Update status map in JSON field of subjects endpoint 

470 if self.one and not self.one.offline: 1b

471 _logger.debug('Updating JSON field of subjects endpoint') 1b

472 status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates( 1b

473 subset='training_status', keep='first').to_dict()) 

474 date, sess = status.items() 1b

475 data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k])) 1b

476 for k, v in date[1].items()}} 

477 _, subject, *_ = session_path_parts(self.session_path) 1b

478 self.one.alyx.json_field_update('subjects', subject, data=data) 1b

479 output_files = [] 1b

480 return output_files 1b