Coverage for ibllib/pipes/behavior_tasks.py: 75%
216 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1"""Standard task protocol extractor dynamic pipeline tasks."""
2import logging
3import traceback
5from pkg_resources import parse_version
6import one.alf.io as alfio
7from one.alf.files import session_path_parts
8from one.api import ONE
10from ibllib.oneibl.registration import get_lab
11from ibllib.pipes import base_tasks
12from ibllib.io.raw_data_loaders import load_settings
13from ibllib.qc.task_extractors import TaskQCExtractor
14from ibllib.qc.task_metrics import HabituationQC, TaskQC
15from ibllib.io.extractors.ephys_passive import PassiveChoiceWorld
16from ibllib.io.extractors import bpod_trials
17from ibllib.io.extractors.base import get_session_extractor_type
18from ibllib.io.extractors.bpod_trials import get_bpod_extractor
19from ibllib.io.extractors.ephys_fpga import extract_all
20from ibllib.io.extractors.mesoscope import TimelineTrials
21from ibllib.pipes import training_status
22from ibllib.plots.figures import BehaviourPlots
24_logger = logging.getLogger('ibllib')
27class HabituationRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask):
28 priority = 100
29 job_size = 'small'
31 @property
32 def signature(self):
33 signature = { 1l
34 'input_files': [],
35 'output_files': [
36 ('_iblrig_taskData.raw.*', self.collection, True),
37 ('_iblrig_taskSettings.raw.*', self.collection, True),
38 ('_iblrig_encoderEvents.raw*', self.collection, False),
39 ('_iblrig_encoderPositions.raw*', self.collection, False),
40 ('_iblrig_encoderTrialInfo.raw*', self.collection, False),
41 ('_iblrig_stimPositionScreen.raw*', self.collection, False),
42 ('_iblrig_syncSquareUpdate.raw*', self.collection, False),
43 ('_iblrig_ambientSensorData.raw*', self.collection, False)
44 ]
45 }
46 return signature 1l
49class HabituationTrialsBpod(base_tasks.BehaviourTask):
50 priority = 90
51 job_size = 'small'
53 @property
54 def signature(self):
55 signature = { 1d
56 'input_files': [
57 ('_iblrig_taskData.raw.*', self.collection, True),
58 ('_iblrig_taskSettings.raw.*', self.collection, True),
59 ],
60 'output_files': [
61 ('*trials.contrastLeft.npy', self.output_collection, True),
62 ('*trials.contrastRight.npy', self.output_collection, True),
63 ('*trials.feedback_times.npy', self.output_collection, True),
64 ('*trials.feedbackType.npy', self.output_collection, True),
65 ('*trials.goCue_times.npy', self.output_collection, True),
66 ('*trials.goCueTrigger_times.npy', self.output_collection, True),
67 ('*trials.intervals.npy', self.output_collection, True),
68 ('*trials.rewardVolume.npy', self.output_collection, True),
69 ('*trials.stimOff_times.npy', self.output_collection, True),
70 ('*trials.stimOn_times.npy', self.output_collection, True),
71 ('*trials.stimOnTrigger_times.npy', self.output_collection, True),
72 ]
73 }
74 return signature 1d
76 def _run(self, update=True):
77 """
78 Extracts an iblrig training session
79 """
80 extractor = bpod_trials.get_bpod_extractor(self.session_path, task_collection=self.collection) 1d
81 trials, output_files = extractor.extract(task_collection=self.collection, save=True) 1d
83 if trials is None: 1d
84 return None
85 if self.one is None or self.one.offline: 1d
86 return output_files
87 # Run the task QC
88 # Compile task data for QC
89 qc = HabituationQC(self.session_path, one=self.one) 1d
90 qc.extractor = TaskQCExtractor(self.session_path, sync_collection=self.sync_collection, 1d
91 one=self.one, sync_type=self.sync, task_collection=self.collection)
92 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' 1d
93 qc.run(update=update, namespace=namespace) 1d
94 return output_files 1d
97class TrialRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask):
98 priority = 100
99 job_size = 'small'
101 @property
102 def signature(self):
103 signature = { 1mnob
104 'input_files': [],
105 'output_files': [
106 ('_iblrig_taskData.raw.*', self.collection, True),
107 ('_iblrig_taskSettings.raw.*', self.collection, True),
108 ('_iblrig_encoderEvents.raw*', self.collection, False),
109 ('_iblrig_encoderPositions.raw*', self.collection, False),
110 ('_iblrig_encoderTrialInfo.raw*', self.collection, False),
111 ('_iblrig_stimPositionScreen.raw*', self.collection, False),
112 ('_iblrig_syncSquareUpdate.raw*', self.collection, False),
113 ('_iblrig_ambientSensorData.raw*', self.collection, False)
114 ]
115 }
116 return signature 1mnob
119class PassiveRegisterRaw(base_tasks.RegisterRawDataTask, base_tasks.BehaviourTask):
120 priority = 100
121 job_size = 'small'
123 @property
124 def signature(self):
125 signature = { 1p
126 'input_files': [],
127 'output_files': [('_iblrig_taskSettings.raw.*', self.collection, True),
128 ('_iblrig_encoderEvents.raw*', self.collection, True),
129 ('_iblrig_encoderPositions.raw*', self.collection, True),
130 ('_iblrig_encoderTrialInfo.raw*', self.collection, True),
131 ('_iblrig_stimPositionScreen.raw*', self.collection, True),
132 ('_iblrig_syncSquareUpdate.raw*', self.collection, True),
133 ('_iblrig_RFMapStim.raw*', self.collection, True)]
134 }
135 return signature 1p
138class PassiveTask(base_tasks.BehaviourTask):
139 priority = 90
140 job_size = 'small'
142 @property
143 def signature(self):
144 signature = { 1ij
145 'input_files': [('_iblrig_taskSettings.raw*', self.collection, True),
146 ('_iblrig_RFMapStim.raw*', self.collection, True),
147 (f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, True),
148 (f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, True),
149 (f'_{self.sync_namespace}_sync.times.*', self.sync_collection, True),
150 ('*.wiring.json', self.sync_collection, False),
151 ('*.meta', self.sync_collection, False)],
152 'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, True),
153 ('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True),
154 ('_ibl_passiveRFM.times.npy', self.output_collection, True),
155 ('_ibl_passiveStims.table.csv', self.output_collection, True)]
156 }
157 return signature 1ij
159 def _run(self, **kwargs):
160 """returns a list of pathlib.Paths. """
161 data, paths = PassiveChoiceWorld(self.session_path).extract( 1ij
162 sync_collection=self.sync_collection, task_collection=self.collection, save=True,
163 path_out=self.session_path.joinpath(self.output_collection), protocol_number=self.protocol_number)
165 if any(x is None for x in paths): 1ij
166 self.status = -1
168 return paths 1ij
171class PassiveTaskTimeline(base_tasks.BehaviourTask, base_tasks.MesoscopeTask):
172 """TODO should be mesoscope invariant, using wiring file"""
173 priority = 90
174 job_size = 'small'
176 @property
177 def signature(self):
178 signature = {
179 'input_files': [('_iblrig_taskSettings.raw*', self.collection, True),
180 ('_iblrig_RFMapStim.raw*', self.collection, True),
181 (f'_{self.sync_namespace}_sync.channels.*', self.sync_collection, False),
182 (f'_{self.sync_namespace}_sync.polarities.*', self.sync_collection, False),
183 (f'_{self.sync_namespace}_sync.times.*', self.sync_collection, False)],
184 'output_files': [('_ibl_passiveGabor.table.csv', self.output_collection, True),
185 ('_ibl_passivePeriods.intervalsTable.csv', self.output_collection, True),
186 ('_ibl_passiveRFM.times.npy', self.output_collection, True),
187 ('_ibl_passiveStims.table.csv', self.output_collection, True)]
188 }
189 return signature
191 def _run(self, **kwargs):
192 """returns a list of pathlib.Paths.
193 This class exists to load the sync file and set the protocol_number to None
194 """
195 settings = load_settings(self.session_path, self.collection)
196 version = settings.get('IBLRIG_VERSION_TAG', '100.0.0')
197 if version == '100.0.0' or parse_version(version) <= parse_version('7.1.0'):
198 _logger.warning('Protocol spacers not supported; setting protocol_number to None')
199 self.protocol_number = None
201 sync, chmap = self.load_sync()
202 data, paths = PassiveChoiceWorld(self.session_path).extract(
203 sync_collection=self.sync_collection, task_collection=self.collection, save=True,
204 path_out=self.session_path.joinpath(self.output_collection),
205 protocol_number=self.protocol_number, sync=sync, sync_map=chmap)
207 if any(x is None for x in paths):
208 self.status = -1
210 return paths
213class ChoiceWorldTrialsBpod(base_tasks.BehaviourTask):
214 priority = 90
215 job_size = 'small'
217 @property
218 def signature(self):
219 signature = { 1efgbk
220 'input_files': [
221 ('_iblrig_taskData.raw.*', self.collection, True),
222 ('_iblrig_taskSettings.raw.*', self.collection, True),
223 ('_iblrig_encoderEvents.raw*', self.collection, True),
224 ('_iblrig_encoderPositions.raw*', self.collection, True)],
225 'output_files': [
226 ('*trials.goCueTrigger_times.npy', self.output_collection, True),
227 ('*trials.stimOnTrigger_times.npy', self.output_collection, False),
228 ('*trials.table.pqt', self.output_collection, True),
229 ('*wheel.position.npy', self.output_collection, True),
230 ('*wheel.timestamps.npy', self.output_collection, True),
231 ('*wheelMoves.intervals.npy', self.output_collection, True),
232 ('*wheelMoves.peakAmplitude.npy', self.output_collection, True)
233 ]
234 }
235 return signature 1efgbk
237 def _run(self, update=True):
238 """
239 Extracts an iblrig training session
240 """
241 extractor = bpod_trials.get_bpod_extractor(self.session_path, task_collection=self.collection) 1efgbk
242 extractor.default_path = self.output_collection 1efgbk
243 trials, output_files = extractor.extract(task_collection=self.collection, save=True) 1efgbk
244 if trials is None: 1efgb
245 return None
246 if self.one is None or self.one.offline: 1efgb
247 return output_files 1efg
248 # Run the task QC
249 # Compile task data for QC
250 type = get_session_extractor_type(self.session_path, task_collection=self.collection) 1b
251 # FIXME Task data should not need re-extracting
252 if type == 'habituation': 1b
253 qc = HabituationQC(self.session_path, one=self.one)
254 qc.extractor = TaskQCExtractor(self.session_path, one=self.one, sync_collection=self.sync_collection,
255 sync_type=self.sync, task_collection=self.collection)
256 else: # Update wheel data
257 qc = TaskQC(self.session_path, one=self.one) 1b
258 qc.extractor = TaskQCExtractor(self.session_path, one=self.one, sync_collection=self.sync_collection, 1b
259 sync_type=self.sync, task_collection=self.collection)
260 qc.extractor.wheel_encoding = 'X1' 1b
261 # Aggregate and update Alyx QC fields
262 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}' 1b
263 qc.run(update=update, namespace=namespace) 1b
265 return output_files 1b
268class ChoiceWorldTrialsNidq(base_tasks.BehaviourTask):
269 priority = 90
270 job_size = 'small'
272 @property
273 def signature(self):
274 signature = { 1hc
275 'input_files': [
276 ('_iblrig_taskData.raw.*', self.collection, True),
277 ('_iblrig_taskSettings.raw.*', self.collection, True),
278 ('_iblrig_encoderEvents.raw*', self.collection, True),
279 ('_iblrig_encoderPositions.raw*', self.collection, True),
280 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
281 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
282 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
283 ('*wiring.json', self.sync_collection, False),
284 ('*.meta', self.sync_collection, True)],
285 'output_files': [
286 ('*trials.goCueTrigger_times.npy', self.output_collection, True),
287 ('*trials.intervals_bpod.npy', self.output_collection, False),
288 ('*trials.stimOff_times.npy', self.output_collection, False),
289 ('*trials.table.pqt', self.output_collection, True),
290 ('*wheel.position.npy', self.output_collection, True),
291 ('*wheel.timestamps.npy', self.output_collection, True),
292 ('*wheelMoves.intervals.npy', self.output_collection, True),
293 ('*wheelMoves.peakAmplitude.npy', self.output_collection, True)
294 ]
295 }
296 return signature 1hc
298 def _behaviour_criterion(self, update=True):
299 """
300 Computes and update the behaviour criterion on Alyx
301 """
302 from brainbox.behavior import training
304 trials = alfio.load_object(self.session_path.joinpath(self.output_collection), 'trials')
305 good_enough = training.criterion_delay(
306 n_trials=trials["intervals"].shape[0],
307 perf_easy=training.compute_performance_easy(trials),
308 )
309 if update:
310 eid = self.one.path2eid(self.session_path, query_type='remote')
311 self.one.alyx.json_field_update(
312 "sessions", eid, "extended_qc", {"behavior": int(good_enough)}
313 )
315 def _extract_behaviour(self):
316 dsets, out_files = extract_all(self.session_path, self.sync_collection, task_collection=self.collection, 1h
317 save_path=self.session_path.joinpath(self.output_collection),
318 protocol_number=self.protocol_number, save=True)
320 return dsets, out_files 1h
322 def _run_qc(self, trials_data, update=True, plot_qc=True):
323 # Run the task QC
324 qc = TaskQC(self.session_path, one=self.one, log=_logger)
325 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one, sync_collection=self.sync_collection,
326 sync_type=self.sync, task_collection=self.collection)
327 # Extract extra datasets required for QC
328 qc.extractor.data = trials_data # FIXME This line is pointless
329 qc.extractor.extract_data()
331 # Aggregate and update Alyx QC fields
332 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}'
333 qc.run(update=update, namespace=namespace)
335 if plot_qc:
336 _logger.info('Creating Trials QC plots')
337 try:
338 # TODO needs to be adapted for chained protocols
339 session_id = self.one.path2eid(self.session_path)
340 plot_task = BehaviourPlots(session_id, self.session_path, one=self.one)
341 _ = plot_task.run()
342 self.plot_tasks.append(plot_task)
344 except Exception:
345 _logger.error('Could not create Trials QC Plot')
346 _logger.error(traceback.format_exc())
347 self.status = -1
349 def _run(self, update=True, plot_qc=True):
350 dsets, out_files = self._extract_behaviour() 1hc
352 if not self.one or self.one.offline: 1hc
353 return out_files 1hc
355 self._behaviour_criterion(update=update)
356 self._run_qc(dsets, update=update, plot_qc=plot_qc)
357 return out_files
360class ChoiceWorldTrialsTimeline(ChoiceWorldTrialsNidq):
361 """Behaviour task extractor with DAQdata.raw NPY datasets."""
362 @property
363 def signature(self):
364 signature = super().signature 1c
365 signature['input_files'] = [ 1c
366 ('_iblrig_taskData.raw.*', self.collection, True),
367 ('_iblrig_taskSettings.raw.*', self.collection, True),
368 ('_iblrig_encoderEvents.raw*', self.collection, True),
369 ('_iblrig_encoderPositions.raw*', self.collection, True),
370 (f'_{self.sync_namespace}_DAQdata.raw.npy', self.sync_collection, True),
371 (f'_{self.sync_namespace}_DAQdata.timestamps.npy', self.sync_collection, True),
372 (f'_{self.sync_namespace}_DAQdata.meta.json', self.sync_collection, True),
373 ]
374 if self.protocol: 1c
375 extractor = get_bpod_extractor(self.session_path, protocol=self.protocol) 1c
376 if extractor.save_names: 1c
377 signature['output_files'] = [(fn, self.output_collection, True) 1c
378 for fn in filter(None, extractor.save_names)]
379 return signature 1c
381 def _extract_behaviour(self):
382 """Extract the Bpod trials data and Timeline acquired signals."""
383 # First determine the extractor from the task protocol
384 extractor = get_bpod_extractor(self.session_path, self.protocol, self.collection) 1c
385 ret, _ = extractor.extract(save=False, task_collection=self.collection) 1c
386 bpod_trials = {k: v for k, v in zip(extractor.var_names, ret)} 1c
388 trials = TimelineTrials(self.session_path, bpod_trials=bpod_trials) 1c
389 save_path = self.session_path / self.output_collection 1c
390 if not self._spacer_support(extractor.settings): 1c
391 _logger.warning('Protocol spacers not supported; setting protocol_number to None') 1c
392 self.protocol_number = None 1c
393 dsets, out_files = trials.extract( 1c
394 save=True, path_out=save_path, sync_collection=self.sync_collection,
395 task_collection=self.collection, protocol_number=self.protocol_number)
397 if not isinstance(dsets, dict): 1c
398 dsets = {k: v for k, v in zip(trials.var_names, dsets)} 1c
400 self.timeline = trials.timeline # Store for QC later 1c
401 self.frame2ttl = trials.frame2ttl 1c
402 self.audio = trials.audio 1c
404 return dsets, out_files 1c
406 def _run_qc(self, trials_data, update=True, **kwargs):
407 """
408 Run the task QC and update Alyx with results.
410 Parameters
411 ----------
412 trials_data : dict
413 The extracted trials data.
414 update : bool
415 If true, update Alyx with the result.
417 Notes
418 -----
419 - Unlike the super class, currently the QC plots are not generated.
420 - Expects the frame2ttl and audio attributes to be set from running _extract_behaviour.
421 """
422 # TODO Task QC extractor for Timeline
423 qc = TaskQC(self.session_path, one=self.one, log=_logger)
424 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one, sync_collection=self.sync_collection,
425 sync_type=self.sync, task_collection=self.collection)
426 # Extract extra datasets required for QC
427 qc.extractor.data = TaskQCExtractor.rename_data(trials_data.copy())
428 qc.extractor.load_raw_data()
430 qc.extractor.frame_ttls = self.frame2ttl
431 qc.extractor.audio_ttls = self.audio
432 # qc.extractor.bpod_ttls = channel_events('bpod')
434 # Aggregate and update Alyx QC fields
435 namespace = 'task' if self.protocol_number is None else f'task_{self.protocol_number:02}'
436 qc.run(update=update, namespace=namespace)
439class TrainingStatus(base_tasks.BehaviourTask):
440 priority = 90
441 job_size = 'small'
443 @property
444 def signature(self):
445 signature = { 1b
446 'input_files': [
447 ('_iblrig_taskData.raw.*', self.collection, True),
448 ('_iblrig_taskSettings.raw.*', self.collection, True),
449 ('*trials.table.pqt', self.output_collection, True)],
450 'output_files': []
451 }
452 return signature 1b
454 def _run(self, upload=True):
455 """
456 Extracts training status for subject
457 """
459 lab = get_lab(self.session_path, self.one.alyx) 1b
460 if lab == 'cortexlab': 1b
461 one = ONE(base_url='https://alyx.internationalbrainlab.org') 1b
462 else:
463 one = self.one
465 df = training_status.get_latest_training_information(self.session_path, one) 1b
466 if df is not None: 1b
467 training_status.make_plots( 1b
468 self.session_path, self.one, df=df, save=True, upload=upload, task_collection=self.collection)
469 # Update status map in JSON field of subjects endpoint
470 if self.one and not self.one.offline: 1b
471 _logger.debug('Updating JSON field of subjects endpoint') 1b
472 status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates( 1b
473 subset='training_status', keep='first').to_dict())
474 date, sess = status.items() 1b
475 data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k])) 1b
476 for k, v in date[1].items()}}
477 _, subject, *_ = session_path_parts(self.session_path) 1b
478 self.one.alyx.json_field_update('subjects', subject, data=data) 1b
479 output_files = [] 1b
480 return output_files 1b