Coverage for ibllib/pipes/widefield_tasks.py: 80%

87 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 13:06 +0000

1"""The widefield data extraction pipeline. 

2 

3The widefield pipeline requires task data extraction using the FPGA (ephys_preprocessing), 

4optogenetics, camera extraction and widefield image data compression, SVD and correction. 

5 

6Pipeline: 

7 1. Data renamed to be ALF-compliant and symlinks created with old names for use by wfield 

8 2. Raw image data is compressed 

9 3. Renamed and compressed files are registered to Alyx, imaging snapshots attached as Alyx notes 

10 4. Preprocessing run to produce 

11""" 

12import logging 

13 

14from ibllib.io.extractors.widefield import Widefield as WidefieldExtractor 

15from ibllib.pipes import base_tasks 

16from ibllib.io.video import get_video_meta 

17from ibllib.plots.snapshot import ReportSnapshot 

18 

19 

20_logger = logging.getLogger(__name__) 

21 

22try: 

23 import labcams.io 

24except ImportError: 

25 _logger.warning('labcams not installed') 

26 

27 

28class WidefieldRegisterRaw(base_tasks.WidefieldTask, base_tasks.RegisterRawDataTask): 

29 

30 priority = 100 

31 job_size = 'small' 

32 

33 @property 

34 def signature(self): 

35 signature = { 1e

36 'input_files': [('dorsal_cortex_landmarks.json', self.device_collection, False), 

37 ('*.camlog', self.device_collection, True), 

38 ('widefield_wiring.htsv', self.device_collection, False)], 

39 'output_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', False), 

40 ('widefieldEvents.raw.camlog', self.device_collection, True), 

41 ('widefieldChannels.wiring.htsv', self.device_collection, False)] 

42 } 

43 return signature 1e

44 

45 def _run(self, symlink_old=True): 

46 out_files = super()._run(symlink_old=symlink_old) 1e

47 self.register_snapshots() 1e

48 return out_files 1e

49 

50 

51class WidefieldCompress(base_tasks.WidefieldTask): 

52 

53 priority = 90 

54 job_size = 'large' 

55 

56 @property 

57 def signature(self): 

58 signature = { 1b

59 'input_files': [('*.dat', self.device_collection, True)], 

60 'output_files': [('imaging.frames.mov', self.device_collection, True)] 

61 } 

62 return signature 1b

63 

64 def _run(self, remove_uncompressed=False, verify_output=True, **kwargs): 

65 # Find raw data dat file 

66 filepath = next(self.session_path.rglob(self.input_files[0].glob_pattern)) 1b

67 

68 # Construct filename for compressed video 

69 output_file = self.session_path.joinpath(self.output_files[0].glob_pattern) 1b

70 # Compress to mov 

71 stack = labcams.io.mmap_dat(str(filepath)) 1b

72 labcams.io.stack_to_mj2_lossless(stack, str(output_file), rate=30) 1b

73 

74 assert output_file.exists(), 'Failed to compress data: no output file found' 1b

75 

76 if verify_output: 1b

77 meta = get_video_meta(output_file) 1b

78 assert meta.length > 0 and meta.size > 0, f'Video file empty: {output_file}' 1b

79 

80 if remove_uncompressed: 1b

81 filepath.unlink() 

82 

83 return [output_file] 1b

84 

85 

86# level 1 

87class WidefieldPreprocess(base_tasks.WidefieldTask): 

88 

89 priority = 80 

90 job_size = 'large' 

91 

92 @property 

93 def signature(self): 

94 signature = { 1c

95 'input_files': [('imaging.frames.*', self.device_collection, True), 

96 ('widefieldEvents.raw.*', self.device_collection, True)], 

97 'output_files': [('widefieldChannels.frameAverage.npy', 'alf/widefield', True), 

98 ('widefieldU.images.npy', 'alf/widefield', True), 

99 ('widefieldSVT.uncorrected.npy', 'alf/widefield', True), 

100 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)] 

101 } 

102 return signature 1c

103 

104 def _run(self, upload_plots=True, **kwargs): 

105 self.wf = WidefieldExtractor(self.session_path) 1c

106 _, out_files = self.wf.extract(save=True, extract_timestamps=False) 1c

107 

108 if upload_plots: 1c

109 output_plots = [] 

110 if self.wf.data_path.joinpath('hemodynamic_correction.png').exists(): 

111 output_plots.append(self.wf.data_path.joinpath('hemodynamic_correction.png')) 

112 if self.wf.data_path.joinpath('motion_correction.png').exists(): 

113 output_plots.append(self.wf.data_path.joinpath('motion_correction.png')) 

114 

115 if len(output_plots) > 0: 

116 eid = self.one.path2eid(self.session_path) 

117 snp = ReportSnapshot(self.session_path, eid, one=self.one) 

118 snp.outputs = output_plots 

119 snp.register_images(widths=['orig'], function='wfield') 

120 

121 return out_files 1c

122 

123 def tearDown(self): 

124 super(WidefieldPreprocess, self).tearDown() 1c

125 self.wf.remove_files() 1c

126 

127 

128class WidefieldSync(base_tasks.WidefieldTask): 

129 

130 priority = 40 

131 job_size = 'small' 

132 

133 @property 

134 def signature(self): 

135 signature = { 1dfg

136 'input_files': [('imaging.frames.mov', self.device_collection, True), 

137 ('widefieldEvents.raw.camlog', self.device_collection, True), 

138 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

139 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

140 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True)], 

141 'output_files': [('imaging.times.npy', 'alf/widefield', True), 

142 ('imaging.imagingLightSource.npy', 'alf/widefield', True), 

143 ('imagingLightSource.properties.htsv', 'alf/widefield', True)] 

144 } 

145 return signature 1dfg

146 

147 def _run(self): 

148 

149 self.wf = WidefieldExtractor(self.session_path) 1dfg

150 save_paths = [self.session_path.joinpath(sig[1], sig[0]) for sig in self.signature['output_files']] 1dfg

151 out_files = self.wf.sync_timestamps(bin_exists=False, save=True, save_paths=save_paths, 1dfg

152 sync_collection=self.sync_collection) 

153 

154 # TODO QC 

155 

156 return out_files 1d

157 

158 

159class WidefieldFOV(base_tasks.WidefieldTask): 

160 

161 priority = 40 

162 job_size = 'small' 

163 

164 @property 

165 def signature(self): 

166 signature = { 

167 'input_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', True), 

168 ('widefieldU.images.npy', 'alf/widefield', True), 

169 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)], 

170 'output_files': [('widefieldU.images_atlasTransformed.npy', 'alf/widefield', True), 

171 ('widefieldU.brainLocationIds_ccf_2017.npy', 'alf/widefield', True)] 

172 } 

173 

174 return signature 

175 

176 def _run(self): 

177 

178 outfiles = [] 

179 

180 # from wfield import load_allen_landmarks, SVDStack, atlas_from_landmarks_file 

181 # from iblatlas.regions import BrainRegions 

182 # from iblutil.numerical import ismember 

183 # import numpy as np 

184 # U = np.load(self.session_path.joinpath('alf/widefield', 'widefieldU.images.npy')) 

185 # SVT = np.load(self.session_path.joinpath('alf/widefield', 'widefieldSVT.haemoCorrected.npy')) 

186 # lmark_file = self.session_path.joinpath('alf/widefield', 'widefieldLandmarks.dorsalCortex.json') 

187 # landmarks = load_allen_landmarks(lmark_file) 

188 # 

189 # br = BrainRegions() 

190 # 

191 # stack = SVDStack(U, SVT) 

192 # stack.set_warped(1, M=landmarks['transform']) 

193 # 

194 # atlas, area_names, mask = atlas_from_landmarks_file(lmark_file) 

195 # atlas = atlas.astype(np.int32) 

196 # wf_ids = np.array([n[0] for n in area_names]) 

197 # allen_ids = np.array([br.acronym2id(n[1].split('_')[0], mapping='Allen-lr', hemisphere=n[1].split('_')[1])[0] 

198 # for n in area_names]) 

199 # 

200 # atlas_allen = np.zeros_like(atlas) 

201 # a, b = ismember(atlas, wf_ids) 

202 # atlas_allen[a] = allen_ids[b] 

203 # 

204 # file_U = self.session_path.joinpath('alf/widefield', 'widefieldU.images_atlasTransformed.npy') 

205 # np.save(file_U, stack.U_warped) 

206 # outfiles.append(file_U) 

207 # 

208 # # Do we save the mask?? 

209 # file_atlas = self.session_path.joinpath('alf/widefield', 'widefieldU.brainLocationIds_ccf_2017.npy') 

210 # np.save(file_atlas, atlas_allen) 

211 # outfiles.append(file_atlas) 

212 

213 return outfiles