Coverage for ibllib/pipes/widefield_tasks.py: 81%
90 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""The widefield data extraction pipeline.
3The widefield pipeline requires task data extraction using the FPGA (ephys_preprocessing),
4optogenetics, camera extraction and widefield image data compression, SVD and correction.
6Pipeline:
7 1. Data renamed to be ALF-compliant and symlinks created with old names for use by wfield
8 2. Raw image data is compressed
9 3. Renamed and compressed files are registered to Alyx, imaging snapshots attached as Alyx notes
10 4. Preprocessing run to produce
11"""
12import logging
13from pathlib import Path
15from ibllib.io.extractors.widefield import Widefield as WidefieldExtractor
16from ibllib.pipes import base_tasks
17from ibllib.io.video import get_video_meta
18from ibllib.plots.snapshot import ReportSnapshot
21_logger = logging.getLogger(__name__)
23try:
24 import labcams.io
25except ImportError:
26 _logger.warning('labcams not installed')
29class WidefieldRegisterRaw(base_tasks.WidefieldTask, base_tasks.RegisterRawDataTask):
31 priority = 100
32 job_size = 'small'
34 @property
35 def signature(self):
36 signature = { 1ahe
37 'input_files': [('dorsal_cortex_landmarks.json', self.device_collection, False),
38 ('*.camlog', self.device_collection, True),
39 ('widefield_wiring.htsv', self.device_collection, False)],
40 'output_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', False),
41 ('widefieldEvents.raw.camlog', self.device_collection, True),
42 ('widefieldChannels.wiring.htsv', self.device_collection, False)]
43 }
44 return signature 1ahe
46 def _run(self, symlink_old=True):
47 out_files = super()._run(symlink_old=symlink_old) 1e
48 self.register_snapshots() 1e
49 return out_files 1e
52class WidefieldCompress(base_tasks.WidefieldTask):
54 priority = 90
55 job_size = 'large'
57 @property
58 def signature(self):
59 signature = { 1b
60 'input_files': [('*.dat', self.device_collection, True)],
61 'output_files': [('imaging.frames.mov', self.device_collection, True)]
62 }
63 return signature 1b
65 def _run(self, remove_uncompressed=False, verify_output=True, **kwargs):
66 # Find raw data dat file
67 filename, collection, _ = self.input_files[0] 1b
68 filepath = next(self.session_path.rglob(str(Path(collection).joinpath(filename)))) 1b
70 # Construct filename for compressed video
71 out_name, out_collection, _ = self.output_files[0] 1b
72 output_file = self.session_path.joinpath(out_collection, out_name) 1b
73 # Compress to mov
74 stack = labcams.io.mmap_dat(str(filepath)) 1b
75 labcams.io.stack_to_mj2_lossless(stack, str(output_file), rate=30) 1b
77 assert output_file.exists(), 'Failed to compress data: no output file found' 1b
79 if verify_output: 1b
80 meta = get_video_meta(output_file) 1b
81 assert meta.length > 0 and meta.size > 0, f'Video file empty: {output_file}' 1b
83 if remove_uncompressed: 1b
84 filepath.unlink()
86 return [output_file] 1b
89# level 1
90class WidefieldPreprocess(base_tasks.WidefieldTask):
92 priority = 80
93 job_size = 'large'
95 @property
96 def signature(self):
97 signature = { 1c
98 'input_files': [('imaging.frames.*', self.device_collection, True),
99 ('widefieldEvents.raw.*', self.device_collection, True)],
100 'output_files': [('widefieldChannels.frameAverage.npy', 'alf/widefield', True),
101 ('widefieldU.images.npy', 'alf/widefield', True),
102 ('widefieldSVT.uncorrected.npy', 'alf/widefield', True),
103 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)]
104 }
105 return signature 1c
107 def _run(self, upload_plots=True, **kwargs):
108 self.wf = WidefieldExtractor(self.session_path) 1c
109 _, out_files = self.wf.extract(save=True, extract_timestamps=False) 1c
111 if upload_plots: 1c
112 output_plots = []
113 if self.wf.data_path.joinpath('hemodynamic_correction.png').exists():
114 output_plots.append(self.wf.data_path.joinpath('hemodynamic_correction.png'))
115 if self.wf.data_path.joinpath('motion_correction.png').exists():
116 output_plots.append(self.wf.data_path.joinpath('motion_correction.png'))
118 if len(output_plots) > 0:
119 eid = self.one.path2eid(self.session_path)
120 snp = ReportSnapshot(self.session_path, eid, one=self.one)
121 snp.outputs = output_plots
122 snp.register_images(widths=['orig'], function='wfield')
124 return out_files 1c
126 def tearDown(self):
127 super(WidefieldPreprocess, self).tearDown() 1c
128 self.wf.remove_files() 1c
131class WidefieldSync(base_tasks.WidefieldTask):
133 priority = 40
134 job_size = 'small'
136 @property
137 def signature(self):
138 signature = { 1dfg
139 'input_files': [('imaging.frames.mov', self.device_collection, True),
140 ('widefieldEvents.raw.camlog', self.device_collection, True),
141 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
142 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
143 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True)],
144 'output_files': [('imaging.times.npy', 'alf/widefield', True),
145 ('imaging.imagingLightSource.npy', 'alf/widefield', True),
146 ('imagingLightSource.properties.htsv', 'alf/widefield', True)]
147 }
148 return signature 1dfg
150 def _run(self):
152 self.wf = WidefieldExtractor(self.session_path) 1dfg
153 save_paths = [self.session_path.joinpath(sig[1], sig[0]) for sig in self.signature['output_files']] 1dfg
154 out_files = self.wf.sync_timestamps(bin_exists=False, save=True, save_paths=save_paths, 1dfg
155 sync_collection=self.sync_collection)
157 # TODO QC
159 return out_files 1d
162class WidefieldFOV(base_tasks.WidefieldTask):
164 priority = 40
165 job_size = 'small'
167 @property
168 def signature(self):
169 signature = {
170 'input_files': [('widefieldLandmarks.dorsalCortex.json', 'alf/widefield', True),
171 ('widefieldU.images.npy', 'alf/widefield', True),
172 ('widefieldSVT.haemoCorrected.npy', 'alf/widefield', True)],
173 'output_files': [('widefieldU.images_atlasTransformed.npy', 'alf/widefield', True),
174 ('widefieldU.brainLocationIds_ccf_2017.npy', 'alf/widefield', True)]
175 }
177 return signature
179 def _run(self):
181 outfiles = []
183 # from wfield import load_allen_landmarks, SVDStack, atlas_from_landmarks_file
184 # from iblatlas.regions import BrainRegions
185 # from iblutil.numerical import ismember
186 # import numpy as np
187 # U = np.load(self.session_path.joinpath('alf/widefield', 'widefieldU.images.npy'))
188 # SVT = np.load(self.session_path.joinpath('alf/widefield', 'widefieldSVT.haemoCorrected.npy'))
189 # lmark_file = self.session_path.joinpath('alf/widefield', 'widefieldLandmarks.dorsalCortex.json')
190 # landmarks = load_allen_landmarks(lmark_file)
191 #
192 # br = BrainRegions()
193 #
194 # stack = SVDStack(U, SVT)
195 # stack.set_warped(1, M=landmarks['transform'])
196 #
197 # atlas, area_names, mask = atlas_from_landmarks_file(lmark_file)
198 # atlas = atlas.astype(np.int32)
199 # wf_ids = np.array([n[0] for n in area_names])
200 # allen_ids = np.array([br.acronym2id(n[1].split('_')[0], mapping='Allen-lr', hemisphere=n[1].split('_')[1])[0]
201 # for n in area_names])
202 #
203 # atlas_allen = np.zeros_like(atlas)
204 # a, b = ismember(atlas, wf_ids)
205 # atlas_allen[a] = allen_ids[b]
206 #
207 # file_U = self.session_path.joinpath('alf/widefield', 'widefieldU.images_atlasTransformed.npy')
208 # np.save(file_U, stack.U_warped)
209 # outfiles.append(file_U)
210 #
211 # # Do we save the mask??
212 # file_atlas = self.session_path.joinpath('alf/widefield', 'widefieldU.brainLocationIds_ccf_2017.npy')
213 # np.save(file_atlas, atlas_allen)
214 # outfiles.append(file_atlas)
216 return outfiles