Coverage for ibllib/pipes/widefield_tasks.py: 81%

90 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""The widefield data extraction pipeline. 

2 

3The widefield pipeline requires task data extraction using the FPGA (ephys_preprocessing), 

4optogenetics, camera extraction and widefield image data compression, SVD and correction. 

5 

6Pipeline: 

7 1. Data renamed to be ALF-compliant and symlinks created with old names for use by wfield 

8 2. Raw image data is compressed 

9 3. Renamed and compressed files are registered to Alyx, imaging snapshots attached as Alyx notes 

10 4. Preprocessing run to produce 

11""" 

12import logging 

13from pathlib import Path 

14 

15from ibllib.io.extractors.widefield import Widefield as WidefieldExtractor 

16from ibllib.pipes import base_tasks 

17from ibllib.io.video import get_video_meta 

18from ibllib.plots.snapshot import ReportSnapshot 

19 

20 

21_logger = logging.getLogger(__name__) 

22 

23try: 

24 import labcams.io 

25except ImportError: 

26 _logger.warning('labcams not installed') 

27 

28 

29class WidefieldRegisterRaw(base_tasks.WidefieldTask, base_tasks.RegisterRawDataTask): 

30 

31 priority = 100 

32 job_size = 'small' 

33 

34 @property 

35 def signature(self): 

36 signature = { 1ahe

37 'input_files': [('dorsal_cortex_landmarks.json', self.device_collection, False), 

38 ('*.camlog', self.device_collection, True), 

39 ('widefield_wiring.htsv', self.device_collection, False)], 

40 'output_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', False), 

41 ('widefieldEvents.raw.camlog', self.device_collection, True), 

42 ('widefieldChannels.wiring.htsv', self.device_collection, False)] 

43 } 

44 return signature 1ahe

45 

46 def _run(self, symlink_old=True): 

47 out_files = super()._run(symlink_old=symlink_old) 1e

48 self.register_snapshots() 1e

49 return out_files 1e

50 

51 

52class WidefieldCompress(base_tasks.WidefieldTask): 

53 

54 priority = 90 

55 job_size = 'large' 

56 

57 @property 

58 def signature(self): 

59 signature = { 1b

60 'input_files': [('*.dat', self.device_collection, True)], 

61 'output_files': [('imaging.frames.mov', self.device_collection, True)] 

62 } 

63 return signature 1b

64 

65 def _run(self, remove_uncompressed=False, verify_output=True, **kwargs): 

66 # Find raw data dat file 

67 filename, collection, _ = self.input_files[0] 1b

68 filepath = next(self.session_path.rglob(str(Path(collection).joinpath(filename)))) 1b

69 

70 # Construct filename for compressed video 

71 out_name, out_collection, _ = self.output_files[0] 1b

72 output_file = self.session_path.joinpath(out_collection, out_name) 1b

73 # Compress to mov 

74 stack = labcams.io.mmap_dat(str(filepath)) 1b

75 labcams.io.stack_to_mj2_lossless(stack, str(output_file), rate=30) 1b

76 

77 assert output_file.exists(), 'Failed to compress data: no output file found' 1b

78 

79 if verify_output: 1b

80 meta = get_video_meta(output_file) 1b

81 assert meta.length > 0 and meta.size > 0, f'Video file empty: {output_file}' 1b

82 

83 if remove_uncompressed: 1b

84 filepath.unlink() 

85 

86 return [output_file] 1b

87 

88 

89# level 1 

90class WidefieldPreprocess(base_tasks.WidefieldTask): 

91 

92 priority = 80 

93 job_size = 'large' 

94 

95 @property 

96 def signature(self): 

97 signature = { 1c

98 'input_files': [('imaging.frames.*', self.device_collection, True), 

99 ('widefieldEvents.raw.*', self.device_collection, True)], 

100 'output_files': [('widefieldChannels.frameAverage.npy', 'alf/widefield', True), 

101 ('widefieldU.images.npy', 'alf/widefield', True), 

102 ('widefieldSVT.uncorrected.npy', 'alf/widefield', True), 

103 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)] 

104 } 

105 return signature 1c

106 

107 def _run(self, upload_plots=True, **kwargs): 

108 self.wf = WidefieldExtractor(self.session_path) 1c

109 _, out_files = self.wf.extract(save=True, extract_timestamps=False) 1c

110 

111 if upload_plots: 1c

112 output_plots = [] 

113 if self.wf.data_path.joinpath('hemodynamic_correction.png').exists(): 

114 output_plots.append(self.wf.data_path.joinpath('hemodynamic_correction.png')) 

115 if self.wf.data_path.joinpath('motion_correction.png').exists(): 

116 output_plots.append(self.wf.data_path.joinpath('motion_correction.png')) 

117 

118 if len(output_plots) > 0: 

119 eid = self.one.path2eid(self.session_path) 

120 snp = ReportSnapshot(self.session_path, eid, one=self.one) 

121 snp.outputs = output_plots 

122 snp.register_images(widths=['orig'], function='wfield') 

123 

124 return out_files 1c

125 

126 def tearDown(self): 

127 super(WidefieldPreprocess, self).tearDown() 1c

128 self.wf.remove_files() 1c

129 

130 

131class WidefieldSync(base_tasks.WidefieldTask): 

132 

133 priority = 40 

134 job_size = 'small' 

135 

136 @property 

137 def signature(self): 

138 signature = { 1dfg

139 'input_files': [('imaging.frames.mov', self.device_collection, True), 

140 ('widefieldEvents.raw.camlog', self.device_collection, True), 

141 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

142 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

143 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True)], 

144 'output_files': [('imaging.times.npy', 'alf/widefield', True), 

145 ('imaging.imagingLightSource.npy', 'alf/widefield', True), 

146 ('imagingLightSource.properties.htsv', 'alf/widefield', True)] 

147 } 

148 return signature 1dfg

149 

150 def _run(self): 

151 

152 self.wf = WidefieldExtractor(self.session_path) 1dfg

153 save_paths = [self.session_path.joinpath(sig[1], sig[0]) for sig in self.signature['output_files']] 1dfg

154 out_files = self.wf.sync_timestamps(bin_exists=False, save=True, save_paths=save_paths, 1dfg

155 sync_collection=self.sync_collection) 

156 

157 # TODO QC 

158 

159 return out_files 1d

160 

161 

162class WidefieldFOV(base_tasks.WidefieldTask): 

163 

164 priority = 40 

165 job_size = 'small' 

166 

167 @property 

168 def signature(self): 

169 signature = { 

170 'input_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', True), 

171 ('widefieldU.images.npy', 'alf/widefield', True), 

172 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)], 

173 'output_files': [('widefieldU.images_atlasTransformed.npy', 'alf/widefield', True), 

174 ('widefieldU.brainLocationIds_ccf_2017.npy', 'alf/widefield', True)] 

175 } 

176 

177 return signature 

178 

179 def _run(self): 

180 

181 outfiles = [] 

182 

183 # from wfield import load_allen_landmarks, SVDStack, atlas_from_landmarks_file 

184 # from iblatlas.regions import BrainRegions 

185 # from iblutil.numerical import ismember 

186 # import numpy as np 

187 # U = np.load(self.session_path.joinpath('alf/widefield', 'widefieldU.images.npy')) 

188 # SVT = np.load(self.session_path.joinpath('alf/widefield', 'widefieldSVT.haemoCorrected.npy')) 

189 # lmark_file = self.session_path.joinpath('alf/widefield', 'widefieldLandmarks.dorsalCortex.json') 

190 # landmarks = load_allen_landmarks(lmark_file) 

191 # 

192 # br = BrainRegions() 

193 # 

194 # stack = SVDStack(U, SVT) 

195 # stack.set_warped(1, M=landmarks['transform']) 

196 # 

197 # atlas, area_names, mask = atlas_from_landmarks_file(lmark_file) 

198 # atlas = atlas.astype(np.int32) 

199 # wf_ids = np.array([n[0] for n in area_names]) 

200 # allen_ids = np.array([br.acronym2id(n[1].split('_')[0], mapping='Allen-lr', hemisphere=n[1].split('_')[1])[0] 

201 # for n in area_names]) 

202 # 

203 # atlas_allen = np.zeros_like(atlas) 

204 # a, b = ismember(atlas, wf_ids) 

205 # atlas_allen[a] = allen_ids[b] 

206 # 

207 # file_U = self.session_path.joinpath('alf/widefield', 'widefieldU.images_atlasTransformed.npy') 

208 # np.save(file_U, stack.U_warped) 

209 # outfiles.append(file_U) 

210 # 

211 # # Do we save the mask?? 

212 # file_atlas = self.session_path.joinpath('alf/widefield', 'widefieldU.brainLocationIds_ccf_2017.npy') 

213 # np.save(file_atlas, atlas_allen) 

214 # outfiles.append(file_atlas) 

215 

216 return outfiles