Coverage for ibllib/pipes/widefield_tasks.py: 83%
87 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1"""The widefield data extraction pipeline.
3The widefield pipeline requires task data extraction using the FPGA (ephys_preprocessing),
4optogenetics, camera extraction and widefield image data compression, SVD and correction.
6Pipeline:
7 1. Data renamed to be ALF-compliant and symlinks created with old names for use by wfield
8 2. Raw image data is compressed
9 3. Renamed and compressed files are registered to Alyx, imaging snapshots attached as Alyx notes
10 4. Preprocessing run to produce
11"""
12import logging
13from pathlib import Path
15from ibllib.io.extractors.widefield import Widefield as WidefieldExtractor
16from ibllib.pipes import base_tasks
17from ibllib.io.video import get_video_meta
18from ibllib.plots.snapshot import ReportSnapshot
20import labcams.io
22_logger = logging.getLogger(__name__)
25class WidefieldRegisterRaw(base_tasks.WidefieldTask, base_tasks.RegisterRawDataTask):
27 priority = 100
28 job_size = 'small'
30 @property
31 def signature(self):
32 signature = { 1ahe
33 'input_files': [('dorsal_cortex_landmarks.json', self.device_collection, False),
34 ('*.camlog', self.device_collection, True),
35 ('widefield_wiring.htsv', self.device_collection, False)],
36 'output_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', False),
37 ('widefieldEvents.raw.camlog', self.device_collection, True),
38 ('widefieldChannels.wiring.htsv', self.device_collection, False)]
39 }
40 return signature 1ahe
42 def _run(self, symlink_old=True):
43 out_files = super()._run(symlink_old=symlink_old) 1e
44 self.register_snapshots() 1e
45 return out_files 1e
48class WidefieldCompress(base_tasks.WidefieldTask):
50 priority = 90
51 job_size = 'large'
53 @property
54 def signature(self):
55 signature = { 1b
56 'input_files': [('*.dat', self.device_collection, True)],
57 'output_files': [('imaging.frames.mov', self.device_collection, True)]
58 }
59 return signature 1b
61 def _run(self, remove_uncompressed=False, verify_output=True, **kwargs):
62 # Find raw data dat file
63 filename, collection, _ = self.input_files[0] 1b
64 filepath = next(self.session_path.rglob(str(Path(collection).joinpath(filename)))) 1b
66 # Construct filename for compressed video
67 out_name, out_collection, _ = self.output_files[0] 1b
68 output_file = self.session_path.joinpath(out_collection, out_name) 1b
69 # Compress to mov
70 stack = labcams.io.mmap_dat(str(filepath)) 1b
71 labcams.io.stack_to_mj2_lossless(stack, str(output_file), rate=30) 1b
73 assert output_file.exists(), 'Failed to compress data: no output file found' 1b
75 if verify_output: 1b
76 meta = get_video_meta(output_file) 1b
77 assert meta.length > 0 and meta.size > 0, f'Video file empty: {output_file}' 1b
79 if remove_uncompressed: 1b
80 filepath.unlink()
82 return [output_file] 1b
85# level 1
86class WidefieldPreprocess(base_tasks.WidefieldTask):
88 priority = 80
89 job_size = 'large'
91 @property
92 def signature(self):
93 signature = { 1c
94 'input_files': [('imaging.frames.*', self.device_collection, True),
95 ('widefieldEvents.raw.*', self.device_collection, True)],
96 'output_files': [('widefieldChannels.frameAverage.npy', 'alf/widefield', True),
97 ('widefieldU.images.npy', 'alf/widefield', True),
98 ('widefieldSVT.uncorrected.npy', 'alf/widefield', True),
99 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)]
100 }
101 return signature 1c
103 def _run(self, upload_plots=True, **kwargs):
104 self.wf = WidefieldExtractor(self.session_path) 1c
105 _, out_files = self.wf.extract(save=True, extract_timestamps=False) 1c
107 if upload_plots: 1c
108 output_plots = []
109 if self.wf.data_path.joinpath('hemodynamic_correction.png').exists():
110 output_plots.append(self.wf.data_path.joinpath('hemodynamic_correction.png'))
111 if self.wf.data_path.joinpath('motion_correction.png').exists():
112 output_plots.append(self.wf.data_path.joinpath('motion_correction.png'))
114 if len(output_plots) > 0:
115 eid = self.one.path2eid(self.session_path)
116 snp = ReportSnapshot(self.session_path, eid, one=self.one)
117 snp.outputs = output_plots
118 snp.register_images(widths=['orig'], function='wfield')
120 return out_files 1c
122 def tearDown(self):
123 super(WidefieldPreprocess, self).tearDown() 1c
124 self.wf.remove_files() 1c
127class WidefieldSync(base_tasks.WidefieldTask):
129 priority = 40
130 job_size = 'small'
132 @property
133 def signature(self):
134 signature = { 1dfg
135 'input_files': [('imaging.frames.mov', self.device_collection, True),
136 ('widefieldEvents.raw.camlog', self.device_collection, True),
137 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
138 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
139 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True)],
140 'output_files': [('imaging.times.npy', 'alf/widefield', True),
141 ('imaging.imagingLightSource.npy', 'alf/widefield', True),
142 ('imagingLightSource.properties.htsv', 'alf/widefield', True)]
143 }
144 return signature 1dfg
146 def _run(self):
148 self.wf = WidefieldExtractor(self.session_path) 1dfg
149 save_paths = [self.session_path.joinpath(sig[1], sig[0]) for sig in self.signature['output_files']] 1dfg
150 out_files = self.wf.sync_timestamps(bin_exists=False, save=True, save_paths=save_paths, 1dfg
151 sync_collection=self.sync_collection)
153 # TODO QC
155 return out_files 1d
158class WidefieldFOV(base_tasks.WidefieldTask):
160 priority = 40
161 job_size = 'small'
163 @property
164 def signature(self):
165 signature = {
166 'input_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', True),
167 ('widefieldU.images.npy', 'alf/widefield', True),
168 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)],
169 'output_files': [('widefieldU.images_atlasTransformed.npy', 'alf/widefield', True),
170 ('widefieldU.brainLocationIds_ccf_2017.npy', 'alf/widefield', True)]
171 }
173 return signature
175 def _run(self):
177 outfiles = []
179 # from wfield import load_allen_landmarks, SVDStack, atlas_from_landmarks_file
180 # from iblatlas.regions import BrainRegions
181 # from iblutil.numerical import ismember
182 # import numpy as np
183 # U = np.load(self.session_path.joinpath('alf/widefield', 'widefieldU.images.npy'))
184 # SVT = np.load(self.session_path.joinpath('alf/widefield', 'widefieldSVT.haemoCorrected.npy'))
185 # lmark_file = self.session_path.joinpath('alf/widefield', 'widefieldLandmarks.dorsalCortex.json')
186 # landmarks = load_allen_landmarks(lmark_file)
187 #
188 # br = BrainRegions()
189 #
190 # stack = SVDStack(U, SVT)
191 # stack.set_warped(1, M=landmarks['transform'])
192 #
193 # atlas, area_names, mask = atlas_from_landmarks_file(lmark_file)
194 # atlas = atlas.astype(np.int32)
195 # wf_ids = np.array([n[0] for n in area_names])
196 # allen_ids = np.array([br.acronym2id(n[1].split('_')[0], mapping='Allen-lr', hemisphere=n[1].split('_')[1])[0]
197 # for n in area_names])
198 #
199 # atlas_allen = np.zeros_like(atlas)
200 # a, b = ismember(atlas, wf_ids)
201 # atlas_allen[a] = allen_ids[b]
202 #
203 # file_U = self.session_path.joinpath('alf/widefield', 'widefieldU.images_atlasTransformed.npy')
204 # np.save(file_U, stack.U_warped)
205 # outfiles.append(file_U)
206 #
207 # # Do we save the mask??
208 # file_atlas = self.session_path.joinpath('alf/widefield', 'widefieldU.brainLocationIds_ccf_2017.npy')
209 # np.save(file_atlas, atlas_allen)
210 # outfiles.append(file_atlas)
212 return outfiles