Coverage for ibllib/pipes/training_preprocessing.py: 59%
108 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""(Deprecated) Training data preprocessing tasks.
3These tasks are part of the old pipeline. This module has been replaced by the dynamic pipeline
4and the `behavior_tasks` module.
5"""
7import logging
8from collections import OrderedDict
9from one.alf.files import session_path_parts
10import warnings
12from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw
13from ibllib.pipes import tasks, training_status
14from ibllib.io import ffmpeg
15from ibllib.io.extractors.base import get_session_extractor_type
16from ibllib.io.extractors import training_audio, bpod_trials, camera
17from ibllib.qc.camera import CameraQC
18from ibllib.qc.task_metrics import TaskQC, HabituationQC
19from ibllib.qc.task_extractors import TaskQCExtractor
21_logger = logging.getLogger(__name__)
22warnings.warn('`pipes.training_preprocessing` to be removed in favour of dynamic pipeline', FutureWarning)
25# level 0
26class TrainingRegisterRaw(tasks.Task):
27 priority = 100
29 def _run(self):
30 return []
33class TrainingTrials(tasks.Task):
34 priority = 90
35 level = 0
36 force = False
37 signature = {
38 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
39 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
40 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True),
41 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True)],
42 'output_files': [('*trials.goCueTrigger_times.npy', 'alf', True),
43 ('*trials.table.pqt', 'alf', True),
44 ('*wheel.position.npy', 'alf', True),
45 ('*wheel.timestamps.npy', 'alf', True),
46 ('*wheelMoves.intervals.npy', 'alf', True),
47 ('*wheelMoves.peakAmplitude.npy', 'alf', True)]
48 }
50 def extract_behaviour(self, save=True):
51 """Extracts an iblrig training session."""
52 trials, wheel, output_files = bpod_trials.extract_all(self.session_path, save=save) 1cdehf
53 if trials is None: 1cdehf
54 return None, None 1h
55 if wheel is not None: 1cdef
56 trials.update(wheel) 1cdef
57 return trials, output_files 1cdef
59 def run_qc(self, trials_data=None, update=True):
60 if trials_data is None:
61 trials_data, _ = self.extract_behaviour(save=False)
62 if not trials_data:
63 raise ValueError('No trials data found')
65 # Compile task data for QC
66 extractor_type = get_session_extractor_type(self.session_path)
67 if extractor_type == 'habituation':
68 qc = HabituationQC(self.session_path, one=self.one)
69 else:
70 qc = TaskQC(self.session_path, one=self.one)
71 qc.extractor = TaskQCExtractor(self.session_path, one=self.one, lazy=True)
72 qc.extractor.type = extractor_type
73 qc.extractor.data = qc.extractor.rename_data(trials_data)
74 qc.extractor.load_raw_data() # re-loads raw data and populates various properties
75 # Aggregate and update Alyx QC fields
76 qc.run(update=update)
78 return qc
80 def _run(self, **_):
81 """Extracts an iblrig training session and runs QC."""
82 trials_data, output_files = self.extract_behaviour() 1cdehf
83 if self.one and not self.one.offline and trials_data: 1cdehf
84 # Run the task QC
85 self.run_qc(trials_data)
86 return output_files 1cdehf
89class TrainingVideoCompress(tasks.Task):
91 priority = 90
92 io_charge = 100
93 job_size = 'large'
95 def _run(self):
96 # avi to mp4 compression
97 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1bg
98 '-nostats -codec:a copy {file_out}')
99 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1bg
101 if len(output_files) == 0: 1bg
102 _logger.info('No compressed videos found; skipping timestamp extraction') 1b
103 return # labels the task as empty if no output 1b
105 # Video timestamps extraction
106 data, files = camera.extract_all(self.session_path, save=True, video_path=output_files[0]) 1bg
107 output_files.extend(files) 1bg
109 # Video QC
110 CameraQC(self.session_path, 'left', one=self.one, stream=False).run(update=True) 1bg
111 return output_files 1bg
114class TrainingAudio(tasks.Task):
115 """
116 Computes raw electrophysiology QC
117 """
118 cpu = 2
119 priority = 10 # a lot of jobs depend on this one
120 level = 0 # this job doesn't depend on anything
122 def _run(self, overwrite=False):
123 return training_audio.extract_sound(self.session_path, save=True, delete=True)
126# level 1
127class TrainingDLC(tasks.Task):
129 def _run(self):
130 """empty placeholder for job creation only"""
131 pass
134class TrainingStatus(tasks.Task):
135 priority = 90
136 level = 1
137 force = False
138 signature = {
139 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
140 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
141 ('*trials.table.pqt', 'alf', True)],
142 'output_files': []
143 }
145 def _run(self, upload=True):
146 """
147 Extracts training status for subject
148 """
149 df = training_status.get_latest_training_information(self.session_path, self.one)
150 if df is not None:
151 training_status.make_plots(self.session_path, self.one, df=df, save=True, upload=upload)
152 # Update status map in JSON field of subjects endpoint
153 # TODO This requires exposing the json field of the subjects endpoint
154 if self.one and not self.one.offline:
155 _logger.debug('Updating JSON field of subjects endpoint')
156 try:
157 status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates(
158 subset='training_status', keep='first').to_dict())
159 date, sess = status.items()
160 data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k])) for k, v
161 in date[1].items()}}
162 _, subject, *_ = session_path_parts(self.session_path)
163 self.one.alyx.json_field_update('subjects', subject, data=data)
164 except KeyError:
165 _logger.error('Failed to update subject training status on Alyx: json field not available')
167 output_files = []
168 return output_files
171class TrainingExtractionPipeline(tasks.Pipeline):
172 label = __name__
174 def __init__(self, session_path, **kwargs):
175 super(TrainingExtractionPipeline, self).__init__(session_path, **kwargs)
176 tasks = OrderedDict()
177 self.session_path = session_path
178 # level 0
179 tasks['ExperimentDescriptionRegisterRaw'] = ExperimentDescriptionRegisterRaw(self.session_path)
180 tasks['TrainingRegisterRaw'] = TrainingRegisterRaw(self.session_path)
181 tasks['TrainingTrials'] = TrainingTrials(self.session_path)
182 tasks['TrainingVideoCompress'] = TrainingVideoCompress(self.session_path)
183 tasks['TrainingAudio'] = TrainingAudio(self.session_path)
184 # level 1
185 tasks['TrainingStatus'] = TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']])
186 tasks['TrainingDLC'] = TrainingDLC(
187 self.session_path, parents=[tasks['TrainingVideoCompress']])
188 self.tasks = tasks