Coverage for ibllib/oneibl/registration.py: 88%
250 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
1from pathlib import Path
2import datetime
3import logging
4import itertools
6from packaging import version
7from requests import HTTPError
9from one.alf.path import get_session_path, folder_parts, get_alf_path
10from one.registration import RegistrationClient, get_dataset_type
11from one.remote.globus import get_local_endpoint_id, get_lab_from_endpoint_id
12from one.webclient import AlyxClient, no_cache
13from one.converters import ConversionMixin, datasets2records
14import one.alf.exceptions as alferr
15from one.api import ONE
16from iblutil.util import ensure_list
18import ibllib
19from ibllib.time import isostr2date
20import ibllib.io.raw_data_loaders as raw
21from ibllib.io import session_params
23_logger = logging.getLogger(__name__)
24EXCLUDED_EXTENSIONS = ['.flag', '.error', '.avi']
25REGISTRATION_GLOB_PATTERNS = ['_ibl_experiment.description.yaml',
26 'alf/**/*.*.*',
27 'raw_behavior_data/**/_iblrig_*.*',
28 'raw_task_data_*/**/_iblrig_*.*',
29 'raw_passive_data/**/_iblrig_*.*',
30 'raw_behavior_data/**/_iblmic_*.*',
31 'raw_video_data/**/_iblrig_*.*',
32 'raw_video_data/**/_ibl_*.*',
33 'raw_ephys_data/**/_iblrig_*.*',
34 'raw_ephys_data/**/_spikeglx_*.*',
35 'raw_ephys_data/**/_iblqc_*.*',
36 'spikesorters/**/_kilosort_*.*'
37 'spikesorters/**/_kilosort_*.*',
38 'raw_widefield_data/**/_ibl_*.*',
39 'raw_photometry_data/**/_neurophotometrics_*.*',
40 ]
43def register_dataset(file_list, one=None, exists=False, versions=None, **kwargs):
44 """
45 Registers a set of files belonging to a session only on the server.
47 Parameters
48 ----------
49 file_list : list, str, pathlib.Path
50 A filepath (or list thereof) of ALF datasets to register to Alyx.
51 one : one.api.OneAlyx
52 An instance of ONE.
53 exists : bool
54 Whether files exist in the repository. May be set to False when registering files
55 before copying to the repository.
56 versions : str, list of str
57 Optional version tags, defaults to the current ibllib version.
58 kwargs
59 Optional keyword arguments for one.registration.RegistrationClient.register_files.
61 Returns
62 -------
63 list of dicts, dict
64 A list of newly created Alyx dataset records or the registration data if dry.
66 Notes
67 -----
68 - If a repository is passed, server_only will be set to True.
70 See Also
71 --------
72 one.registration.RegistrationClient.register_files
73 """
74 if not file_list: 1lkia
75 return 1ia
76 elif isinstance(file_list, (str, Path)): 1lkia
77 file_list = [file_list] 1k
79 assert len(set(get_session_path(f) for f in file_list)) == 1 1lkia
80 assert all(Path(f).exists() for f in file_list) 1lkia
82 client = IBLRegistrationClient(one) 1lkia
84 # Check for protected datasets
85 def _get_protected(pr_status): 1lkia
86 if isinstance(protected_status, list): 1lkia
87 pr = any(d['status_code'] == 403 for d in pr_status) 1lkia
88 else:
89 pr = protected_status['status_code'] == 403
91 return pr 1lkia
93 # Account for cases where we are connected to cortex lab database
94 if one.alyx.base_url == 'https://alyx.cortexlab.net': 1lkia
95 try:
96 _one = ONE(base_url='https://alyx.internationalbrainlab.org', mode='remote', cache_rest=one.alyx.cache_mode)
97 protected_status = IBLRegistrationClient(_one).check_protected_files(file_list)
98 protected = _get_protected(protected_status)
99 except HTTPError as err:
100 if "[Errno 500] /check-protected: 'A base session for" in str(err):
101 # If we get an error due to the session not existing, we take this to mean no datasets are protected
102 protected = False
103 else:
104 raise err
105 else:
106 protected_status = client.check_protected_files(file_list) 1lkia
107 protected = _get_protected(protected_status) 1lkia
109 # If we find a protected dataset, and we don't have a force=True flag, raise an error
110 if protected and not kwargs.pop('force', False): 1lkia
111 raise FileExistsError('Protected datasets were found in the file list. To force the registration of datasets ' 1k
112 'add the force=True argument.')
114 # If the repository is specified then for the registration client we want server_only=True to
115 # make sure we don't make any other repositories for the lab
116 if kwargs.get('repository') and not kwargs.get('server_only', False): 1lkia
117 kwargs['server_only'] = True
119 return client.register_files(file_list, versions=versions or ibllib.__version__, exists=exists, **kwargs) 1lkia
122def register_session_raw_data(session_path, one=None, overwrite=False, **kwargs):
123 """
124 Registers all files corresponding to raw data files to Alyx. It will select files that
125 match Alyx registration patterns.
127 Parameters
128 ----------
129 session_path : str, pathlib.Path
130 The local session path.
131 one : one.api.OneAlyx
132 An instance of ONE.
133 overwrite : bool
134 If set to True, will patch the datasets. It will take very long. If set to False (default)
135 will skip all already registered data.
136 **kwargs
137 Optional keyword arguments for one.registration.RegistrationClient.register_files.
139 Returns
140 -------
141 list of pathlib.Path
142 A list of raw dataset paths.
143 list of dicts, dict
144 A list of newly created Alyx dataset records or the registration data if dry.
145 """
146 # Clear rest cache to make sure we have the latest entries
147 one.alyx.clear_rest_cache() 1h
148 client = IBLRegistrationClient(one) 1h
149 session_path = Path(session_path) 1h
150 eid = one.path2eid(session_path, query_type='remote') # needs to make sure we're up to date 1h
151 if not eid: 1h
152 raise alferr.ALFError(f'Session does not exist on Alyx: {get_alf_path(session_path)}') 1h
153 # find all files that are in a raw data collection
154 file_list = [f for f in client.find_files(session_path) 1h
155 if f.relative_to(session_path).as_posix().startswith('raw')]
156 # unless overwrite is True, filter out the datasets that already exist
157 if not overwrite: 1h
158 # query the database for existing datasets on the session and allowed dataset types
159 dsets = datasets2records(one.alyx.rest('datasets', 'list', session=eid, no_cache=True)) 1h
160 already_registered = list(map(session_path.joinpath, dsets['rel_path'])) 1h
161 file_list = list(filter(lambda f: f not in already_registered, file_list)) 1h
163 kwargs['repository'] = get_local_data_repository(one.alyx) 1h
164 kwargs['server_only'] = True 1h
166 response = client.register_files(file_list, versions=ibllib.__version__, exists=False, **kwargs) 1h
167 return file_list, response 1h
170class IBLRegistrationClient(RegistrationClient):
171 """
172 Object that keeps the ONE instance and provides method to create sessions and register data.
173 """
175 def register_session(self, ses_path, file_list=True, projects=None, procedures=None, register_reward=True):
176 """
177 Register an IBL Bpod session in Alyx.
179 Parameters
180 ----------
181 ses_path : str, pathlib.Path
182 The local session path.
183 file_list : bool, list
184 An optional list of file paths to register. If True, all valid files within the
185 session folder are registered. If False, no files are registered.
186 projects: str, list
187 The project(s) to which the experiment belongs (optional).
188 procedures : str, list
189 An optional list of procedures, e.g. 'Behavior training/tasks'.
190 register_reward : bool
191 If true, register all water administrations in the settings files, if no admins already
192 present for this session.
194 Returns
195 -------
196 dict
197 An Alyx session record.
198 list of dict, None
199 Alyx file records (or None if file_list is False).
201 Notes
202 -----
203 For a list of available projects:
204 >>> sorted(proj['name'] for proj in one.alyx.rest('projects', 'list'))
205 For a list of available procedures:
206 >>> sorted(proc['name'] for proc in one.alyx.rest('procedures', 'list'))
207 """
208 if isinstance(ses_path, str): 1edcfab
209 ses_path = Path(ses_path) 1c
211 # Read in the experiment description file if it exists and get projects and procedures from here
212 experiment_description_file = session_params.read_params(ses_path) 1edcfab
213 _, subject, date, number, *_ = folder_parts(ses_path) 1edcfab
214 if experiment_description_file is None: 1edcfab
215 collections = ['raw_behavior_data'] 1ec
216 else:
217 # Combine input projects/procedures with those in experiment description
218 projects = list({*experiment_description_file.get('projects', []), *(projects or [])}) 1dfab
219 procedures = list({*experiment_description_file.get('procedures', []), *(procedures or [])}) 1dfab
220 collections = session_params.get_task_collection(experiment_description_file) 1dfab
222 # Read narrative.txt
223 if (narrative_file := ses_path.joinpath('narrative.txt')).exists(): 1edcfab
224 with narrative_file.open('r') as f:
225 narrative = f.read()
226 else:
227 narrative = '' 1edcfab
229 # query Alyx endpoints for subject, error if not found
230 subject = self.assert_exists(subject, 'subjects') 1edcfab
232 # look for a session from the same subject, same number on the same day
233 with no_cache(self.one.alyx): 1edcfab
234 session_id, session = self.one.search(subject=subject['nickname'], 1edcfab
235 date_range=date,
236 number=number,
237 details=True, query_type='remote')
238 if collections is None: # No task data 1edcfab
239 assert len(session) != 0, 'no session on Alyx and no tasks in experiment description' 1f
240 # Fetch the full session JSON and assert that some basic information is present.
241 # Basically refuse to extract the data if key information is missing
242 session_details = self.one.alyx.rest('sessions', 'read', id=session_id[0], no_cache=True) 1f
243 required = ('location', 'start_time', 'lab', 'users') 1f
244 missing = [k for k in required if not session_details[k]] 1f
245 assert not any(missing), 'missing session information: ' + ', '.join(missing) 1f
246 task_protocols = task_data = settings = [] 1f
247 json_field = start_time = end_time = None 1f
248 users = session_details['users'] 1f
249 n_trials = n_correct_trials = 0 1f
250 else: # Get session info from task data
251 collections = sorted(ensure_list(collections)) 1edcab
252 # read meta data from the rig for the session from the task settings file
253 task_data = (raw.load_bpod(ses_path, collection) for collection in collections) 1edcab
254 # Filter collections where settings file was not found
255 if not (task_data := list(zip(*filter(lambda x: x[0] is not None, task_data)))): 1edcab
256 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.') 1e
257 settings, task_data = task_data 1edcab
258 if len(settings) != len(collections): 1edcab
259 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.')
261 # Do some validation
262 assert len({x['SUBJECT_NAME'] for x in settings}) == 1 and settings[0]['SUBJECT_NAME'] == subject['nickname'] 1edcab
263 assert len({x['SESSION_DATE'] for x in settings}) == 1 and settings[0]['SESSION_DATE'] == date 1edcab
264 assert len({x['SESSION_NUMBER'] for x in settings}) == 1 and settings[0]['SESSION_NUMBER'] == number 1edcab
265 assert len({x['IS_MOCK'] for x in settings}) == 1 1edcab
266 assert len({md['PYBPOD_BOARD'] for md in settings}) == 1 1edcab
267 assert len({md.get('IBLRIG_VERSION') for md in settings}) == 1 1edcab
268 # assert len({md['IBLRIG_VERSION_TAG'] for md in settings}) == 1
270 users = [] 1edcab
271 for user in filter(lambda x: x and x[1], map(lambda x: x.get('PYBPOD_CREATOR'), settings)): 1edcab
272 user = self.assert_exists(user[0], 'users') # user is list of [username, uuid] 1edcab
273 users.append(user['username']) 1edcab
275 # extract information about session duration and performance
276 start_time, end_time = _get_session_times(str(ses_path), settings, task_data) 1edcab
277 n_trials, n_correct_trials = _get_session_performance(settings, task_data) 1edcab
279 # TODO Add task_protocols to Alyx sessions endpoint
280 task_protocols = [md['PYBPOD_PROTOCOL'] + md['IBLRIG_VERSION'] for md in settings] 1edcab
281 # unless specified label the session projects with subject projects
282 projects = subject['projects'] if projects is None else projects 1edcab
283 # makes sure projects is a list
284 projects = [projects] if isinstance(projects, str) else projects 1edcab
286 # unless specified label the session procedures with task protocol lookup
287 procedures = [procedures] if isinstance(procedures, str) else (procedures or []) 1edcab
288 json_fields_names = ['IS_MOCK', 'IBLRIG_VERSION'] 1edcab
289 json_field = {k: settings[0].get(k) for k in json_fields_names} 1edcab
290 for field in ('PROJECT_EXTRACTION_VERSION', 'TASK_VERSION'): 1edcab
291 if value := settings[0].get(field): 1edcab
292 # Add these fields only if they exist and are not None
293 json_field[field] = value
294 # The poo count field is only updated if the field is defined in at least one of the settings
295 poo_counts = [md.get('POOP_COUNT') for md in settings if md.get('POOP_COUNT') is not None] 1edcab
296 if poo_counts: 1edcab
297 json_field['POOP_COUNT'] = int(sum(poo_counts)) 1dab
298 # Get the session start delay if available, needed for the training status
299 session_delay = [md.get('SESSION_DELAY_START') for md in settings 1edcab
300 if md.get('SESSION_DELAY_START') is not None]
301 if session_delay: 1edcab
302 json_field['SESSION_DELAY_START'] = int(sum(session_delay))
304 if not len(session): # Create session and weighings 1edcfab
305 ses_ = {'subject': subject['nickname'], 1edcb
306 'users': users or [subject['responsible_user']],
307 'location': settings[0]['PYBPOD_BOARD'],
308 'procedures': procedures,
309 'lab': subject['lab'],
310 'projects': projects,
311 'type': 'Experiment',
312 'task_protocol': '/'.join(task_protocols),
313 'number': number,
314 'start_time': self.ensure_ISO8601(start_time),
315 'end_time': self.ensure_ISO8601(end_time) if end_time else None,
316 'n_correct_trials': n_correct_trials,
317 'n_trials': n_trials,
318 'narrative': narrative,
319 'json': json_field
320 }
321 session = self.one.alyx.rest('sessions', 'create', data=ses_) 1edcb
322 # Submit weights
323 for md in filter(lambda md: md.get('SUBJECT_WEIGHT') is not None, settings): 1edcb
324 user = md.get('PYBPOD_CREATOR') 1edcb
325 if isinstance(user, list): 1edcb
326 user = user[0] 1edcb
327 if user not in users: 1edcb
328 user = self.one.alyx.user
329 self.register_weight(subject['nickname'], md['SUBJECT_WEIGHT'], 1edcb
330 date_time=md['SESSION_DATETIME'], user=user)
331 else: # if session exists update a few key fields
332 data = {'procedures': procedures, 'projects': projects, 1cfa
333 'n_correct_trials': n_correct_trials, 'n_trials': n_trials}
334 if len(narrative) > 0: 1cfa
335 data['narrative'] = narrative
336 if task_protocols: 1cfa
337 data['task_protocol'] = '/'.join(task_protocols) 1ca
338 if end_time: 1cfa
339 data['end_time'] = self.ensure_ISO8601(end_time) 1ca
340 if start_time: 1cfa
341 data['start_time'] = self.ensure_ISO8601(start_time) 1ca
343 session = self.one.alyx.rest('sessions', 'partial_update', id=session_id[0], data=data) 1cfa
344 if json_field: 1cfa
345 session['json'] = self.one.alyx.json_field_update('sessions', session['id'], data=json_field) 1ca
347 _logger.info(session['url'] + ' ') 1edcfab
348 # create associated water administration if not found
349 if register_reward and not session['wateradmin_session_related'] and any(task_data): 1edcfab
350 for md, d in filter(all, zip(settings, task_data)): 1ab
351 _, _end_time = _get_session_times(ses_path, md, d) 1ab
352 user = md.get('PYBPOD_CREATOR') 1ab
353 user = user[0] if user[0] in users else self.one.alyx.user 1ab
354 volume = d[-1].get('water_delivered', sum(x['reward_amount'] for x in d)) / 1000 1ab
355 if volume > 0: 1ab
356 self.register_water_administration( 1ab
357 subject['nickname'], volume, date_time=_end_time or end_time, user=user,
358 session=session['id'], water_type=md.get('REWARD_TYPE') or 'Water')
359 # at this point the session has been created. If create only, exit
360 if not file_list: 1edcfab
361 return session, None 1eab
363 # register all files that match the Alyx patterns and file_list
364 if any(settings): 1dcf
365 rename_files_compatibility(ses_path, settings[0]['IBLRIG_VERSION']) 1dc
366 F = filter(lambda x: self._register_bool(x.name, file_list), self.find_files(ses_path)) 1dcf
367 recs = self.register_files(F, created_by=users[0] if users else None, versions=ibllib.__version__) 1dcf
368 return session, recs 1dcf
370 @staticmethod
371 def _register_bool(fn, file_list):
372 if isinstance(file_list, bool): 1dcf
373 return file_list 1dcf
374 if isinstance(file_list, str):
375 file_list = [file_list]
376 return any(str(fil) in fn for fil in file_list)
378 def find_files(self, session_path):
379 """Similar to base class method but further filters by name and extension.
381 In addition to finding files that match Excludes files
382 whose extension is in EXCLUDED_EXTENSIONS, or that don't match the patterns in
383 REGISTRATION_GLOB_PATTERNS.
385 Parameters
386 ----------
387 session_path : str, pathlib.Path
388 The session path to search.
390 Yields
391 -------
392 pathlib.Path
393 File paths that match the dataset type patterns in Alyx and registration glob patterns.
394 """
395 files = itertools.chain.from_iterable(session_path.glob(x) for x in REGISTRATION_GLOB_PATTERNS) 1dcfh
396 for file in filter(lambda x: x.suffix not in EXCLUDED_EXTENSIONS, files): 1dcfh
397 try: 1dcfh
398 get_dataset_type(file, self.dtypes) 1dcfh
399 yield file 1dcfh
400 except ValueError as ex:
401 _logger.error(ex)
404def rename_files_compatibility(ses_path, version_tag):
405 if not version_tag: 1dc
406 return
407 if version.parse(version_tag) <= version.parse('3.2.3'): 1dc
408 task_code = ses_path.glob('**/_ibl_trials.iti_duration.npy')
409 for fn in task_code:
410 fn.replace(fn.parent.joinpath('_ibl_trials.itiDuration.npy'))
411 task_code = ses_path.glob('**/_iblrig_taskCodeFiles.raw.zip') 1dc
412 for fn in task_code: 1dc
413 fn.replace(fn.parent.joinpath('_iblrig_codeFiles.raw.zip'))
416def _get_session_times(fn, md, ses_data):
417 """
418 Get session start and end time from the Bpod data.
420 Parameters
421 ----------
422 fn : str, pathlib.Path
423 Session/task identifier. Only used in warning logs.
424 md : dict, list of dict
425 A session parameters dictionary or list thereof.
426 ses_data : dict, list of dict
427 A session data dictionary or list thereof.
429 Returns
430 -------
431 datetime.datetime
432 The datetime of the start of the session.
433 datetime.datetime
434 The datetime of the end of the session, or None is ses_data is None.
435 """
436 if isinstance(md, dict): 1edcabjn
437 start_time = _start_time = isostr2date(md['SESSION_DATETIME']) 1abjn
438 end_time = isostr2date(md['SESSION_END_TIME']) if md.get('SESSION_END_TIME') else None 1abjn
439 else:
440 start_time = isostr2date(md[0]['SESSION_DATETIME']) 1edcabj
441 _start_time = isostr2date(md[-1]['SESSION_DATETIME']) 1edcabj
442 end_time = isostr2date(md[-1]['SESSION_END_TIME']) if md[-1].get('SESSION_END_TIME') else None 1edcabj
443 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1edcabj
444 assert len(md) == 1 or start_time < _start_time 1edcabj
445 ses_data = ses_data[-1] 1edcabj
446 if not ses_data or end_time is not None: 1edcabjn
447 return start_time, end_time 1edc
448 c = ses_duration_secs = 0 1abjn
449 for sd in reversed(ses_data): 1abjn
450 ses_duration_secs = (sd['behavior_data']['Trial end timestamp'] - 1abjn
451 sd['behavior_data']['Bpod start timestamp'])
452 if ses_duration_secs < (6 * 3600): 1abjn
453 break 1abjn
454 c += 1
455 if c: 1abjn
456 _logger.warning(('Trial end timestamps of last %i trials above 6 hours '
457 '(most likely corrupt): %s'), c, str(fn))
458 end_time = _start_time + datetime.timedelta(seconds=ses_duration_secs) 1abjn
459 return start_time, end_time 1abjn
462def _get_session_performance(md, ses_data):
463 """
464 Get performance about the session from Bpod data.
465 Note: This does not support custom protocols.
467 Parameters
468 ----------
469 md : dict, list of dict
470 A session parameters dictionary or list thereof.
471 ses_data : dict, list of dict
472 A session data dictionary or list thereof.
474 Returns
475 -------
476 int
477 The total number of trials across protocols.
478 int
479 The total number of correct trials across protocols.
480 """
482 if not any(filter(None, ses_data or None)): 1edcabm
483 return None, None 1edc
485 if isinstance(md, dict): 1abm
486 ses_data = [ses_data] 1m
487 md = [md] 1m
488 else:
489 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1abm
491 n_trials = [] 1abm
492 n_correct = [] 1abm
493 for data, settings in filter(all, zip(ses_data, md)): 1abm
494 # In some protocols trials start from 0, in others, from 1
495 n = data[-1]['trial_num'] + int(data[0]['trial_num'] == 0) # +1 if starts from 0 1abm
496 n_trials.append(n) 1abm
497 # checks that the number of actual trials and labeled number of trials check out
498 assert len(data) == n, f'{len(data)} trials in data, however last trial number was {n}' 1abm
499 # task specific logic
500 if 'habituationChoiceWorld' in settings.get('PYBPOD_PROTOCOL', ''): 1abm
501 n_correct.append(0) 1m
502 else:
503 n_correct.append(data[-1].get('ntrials_correct', sum(x['trial_correct'] for x in data))) 1abm
505 return sum(n_trials), sum(n_correct) 1abm
508def get_local_data_repository(ac):
509 """
510 Get local data repo name from Globus client.
512 Parameters
513 ----------
514 ac : one.webclient.AlyxClient
515 An AlyxClient instance for querying data repositories.
517 Returns
518 -------
519 str
520 The (first) data repository associated with the local Globus endpoint ID.
521 """
522 try: 1gqipah
523 assert ac 1gqipah
524 globus_id = get_local_endpoint_id() 1gipah
525 except AssertionError: 1q
526 return 1q
528 data_repo = ac.rest('data-repository', 'list', globus_endpoint_id=globus_id) 1gipah
529 return next((da['name'] for da in data_repo), None) 1gipah
532def get_lab(session_path, alyx=None):
533 """
534 Get lab from a session path using the subject name.
536 On local lab servers, the lab name is not in the ALF path and the globus endpoint ID may be
537 associated with multiple labs, so lab name is fetched from the subjects endpoint.
539 Parameters
540 ----------
541 session_path : str, pathlib.Path
542 The session path from which to determine the lab name.
543 alyx : one.webclient.AlyxClient
544 An AlyxClient instance for querying data repositories.
546 Returns
547 -------
548 str
549 The lab name associated with the session path subject.
551 See Also
552 --------
553 one.remote.globus.get_lab_from_endpoint_id
554 """
555 alyx = alyx or AlyxClient() 1oa
556 if not (ref := ConversionMixin.path2ref(session_path)): 1oa
557 raise ValueError(f'Failed to parse session path: {session_path}') 1o
559 labs = [x['lab'] for x in alyx.rest('subjects', 'list', nickname=ref['subject'])] 1oa
560 if len(labs) == 0: 1oa
561 raise alferr.AlyxSubjectNotFound(ref['subject']) 1o
562 elif len(labs) > 1: # More than one subject with this nickname 1oa
563 # use local endpoint ID to find the correct lab
564 endpoint_labs = get_lab_from_endpoint_id(alyx=alyx) 1o
565 lab = next(x for x in labs if x in endpoint_labs) 1o
566 else:
567 lab, = labs 1oa
569 return lab 1oa