Coverage for ibllib/oneibl/registration.py: 93%

214 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1from pathlib import Path 

2import json 

3import datetime 

4import logging 

5import itertools 

6 

7from pkg_resources import parse_version 

8from one.alf.files import get_session_path, folder_parts, get_alf_path 

9from one.registration import RegistrationClient, get_dataset_type 

10from one.remote.globus import get_local_endpoint_id, get_lab_from_endpoint_id 

11from one.webclient import AlyxClient 

12from one.converters import ConversionMixin 

13import one.alf.exceptions as alferr 

14from one.util import datasets2records, ensure_list 

15 

16import ibllib 

17import ibllib.io.extractors.base 

18from ibllib.time import isostr2date 

19import ibllib.io.raw_data_loaders as raw 

20from ibllib.io import session_params 

21 

22_logger = logging.getLogger(__name__) 

23EXCLUDED_EXTENSIONS = ['.flag', '.error', '.avi'] 

24REGISTRATION_GLOB_PATTERNS = ['alf/**/*.*', 

25 'raw_behavior_data/**/_iblrig_*.*', 

26 'raw_task_data_*/**/_iblrig_*.*', 

27 'raw_passive_data/**/_iblrig_*.*', 

28 'raw_behavior_data/**/_iblmic_*.*', 

29 'raw_video_data/**/_iblrig_*.*', 

30 'raw_video_data/**/_ibl_*.*', 

31 'raw_ephys_data/**/_iblrig_*.*', 

32 'raw_ephys_data/**/_spikeglx_*.*', 

33 'raw_ephys_data/**/_iblqc_*.*', 

34 'spikesorters/**/_kilosort_*.*' 

35 'spikesorters/**/_kilosort_*.*', 

36 'raw_widefield_data/**/_ibl_*.*', 

37 'raw_photometry_data/**/_neurophotometrics_*.*', 

38 ] 

39 

40 

41def register_dataset(file_list, one=None, exists=False, versions=None, **kwargs): 

42 """ 

43 Registers a set of files belonging to a session only on the server. 

44 

45 Parameters 

46 ---------- 

47 file_list : list, str, pathlib.Path 

48 A filepath (or list thereof) of ALF datasets to register to Alyx. 

49 one : one.api.OneAlyx 

50 An instance of ONE. 

51 exists : bool 

52 Whether files exist in the repository. May be set to False when registering files 

53 before copying to the repository. 

54 versions : str, list of str 

55 Optional version tags, defaults to the current ibllib version. 

56 kwargs 

57 Optional keyword arguments for one.registration.RegistrationClient.register_files. 

58 

59 Returns 

60 ------- 

61 list of dicts, dict 

62 A list of newly created Alyx dataset records or the registration data if dry. 

63 

64 Notes 

65 ----- 

66 - If a repository is passed, server_only will be set to True. 

67 

68 See Also 

69 -------- 

70 one.registration.RegistrationClient.register_files 

71 """ 

72 if not file_list: 1plcba

73 return 1cba

74 elif isinstance(file_list, (str, Path)): 1plcba

75 file_list = [file_list] 1pl

76 

77 assert len(set(get_session_path(f) for f in file_list)) == 1 1plcba

78 assert all(Path(f).exists() for f in file_list) 1plcba

79 

80 client = IBLRegistrationClient(one) 1plcba

81 # If the repository is specified then for the registration client we want server_only=True to 

82 # make sure we don't make any other repositories for the lab 

83 if kwargs.get('repository') and not kwargs.get('server_only', False): 1plcba

84 kwargs['server_only'] = True 

85 

86 return client.register_files(file_list, versions=versions or ibllib.__version__, exists=exists, **kwargs) 1plcba

87 

88 

89def register_session_raw_data(session_path, one=None, overwrite=False, **kwargs): 

90 """ 

91 Registers all files corresponding to raw data files to Alyx. It will select files that 

92 match Alyx registration patterns. 

93 

94 Parameters 

95 ---------- 

96 session_path : str, pathlib.Path 

97 The local session path. 

98 one : one.api.OneAlyx 

99 An instance of ONE. 

100 overwrite : bool 

101 If set to True, will patch the datasets. It will take very long. If set to False (default) 

102 will skip all already registered data. 

103 **kwargs 

104 Optional keyword arguments for one.registration.RegistrationClient.register_files. 

105 

106 Returns 

107 ------- 

108 list of pathlib.Path 

109 A list of raw dataset paths. 

110 list of dicts, dict 

111 A list of newly created Alyx dataset records or the registration data if dry. 

112 """ 

113 # Clear rest cache to make sure we have the latest entries 

114 one.alyx.clear_rest_cache() 1bia

115 client = IBLRegistrationClient(one) 1bia

116 session_path = Path(session_path) 1bia

117 eid = one.path2eid(session_path, query_type='remote') # needs to make sure we're up to date 1bia

118 if not eid: 1bia

119 raise alferr.ALFError(f'Session does not exist on Alyx: {get_alf_path(session_path)}') 1i

120 # find all files that are in a raw data collection 

121 file_list = [f for f in client.find_files(session_path) 1bia

122 if f.relative_to(session_path).as_posix().startswith('raw')] 

123 # unless overwrite is True, filter out the datasets that already exist 

124 if not overwrite: 1bia

125 # query the database for existing datasets on the session and allowed dataset types 

126 dsets = datasets2records(one.alyx.rest('datasets', 'list', session=eid, no_cache=True)) 1bia

127 already_registered = list(map(session_path.joinpath, dsets['rel_path'])) 1bia

128 file_list = list(filter(lambda f: f not in already_registered, file_list)) 1bia

129 

130 kwargs['repository'] = get_local_data_repository(one.alyx) 1bia

131 kwargs['server_only'] = True 1bia

132 

133 response = client.register_files(file_list, versions=ibllib.__version__, exists=False, **kwargs) 1bia

134 return file_list, response 1bia

135 

136 

137class IBLRegistrationClient(RegistrationClient): 

138 """ 

139 Object that keeps the ONE instance and provides method to create sessions and register data. 

140 """ 

141 

142 def register_session(self, ses_path, file_list=True, projects=None, procedures=None): 

143 """ 

144 Register an IBL Bpod session in Alyx. 

145 

146 Parameters 

147 ---------- 

148 ses_path : str, pathlib.Path 

149 The local session path. 

150 file_list : bool, list 

151 An optional list of file paths to register. If True, all valid files within the 

152 session folder are registered. If False, no files are registered. 

153 projects: str, list 

154 The project(s) to which the experiment belongs (optional). 

155 procedures : str, list 

156 An optional list of procedures, e.g. 'Behavior training/tasks'. 

157 

158 Returns 

159 ------- 

160 dict 

161 An Alyx session record. 

162 

163 Notes 

164 ----- 

165 For a list of available projects: 

166 >>> sorted(proj['name'] for proj in one.alyx.rest('projects', 'list')) 

167 For a list of available procedures: 

168 >>> sorted(proc['name'] for proc in one.alyx.rest('procedures', 'list')) 

169 """ 

170 if isinstance(ses_path, str): 1gfecbda

171 ses_path = Path(ses_path) 1e

172 

173 # Read in the experiment description file if it exists and get projects and procedures from here 

174 experiment_description_file = session_params.read_params(ses_path) 1gfecbda

175 if experiment_description_file is None: 1gfecbda

176 collections = ['raw_behavior_data'] 1geba

177 else: 

178 projects = experiment_description_file.get('projects', projects) 1fcd

179 procedures = experiment_description_file.get('procedures', procedures) 1fcd

180 collections = ensure_list(session_params.get_task_collection(experiment_description_file)) 1fcd

181 

182 # read meta data from the rig for the session from the task settings file 

183 task_data = (raw.load_bpod(ses_path, collection) for collection in sorted(collections)) 1gfecbda

184 # Filter collections where settings file was not found 

185 if not (task_data := list(zip(*filter(lambda x: x[0] is not None, task_data)))): 1gfecbda

186 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.') 1g

187 settings, task_data = task_data 1gfecbda

188 if len(settings) != len(collections): 1gfecbda

189 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.') 

190 

191 # Do some validation 

192 _, subject, date, number, *_ = folder_parts(ses_path) 1gfecbda

193 assert len({x['SUBJECT_NAME'] for x in settings}) == 1 and settings[0]['SUBJECT_NAME'] == subject 1gfecbda

194 assert len({x['SESSION_DATE'] for x in settings}) == 1 and settings[0]['SESSION_DATE'] == date 1gfecbda

195 assert len({x['SESSION_NUMBER'] for x in settings}) == 1 and settings[0]['SESSION_NUMBER'] == number 1gfecbda

196 assert len({x['IS_MOCK'] for x in settings}) == 1 1gfecbda

197 assert len({md['PYBPOD_BOARD'] for md in settings}) == 1 1gfecbda

198 assert len({md.get('IBLRIG_VERSION') for md in settings}) == 1 1gfecbda

199 # assert len({md['IBLRIG_VERSION_TAG'] for md in settings}) == 1 

200 

201 # query Alyx endpoints for subject, error if not found 

202 subject = self.assert_exists(subject, 'subjects') 1gfecbda

203 

204 # look for a session from the same subject, same number on the same day 

205 session_id, session = self.one.search(subject=subject['nickname'], 1gfecbda

206 date_range=date, 

207 number=number, 

208 details=True, query_type='remote') 

209 users = [] 1gfecbda

210 for user in filter(None, map(lambda x: x.get('PYBPOD_CREATOR'), settings)): 1gfecbda

211 user = self.assert_exists(user[0], 'users') # user is list of [username, uuid] 1gfecbda

212 users.append(user['username']) 1gfecbda

213 

214 # extract information about session duration and performance 

215 start_time, end_time = _get_session_times(str(ses_path), settings, task_data) 1gfecbda

216 n_trials, n_correct_trials = _get_session_performance(settings, task_data) 1gfecbda

217 

218 # TODO Add task_protocols to Alyx sessions endpoint 

219 task_protocols = [md['PYBPOD_PROTOCOL'] + md['IBLRIG_VERSION_TAG'] for md in settings] 1gfecbda

220 # unless specified label the session projects with subject projects 

221 projects = subject['projects'] if projects is None else projects 1gfecbda

222 # makes sure projects is a list 

223 projects = [projects] if isinstance(projects, str) else projects 1gfecbda

224 

225 # unless specified label the session procedures with task protocol lookup 

226 procedures = procedures or list(set(filter(None, map(self._alyx_procedure_from_task, task_protocols)))) 1gfecbda

227 procedures = [procedures] if isinstance(procedures, str) else procedures 1gfecbda

228 json_fields_names = ['IS_MOCK', 'IBLRIG_VERSION'] 1gfecbda

229 json_field = {k: settings[0].get(k) for k in json_fields_names} 1gfecbda

230 # The poo count field is only updated if the field is defined in at least one of the settings 

231 poo_counts = [md.get('POOP_COUNT') for md in settings if md.get('POOP_COUNT') is not None] 1gfecbda

232 if poo_counts: 1gfecbda

233 json_field['POOP_COUNT'] = int(sum(poo_counts)) 1cbda

234 

235 if not session: # Create session and weighings 1gfecbda

236 ses_ = {'subject': subject['nickname'], 1gfebda

237 'users': users or [subject['responsible_user']], 

238 'location': settings[0]['PYBPOD_BOARD'], 

239 'procedures': procedures, 

240 'lab': subject['lab'], 

241 'projects': projects, 

242 'type': 'Experiment', 

243 'task_protocol': '/'.join(task_protocols), 

244 'number': number, 

245 'start_time': self.ensure_ISO8601(start_time), 

246 'end_time': self.ensure_ISO8601(end_time) if end_time else None, 

247 'n_correct_trials': n_correct_trials, 

248 'n_trials': n_trials, 

249 'json': json_field 

250 } 

251 session = self.one.alyx.rest('sessions', 'create', data=ses_) 1gfebda

252 # Submit weights 

253 for md in filter(lambda md: md.get('SUBJECT_WEIGHT') is not None, settings): 1gfebda

254 user = md.get('PYBPOD_CREATOR') 1gfebda

255 if isinstance(user, list): 1gfebda

256 user = user[0] 1gfebda

257 if user not in users: 1gfebda

258 user = self.one.alyx.user 

259 self.register_weight(subject['nickname'], md['SUBJECT_WEIGHT'], 1gfebda

260 date_time=md['SESSION_DATETIME'], user=user) 

261 else: # if session exists update the JSON field 

262 session = self.one.alyx.rest('sessions', 'read', id=session_id[0], no_cache=True) 1c

263 self.one.alyx.json_field_update('sessions', session['id'], data=json_field) 1c

264 

265 _logger.info(session['url'] + ' ') 1gfecbda

266 # create associated water administration if not found 

267 if not session['wateradmin_session_related'] and any(task_data): 1gfecbda

268 for md, d in filter(all, zip(settings, task_data)): 1cbda

269 _, _end_time = _get_session_times(ses_path, md, d) 1cbda

270 user = md.get('PYBPOD_CREATOR') 1cbda

271 user = user[0] if user[0] in users else self.one.alyx.user 1cbda

272 volume = d[-1].get('water_delivered', sum(x['reward_amount'] for x in d)) / 1000 1cbda

273 if volume > 0: 1cbda

274 self.register_water_administration( 1cbda

275 subject['nickname'], volume, date_time=_end_time or end_time, user=user, 

276 session=session['id'], water_type=md.get('REWARD_TYPE') or 'Water') 

277 # at this point the session has been created. If create only, exit 

278 if not file_list: 1gfecbda

279 return session, None 1gcbda

280 

281 # register all files that match the Alyx patterns and file_list 

282 rename_files_compatibility(ses_path, settings[0]['IBLRIG_VERSION_TAG']) 1fe

283 F = filter(lambda x: self._register_bool(x.name, file_list), self.find_files(ses_path)) 1fe

284 recs = self.register_files(F, created_by=users[0] if users else None, versions=ibllib.__version__) 1fe

285 return session, recs 1fe

286 

287 @staticmethod 

288 def _register_bool(fn, file_list): 

289 if isinstance(file_list, bool): 1fe

290 return file_list 1fe

291 if isinstance(file_list, str): 

292 file_list = [file_list] 

293 return any(str(fil) in fn for fil in file_list) 

294 

295 @staticmethod 

296 def _alyx_procedure_from_task(task_protocol): 

297 task_type = ibllib.io.extractors.base.get_task_extractor_type(task_protocol) 1gemba

298 procedure = _alyx_procedure_from_task_type(task_type) 1gemba

299 return procedure or [] 1gemba

300 

301 def find_files(self, session_path): 

302 """Similar to base class method but further filters by name and extension. 

303 

304 In addition to finding files that match Excludes files 

305 whose extension is in EXCLUDED_EXTENSIONS, or that don't match the patterns in 

306 REGISTRATION_GLOB_PATTERNS. 

307 

308 Parameters 

309 ---------- 

310 session_path : str, pathlib.Path 

311 The session path to search. 

312 

313 Yields 

314 ------- 

315 pathlib.Path 

316 File paths that match the dataset type patterns in Alyx and registration glob patterns. 

317 """ 

318 files = itertools.chain.from_iterable(session_path.glob(x) for x in REGISTRATION_GLOB_PATTERNS) 1febia

319 for file in filter(lambda x: x.suffix not in EXCLUDED_EXTENSIONS, files): 1febia

320 try: 1febia

321 get_dataset_type(file, self.dtypes) 1febia

322 yield file 1febia

323 except ValueError as ex: 1ba

324 _logger.error(ex) 1ba

325 

326 

327def _alyx_procedure_from_task_type(task_type): 

328 lookup = {'biased': 'Behavior training/tasks', 1gemba

329 'biased_opto': 'Behavior training/tasks', 

330 'habituation': 'Behavior training/tasks', 

331 'training': 'Behavior training/tasks', 

332 'ephys': 'Ephys recording with acute probe(s)', 

333 'ephys_biased_opto': 'Ephys recording with acute probe(s)', 

334 'ephys_passive_opto': 'Ephys recording with acute probe(s)', 

335 'ephys_replay': 'Ephys recording with acute probe(s)', 

336 'ephys_training': 'Ephys recording with acute probe(s)', 

337 'mock_ephys': 'Ephys recording with acute probe(s)', 

338 'sync_ephys': 'Ephys recording with acute probe(s)'} 

339 try: 1gemba

340 # look if there are tasks in the personal projects repo with procedures 

341 import projects.base 1gemba

342 custom_tasks = Path(projects.base.__file__).parent.joinpath('task_type_procedures.json') 1gemba

343 with open(custom_tasks) as fp: 1gemba

344 lookup.update(json.load(fp)) 1gemba

345 except (ModuleNotFoundError, FileNotFoundError): 

346 pass 

347 if task_type in lookup: 1gemba

348 return lookup[task_type] 1gemba

349 

350 

351def rename_files_compatibility(ses_path, version_tag): 

352 if not version_tag: 1fe

353 return 

354 if parse_version(version_tag) <= parse_version('3.2.3'): 1fe

355 task_code = ses_path.glob('**/_ibl_trials.iti_duration.npy') 

356 for fn in task_code: 

357 fn.replace(fn.parent.joinpath('_ibl_trials.itiDuration.npy')) 

358 task_code = ses_path.glob('**/_iblrig_taskCodeFiles.raw.zip') 1fe

359 for fn in task_code: 1fe

360 fn.replace(fn.parent.joinpath('_iblrig_codeFiles.raw.zip')) 

361 

362 

363def _get_session_times(fn, md, ses_data): 

364 """ 

365 Get session start and end time from the Bpod data. 

366 

367 Parameters 

368 ---------- 

369 fn : str, pathlib.Path 

370 Session/task identifier. Only used in warning logs. 

371 md : dict, list of dict 

372 A session parameters dictionary or list thereof. 

373 ses_data : dict, list of dict 

374 A session data dictionary or list thereof. 

375 

376 Returns 

377 ------- 

378 datetime.datetime 

379 The datetime of the start of the session. 

380 datetime.datetime 

381 The datetime of the end of the session, or None is ses_data is None. 

382 """ 

383 if isinstance(md, dict): 1gfecbdjan

384 start_time = _start_time = isostr2date(md['SESSION_DATETIME']) 1cbdjan

385 else: 

386 start_time = isostr2date(md[0]['SESSION_DATETIME']) 1gfecbdja

387 _start_time = isostr2date(md[-1]['SESSION_DATETIME']) 1gfecbdja

388 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1gfecbdja

389 assert len(md) == 1 or start_time < _start_time 1gfecbdja

390 ses_data = ses_data[-1] 1gfecbdja

391 if not ses_data: 1gfecbdjan

392 return start_time, None 1gfe

393 c = ses_duration_secs = 0 1cbdjan

394 for sd in reversed(ses_data): 1cbdjan

395 ses_duration_secs = (sd['behavior_data']['Trial end timestamp'] - 1cbdjan

396 sd['behavior_data']['Bpod start timestamp']) 

397 if ses_duration_secs < (6 * 3600): 1cbdjan

398 break 1cbdjan

399 c += 1 1a

400 if c: 1cbdjan

401 _logger.warning(('Trial end timestamps of last %i trials above 6 hours ' 1a

402 '(most likely corrupt): %s'), c, str(fn)) 

403 end_time = _start_time + datetime.timedelta(seconds=ses_duration_secs) 1cbdjan

404 return start_time, end_time 1cbdjan

405 

406 

407def _get_session_performance(md, ses_data): 

408 """ 

409 Get performance about the session from Bpod data. 

410 Note: This does not support custom protocols. 

411 

412 Parameters 

413 ---------- 

414 md : dict, list of dict 

415 A session parameters dictionary or list thereof. 

416 ses_data : dict, list of dict 

417 A session data dictionary or list thereof. 

418 

419 Returns 

420 ------- 

421 int 

422 The total number of trials across protocols. 

423 int 

424 The total number of correct trials across protocols. 

425 """ 

426 

427 if not any(filter(None, ses_data or None)): 1gfecbdka

428 return None, None 1gfe

429 

430 if isinstance(md, dict): 1cbdka

431 ses_data = [ses_data] 1k

432 md = [md] 1k

433 else: 

434 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1cbdka

435 

436 n_trials = [] 1cbdka

437 n_correct = [] 1cbdka

438 for data, settings in filter(all, zip(ses_data, md)): 1cbdka

439 # In some protocols trials start from 0, in others, from 1 

440 n = data[-1]['trial_num'] + int(data[0]['trial_num'] == 0) # +1 if starts from 0 1cbdka

441 n_trials.append(n) 1cbdka

442 # checks that the number of actual trials and labeled number of trials check out 

443 assert len(data) == n, f'{len(data)} trials in data, however last trial number was {n}' 1cbdka

444 # task specific logic 

445 if 'habituationChoiceWorld' in settings.get('PYBPOD_PROTOCOL', ''): 1cbdka

446 n_correct.append(0) 1k

447 else: 

448 n_correct.append(data[-1].get('ntrials_correct', sum(x['trial_correct'] for x in data))) 1cbdka

449 

450 return sum(n_trials), sum(n_correct) 1cbdka

451 

452 

453def get_local_data_repository(ac): 

454 """ 

455 Get local data repo name from Globus client. 

456 

457 Parameters 

458 ---------- 

459 ac : one.webclient.AlyxClient 

460 An AlyxClient instance for querying data repositories. 

461 

462 Returns 

463 ------- 

464 str 

465 The (first) data repository associated with the local Globus endpoint ID. 

466 """ 

467 try: 1hlcbia

468 assert ac 1hlcbia

469 globus_id = get_local_endpoint_id() 1hlcbia

470 except AssertionError: 

471 return 

472 

473 data_repo = ac.rest('data-repository', 'list', globus_endpoint_id=globus_id) 1hlcbia

474 return next((da['name'] for da in data_repo), None) 1hlcbia

475 

476 

477def get_lab(session_path, alyx=None): 

478 """ 

479 Get lab from a session path using the subject name. 

480 

481 On local lab servers, the lab name is not in the ALF path and the globus endpoint ID may be 

482 associated with multiple labs, so lab name is fetched from the subjects endpoint. 

483 

484 Parameters 

485 ---------- 

486 session_path : str, pathlib.Path 

487 The session path from which to determine the lab name. 

488 alyx : one.webclient.AlyxClient 

489 An AlyxClient instance for querying data repositories. 

490 

491 Returns 

492 ------- 

493 str 

494 The lab name associated with the session path subject. 

495 

496 See Also 

497 -------- 

498 one.remote.globus.get_lab_from_endpoint_id 

499 """ 

500 alyx = alyx or AlyxClient() 1ocba

501 if not (ref := ConversionMixin.path2ref(session_path)): 1ocba

502 raise ValueError(f'Failed to parse session path: {session_path}') 1o

503 

504 labs = [x['lab'] for x in alyx.rest('subjects', 'list', nickname=ref['subject'])] 1ocba

505 if len(labs) == 0: 1ocba

506 raise alferr.AlyxSubjectNotFound(ref['subject']) 1o

507 elif len(labs) > 1: # More than one subject with this nickname 1ocba

508 # use local endpoint ID to find the correct lab 

509 endpoint_labs = get_lab_from_endpoint_id(alyx=alyx) 1o

510 lab = next(x for x in labs if x in endpoint_labs) 1o

511 else: 

512 lab, = labs 1ocba

513 

514 return lab 1ocba