Coverage for ibllib/pipes/training_preprocessing.py: 59%

108 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""(Deprecated) Training data preprocessing tasks. 

2 

3These tasks are part of the old pipeline. This module has been replaced by the dynamic pipeline 

4and the `behavior_tasks` module. 

5""" 

6 

7import logging 

8from collections import OrderedDict 

9from one.alf.files import session_path_parts 

10import warnings 

11 

12from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw 

13from ibllib.pipes import tasks, training_status 

14from ibllib.io import ffmpeg 

15from ibllib.io.extractors.base import get_session_extractor_type 

16from ibllib.io.extractors import training_audio, bpod_trials, camera 

17from ibllib.qc.camera import CameraQC 

18from ibllib.qc.task_metrics import TaskQC, HabituationQC 

19from ibllib.qc.task_extractors import TaskQCExtractor 

20 

21_logger = logging.getLogger(__name__) 

22warnings.warn('`pipes.training_preprocessing` to be removed in favour of dynamic pipeline', FutureWarning) 

23 

24 

25# level 0 

26class TrainingRegisterRaw(tasks.Task): 

27 priority = 100 

28 

29 def _run(self): 

30 return [] 

31 

32 

33class TrainingTrials(tasks.Task): 

34 priority = 90 

35 level = 0 

36 force = False 

37 signature = { 

38 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

39 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

40 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True), 

41 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True)], 

42 'output_files': [('*trials.goCueTrigger_times.npy', 'alf', True), 

43 ('*trials.table.pqt', 'alf', True), 

44 ('*wheel.position.npy', 'alf', True), 

45 ('*wheel.timestamps.npy', 'alf', True), 

46 ('*wheelMoves.intervals.npy', 'alf', True), 

47 ('*wheelMoves.peakAmplitude.npy', 'alf', True)] 

48 } 

49 

50 def extract_behaviour(self, save=True): 

51 """Extracts an iblrig training session.""" 

52 trials, wheel, output_files = bpod_trials.extract_all(self.session_path, save=save) 1cdehf

53 if trials is None: 1cdehf

54 return None, None 1h

55 if wheel is not None: 1cdef

56 trials.update(wheel) 1cdef

57 return trials, output_files 1cdef

58 

59 def run_qc(self, trials_data=None, update=True): 

60 if trials_data is None: 

61 trials_data, _ = self.extract_behaviour(save=False) 

62 if not trials_data: 

63 raise ValueError('No trials data found') 

64 

65 # Compile task data for QC 

66 extractor_type = get_session_extractor_type(self.session_path) 

67 if extractor_type == 'habituation': 

68 qc = HabituationQC(self.session_path, one=self.one) 

69 else: 

70 qc = TaskQC(self.session_path, one=self.one) 

71 qc.extractor = TaskQCExtractor(self.session_path, one=self.one, lazy=True) 

72 qc.extractor.type = extractor_type 

73 qc.extractor.data = qc.extractor.rename_data(trials_data) 

74 qc.extractor.load_raw_data() # re-loads raw data and populates various properties 

75 # Aggregate and update Alyx QC fields 

76 qc.run(update=update) 

77 

78 return qc 

79 

80 def _run(self, **_): 

81 """Extracts an iblrig training session and runs QC.""" 

82 trials_data, output_files = self.extract_behaviour() 1cdehf

83 if self.one and not self.one.offline and trials_data: 1cdehf

84 # Run the task QC 

85 self.run_qc(trials_data) 

86 return output_files 1cdehf

87 

88 

89class TrainingVideoCompress(tasks.Task): 

90 

91 priority = 90 

92 io_charge = 100 

93 job_size = 'large' 

94 

95 def _run(self): 

96 # avi to mp4 compression 

97 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1bg

98 '-nostats -codec:a copy {file_out}') 

99 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1bg

100 

101 if len(output_files) == 0: 1bg

102 _logger.info('No compressed videos found; skipping timestamp extraction') 1b

103 return # labels the task as empty if no output 1b

104 

105 # Video timestamps extraction 

106 data, files = camera.extract_all(self.session_path, save=True, video_path=output_files[0]) 1bg

107 output_files.extend(files) 1bg

108 

109 # Video QC 

110 CameraQC(self.session_path, 'left', one=self.one, stream=False).run(update=True) 1bg

111 return output_files 1bg

112 

113 

114class TrainingAudio(tasks.Task): 

115 """ 

116 Computes raw electrophysiology QC 

117 """ 

118 cpu = 2 

119 priority = 10 # a lot of jobs depend on this one 

120 level = 0 # this job doesn't depend on anything 

121 

122 def _run(self, overwrite=False): 

123 return training_audio.extract_sound(self.session_path, save=True, delete=True) 

124 

125 

126# level 1 

127class TrainingDLC(tasks.Task): 

128 

129 def _run(self): 

130 """empty placeholder for job creation only""" 

131 pass 

132 

133 

134class TrainingStatus(tasks.Task): 

135 priority = 90 

136 level = 1 

137 force = False 

138 signature = { 

139 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

140 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

141 ('*trials.table.pqt', 'alf', True)], 

142 'output_files': [] 

143 } 

144 

145 def _run(self, upload=True): 

146 """ 

147 Extracts training status for subject 

148 """ 

149 df = training_status.get_latest_training_information(self.session_path, self.one) 

150 if df is not None: 

151 training_status.make_plots(self.session_path, self.one, df=df, save=True, upload=upload) 

152 # Update status map in JSON field of subjects endpoint 

153 # TODO This requires exposing the json field of the subjects endpoint 

154 if self.one and not self.one.offline: 

155 _logger.debug('Updating JSON field of subjects endpoint') 

156 try: 

157 status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates( 

158 subset='training_status', keep='first').to_dict()) 

159 date, sess = status.items() 

160 data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k])) for k, v 

161 in date[1].items()}} 

162 _, subject, *_ = session_path_parts(self.session_path) 

163 self.one.alyx.json_field_update('subjects', subject, data=data) 

164 except KeyError: 

165 _logger.error('Failed to update subject training status on Alyx: json field not available') 

166 

167 output_files = [] 

168 return output_files 

169 

170 

171class TrainingExtractionPipeline(tasks.Pipeline): 

172 label = __name__ 

173 

174 def __init__(self, session_path, **kwargs): 

175 super(TrainingExtractionPipeline, self).__init__(session_path, **kwargs) 

176 tasks = OrderedDict() 

177 self.session_path = session_path 

178 # level 0 

179 tasks['ExperimentDescriptionRegisterRaw'] = ExperimentDescriptionRegisterRaw(self.session_path) 

180 tasks['TrainingRegisterRaw'] = TrainingRegisterRaw(self.session_path) 

181 tasks['TrainingTrials'] = TrainingTrials(self.session_path) 

182 tasks['TrainingVideoCompress'] = TrainingVideoCompress(self.session_path) 

183 tasks['TrainingAudio'] = TrainingAudio(self.session_path) 

184 # level 1 

185 tasks['TrainingStatus'] = TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']]) 

186 tasks['TrainingDLC'] = TrainingDLC( 

187 self.session_path, parents=[tasks['TrainingVideoCompress']]) 

188 self.tasks = tasks