Coverage for ibllib/pipes/training_preprocessing.py: 96%

96 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import logging 

2from collections import OrderedDict 

3from one.alf.files import session_path_parts 

4import warnings 

5 

6from ibllib.pipes.base_tasks import ExperimentDescriptionRegisterRaw 

7from ibllib.pipes import tasks, training_status 

8from ibllib.io import ffmpeg 

9from ibllib.io.extractors.base import get_session_extractor_type 

10from ibllib.io.extractors import training_audio, bpod_trials, camera 

11from ibllib.qc.camera import CameraQC 

12from ibllib.qc.task_metrics import TaskQC, HabituationQC 

13from ibllib.qc.task_extractors import TaskQCExtractor 

14 

15_logger = logging.getLogger(__name__) 

16warnings.warn('`pipes.training_preprocessing` to be removed in favour of dynamic pipeline') 

17 

18 

19# level 0 

20class TrainingRegisterRaw(tasks.Task): 

21 priority = 100 

22 

23 def _run(self): 

24 return [] 1ca

25 

26 

27class TrainingTrials(tasks.Task): 

28 priority = 90 

29 level = 0 

30 force = False 

31 signature = { 

32 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

33 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

34 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True), 

35 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True)], 

36 'output_files': [('*trials.goCueTrigger_times.npy', 'alf', True), 

37 ('*trials.table.pqt', 'alf', True), 

38 ('*wheel.position.npy', 'alf', True), 

39 ('*wheel.timestamps.npy', 'alf', True), 

40 ('*wheelMoves.intervals.npy', 'alf', True), 

41 ('*wheelMoves.peakAmplitude.npy', 'alf', True)] 

42 } 

43 

44 def _run(self): 

45 """ 

46 Extracts an iblrig training session 

47 """ 

48 trials, wheel, output_files = bpod_trials.extract_all(self.session_path, save=True) 1fghaji

49 if trials is None: 1fghaji

50 return None 1j

51 if self.one is None or self.one.offline: 1fghai

52 return output_files 1fghi

53 # Run the task QC 

54 # Compile task data for QC 

55 type = get_session_extractor_type(self.session_path) 1a

56 if type == 'habituation': 1a

57 qc = HabituationQC(self.session_path, one=self.one) 

58 qc.extractor = TaskQCExtractor(self.session_path, one=self.one) 

59 else: # Update wheel data 

60 qc = TaskQC(self.session_path, one=self.one) 1a

61 qc.extractor = TaskQCExtractor(self.session_path, one=self.one) 1a

62 qc.extractor.wheel_encoding = 'X1' 1a

63 # Aggregate and update Alyx QC fields 

64 qc.run(update=True) 1a

65 return output_files 1a

66 

67 

68class TrainingVideoCompress(tasks.Task): 

69 

70 priority = 90 

71 io_charge = 100 

72 job_size = 'large' 

73 

74 def _run(self): 

75 # avi to mp4 compression 

76 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1dea

77 '-nostats -codec:a copy {file_out}') 

78 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1dea

79 

80 if len(output_files) == 0: 1dea

81 _logger.info('No compressed videos found; skipping timestamp extraction') 1da

82 return # labels the task as empty if no output 1da

83 

84 # Video timestamps extraction 

85 data, files = camera.extract_all(self.session_path, save=True, video_path=output_files[0]) 1dea

86 output_files.extend(files) 1dea

87 

88 # Video QC 

89 CameraQC(self.session_path, 'left', one=self.one, stream=False).run(update=True) 1dea

90 return output_files 1dea

91 

92 

93class TrainingAudio(tasks.Task): 

94 """ 

95 Computes raw electrophysiology QC 

96 """ 

97 cpu = 2 

98 priority = 10 # a lot of jobs depend on this one 

99 level = 0 # this job doesn't depend on anything 

100 

101 def _run(self, overwrite=False): 

102 return training_audio.extract_sound(self.session_path, save=True, delete=True) 1a

103 

104 

105# level 1 

106class TrainingDLC(tasks.Task): 

107 

108 def _run(self): 

109 """empty placeholder for job creation only""" 

110 pass 1a

111 

112 

113class TrainingStatus(tasks.Task): 

114 priority = 90 

115 level = 1 

116 force = False 

117 signature = { 

118 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

119 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

120 ('*trials.table.pqt', 'alf', True)], 

121 'output_files': [] 

122 } 

123 

124 def _run(self, upload=True): 

125 """ 

126 Extracts training status for subject 

127 """ 

128 df = training_status.get_latest_training_information(self.session_path, self.one) 1ca

129 if df is not None: 1ca

130 training_status.make_plots(self.session_path, self.one, df=df, save=True, upload=upload) 1ca

131 # Update status map in JSON field of subjects endpoint 

132 # TODO This requires exposing the json field of the subjects endpoint 

133 if self.one and not self.one.offline: 1ca

134 _logger.debug('Updating JSON field of subjects endpoint') 1ca

135 try: 1ca

136 status = (df.set_index('date')[['training_status', 'session_path']].drop_duplicates( 1ca

137 subset='training_status', keep='first').to_dict()) 

138 date, sess = status.items() 1ca

139 data = {'trained_criteria': {v.replace(' ', '_'): (k, self.one.path2eid(sess[1][k])) for k, v 1ca

140 in date[1].items()}} 

141 _, subject, *_ = session_path_parts(self.session_path) 1ca

142 self.one.alyx.json_field_update('subjects', subject, data=data) 1ca

143 except KeyError: 

144 _logger.error('Failed to update subject training status on Alyx: json field not available') 

145 

146 output_files = [] 1ca

147 return output_files 1ca

148 

149 

150class TrainingExtractionPipeline(tasks.Pipeline): 

151 label = __name__ 

152 

153 def __init__(self, session_path, **kwargs): 

154 super(TrainingExtractionPipeline, self).__init__(session_path, **kwargs) 1a

155 tasks = OrderedDict() 1a

156 self.session_path = session_path 1a

157 # level 0 

158 tasks['ExperimentDescriptionRegisterRaw'] = ExperimentDescriptionRegisterRaw(self.session_path) 1a

159 tasks['TrainingRegisterRaw'] = TrainingRegisterRaw(self.session_path) 1a

160 tasks['TrainingTrials'] = TrainingTrials(self.session_path) 1a

161 tasks['TrainingVideoCompress'] = TrainingVideoCompress(self.session_path) 1a

162 tasks['TrainingAudio'] = TrainingAudio(self.session_path) 1a

163 # level 1 

164 tasks['TrainingStatus'] = TrainingStatus(self.session_path, parents=[tasks['TrainingTrials']]) 1a

165 tasks['TrainingDLC'] = TrainingDLC( 1a

166 self.session_path, parents=[tasks['TrainingVideoCompress']]) 

167 self.tasks = tasks 1a