Coverage for ibllib/pipes/widefield_tasks.py: 83%

87 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1"""The widefield data extraction pipeline. 

2 

3The widefield pipeline requires task data extraction using the FPGA (ephys_preprocessing), 

4optogenetics, camera extraction and widefield image data compression, SVD and correction. 

5 

6Pipeline: 

7 1. Data renamed to be ALF-compliant and symlinks created with old names for use by wfield 

8 2. Raw image data is compressed 

9 3. Renamed and compressed files are registered to Alyx, imaging snapshots attached as Alyx notes 

10 4. Preprocessing run to produce 

11""" 

12import logging 

13from pathlib import Path 

14 

15from ibllib.io.extractors.widefield import Widefield as WidefieldExtractor 

16from ibllib.pipes import base_tasks 

17from ibllib.io.video import get_video_meta 

18from ibllib.plots.snapshot import ReportSnapshot 

19 

20import labcams.io 

21 

22_logger = logging.getLogger(__name__) 

23 

24 

25class WidefieldRegisterRaw(base_tasks.WidefieldTask, base_tasks.RegisterRawDataTask): 

26 

27 priority = 100 

28 job_size = 'small' 

29 

30 @property 

31 def signature(self): 

32 signature = { 1ahe

33 'input_files': [('dorsal_cortex_landmarks.json', self.device_collection, False), 

34 ('*.camlog', self.device_collection, True), 

35 ('widefield_wiring.htsv', self.device_collection, False)], 

36 'output_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', False), 

37 ('widefieldEvents.raw.camlog', self.device_collection, True), 

38 ('widefieldChannels.wiring.htsv', self.device_collection, False)] 

39 } 

40 return signature 1ahe

41 

42 def _run(self, symlink_old=True): 

43 out_files = super()._run(symlink_old=symlink_old) 1e

44 self.register_snapshots() 1e

45 return out_files 1e

46 

47 

48class WidefieldCompress(base_tasks.WidefieldTask): 

49 

50 priority = 90 

51 job_size = 'large' 

52 

53 @property 

54 def signature(self): 

55 signature = { 1b

56 'input_files': [('*.dat', self.device_collection, True)], 

57 'output_files': [('imaging.frames.mov', self.device_collection, True)] 

58 } 

59 return signature 1b

60 

61 def _run(self, remove_uncompressed=False, verify_output=True, **kwargs): 

62 # Find raw data dat file 

63 filename, collection, _ = self.input_files[0] 1b

64 filepath = next(self.session_path.rglob(str(Path(collection).joinpath(filename)))) 1b

65 

66 # Construct filename for compressed video 

67 out_name, out_collection, _ = self.output_files[0] 1b

68 output_file = self.session_path.joinpath(out_collection, out_name) 1b

69 # Compress to mov 

70 stack = labcams.io.mmap_dat(str(filepath)) 1b

71 labcams.io.stack_to_mj2_lossless(stack, str(output_file), rate=30) 1b

72 

73 assert output_file.exists(), 'Failed to compress data: no output file found' 1b

74 

75 if verify_output: 1b

76 meta = get_video_meta(output_file) 1b

77 assert meta.length > 0 and meta.size > 0, f'Video file empty: {output_file}' 1b

78 

79 if remove_uncompressed: 1b

80 filepath.unlink() 

81 

82 return [output_file] 1b

83 

84 

85# level 1 

86class WidefieldPreprocess(base_tasks.WidefieldTask): 

87 

88 priority = 80 

89 job_size = 'large' 

90 

91 @property 

92 def signature(self): 

93 signature = { 1c

94 'input_files': [('imaging.frames.*', self.device_collection, True), 

95 ('widefieldEvents.raw.*', self.device_collection, True)], 

96 'output_files': [('widefieldChannels.frameAverage.npy', 'alf/widefield', True), 

97 ('widefieldU.images.npy', 'alf/widefield', True), 

98 ('widefieldSVT.uncorrected.npy', 'alf/widefield', True), 

99 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)] 

100 } 

101 return signature 1c

102 

103 def _run(self, upload_plots=True, **kwargs): 

104 self.wf = WidefieldExtractor(self.session_path) 1c

105 _, out_files = self.wf.extract(save=True, extract_timestamps=False) 1c

106 

107 if upload_plots: 1c

108 output_plots = [] 

109 if self.wf.data_path.joinpath('hemodynamic_correction.png').exists(): 

110 output_plots.append(self.wf.data_path.joinpath('hemodynamic_correction.png')) 

111 if self.wf.data_path.joinpath('motion_correction.png').exists(): 

112 output_plots.append(self.wf.data_path.joinpath('motion_correction.png')) 

113 

114 if len(output_plots) > 0: 

115 eid = self.one.path2eid(self.session_path) 

116 snp = ReportSnapshot(self.session_path, eid, one=self.one) 

117 snp.outputs = output_plots 

118 snp.register_images(widths=['orig'], function='wfield') 

119 

120 return out_files 1c

121 

122 def tearDown(self): 

123 super(WidefieldPreprocess, self).tearDown() 1c

124 self.wf.remove_files() 1c

125 

126 

127class WidefieldSync(base_tasks.WidefieldTask): 

128 

129 priority = 40 

130 job_size = 'small' 

131 

132 @property 

133 def signature(self): 

134 signature = { 1dfg

135 'input_files': [('imaging.frames.mov', self.device_collection, True), 

136 ('widefieldEvents.raw.camlog', self.device_collection, True), 

137 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

138 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

139 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True)], 

140 'output_files': [('imaging.times.npy', 'alf/widefield', True), 

141 ('imaging.imagingLightSource.npy', 'alf/widefield', True), 

142 ('imagingLightSource.properties.htsv', 'alf/widefield', True)] 

143 } 

144 return signature 1dfg

145 

146 def _run(self): 

147 

148 self.wf = WidefieldExtractor(self.session_path) 1dfg

149 save_paths = [self.session_path.joinpath(sig[1], sig[0]) for sig in self.signature['output_files']] 1dfg

150 out_files = self.wf.sync_timestamps(bin_exists=False, save=True, save_paths=save_paths, 1dfg

151 sync_collection=self.sync_collection) 

152 

153 # TODO QC 

154 

155 return out_files 1d

156 

157 

158class WidefieldFOV(base_tasks.WidefieldTask): 

159 

160 priority = 40 

161 job_size = 'small' 

162 

163 @property 

164 def signature(self): 

165 signature = { 

166 'input_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', True), 

167 ('widefieldU.images.npy', 'alf/widefield', True), 

168 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)], 

169 'output_files': [('widefieldU.images_atlasTransformed.npy', 'alf/widefield', True), 

170 ('widefieldU.brainLocationIds_ccf_2017.npy', 'alf/widefield', True)] 

171 } 

172 

173 return signature 

174 

175 def _run(self): 

176 

177 outfiles = [] 

178 

179 # from wfield import load_allen_landmarks, SVDStack, atlas_from_landmarks_file 

180 # from iblatlas.regions import BrainRegions 

181 # from iblutil.numerical import ismember 

182 # import numpy as np 

183 # U = np.load(self.session_path.joinpath('alf/widefield', 'widefieldU.images.npy')) 

184 # SVT = np.load(self.session_path.joinpath('alf/widefield', 'widefieldSVT.haemoCorrected.npy')) 

185 # lmark_file = self.session_path.joinpath('alf/widefield', 'widefieldLandmarks.dorsalCortex.json') 

186 # landmarks = load_allen_landmarks(lmark_file) 

187 # 

188 # br = BrainRegions() 

189 # 

190 # stack = SVDStack(U, SVT) 

191 # stack.set_warped(1, M=landmarks['transform']) 

192 # 

193 # atlas, area_names, mask = atlas_from_landmarks_file(lmark_file) 

194 # atlas = atlas.astype(np.int32) 

195 # wf_ids = np.array([n[0] for n in area_names]) 

196 # allen_ids = np.array([br.acronym2id(n[1].split('_')[0], mapping='Allen-lr', hemisphere=n[1].split('_')[1])[0] 

197 # for n in area_names]) 

198 # 

199 # atlas_allen = np.zeros_like(atlas) 

200 # a, b = ismember(atlas, wf_ids) 

201 # atlas_allen[a] = allen_ids[b] 

202 # 

203 # file_U = self.session_path.joinpath('alf/widefield', 'widefieldU.images_atlasTransformed.npy') 

204 # np.save(file_U, stack.U_warped) 

205 # outfiles.append(file_U) 

206 # 

207 # # Do we save the mask?? 

208 # file_atlas = self.session_path.joinpath('alf/widefield', 'widefieldU.brainLocationIds_ccf_2017.npy') 

209 # np.save(file_atlas, atlas_allen) 

210 # outfiles.append(file_atlas) 

211 

212 return outfiles