Coverage for ibllib/io/extractors/base.py: 83%

185 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1"""Base Extractor classes. 

2A module for the base Extractor classes. The Extractor, given a session path, will extract the 

3processed data from raw hardware files and optionally save them. 

4""" 

5 

6import abc 

7from collections import OrderedDict 

8import json 

9from pathlib import Path 

10 

11import numpy as np 

12import pandas as pd 

13from one.alf.files import get_session_path 

14from ibllib.io import raw_data_loaders as raw 

15from ibllib.io.raw_data_loaders import load_settings, _logger 

16 

17 

18class BaseExtractor(abc.ABC): 

19 """ 

20 Base extractor class 

21 Writing an extractor checklist: 

22 - on the child class, overload the _extract method 

23 - this method should output one or several numpy.arrays or dataframe with a consistent shape 

24 - save_names is a list or a string of filenames, there should be one per dataset 

25 - set save_names to None for a dataset that doesn't need saving (could be set dynamically 

26 in the _extract method) 

27 :param session_path: Absolute path of session folder 

28 :type session_path: str/Path 

29 """ 

30 

31 session_path = None 

32 save_names = None 

33 var_names = None 

34 default_path = Path('alf') # relative to session 

35 

36 def __init__(self, session_path=None): 

37 # If session_path is None Path(session_path) will fail 

38 self.session_path = Path(session_path) 2d @ w 6 7 8 9 ! # $ % ' ( ) * + , - . / : f ; h C | } [ i c L t j k b ~ O P abbbu J D H I ^ cbdbebx v E y _ F G ` = z ? M { K e o p q l m n A B ] a g r s

39 

40 def extract(self, save=False, path_out=None, **kwargs): 

41 """ 

42 :return: dict of numpy.array, list of filenames 

43 """ 

44 out = self._extract(**kwargs) 1d@Nw6789!#$%'()*+,-./:f;hC[icLtjkbOPuJDHI^xvEy_FG`=z?M{KeopqlmnAB]agrs

45 files = self._save(out, path_out=path_out) if save else None 1d@Nw6789!#$%'()*+,-./:f;hC[icLtjkbOPuJDHI^xvEy_FG`=z?M{KeopqlmnAB]agrs

46 return out, files 1d@Nw6789!#$%'()*+,-./:f;hC[icLtjkbOPuJDHI^xvEy_FG`=z?M{KeopqlmnAB]agrs

47 

48 def _save(self, data, path_out=None): 

49 # Check if self.save_names is of the same length of out 

50 if not path_out: 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

51 path_out = self.session_path.joinpath(self.default_path) 1NwfhicjkbOPDHIxvKeopqlmnagrs

52 

53 def _write_to_disk(file_path, data): 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

54 """Implements different save calls depending on file extension. 

55 

56 Parameters 

57 ---------- 

58 file_path : pathlib.Path 

59 The location to save the data. 

60 data : pandas.DataFrame, numpy.ndarray 

61 The data to save 

62 

63 """ 

64 csv_separators = { 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

65 ".csv": ",", 

66 ".ssv": " ", 

67 ".tsv": "\t" 

68 } 

69 # Ensure empty files are not created; we expect all datasets to have a non-zero size 

70 if getattr(data, 'size', len(data)) == 0: 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

71 filename = file_path.relative_to(self.session_path).as_posix() 1w

72 raise ValueError(f'Data for {filename} appears to be empty') 1w

73 file_path = Path(file_path) 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

74 file_path.parent.mkdir(exist_ok=True, parents=True) 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

75 if file_path.suffix == ".npy": 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

76 np.save(file_path, data) 1NwfhicLtjkbOPuDHIxvMKeopqlmnagrs

77 elif file_path.suffix in [".parquet", ".pqt"]: 1NwfhiLtjkbuJMKeopqlmnagrs

78 if not isinstance(data, pd.DataFrame): 1wfhitjkbuJeopqlmnagrs

79 _logger.error("Data is not a panda's DataFrame object") 

80 raise TypeError("Data is not a panda's DataFrame object") 

81 data.to_parquet(file_path) 1wfhitjkbuJeopqlmnagrs

82 elif file_path.suffix in csv_separators: 1NLMK

83 sep = csv_separators[file_path.suffix] 1NLMK

84 data.to_csv(file_path, sep=sep) 1NLMK

85 # np.savetxt(file_path, data, delimiter=sep) 

86 else: 

87 _logger.error(f"Don't know how to save {file_path.suffix} files yet") 

88 

89 if self.save_names is None: 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

90 file_paths = [] 

91 elif isinstance(self.save_names, str): 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

92 file_paths = path_out.joinpath(self.save_names) 1wfhbJDHIxveag

93 _write_to_disk(file_paths, data) 1wfhbJDHIxveag

94 elif isinstance(data, dict): 1NwfhicLtjkbOPuMKeopqlmnagrs

95 file_paths = [] 

96 for var, value in data.items(): 

97 if fn := self.save_names[self.var_names.index(var)]: 

98 fpath = path_out.joinpath(fn) 

99 _write_to_disk(fpath, value) 

100 file_paths.append(fpath) 

101 else: # Should be list or tuple... 

102 assert len(data) == len(self.save_names) 1NwfhicLtjkbOPuMKeopqlmnagrs

103 file_paths = [] 1NwfhicLtjkbOPuMKeopqlmnagrs

104 for data, fn in zip(data, self.save_names): 1NwfhicLtjkbOPuMKeopqlmnagrs

105 if fn: 1NwfhicLtjkbOPuMKeopqlmnagrs

106 fpath = path_out.joinpath(fn) 1NwfhicLtjkbOPuMKeopqlmnagrs

107 _write_to_disk(fpath, data) 1NwfhicLtjkbOPuMKeopqlmnagrs

108 file_paths.append(fpath) 1NwfhicLtjkbOPuMKeopqlmnagrs

109 return file_paths 1NwfhicLtjkbOPuJDHIxvMKeopqlmnagrs

110 

111 @abc.abstractmethod 

112 def _extract(self): 

113 pass 

114 

115 

116class BaseBpodTrialsExtractor(BaseExtractor): 

117 """ 

118 Base (abstract) extractor class for bpod jsonable data set 

119 Wrps the _extract private method 

120 

121 :param session_path: Absolute path of session folder 

122 :type session_path: str 

123 :param bpod_trials 

124 :param settings 

125 """ 

126 

127 bpod_trials = None 

128 settings = None 

129 task_collection = None 

130 

131 def extract(self, bpod_trials=None, settings=None, **kwargs): 

132 """ 

133 :param: bpod_trials (optional) bpod trials from jsonable in a dictionary 

134 :param: settings (optional) bpod iblrig settings json file in a dictionary 

135 :param: save (bool) write output ALF files, defaults to False 

136 :param: path_out (pathlib.Path) output path (defaults to `{session_path}/alf`) 

137 :return: numpy.ndarray or list of ndarrays, list of filenames 

138 :rtype: dtype('float64') 

139 """ 

140 self.bpod_trials = bpod_trials 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

141 self.settings = settings 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

142 self.task_collection = kwargs.pop('task_collection', 'raw_behavior_data') 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

143 if self.bpod_trials is None: 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

144 self.bpod_trials = raw.load_data(self.session_path, task_collection=self.task_collection) 16789!#$%'()*+,-./:;icjkbuJDvy=z?la

145 if not self.settings: 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

146 self.settings = raw.load_settings(self.session_path, task_collection=self.task_collection) 16789!#$%'()*+,-./:;[icjkbuJDvy=z?l]a

147 if self.settings is None: 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

148 self.settings = {"IBLRIG_VERSION_TAG": "100.0.0"} 

149 elif self.settings.get("IBLRIG_VERSION_TAG", "") == "": 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

150 self.settings["IBLRIG_VERSION_TAG"] = "100.0.0" 1@uJ

151 return super(BaseBpodTrialsExtractor, self).extract(**kwargs) 1d@w6789!#$%'()*+,-./:f;hC[ictjkbuJDvy=z?eopqlmnAB]agrs

152 

153 

154def run_extractor_classes(classes, session_path=None, **kwargs): 

155 """ 

156 Run a set of extractors with the same inputs 

157 :param classes: list of Extractor class 

158 :param save: True/False 

159 :param path_out: (defaults to alf path) 

160 :param kwargs: extractor arguments (session_path...) 

161 :return: dictionary of arrays, list of files 

162 """ 

163 files = [] 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

164 outputs = OrderedDict({}) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

165 assert session_path 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

166 # if a single class is passed, convert as a list 

167 try: 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

168 iter(classes) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

169 except TypeError: 1cbDvyza

170 classes = [classes] 1cbDvyza

171 for classe in classes: 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

172 cls = classe(session_path=session_path) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

173 out, fil = cls.extract(**kwargs) 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

174 if isinstance(fil, list): 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

175 files.extend(fil) 1wfhmnag

176 elif fil is not None: 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

177 files.append(fil) 1wfbDHIxveag

178 if isinstance(out, dict): 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

179 outputs.update(out) 1w

180 elif isinstance(cls.var_names, str): 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

181 outputs[cls.var_names] = out 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

182 else: 

183 for i, k in enumerate(cls.var_names): 1dwfhCictjkbueopqlmnABagrs

184 outputs[k] = out[i] 1dwfhCictjkbueopqlmnABagrs

185 return outputs, files 1dwfhCictjkbuDHIxvEyFGzeopqlmnABagrs

186 

187 

188def _get_task_types_json_config(): 

189 with open(Path(__file__).parent.joinpath('extractor_types.json')) as fp: 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

190 task_types = json.load(fp) 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

191 try: 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

192 # look if there are custom extractor types in the personal projects repo 

193 import projects.base 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

194 custom_extractors = Path(projects.base.__file__).parent.joinpath('extractor_types.json') 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

195 with open(custom_extractors) as fp: 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

196 custom_task_types = json.load(fp) 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

197 task_types.update(custom_task_types) 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

198 except (ModuleNotFoundError, FileNotFoundError): 

199 pass 

200 return task_types 1dfSTU423Q5VWXhCctbxvEYZyFGzeopqlmnRABa01grs

201 

202 

203def get_task_protocol(session_path, task_collection='raw_behavior_data'): 

204 try: 

205 settings = load_settings(get_session_path(session_path), task_collection=task_collection) 

206 except json.decoder.JSONDecodeError: 

207 _logger.error(f'Can\'t read settings for {session_path}') 

208 return 

209 if settings: 

210 return settings.get('PYBPOD_PROTOCOL', None) 

211 else: 

212 return 

213 

214 

215def get_task_extractor_type(task_name): 

216 """ 

217 Returns the task type string from the full pybpod task name: 

218 _iblrig_tasks_biasedChoiceWorld3.7.0 returns "biased" 

219 _iblrig_tasks_trainingChoiceWorld3.6.0 returns "training' 

220 :param task_name: 

221 :return: one of ['biased', 'habituation', 'training', 'ephys', 'mock_ephys', 'sync_ephys'] 

222 """ 

223 if isinstance(task_name, Path): 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs

224 task_name = get_task_protocol(task_name) 

225 if task_name is None: 

226 return 

227 task_types = _get_task_types_json_config() 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs

228 

229 task_type = task_types.get(task_name, None) 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs

230 if task_type is None: # Try lazy matching of name 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs

231 task_type = next((task_types[tt] for tt in task_types if tt in task_name), None) 1dfSTU423QVWXhctbxvEYZyFGzeopqlmnRABa01grs

232 if task_type is None: 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs

233 _logger.warning(f'No extractor type found for {task_name}') 123Q

234 return task_type 1dfSTU423QVWXhCctbxvEYZyFGzeopqlmnRABa01grs

235 

236 

237def get_session_extractor_type(session_path, task_collection='raw_behavior_data'): 

238 """ 

239 From a session path, loads the settings file, finds the task and checks if extractors exist 

240 task names examples: 

241 :param session_path: 

242 :return: bool 

243 """ 

244 settings = load_settings(session_path, task_collection=task_collection) 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs

245 if settings is None: 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs

246 _logger.error(f'ABORT: No data found in "{task_collection}" folder {session_path}') 1eR

247 return False 1eR

248 extractor_type = get_task_extractor_type(settings['PYBPOD_PROTOCOL']) 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs

249 if extractor_type: 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs

250 return extractor_type 1dfSTUVWXhCctbxvEYZyFGzeopqlmnRABa01grs

251 else: 

252 return False 

253 

254 

255def get_pipeline(session_path, task_collection='raw_behavior_data'): 

256 """ 

257 Get the pre-processing pipeline name from a session path 

258 :param session_path: 

259 :return: 

260 """ 

261 stype = get_session_extractor_type(session_path, task_collection=task_collection) 1ea

262 return _get_pipeline_from_task_type(stype) 1ea

263 

264 

265def _get_pipeline_from_task_type(stype): 

266 """ 

267 Returns the pipeline from the task type. Some tasks types directly define the pipeline 

268 :param stype: session_type or task extractor type 

269 :return: 

270 """ 

271 if stype in ['ephys_biased_opto', 'ephys', 'ephys_training', 'mock_ephys', 'sync_ephys']: 1Q5ea

272 return 'ephys' 1Q5e

273 elif stype in ['habituation', 'training', 'biased', 'biased_opto']: 1Q5a

274 return 'training' 1Q5a

275 elif 'widefield' in stype: 1Q5

276 return 'widefield' 

277 else: 

278 return stype 1Q5

279 

280 

281def _get_task_extractor_map(): 

282 """ 

283 Load the task protocol extractor map. 

284 

285 Returns 

286 ------- 

287 dict(str, str) 

288 A map of task protocol to Bpod trials extractor class. 

289 """ 

290 FILENAME = 'task_extractor_map.json' 1icjkbua

291 with open(Path(__file__).parent.joinpath(FILENAME)) as fp: 1icjkbua

292 task_extractors = json.load(fp) 1icjkbua

293 try: 1icjkbua

294 # look if there are custom extractor types in the personal projects repo 

295 import projects.base 1icjkbua

296 custom_extractors = Path(projects.base.__file__).parent.joinpath(FILENAME) 1icjkbua

297 with open(custom_extractors) as fp: 1icjkbua

298 custom_task_types = json.load(fp) 

299 task_extractors.update(custom_task_types) 

300 except (ModuleNotFoundError, FileNotFoundError): 1icjkbua

301 pass 1icjkbua

302 return task_extractors 1icjkbua

303 

304 

305def get_bpod_extractor_class(session_path, task_collection='raw_behavior_data'): 

306 """ 

307 Get the Bpod trials extractor class associated with a given Bpod session. 

308 

309 Parameters 

310 ---------- 

311 session_path : str, pathlib.Path 

312 The session path containing Bpod behaviour data. 

313 task_collection : str 

314 The session_path subfolder containing the Bpod settings file. 

315 

316 Returns 

317 ------- 

318 str 

319 The extractor class name. 

320 """ 

321 # Attempt to load settings files 

322 settings = load_settings(session_path, task_collection=task_collection) 1icjkba

323 if settings is None: 1icjkba

324 raise ValueError(f'No data found in "{task_collection}" folder {session_path}') 

325 # Attempt to get task protocol 

326 protocol = settings.get('PYBPOD_PROTOCOL') 1icjkba

327 if not protocol: 1icjkba

328 raise ValueError(f'No task protocol found in {session_path/task_collection}') 

329 return protocol2extractor(protocol) 1icjkba

330 

331 

332def protocol2extractor(protocol): 

333 """ 

334 Get the Bpod trials extractor class associated with a given Bpod task protocol. 

335 

336 The Bpod task protocol can be found in the 'PYBPOD_PROTOCOL' field of _iblrig_taskSettings.raw.json. 

337 

338 Parameters 

339 ---------- 

340 protocol : str 

341 A Bpod task protocol name. 

342 

343 Returns 

344 ------- 

345 str 

346 The extractor class name. 

347 """ 

348 # Attempt to get extractor class from protocol 

349 extractor_map = _get_task_extractor_map() 1icjkbua

350 extractor = extractor_map.get(protocol, None) 1icjkbua

351 if extractor is None: # Try lazy matching of name 1icjkbua

352 extractor = next((extractor_map[tt] for tt in extractor_map if tt in protocol), None) 1icjkba

353 if extractor is None: 1icjkbua

354 raise ValueError(f'No extractor associated with "{protocol}"') 

355 return extractor 1icjkbua