Coverage for ibllib/io/extractors/base.py: 83%
185 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1"""Base Extractor classes.
2A module for the base Extractor classes. The Extractor, given a session path, will extract the
3processed data from raw hardware files and optionally save them.
4"""
6import abc
7from collections import OrderedDict
8import json
9from pathlib import Path
11import numpy as np
12import pandas as pd
13from one.alf.files import get_session_path
14from ibllib.io import raw_data_loaders as raw
15from ibllib.io.raw_data_loaders import load_settings, _logger
18class BaseExtractor(abc.ABC):
19 """
20 Base extractor class
21 Writing an extractor checklist:
22 - on the child class, overload the _extract method
23 - this method should output one or several numpy.arrays or dataframe with a consistent shape
24 - save_names is a list or a string of filenames, there should be one per dataset
25 - set save_names to None for a dataset that doesn't need saving (could be set dynamically
26 in the _extract method)
27 :param session_path: Absolute path of session folder
28 :type session_path: str/Path
29 """
31 session_path = None
32 save_names = None
33 var_names = None
34 default_path = Path('alf') # relative to session
36 def __init__(self, session_path=None):
37 # If session_path is None Path(session_path) will fail
38 self.session_path = Path(session_path) 2d @ w 6 7 8 9 ! # $ % ' ( ) * + , - . / : f ; h C | } [ i c L t j k b ~ O P abbbu J D H I ^ cbdbebx v E y _ F G ` = z ? M { K e o p q l m n A B ] a g r s
40 def extract(self, save=False, path_out=None, **kwargs):
41 """
42 :return: dict of numpy.array, list of filenames
43 """
44 out = self._extract(**kwargs) 1d@Nw6789!#$%'()*+,-./:f;hC[icLtjkbOPuJDHI^xvEy_FG`=z?M{KeopqlmnAB]agrs
45 files = self._save(out, path_out=path_out) if save else None 1d@Nw6789!#$%'()*+,-./:f;hC[icLtjkbOPuJDHI^xvEy_FG`=z?M{KeopqlmnAB]agrs
46 return out, files 1d@Nw6789!#$%'()*+,-./:f;hC[icLtjkbOPuJDHI^xvEy_FG`=z?M{KeopqlmnAB]agrs
48 def _save(self, data, path_out=None):
49 # Check if self.save_names is of the same length of out
50 if not path_out: 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
51 path_out = self.session_path.joinpath(self.default_path) 1NwfhicjkbOPDHIxvKeopqlmnagrs
53 def _write_to_disk(file_path, data): 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
54 """Implements different save calls depending on file extension.
56 Parameters
57 ----------
58 file_path : pathlib.Path
59 The location to save the data.
60 data : pandas.DataFrame, numpy.ndarray
61 The data to save
63 """
64 csv_separators = { 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
65 ".csv": ",",
66 ".ssv": " ",
67 ".tsv": "\t"
68 }
69 # Ensure empty files are not created; we expect all datasets to have a non-zero size
70 if getattr(data, 'size', len(data)) == 0: 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
71 filename = file_path.relative_to(self.session_path).as_posix() 1w
72 raise ValueError(f'Data for {filename} appears to be empty') 1w
73 file_path = Path(file_path) 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
74 file_path.parent.mkdir(exist_ok=True, parents=True) 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
75 if file_path.suffix == ".npy": 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
76 np.save(file_path, data) 1NwfhicLtjkbOPuDHIxvMKeopqlmnagrs
77 elif file_path.suffix in [".parquet", ".pqt"]: 1NwfhiLtjkbuJMKeopqlmnagrs
78 if not isinstance(data, pd.DataFrame): 1wfhitjkbuJeopqlmnagrs
79 _logger.error("Data is not a panda's DataFrame object")
80 raise TypeError("Data is not a panda's DataFrame object")
81 data.to_parquet(file_path) 1wfhitjkbuJeopqlmnagrs
82 elif file_path.suffix in csv_separators: 1NLMK
83 sep = csv_separators[file_path.suffix] 1NLMK
84 data.to_csv(file_path, sep=sep) 1NLMK
85 # np.savetxt(file_path, data, delimiter=sep)
86 else:
87 _logger.error(f"Don't know how to save {file_path.suffix} files yet")
89 if self.save_names is None: 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
90 file_paths = []
91 elif isinstance(self.save_names, str): 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
92 file_paths = path_out.joinpath(self.save_names) 1wfhbJDHIxveag
93 _write_to_disk(file_paths, data) 1wfhbJDHIxveag
94 elif isinstance(data, dict): 1NwfhicLtjkbOPuMKeopqlmnagrs
95 file_paths = []
96 for var, value in data.items():
97 if fn := self.save_names[self.var_names.index(var)]:
98 fpath = path_out.joinpath(fn)
99 _write_to_disk(fpath, value)
100 file_paths.append(fpath)
101 else: # Should be list or tuple...
102 assert len(data) == len(self.save_names) 1NwfhicLtjkbOPuMKeopqlmnagrs
103 file_paths = [] 1NwfhicLtjkbOPuMKeopqlmnagrs
104 for data, fn in zip(data, self.save_names): 1NwfhicLtjkbOPuMKeopqlmnagrs
105 if fn: 1NwfhicLtjkbOPuMKeopqlmnagrs
106 fpath = path_out.joinpath(fn) 1NwfhicLtjkbOPuMKeopqlmnagrs
107 _write_to_disk(fpath, data) 1NwfhicLtjkbOPuMKeopqlmnagrs
108 file_paths.append(fpath) 1NwfhicLtjkbOPuMKeopqlmnagrs
109 return file_paths 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs
111 @abc.abstractmethod
112 def _extract(self):
113 pass
116class BaseBpodTrialsExtractor(BaseExtractor):
117 """
118 Base (abstract) extractor class for bpod jsonable data set
119 Wrps the _extract private method
121 :param session_path: Absolute path of session folder
122 :type session_path: str
123 :param bpod_trials
124 :param settings
125 """
127 bpod_trials = None
128 settings = None
129 task_collection = None
131 def extract(self, bpod_trials=None, settings=None, **kwargs):
132 """
133 :param: bpod_trials (optional) bpod trials from jsonable in a dictionary
134 :param: settings (optional) bpod iblrig settings json file in a dictionary
135 :param: save (bool) write output ALF files, defaults to False
136 :param: path_out (pathlib.Path) output path (defaults to `{session_path}/alf`)
137 :return: numpy.ndarray or list of ndarrays, list of filenames
138 :rtype: dtype('float64')
139 """
140 self.bpod_trials = bpod_trials 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
141 self.settings = settings 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
142 self.task_collection = kwargs.pop('task_collection', 'raw_behavior_data') 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
143 if self.bpod_trials is None: 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
144 self.bpod_trials = raw.load_data(self.session_path, task_collection=self.task_collection) 16789!#$%'()*+,-./:;icjkbuJDvy=z?la
145 if not self.settings: 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
146 self.settings = raw.load_settings(self.session_path, task_collection=self.task_collection) 16789!#$%'()*+,-./:;[icjkbuJDvy=z?l]a
147 if self.settings is None: 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
148 self.settings = {"IBLRIG_VERSION_TAG": "100.0.0"}
149 elif self.settings.get("IBLRIG_VERSION_TAG", "") == "": 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
150 self.settings["IBLRIG_VERSION_TAG"] = "100.0.0" 1@uJ
151 return super(BaseBpodTrialsExtractor, self).extract(**kwargs) 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs
154def run_extractor_classes(classes, session_path=None, **kwargs):
155 """
156 Run a set of extractors with the same inputs
157 :param classes: list of Extractor class
158 :param save: True/False
159 :param path_out: (defaults to alf path)
160 :param kwargs: extractor arguments (session_path...)
161 :return: dictionary of arrays, list of files
162 """
163 files = [] 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
164 outputs = OrderedDict({}) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
165 assert session_path 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
166 # if a single class is passed, convert as a list
167 try: 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
168 iter(classes) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
169 except TypeError: 1cbDvyza
170 classes = [classes] 1cbDvyza
171 for classe in classes: 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
172 cls = classe(session_path=session_path) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
173 out, fil = cls.extract(**kwargs) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
174 if isinstance(fil, list): 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
175 files.extend(fil) 1wfhmnag
176 elif fil is not None: 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
177 files.append(fil) 1wfbDHIxveag
178 if isinstance(out, dict): 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
179 outputs.update(out) 1w
180 elif isinstance(cls.var_names, str): 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
181 outputs[cls.var_names] = out 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
182 else:
183 for i, k in enumerate(cls.var_names): 1dwfhCictjkbueopqlmnABagrs
184 outputs[k] = out[i] 1dwfhCictjkbueopqlmnABagrs
185 return outputs, files 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs
188def _get_task_types_json_config():
189 with open(Path(__file__).parent.joinpath('extractor_types.json')) as fp: 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
190 task_types = json.load(fp) 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
191 try: 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
192 # look if there are custom extractor types in the personal projects repo
193 import projects.base 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
194 custom_extractors = Path(projects.base.__file__).parent.joinpath('extractor_types.json') 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
195 with open(custom_extractors) as fp: 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
196 custom_task_types = json.load(fp) 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
197 task_types.update(custom_task_types) 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
198 except (ModuleNotFoundError, FileNotFoundError):
199 pass
200 return task_types 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs
203def get_task_protocol(session_path, task_collection='raw_behavior_data'):
204 try:
205 settings = load_settings(get_session_path(session_path), task_collection=task_collection)
206 except json.decoder.JSONDecodeError:
207 _logger.error(f'Can\'t read settings for {session_path}')
208 return
209 if settings:
210 return settings.get('PYBPOD_PROTOCOL', None)
211 else:
212 return
215def get_task_extractor_type(task_name):
216 """
217 Returns the task type string from the full pybpod task name:
218 _iblrig_tasks_biasedChoiceWorld3.7.0 returns "biased"
219 _iblrig_tasks_trainingChoiceWorld3.6.0 returns "training'
220 :param task_name:
221 :return: one of ['biased', 'habituation', 'training', 'ephys', 'mock_ephys', 'sync_ephys']
222 """
223 if isinstance(task_name, Path): 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs
224 task_name = get_task_protocol(task_name)
225 if task_name is None:
226 return
227 task_types = _get_task_types_json_config() 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs
229 task_type = task_types.get(task_name, None) 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs
230 if task_type is None: # Try lazy matching of name 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs
231 task_type = next((task_types[tt] for tt in task_types if tt in task_name), None) 1dfSTU423QVWXhctbxvEYZyFGzeopqlmnRABa01grs
232 if task_type is None: 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs
233 _logger.warning(f'No extractor type found for {task_name}') 123Q
234 return task_type 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs
237def get_session_extractor_type(session_path, task_collection='raw_behavior_data'):
238 """
239 From a session path, loads the settings file, finds the task and checks if extractors exist
240 task names examples:
241 :param session_path:
242 :return: bool
243 """
244 settings = load_settings(session_path, task_collection=task_collection) 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs
245 if settings is None: 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs
246 _logger.error(f'ABORT: No data found in "{task_collection}" folder {session_path}') 1eR
247 return False 1eR
248 extractor_type = get_task_extractor_type(settings['PYBPOD_PROTOCOL']) 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs
249 if extractor_type: 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs
250 return extractor_type 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs
251 else:
252 return False
255def get_pipeline(session_path, task_collection='raw_behavior_data'):
256 """
257 Get the pre-processing pipeline name from a session path
258 :param session_path:
259 :return:
260 """
261 stype = get_session_extractor_type(session_path, task_collection=task_collection) 1ea
262 return _get_pipeline_from_task_type(stype) 1ea
265def _get_pipeline_from_task_type(stype):
266 """
267 Returns the pipeline from the task type. Some tasks types directly define the pipeline
268 :param stype: session_type or task extractor type
269 :return:
270 """
271 if stype in ['ephys_biased_opto', 'ephys', 'ephys_training', 'mock_ephys', 'sync_ephys']: 1Q5ea
272 return 'ephys' 1Q5e
273 elif stype in ['habituation', 'training', 'biased', 'biased_opto']: 1Q5a
274 return 'training' 1Q5a
275 elif 'widefield' in stype: 1Q5
276 return 'widefield'
277 else:
278 return stype 1Q5
281def _get_task_extractor_map():
282 """
283 Load the task protocol extractor map.
285 Returns
286 -------
287 dict(str, str)
288 A map of task protocol to Bpod trials extractor class.
289 """
290 FILENAME = 'task_extractor_map.json' 1icjkbua
291 with open(Path(__file__).parent.joinpath(FILENAME)) as fp: 1icjkbua
292 task_extractors = json.load(fp) 1icjkbua
293 try: 1icjkbua
294 # look if there are custom extractor types in the personal projects repo
295 import projects.base 1icjkbua
296 custom_extractors = Path(projects.base.__file__).parent.joinpath(FILENAME) 1icjkbua
297 with open(custom_extractors) as fp: 1icjkbua
298 custom_task_types = json.load(fp)
299 task_extractors.update(custom_task_types)
300 except (ModuleNotFoundError, FileNotFoundError): 1icjkbua
301 pass 1icjkbua
302 return task_extractors 1icjkbua
305def get_bpod_extractor_class(session_path, task_collection='raw_behavior_data'):
306 """
307 Get the Bpod trials extractor class associated with a given Bpod session.
309 Parameters
310 ----------
311 session_path : str, pathlib.Path
312 The session path containing Bpod behaviour data.
313 task_collection : str
314 The session_path subfolder containing the Bpod settings file.
316 Returns
317 -------
318 str
319 The extractor class name.
320 """
321 # Attempt to load settings files
322 settings = load_settings(session_path, task_collection=task_collection) 1icjkba
323 if settings is None: 1icjkba
324 raise ValueError(f'No data found in "{task_collection}" folder {session_path}')
325 # Attempt to get task protocol
326 protocol = settings.get('PYBPOD_PROTOCOL') 1icjkba
327 if not protocol: 1icjkba
328 raise ValueError(f'No task protocol found in {session_path/task_collection}')
329 return protocol2extractor(protocol) 1icjkba
332def protocol2extractor(protocol):
333 """
334 Get the Bpod trials extractor class associated with a given Bpod task protocol.
336 The Bpod task protocol can be found in the 'PYBPOD_PROTOCOL' field of _iblrig_taskSettings.raw.json.
338 Parameters
339 ----------
340 protocol : str
341 A Bpod task protocol name.
343 Returns
344 -------
345 str
346 The extractor class name.
347 """
348 # Attempt to get extractor class from protocol
349 extractor_map = _get_task_extractor_map() 1icjkbua
350 extractor = extractor_map.get(protocol, None) 1icjkbua
351 if extractor is None: # Try lazy matching of name 1icjkbua
352 extractor = next((extractor_map[tt] for tt in extractor_map if tt in protocol), None) 1icjkba
353 if extractor is None: 1icjkbua
354 raise ValueError(f'No extractor associated with "{protocol}"')
355 return extractor 1icjkbua