Coverage for ibllib/pipes/widefield_tasks.py: 80%
87 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 13:06 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 13:06 +0000
1"""The widefield data extraction pipeline.
3The widefield pipeline requires task data extraction using the FPGA (ephys_preprocessing),
4optogenetics, camera extraction and widefield image data compression, SVD and correction.
6Pipeline:
7 1. Data renamed to be ALF-compliant and symlinks created with old names for use by wfield
8 2. Raw image data is compressed
9 3. Renamed and compressed files are registered to Alyx, imaging snapshots attached as Alyx notes
10 4. Preprocessing run to produce
11"""
12import logging
14from ibllib.io.extractors.widefield import Widefield as WidefieldExtractor
15from ibllib.pipes import base_tasks
16from ibllib.io.video import get_video_meta
17from ibllib.plots.snapshot import ReportSnapshot
20_logger = logging.getLogger(__name__)
22try:
23 import labcams.io
24except ImportError:
25 _logger.warning('labcams not installed')
28class WidefieldRegisterRaw(base_tasks.WidefieldTask, base_tasks.RegisterRawDataTask):
30 priority = 100
31 job_size = 'small'
33 @property
34 def signature(self):
35 signature = { 1e
36 'input_files': [('dorsal_cortex_landmarks.json', self.device_collection, False),
37 ('*.camlog', self.device_collection, True),
38 ('widefield_wiring.htsv', self.device_collection, False)],
39 'output_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', False),
40 ('widefieldEvents.raw.camlog', self.device_collection, True),
41 ('widefieldChannels.wiring.htsv', self.device_collection, False)]
42 }
43 return signature 1e
45 def _run(self, symlink_old=True):
46 out_files = super()._run(symlink_old=symlink_old) 1e
47 self.register_snapshots() 1e
48 return out_files 1e
51class WidefieldCompress(base_tasks.WidefieldTask):
53 priority = 90
54 job_size = 'large'
56 @property
57 def signature(self):
58 signature = { 1b
59 'input_files': [('*.dat', self.device_collection, True)],
60 'output_files': [('imaging.frames.mov', self.device_collection, True)]
61 }
62 return signature 1b
64 def _run(self, remove_uncompressed=False, verify_output=True, **kwargs):
65 # Find raw data dat file
66 filepath = next(self.session_path.rglob(self.input_files[0].glob_pattern)) 1b
68 # Construct filename for compressed video
69 output_file = self.session_path.joinpath(self.output_files[0].glob_pattern) 1b
70 # Compress to mov
71 stack = labcams.io.mmap_dat(str(filepath)) 1b
72 labcams.io.stack_to_mj2_lossless(stack, str(output_file), rate=30) 1b
74 assert output_file.exists(), 'Failed to compress data: no output file found' 1b
76 if verify_output: 1b
77 meta = get_video_meta(output_file) 1b
78 assert meta.length > 0 and meta.size > 0, f'Video file empty: {output_file}' 1b
80 if remove_uncompressed: 1b
81 filepath.unlink()
83 return [output_file] 1b
86# level 1
87class WidefieldPreprocess(base_tasks.WidefieldTask):
89 priority = 80
90 job_size = 'large'
92 @property
93 def signature(self):
94 signature = { 1c
95 'input_files': [('imaging.frames.*', self.device_collection, True),
96 ('widefieldEvents.raw.*', self.device_collection, True)],
97 'output_files': [('widefieldChannels.frameAverage.npy', 'alf/widefield', True),
98 ('widefieldU.images.npy', 'alf/widefield', True),
99 ('widefieldSVT.uncorrected.npy', 'alf/widefield', True),
100 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)]
101 }
102 return signature 1c
104 def _run(self, upload_plots=True, **kwargs):
105 self.wf = WidefieldExtractor(self.session_path) 1c
106 _, out_files = self.wf.extract(save=True, extract_timestamps=False) 1c
108 if upload_plots: 1c
109 output_plots = []
110 if self.wf.data_path.joinpath('hemodynamic_correction.png').exists():
111 output_plots.append(self.wf.data_path.joinpath('hemodynamic_correction.png'))
112 if self.wf.data_path.joinpath('motion_correction.png').exists():
113 output_plots.append(self.wf.data_path.joinpath('motion_correction.png'))
115 if len(output_plots) > 0:
116 eid = self.one.path2eid(self.session_path)
117 snp = ReportSnapshot(self.session_path, eid, one=self.one)
118 snp.outputs = output_plots
119 snp.register_images(widths=['orig'], function='wfield')
121 return out_files 1c
123 def tearDown(self):
124 super(WidefieldPreprocess, self).tearDown() 1c
125 self.wf.remove_files() 1c
128class WidefieldSync(base_tasks.WidefieldTask):
130 priority = 40
131 job_size = 'small'
133 @property
134 def signature(self):
135 signature = { 1dfg
136 'input_files': [('imaging.frames.mov', self.device_collection, True),
137 ('widefieldEvents.raw.camlog', self.device_collection, True),
138 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
139 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
140 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True)],
141 'output_files': [('imaging.times.npy', 'alf/widefield', True),
142 ('imaging.imagingLightSource.npy', 'alf/widefield', True),
143 ('imagingLightSource.properties.htsv', 'alf/widefield', True)]
144 }
145 return signature 1dfg
147 def _run(self):
149 self.wf = WidefieldExtractor(self.session_path) 1dfg
150 save_paths = [self.session_path.joinpath(sig[1], sig[0]) for sig in self.signature['output_files']] 1dfg
151 out_files = self.wf.sync_timestamps(bin_exists=False, save=True, save_paths=save_paths, 1dfg
152 sync_collection=self.sync_collection)
154 # TODO QC
156 return out_files 1d
159class WidefieldFOV(base_tasks.WidefieldTask):
161 priority = 40
162 job_size = 'small'
164 @property
165 def signature(self):
166 signature = {
167 'input_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', True),
168 ('widefieldU.images.npy', 'alf/widefield', True),
169 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)],
170 'output_files': [('widefieldU.images_atlasTransformed.npy', 'alf/widefield', True),
171 ('widefieldU.brainLocationIds_ccf_2017.npy', 'alf/widefield', True)]
172 }
174 return signature
176 def _run(self):
178 outfiles = []
180 # from wfield import load_allen_landmarks, SVDStack, atlas_from_landmarks_file
181 # from iblatlas.regions import BrainRegions
182 # from iblutil.numerical import ismember
183 # import numpy as np
184 # U = np.load(self.session_path.joinpath('alf/widefield', 'widefieldU.images.npy'))
185 # SVT = np.load(self.session_path.joinpath('alf/widefield', 'widefieldSVT.haemoCorrected.npy'))
186 # lmark_file = self.session_path.joinpath('alf/widefield', 'widefieldLandmarks.dorsalCortex.json')
187 # landmarks = load_allen_landmarks(lmark_file)
188 #
189 # br = BrainRegions()
190 #
191 # stack = SVDStack(U, SVT)
192 # stack.set_warped(1, M=landmarks['transform'])
193 #
194 # atlas, area_names, mask = atlas_from_landmarks_file(lmark_file)
195 # atlas = atlas.astype(np.int32)
196 # wf_ids = np.array([n[0] for n in area_names])
197 # allen_ids = np.array([br.acronym2id(n[1].split('_')[0], mapping='Allen-lr', hemisphere=n[1].split('_')[1])[0]
198 # for n in area_names])
199 #
200 # atlas_allen = np.zeros_like(atlas)
201 # a, b = ismember(atlas, wf_ids)
202 # atlas_allen[a] = allen_ids[b]
203 #
204 # file_U = self.session_path.joinpath('alf/widefield', 'widefieldU.images_atlasTransformed.npy')
205 # np.save(file_U, stack.U_warped)
206 # outfiles.append(file_U)
207 #
208 # # Do we save the mask??
209 # file_atlas = self.session_path.joinpath('alf/widefield', 'widefieldU.brainLocationIds_ccf_2017.npy')
210 # np.save(file_atlas, atlas_allen)
211 # outfiles.append(file_atlas)
213 return outfiles