Coverage for ibllib/oneibl/registration.py: 88%
251 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1from pathlib import Path
2import datetime
3import logging
4import itertools
6from packaging import version
7from requests import HTTPError
9from one.alf.path import get_session_path, folder_parts, get_alf_path
10from one.registration import RegistrationClient, get_dataset_type
11from one.remote.globus import get_local_endpoint_id, get_lab_from_endpoint_id
12from one.webclient import AlyxClient, no_cache
13from one.converters import ConversionMixin, datasets2records
14import one.alf.exceptions as alferr
15from one.api import ONE
16from iblutil.util import ensure_list
18import ibllib
19import ibllib.io.extractors.base
20from ibllib.time import isostr2date
21import ibllib.io.raw_data_loaders as raw
22from ibllib.io import session_params
24_logger = logging.getLogger(__name__)
25EXCLUDED_EXTENSIONS = ['.flag', '.error', '.avi']
26REGISTRATION_GLOB_PATTERNS = ['_ibl_experiment.description.yaml',
27 'alf/**/*.*.*',
28 'raw_behavior_data/**/_iblrig_*.*',
29 'raw_task_data_*/**/_iblrig_*.*',
30 'raw_passive_data/**/_iblrig_*.*',
31 'raw_behavior_data/**/_iblmic_*.*',
32 'raw_video_data/**/_iblrig_*.*',
33 'raw_video_data/**/_ibl_*.*',
34 'raw_ephys_data/**/_iblrig_*.*',
35 'raw_ephys_data/**/_spikeglx_*.*',
36 'raw_ephys_data/**/_iblqc_*.*',
37 'spikesorters/**/_kilosort_*.*'
38 'spikesorters/**/_kilosort_*.*',
39 'raw_widefield_data/**/_ibl_*.*',
40 'raw_photometry_data/**/_neurophotometrics_*.*',
41 ]
44def register_dataset(file_list, one=None, exists=False, versions=None, **kwargs):
45 """
46 Registers a set of files belonging to a session only on the server.
48 Parameters
49 ----------
50 file_list : list, str, pathlib.Path
51 A filepath (or list thereof) of ALF datasets to register to Alyx.
52 one : one.api.OneAlyx
53 An instance of ONE.
54 exists : bool
55 Whether files exist in the repository. May be set to False when registering files
56 before copying to the repository.
57 versions : str, list of str
58 Optional version tags, defaults to the current ibllib version.
59 kwargs
60 Optional keyword arguments for one.registration.RegistrationClient.register_files.
62 Returns
63 -------
64 list of dicts, dict
65 A list of newly created Alyx dataset records or the registration data if dry.
67 Notes
68 -----
69 - If a repository is passed, server_only will be set to True.
71 See Also
72 --------
73 one.registration.RegistrationClient.register_files
74 """
75 if not file_list: 1lkia
76 return 1ia
77 elif isinstance(file_list, (str, Path)): 1lkia
78 file_list = [file_list] 1k
80 assert len(set(get_session_path(f) for f in file_list)) == 1 1lkia
81 assert all(Path(f).exists() for f in file_list) 1lkia
83 client = IBLRegistrationClient(one) 1lkia
85 # Check for protected datasets
86 def _get_protected(pr_status): 1lkia
87 if isinstance(protected_status, list): 1lkia
88 pr = any(d['status_code'] == 403 for d in pr_status) 1lkia
89 else:
90 pr = protected_status['status_code'] == 403
92 return pr 1lkia
94 # Account for cases where we are connected to cortex lab database
95 if one.alyx.base_url == 'https://alyx.cortexlab.net': 1lkia
96 try:
97 _one = ONE(base_url='https://alyx.internationalbrainlab.org', mode='remote', cache_rest=one.alyx.cache_mode)
98 protected_status = IBLRegistrationClient(_one).check_protected_files(file_list)
99 protected = _get_protected(protected_status)
100 except HTTPError as err:
101 if "[Errno 500] /check-protected: 'A base session for" in str(err):
102 # If we get an error due to the session not existing, we take this to mean no datasets are protected
103 protected = False
104 else:
105 raise err
106 else:
107 protected_status = client.check_protected_files(file_list) 1lkia
108 protected = _get_protected(protected_status) 1lkia
110 # If we find a protected dataset, and we don't have a force=True flag, raise an error
111 if protected and not kwargs.pop('force', False): 1lkia
112 raise FileExistsError('Protected datasets were found in the file list. To force the registration of datasets ' 1k
113 'add the force=True argument.')
115 # If the repository is specified then for the registration client we want server_only=True to
116 # make sure we don't make any other repositories for the lab
117 if kwargs.get('repository') and not kwargs.get('server_only', False): 1lkia
118 kwargs['server_only'] = True
120 return client.register_files(file_list, versions=versions or ibllib.__version__, exists=exists, **kwargs) 1lkia
123def register_session_raw_data(session_path, one=None, overwrite=False, **kwargs):
124 """
125 Registers all files corresponding to raw data files to Alyx. It will select files that
126 match Alyx registration patterns.
128 Parameters
129 ----------
130 session_path : str, pathlib.Path
131 The local session path.
132 one : one.api.OneAlyx
133 An instance of ONE.
134 overwrite : bool
135 If set to True, will patch the datasets. It will take very long. If set to False (default)
136 will skip all already registered data.
137 **kwargs
138 Optional keyword arguments for one.registration.RegistrationClient.register_files.
140 Returns
141 -------
142 list of pathlib.Path
143 A list of raw dataset paths.
144 list of dicts, dict
145 A list of newly created Alyx dataset records or the registration data if dry.
146 """
147 # Clear rest cache to make sure we have the latest entries
148 one.alyx.clear_rest_cache() 1h
149 client = IBLRegistrationClient(one) 1h
150 session_path = Path(session_path) 1h
151 eid = one.path2eid(session_path, query_type='remote') # needs to make sure we're up to date 1h
152 if not eid: 1h
153 raise alferr.ALFError(f'Session does not exist on Alyx: {get_alf_path(session_path)}') 1h
154 # find all files that are in a raw data collection
155 file_list = [f for f in client.find_files(session_path) 1h
156 if f.relative_to(session_path).as_posix().startswith('raw')]
157 # unless overwrite is True, filter out the datasets that already exist
158 if not overwrite: 1h
159 # query the database for existing datasets on the session and allowed dataset types
160 dsets = datasets2records(one.alyx.rest('datasets', 'list', session=eid, no_cache=True)) 1h
161 already_registered = list(map(session_path.joinpath, dsets['rel_path'])) 1h
162 file_list = list(filter(lambda f: f not in already_registered, file_list)) 1h
164 kwargs['repository'] = get_local_data_repository(one.alyx) 1h
165 kwargs['server_only'] = True 1h
167 response = client.register_files(file_list, versions=ibllib.__version__, exists=False, **kwargs) 1h
168 return file_list, response 1h
171class IBLRegistrationClient(RegistrationClient):
172 """
173 Object that keeps the ONE instance and provides method to create sessions and register data.
174 """
176 def register_session(self, ses_path, file_list=True, projects=None, procedures=None, register_reward=True):
177 """
178 Register an IBL Bpod session in Alyx.
180 Parameters
181 ----------
182 ses_path : str, pathlib.Path
183 The local session path.
184 file_list : bool, list
185 An optional list of file paths to register. If True, all valid files within the
186 session folder are registered. If False, no files are registered.
187 projects: str, list
188 The project(s) to which the experiment belongs (optional).
189 procedures : str, list
190 An optional list of procedures, e.g. 'Behavior training/tasks'.
191 register_reward : bool
192 If true, register all water administrations in the settings files, if no admins already
193 present for this session.
195 Returns
196 -------
197 dict
198 An Alyx session record.
199 list of dict, None
200 Alyx file records (or None if file_list is False).
202 Notes
203 -----
204 For a list of available projects:
205 >>> sorted(proj['name'] for proj in one.alyx.rest('projects', 'list'))
206 For a list of available procedures:
207 >>> sorted(proc['name'] for proc in one.alyx.rest('procedures', 'list'))
208 """
209 if isinstance(ses_path, str): 1edcfab
210 ses_path = Path(ses_path) 1c
212 # Read in the experiment description file if it exists and get projects and procedures from here
213 experiment_description_file = session_params.read_params(ses_path) 1edcfab
214 _, subject, date, number, *_ = folder_parts(ses_path) 1edcfab
215 if experiment_description_file is None: 1edcfab
216 collections = ['raw_behavior_data'] 1ec
217 else:
218 # Combine input projects/procedures with those in experiment description
219 projects = list({*experiment_description_file.get('projects', []), *(projects or [])}) 1dfab
220 procedures = list({*experiment_description_file.get('procedures', []), *(procedures or [])}) 1dfab
221 collections = session_params.get_task_collection(experiment_description_file) 1dfab
223 # Read narrative.txt
224 if (narrative_file := ses_path.joinpath('narrative.txt')).exists(): 1edcfab
225 with narrative_file.open('r') as f:
226 narrative = f.read()
227 else:
228 narrative = '' 1edcfab
230 # query Alyx endpoints for subject, error if not found
231 subject = self.assert_exists(subject, 'subjects') 1edcfab
233 # look for a session from the same subject, same number on the same day
234 with no_cache(self.one.alyx): 1edcfab
235 session_id, session = self.one.search(subject=subject['nickname'], 1edcfab
236 date_range=date,
237 number=number,
238 details=True, query_type='remote')
239 if collections is None: # No task data 1edcfab
240 assert len(session) != 0, 'no session on Alyx and no tasks in experiment description' 1f
241 # Fetch the full session JSON and assert that some basic information is present.
242 # Basically refuse to extract the data if key information is missing
243 session_details = self.one.alyx.rest('sessions', 'read', id=session_id[0], no_cache=True) 1f
244 required = ('location', 'start_time', 'lab', 'users') 1f
245 missing = [k for k in required if not session_details[k]] 1f
246 assert not any(missing), 'missing session information: ' + ', '.join(missing) 1f
247 task_protocols = task_data = settings = [] 1f
248 json_field = start_time = end_time = None 1f
249 users = session_details['users'] 1f
250 n_trials = n_correct_trials = 0 1f
251 else: # Get session info from task data
252 collections = sorted(ensure_list(collections)) 1edcab
253 # read meta data from the rig for the session from the task settings file
254 task_data = (raw.load_bpod(ses_path, collection) for collection in collections) 1edcab
255 # Filter collections where settings file was not found
256 if not (task_data := list(zip(*filter(lambda x: x[0] is not None, task_data)))): 1edcab
257 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.') 1e
258 settings, task_data = task_data 1edcab
259 if len(settings) != len(collections): 1edcab
260 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.')
262 # Do some validation
263 assert len({x['SUBJECT_NAME'] for x in settings}) == 1 and settings[0]['SUBJECT_NAME'] == subject['nickname'] 1edcab
264 assert len({x['SESSION_DATE'] for x in settings}) == 1 and settings[0]['SESSION_DATE'] == date 1edcab
265 assert len({x['SESSION_NUMBER'] for x in settings}) == 1 and settings[0]['SESSION_NUMBER'] == number 1edcab
266 assert len({x['IS_MOCK'] for x in settings}) == 1 1edcab
267 assert len({md['PYBPOD_BOARD'] for md in settings}) == 1 1edcab
268 assert len({md.get('IBLRIG_VERSION') for md in settings}) == 1 1edcab
269 # assert len({md['IBLRIG_VERSION_TAG'] for md in settings}) == 1
271 users = [] 1edcab
272 for user in filter(lambda x: x and x[1], map(lambda x: x.get('PYBPOD_CREATOR'), settings)): 1edcab
273 user = self.assert_exists(user[0], 'users') # user is list of [username, uuid] 1edcab
274 users.append(user['username']) 1edcab
276 # extract information about session duration and performance
277 start_time, end_time = _get_session_times(str(ses_path), settings, task_data) 1edcab
278 n_trials, n_correct_trials = _get_session_performance(settings, task_data) 1edcab
280 # TODO Add task_protocols to Alyx sessions endpoint
281 task_protocols = [md['PYBPOD_PROTOCOL'] + md['IBLRIG_VERSION'] for md in settings] 1edcab
282 # unless specified label the session projects with subject projects
283 projects = subject['projects'] if projects is None else projects 1edcab
284 # makes sure projects is a list
285 projects = [projects] if isinstance(projects, str) else projects 1edcab
287 # unless specified label the session procedures with task protocol lookup
288 procedures = [procedures] if isinstance(procedures, str) else (procedures or []) 1edcab
289 json_fields_names = ['IS_MOCK', 'IBLRIG_VERSION'] 1edcab
290 json_field = {k: settings[0].get(k) for k in json_fields_names} 1edcab
291 for field in ('PROJECT_EXTRACTION_VERSION', 'TASK_VERSION'): 1edcab
292 if value := settings[0].get(field): 1edcab
293 # Add these fields only if they exist and are not None
294 json_field[field] = value
295 # The poo count field is only updated if the field is defined in at least one of the settings
296 poo_counts = [md.get('POOP_COUNT') for md in settings if md.get('POOP_COUNT') is not None] 1edcab
297 if poo_counts: 1edcab
298 json_field['POOP_COUNT'] = int(sum(poo_counts)) 1dab
299 # Get the session start delay if available, needed for the training status
300 session_delay = [md.get('SESSION_DELAY_START') for md in settings 1edcab
301 if md.get('SESSION_DELAY_START') is not None]
302 if session_delay: 1edcab
303 json_field['SESSION_DELAY_START'] = int(sum(session_delay))
305 if not len(session): # Create session and weighings 1edcfab
306 ses_ = {'subject': subject['nickname'], 1edcb
307 'users': users or [subject['responsible_user']],
308 'location': settings[0]['PYBPOD_BOARD'],
309 'procedures': procedures,
310 'lab': subject['lab'],
311 'projects': projects,
312 'type': 'Experiment',
313 'task_protocol': '/'.join(task_protocols),
314 'number': number,
315 'start_time': self.ensure_ISO8601(start_time),
316 'end_time': self.ensure_ISO8601(end_time) if end_time else None,
317 'n_correct_trials': n_correct_trials,
318 'n_trials': n_trials,
319 'narrative': narrative,
320 'json': json_field
321 }
322 session = self.one.alyx.rest('sessions', 'create', data=ses_) 1edcb
323 # Submit weights
324 for md in filter(lambda md: md.get('SUBJECT_WEIGHT') is not None, settings): 1edcb
325 user = md.get('PYBPOD_CREATOR') 1edcb
326 if isinstance(user, list): 1edcb
327 user = user[0] 1edcb
328 if user not in users: 1edcb
329 user = self.one.alyx.user
330 self.register_weight(subject['nickname'], md['SUBJECT_WEIGHT'], 1edcb
331 date_time=md['SESSION_DATETIME'], user=user)
332 else: # if session exists update a few key fields
333 data = {'procedures': procedures, 'projects': projects, 1cfa
334 'n_correct_trials': n_correct_trials, 'n_trials': n_trials}
335 if len(narrative) > 0: 1cfa
336 data['narrative'] = narrative
337 if task_protocols: 1cfa
338 data['task_protocol'] = '/'.join(task_protocols) 1ca
339 if end_time: 1cfa
340 data['end_time'] = self.ensure_ISO8601(end_time) 1ca
341 if start_time: 1cfa
342 data['start_time'] = self.ensure_ISO8601(start_time) 1ca
344 session = self.one.alyx.rest('sessions', 'partial_update', id=session_id[0], data=data) 1cfa
345 if json_field: 1cfa
346 session['json'] = self.one.alyx.json_field_update('sessions', session['id'], data=json_field) 1ca
348 _logger.info(session['url'] + ' ') 1edcfab
349 # create associated water administration if not found
350 if register_reward and not session['wateradmin_session_related'] and any(task_data): 1edcfab
351 for md, d in filter(all, zip(settings, task_data)): 1ab
352 _, _end_time = _get_session_times(ses_path, md, d) 1ab
353 user = md.get('PYBPOD_CREATOR') 1ab
354 user = user[0] if user[0] in users else self.one.alyx.user 1ab
355 volume = d[-1].get('water_delivered', sum(x['reward_amount'] for x in d)) / 1000 1ab
356 if volume > 0: 1ab
357 self.register_water_administration( 1ab
358 subject['nickname'], volume, date_time=_end_time or end_time, user=user,
359 session=session['id'], water_type=md.get('REWARD_TYPE') or 'Water')
360 # at this point the session has been created. If create only, exit
361 if not file_list: 1edcfab
362 return session, None 1eab
364 # register all files that match the Alyx patterns and file_list
365 if any(settings): 1dcf
366 rename_files_compatibility(ses_path, settings[0]['IBLRIG_VERSION']) 1dc
367 F = filter(lambda x: self._register_bool(x.name, file_list), self.find_files(ses_path)) 1dcf
368 recs = self.register_files(F, created_by=users[0] if users else None, versions=ibllib.__version__) 1dcf
369 return session, recs 1dcf
371 @staticmethod
372 def _register_bool(fn, file_list):
373 if isinstance(file_list, bool): 1dcf
374 return file_list 1dcf
375 if isinstance(file_list, str):
376 file_list = [file_list]
377 return any(str(fil) in fn for fil in file_list)
379 def find_files(self, session_path):
380 """Similar to base class method but further filters by name and extension.
382 In addition to finding files that match Excludes files
383 whose extension is in EXCLUDED_EXTENSIONS, or that don't match the patterns in
384 REGISTRATION_GLOB_PATTERNS.
386 Parameters
387 ----------
388 session_path : str, pathlib.Path
389 The session path to search.
391 Yields
392 -------
393 pathlib.Path
394 File paths that match the dataset type patterns in Alyx and registration glob patterns.
395 """
396 files = itertools.chain.from_iterable(session_path.glob(x) for x in REGISTRATION_GLOB_PATTERNS) 1dcfh
397 for file in filter(lambda x: x.suffix not in EXCLUDED_EXTENSIONS, files): 1dcfh
398 try: 1dcfh
399 get_dataset_type(file, self.dtypes) 1dcfh
400 yield file 1dcfh
401 except ValueError as ex:
402 _logger.error(ex)
405def rename_files_compatibility(ses_path, version_tag):
406 if not version_tag: 1dc
407 return
408 if version.parse(version_tag) <= version.parse('3.2.3'): 1dc
409 task_code = ses_path.glob('**/_ibl_trials.iti_duration.npy')
410 for fn in task_code:
411 fn.replace(fn.parent.joinpath('_ibl_trials.itiDuration.npy'))
412 task_code = ses_path.glob('**/_iblrig_taskCodeFiles.raw.zip') 1dc
413 for fn in task_code: 1dc
414 fn.replace(fn.parent.joinpath('_iblrig_codeFiles.raw.zip'))
417def _get_session_times(fn, md, ses_data):
418 """
419 Get session start and end time from the Bpod data.
421 Parameters
422 ----------
423 fn : str, pathlib.Path
424 Session/task identifier. Only used in warning logs.
425 md : dict, list of dict
426 A session parameters dictionary or list thereof.
427 ses_data : dict, list of dict
428 A session data dictionary or list thereof.
430 Returns
431 -------
432 datetime.datetime
433 The datetime of the start of the session.
434 datetime.datetime
435 The datetime of the end of the session, or None is ses_data is None.
436 """
437 if isinstance(md, dict): 1edcabjn
438 start_time = _start_time = isostr2date(md['SESSION_DATETIME']) 1abjn
439 end_time = isostr2date(md['SESSION_END_TIME']) if md.get('SESSION_END_TIME') else None 1abjn
440 else:
441 start_time = isostr2date(md[0]['SESSION_DATETIME']) 1edcabj
442 _start_time = isostr2date(md[-1]['SESSION_DATETIME']) 1edcabj
443 end_time = isostr2date(md[-1]['SESSION_END_TIME']) if md[-1].get('SESSION_END_TIME') else None 1edcabj
444 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1edcabj
445 assert len(md) == 1 or start_time < _start_time 1edcabj
446 ses_data = ses_data[-1] 1edcabj
447 if not ses_data or end_time is not None: 1edcabjn
448 return start_time, end_time 1edc
449 c = ses_duration_secs = 0 1abjn
450 for sd in reversed(ses_data): 1abjn
451 ses_duration_secs = (sd['behavior_data']['Trial end timestamp'] - 1abjn
452 sd['behavior_data']['Bpod start timestamp'])
453 if ses_duration_secs < (6 * 3600): 1abjn
454 break 1abjn
455 c += 1
456 if c: 1abjn
457 _logger.warning(('Trial end timestamps of last %i trials above 6 hours '
458 '(most likely corrupt): %s'), c, str(fn))
459 end_time = _start_time + datetime.timedelta(seconds=ses_duration_secs) 1abjn
460 return start_time, end_time 1abjn
463def _get_session_performance(md, ses_data):
464 """
465 Get performance about the session from Bpod data.
466 Note: This does not support custom protocols.
468 Parameters
469 ----------
470 md : dict, list of dict
471 A session parameters dictionary or list thereof.
472 ses_data : dict, list of dict
473 A session data dictionary or list thereof.
475 Returns
476 -------
477 int
478 The total number of trials across protocols.
479 int
480 The total number of correct trials across protocols.
481 """
483 if not any(filter(None, ses_data or None)): 1edcabm
484 return None, None 1edc
486 if isinstance(md, dict): 1abm
487 ses_data = [ses_data] 1m
488 md = [md] 1m
489 else:
490 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1abm
492 n_trials = [] 1abm
493 n_correct = [] 1abm
494 for data, settings in filter(all, zip(ses_data, md)): 1abm
495 # In some protocols trials start from 0, in others, from 1
496 n = data[-1]['trial_num'] + int(data[0]['trial_num'] == 0) # +1 if starts from 0 1abm
497 n_trials.append(n) 1abm
498 # checks that the number of actual trials and labeled number of trials check out
499 assert len(data) == n, f'{len(data)} trials in data, however last trial number was {n}' 1abm
500 # task specific logic
501 if 'habituationChoiceWorld' in settings.get('PYBPOD_PROTOCOL', ''): 1abm
502 n_correct.append(0) 1m
503 else:
504 n_correct.append(data[-1].get('ntrials_correct', sum(x['trial_correct'] for x in data))) 1abm
506 return sum(n_trials), sum(n_correct) 1abm
509def get_local_data_repository(ac):
510 """
511 Get local data repo name from Globus client.
513 Parameters
514 ----------
515 ac : one.webclient.AlyxClient
516 An AlyxClient instance for querying data repositories.
518 Returns
519 -------
520 str
521 The (first) data repository associated with the local Globus endpoint ID.
522 """
523 try: 1gqipah
524 assert ac 1gqipah
525 globus_id = get_local_endpoint_id() 1gipah
526 except AssertionError: 1q
527 return 1q
529 data_repo = ac.rest('data-repository', 'list', globus_endpoint_id=globus_id) 1gipah
530 return next((da['name'] for da in data_repo), None) 1gipah
533def get_lab(session_path, alyx=None):
534 """
535 Get lab from a session path using the subject name.
537 On local lab servers, the lab name is not in the ALF path and the globus endpoint ID may be
538 associated with multiple labs, so lab name is fetched from the subjects endpoint.
540 Parameters
541 ----------
542 session_path : str, pathlib.Path
543 The session path from which to determine the lab name.
544 alyx : one.webclient.AlyxClient
545 An AlyxClient instance for querying data repositories.
547 Returns
548 -------
549 str
550 The lab name associated with the session path subject.
552 See Also
553 --------
554 one.remote.globus.get_lab_from_endpoint_id
555 """
556 alyx = alyx or AlyxClient() 1oa
557 if not (ref := ConversionMixin.path2ref(session_path)): 1oa
558 raise ValueError(f'Failed to parse session path: {session_path}') 1o
560 labs = [x['lab'] for x in alyx.rest('subjects', 'list', nickname=ref['subject'])] 1oa
561 if len(labs) == 0: 1oa
562 raise alferr.AlyxSubjectNotFound(ref['subject']) 1o
563 elif len(labs) > 1: # More than one subject with this nickname 1oa
564 # use local endpoint ID to find the correct lab
565 endpoint_labs = get_lab_from_endpoint_id(alyx=alyx) 1o
566 lab = next(x for x in labs if x in endpoint_labs) 1o
567 else:
568 lab, = labs 1oa
570 return lab 1oa