Coverage for ibllib/pipes/neurophotometrics.py: 96%
89 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
1"""Extraction tasks for fibrephotometry"""
3import logging
4import numpy as np
5import pandas as pd
7import ibldsp.utils
8import ibllib.io.session_params
9from ibllib.pipes import base_tasks
10from iblutil.io import jsonable
12_logger = logging.getLogger('ibllib')
15"""
16Neurophotometrics FP3002 specific information.
17The light source map refers to the available LEDs on the system.
18The flags refers to the byte encoding of led states in the system.
19"""
20LIGHT_SOURCE_MAP = {
21 'color': ['None', 'Violet', 'Blue', 'Green'],
22 'wavelength': [0, 415, 470, 560],
23 'name': ['None', 'Isosbestic', 'GCaMP', 'RCaMP'],
24}
26LED_STATES = {
27 'Condition': {
28 0: 'No additional signal',
29 1: 'Output 1 signal HIGH',
30 2: 'Output 0 signal HIGH',
31 3: 'Stimulation ON',
32 4: 'GPIO Line 2 HIGH',
33 5: 'GPIO Line 3 HIGH',
34 6: 'Input 1 HIGH',
35 7: 'Input 0 HIGH',
36 8: 'Output 0 signal HIGH + Stimulation',
37 9: 'Output 0 signal HIGH + Input 0 signal HIGH',
38 10: 'Input 0 signal HIGH + Stimulation',
39 11: 'Output 0 HIGH + Input 0 HIGH + Stimulation',
40 },
41 'No LED ON': {0: 0, 1: 8, 2: 16, 3: 32, 4: 64, 5: 128, 6: 256, 7: 512, 8: 48, 9: 528, 10: 544, 11: 560},
42 'L415': {0: 1, 1: 9, 2: 17, 3: 33, 4: 65, 5: 129, 6: 257, 7: 513, 8: 49, 9: 529, 10: 545, 11: 561},
43 'L470': {0: 2, 1: 10, 2: 18, 3: 34, 4: 66, 5: 130, 6: 258, 7: 514, 8: 50, 9: 530, 10: 546, 11: 562},
44 'L560': {0: 4, 1: 12, 2: 20, 3: 36, 4: 68, 5: 132, 6: 260, 7: 516, 8: 52, 9: 532, 10: 548, 11: 564}
45}
48def _channel_meta(light_source_map=None):
49 """
50 Return table of light source wavelengths and corresponding colour labels.
52 Parameters
53 ----------
54 light_source_map : dict
55 An optional map of light source wavelengths (nm) used and their corresponding colour name.
57 Returns
58 -------
59 pandas.DataFrame
60 A sorted table of wavelength and colour name.
61 """
62 light_source_map = light_source_map or LIGHT_SOURCE_MAP 1a
63 meta = pd.DataFrame.from_dict(light_source_map) 1a
64 meta.index.rename('channel_id', inplace=True) 1a
65 return meta 1a
68class FibrePhotometrySync(base_tasks.DynamicTask):
69 priority = 90
70 job_size = 'small'
72 def __init__(self, *args, **kwargs):
73 super().__init__(*args, **kwargs) 1ca
74 self.device_collection = self.get_device_collection( 1ca
75 'neurophotometrics', device_collection='raw_photometry_data')
76 # we will work with the first protocol here
77 for task in self.session_params['tasks']: 1ca
78 self.task_protocol = next(k for k in task) 1ca
79 self.task_collection = ibllib.io.session_params.get_task_collection(self.session_params, self.task_protocol) 1ca
80 break 1ca
82 @property
83 def signature(self):
84 signature = { 1a
85 'input_files': [('_neurophotometrics_fpData.raw.pqt', self.device_collection, True, True),
86 ('_iblrig_taskData.raw.jsonable', self.task_collection, True, True),
87 ('_neurophotometrics_fpData.channels.csv', self.device_collection, True, True),
88 ('_neurophotometrics_fpData.digitalIntputs.pqt', self.device_collection, True)],
89 'output_files': [('photometry.signal.pqt', 'alf/photometry', True),
90 ('photometryROI.locations.pqt', 'alf/photometry', True)]
91 }
92 return signature 1a
94 def _sync_bpod_neurophotometrics(self):
95 """
96 Perform the linear clock correction between bpod and neurophotometrics timestamps.
97 :return: interpolation function that outputs bpod timestamsp from neurophotometrics timestamps
98 """
99 folder_raw_photometry = self.session_path.joinpath(self.device_collection) 1a
100 df_digital_inputs = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.digitalIntputs.pqt')) 1a
101 # normally we should disregard the states and use the sync label. But bpod doesn't log TTL outs,
102 # only the states. This will change in the future but for now we are stuck with this.
103 if 'habituation' in self.task_protocol: 1a
104 sync_states_names = ['iti', 'reward']
105 else:
106 sync_states_names = ['trial_start', 'reward', 'exit_state'] 1a
107 # read in the raw behaviour data for syncing
108 file_jsonable = self.session_path.joinpath(self.task_collection, '_iblrig_taskData.raw.jsonable') 1a
109 trials_table, bpod_data = jsonable.load_task_jsonable(file_jsonable) 1a
110 # we get the timestamps of the states from the bpod data
111 tbpod = [] 1a
112 for sname in sync_states_names: 1a
113 tbpod.append(np.array( 1a
114 [bd['States timestamps'][sname][0][0] + bd['Trial start timestamp'] for bd in bpod_data if
115 sname in bd['States timestamps']]))
116 tbpod = np.sort(np.concatenate(tbpod)) 1a
117 tbpod = tbpod[~np.isnan(tbpod)] 1a
118 # we get the timestamps for the photometry data
119 tph = df_digital_inputs['SystemTimestamp'].values[df_digital_inputs['Channel'] == self.kwargs['sync_channel']] 1a
120 tph = tph[15:] # TODO: we may want to detect the spacers before removing it, especially for successive sessions 1a
121 # sync the behaviour events to the photometry timestamps
122 fcn_nph_to_bpod_times, drift_ppm, iph, ibpod = ibldsp.utils.sync_timestamps( 1a
123 tph, tbpod, return_indices=True, linear=True)
124 # then we check the alignment, should be less than the screen refresh rate
125 tcheck = fcn_nph_to_bpod_times(tph[iph]) - tbpod[ibpod] 1a
126 _logger.info( 1a
127 f'sync: n trials {len(bpod_data)}, n bpod sync {len(tbpod)}, n photometry {len(tph)}, n match {len(iph)}')
128 assert np.all(np.abs(tcheck) < 1 / 60), 'Sync issue detected, residual above 1/60s' 1a
129 assert len(iph) / len(tbpod) > 0.95, 'Sync issue detected, less than 95% of the bpod events matched' 1a
130 valid_bounds = [bpod_data[0]['Trial start timestamp'] - 2, bpod_data[-1]['Trial end timestamp'] + 2] 1a
131 return fcn_nph_to_bpod_times, valid_bounds 1a
133 def _run(self, **kwargs):
134 """
135 Extract photometry data from the raw neurophotometrics data in parquet
136 The extraction has 3 main steps:
137 1. Synchronise the bpod and neurophotometrics timestamps.
138 2. Extract the photometry data from the raw neurophotometrics data.
139 3. Label the fibers correspondance with brain regions in a small table
140 :param kwargs:
141 :return:
142 """
143 # 1) sync: we check the synchronisation, right now we only have bpod but soon the daq will be used
144 match list(self.session_params['sync'].keys())[0]: 1a
145 case 'bpod': 1a
146 fcn_nph_to_bpod_times, valid_bounds = self._sync_bpod_neurophotometrics() 1a
147 case _:
148 raise NotImplementedError('Syncing with daq is not supported yet.')
149 # 2) reformat the raw data with wavelengths and meta-data
150 folder_raw_photometry = self.session_path.joinpath(self.device_collection) 1a
151 fp_data = pd.read_parquet(folder_raw_photometry.joinpath('_neurophotometrics_fpData.raw.pqt')) 1a
152 # Load channels and wavelength information
153 channel_meta_map = _channel_meta() 1a
154 if (fn := folder_raw_photometry.joinpath('_neurophotometrics_fpData.channels.csv')).exists(): 1a
155 led_states = pd.read_csv(fn) 1a
156 else:
157 led_states = pd.DataFrame(LED_STATES)
158 led_states = led_states.set_index('Condition') 1a
159 # Extract signal columns into 2D array
160 rois = list(self.kwargs['fibers'].keys()) 1a
161 out_df = fp_data.filter(items=rois, axis=1).sort_index(axis=1) 1a
162 out_df['times'] = fcn_nph_to_bpod_times(fp_data['SystemTimestamp']) 1a
163 out_df['valid'] = np.logical_and(out_df['times'] >= valid_bounds[0], out_df['times'] <= valid_bounds[1]) 1a
164 out_df['wavelength'] = np.nan 1a
165 out_df['name'] = '' 1a
166 out_df['color'] = '' 1a
167 # Extract channel index
168 states = fp_data.get('LedState', fp_data.get('Flags', None)) 1a
169 for state in states.unique(): 1a
170 ir, ic = np.where(led_states == state) 1a
171 if ic.size == 0: 1a
172 continue 1a
173 for cn in ['name', 'color', 'wavelength']: 1a
174 out_df.loc[states == state, cn] = channel_meta_map.iloc[ic[0]][cn] 1a
175 # 3) label the brain regions
176 rois = [] 1a
177 c = 0 1a
178 for k, v in self.kwargs['fibers'].items(): 1a
179 rois.append({'ROI': k, 'fiber': f'fiber{c:02d}', 'brain_region': v['location']}) 1a
180 df_rois = pd.DataFrame(rois).set_index('ROI') 1a
181 # to finish we write the dataframes to disk
182 out_path = self.session_path.joinpath('alf', 'photometry') 1a
183 out_path.mkdir(parents=True, exist_ok=True) 1a
184 out_df.to_parquet(file_signal := out_path.joinpath('photometry.signal.pqt')) 1a
185 df_rois.to_parquet(file_locations := out_path.joinpath('photometryROI.locations.pqt')) 1a
186 return file_signal, file_locations 1a