Coverage for ibllib/pipes/training_preprocessing.py: 96%
96 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import logging
2from collections import OrderedDict
3from one.alf.files import session_path_parts
4import warnings
6from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw
7from ibllib.pipes import tasks, training_status
8from ibllib.io import ffmpeg
9from ibllib.io.extractors.base import get_session_extractor_type
10from ibllib.io.extractors import training_audio, bpod_trials, camera
11from ibllib.qc.camera import CameraQC
12from ibllib.qc.task_metrics import TaskQC, HabituationQC
13from ibllib.qc.task_extractors import TaskQCExtractor
15_logger = logging.getLogger(__name__)
16warnings.warn('`pipes.training_preprocessing` to be removed in favour of dynamic pipeline')
19# level 0
20class TrainingRegisterRaw(tasks.Task):
21 priority = 100
23 def _run(self):
24 return [] 1ca
27class TrainingTrials(tasks.Task):
28 priority = 90
29 level = 0
30 force = False
31 signature = {
32 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
33 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
34 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True),
35 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True)],
36 'output_files': [('*trials.goCueTrigger_times.npy', 'alf', True),
37 ('*trials.table.pqt', 'alf', True),
38 ('*wheel.position.npy', 'alf', True),
39 ('*wheel.timestamps.npy', 'alf', True),
40 ('*wheelMoves.intervals.npy', 'alf', True),
41 ('*wheelMoves.peakAmplitude.npy', 'alf', True)]
42 }
44 def _run(self):
45 """
46 Extracts an iblrig training session
47 """
48 trials, wheel, output_files = bpod_trials.extract_all(self.session_path, save=True) 1fghaji
49 if trials is None: 1fghaji
50 return None 1j
51 if self.one is None or self.one.offline: 1fghai
52 return output_files 1fghi
53 # Run the task QC
54 # Compile task data for QC
55 type = get_session_extractor_type(self.session_path) 1a
56 if type == 'habituation': 1a
57 qc = HabituationQC(self.session_path, one=self.one)
58 qc.extractor = TaskQCExtractor(self.session_path, one=self.one)
59 else: # Update wheel data
60 qc = TaskQC(self.session_path, one=self.one) 1a
61 qc.extractor = TaskQCExtractor(self.session_path, one=self.one) 1a
62 qc.extractor.wheel_encoding = 'X1' 1a
63 # Aggregate and update Alyx QC fields
64 qc.run(update=True) 1a
65 return output_files 1a
68class TrainingVideoCompress(tasks.Task):
70 priority = 90
71 io_charge = 100
72 job_size = 'large'
74 def _run(self):
75 # avi to mp4 compression
76 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1dea
77 '-nostats -codec:a copy {file_out}')
78 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1dea
80 if len(output_files) == 0: 1dea
81 _logger.info('No compressed videos found; skipping timestamp extraction') 1da
82 return # labels the task as empty if no output 1da
84 # Video timestamps extraction
85 data, files = camera.extract_all(self.session_path, save=True, video_path=output_files[0]) 1dea
86 output_files.extend(files) 1dea
88 # Video QC
89 CameraQC(self.session_path, 'left', one=self.one, stream=False).run(update=True) 1dea
90 return output_files 1dea
93class TrainingAudio(tasks.Task):
94 """
95 Computes raw electrophysiology QC
96 """
97 cpu = 2
98 priority = 10 # a lot of jobs depend on this one
99 level = 0 # this job doesn't depend on anything
101 def _run(self, overwrite=False):
102 return training_audio.extract_sound(self.session_path, save=True, delete=True) 1a
105# level 1
106class TrainingDLC(tasks.Task):
108 def _run(self):
109 """empty placeholder for job creation only"""
110 pass 1a
113class TrainingStatus(tasks.Task):
114 priority = 90
115 level = 1
116 force = False
117 signature = {
118 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
119 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
120 ('*trials.table.pqt', 'alf', True)],
121 'output_files': []
122 }
124 def _run(self, upload=True):
125 """
126 Extracts training status for subject
127 """
128 df = training_status.get_latest_training_information(self.session_path, self.one) 1ca
129 if df is not None: 1ca
130 training_status.make_plots(self.session_path, self.one, df=df, save=True, upload=upload) 1ca
131 # Update status map in JSON field of subjects endpoint
132 # TODO This requires exposing the json field of the subjects endpoint
133 if self.one and not self.one.offline: 1ca
134 _logger.debug('Updating JSON field of subjects endpoint') 1ca
135 try: 1ca
136 status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates( 1ca
137 subset='training_status', keep='first').to_dict())
138 date, sess = status.items() 1ca
139 data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k])) for k, v 1ca
140 in date[1].items()}}
141 _, subject, *_ = session_path_parts(self.session_path) 1ca
142 self.one.alyx.json_field_update('subjects', subject, data=data) 1ca
143 except KeyError:
144 _logger.error('Failed to update subject training status on Alyx: json field not available')
146 output_files = [] 1ca
147 return output_files 1ca
150class TrainingExtractionPipeline(tasks.Pipeline):
151 label = __name__
153 def __init__(self, session_path, **kwargs):
154 super(TrainingExtractionPipeline, self).__init__(session_path, **kwargs) 1a
155 tasks = OrderedDict() 1a
156 self.session_path = session_path 1a
157 # level 0
158 tasks['ExperimentDescriptionRegisterRaw'] = ExperimentDescriptionRegisterRaw(self.session_path) 1a
159 tasks['TrainingRegisterRaw'] = TrainingRegisterRaw(self.session_path) 1a
160 tasks['TrainingTrials'] = TrainingTrials(self.session_path) 1a
161 tasks['TrainingVideoCompress'] = TrainingVideoCompress(self.session_path) 1a
162 tasks['TrainingAudio'] = TrainingAudio(self.session_path) 1a
163 # level 1
164 tasks['TrainingStatus'] = TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']]) 1a
165 tasks['TrainingDLC'] = TrainingDLC( 1a
166 self.session_path, parents=[tasks['TrainingVideoCompress']])
167 self.tasks = tasks 1a