Coverage for ibllib/oneibl/registration.py: 93%
214 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1from pathlib import Path
2import json
3import datetime
4import logging
5import itertools
7from pkg_resources import parse_version
8from one.alf.files import get_session_path, folder_parts, get_alf_path
9from one.registration import RegistrationClient, get_dataset_type
10from one.remote.globus import get_local_endpoint_id, get_lab_from_endpoint_id
11from one.webclient import AlyxClient
12from one.converters import ConversionMixin
13import one.alf.exceptions as alferr
14from one.util import datasets2records, ensure_list
16import ibllib
17import ibllib.io.extractors.base
18from ibllib.time import isostr2date
19import ibllib.io.raw_data_loaders as raw
20from ibllib.io import session_params
22_logger = logging.getLogger(__name__)
23EXCLUDED_EXTENSIONS = ['.flag', '.error', '.avi']
24REGISTRATION_GLOB_PATTERNS = ['alf/**/*.*',
25 'raw_behavior_data/**/_iblrig_*.*',
26 'raw_task_data_*/**/_iblrig_*.*',
27 'raw_passive_data/**/_iblrig_*.*',
28 'raw_behavior_data/**/_iblmic_*.*',
29 'raw_video_data/**/_iblrig_*.*',
30 'raw_video_data/**/_ibl_*.*',
31 'raw_ephys_data/**/_iblrig_*.*',
32 'raw_ephys_data/**/_spikeglx_*.*',
33 'raw_ephys_data/**/_iblqc_*.*',
34 'spikesorters/**/_kilosort_*.*'
35 'spikesorters/**/_kilosort_*.*',
36 'raw_widefield_data/**/_ibl_*.*',
37 'raw_photometry_data/**/_neurophotometrics_*.*',
38 ]
41def register_dataset(file_list, one=None, exists=False, versions=None, **kwargs):
42 """
43 Registers a set of files belonging to a session only on the server.
45 Parameters
46 ----------
47 file_list : list, str, pathlib.Path
48 A filepath (or list thereof) of ALF datasets to register to Alyx.
49 one : one.api.OneAlyx
50 An instance of ONE.
51 exists : bool
52 Whether files exist in the repository. May be set to False when registering files
53 before copying to the repository.
54 versions : str, list of str
55 Optional version tags, defaults to the current ibllib version.
56 kwargs
57 Optional keyword arguments for one.registration.RegistrationClient.register_files.
59 Returns
60 -------
61 list of dicts, dict
62 A list of newly created Alyx dataset records or the registration data if dry.
64 Notes
65 -----
66 - If a repository is passed, server_only will be set to True.
68 See Also
69 --------
70 one.registration.RegistrationClient.register_files
71 """
72 if not file_list: 1plcba
73 return 1cba
74 elif isinstance(file_list, (str, Path)): 1plcba
75 file_list = [file_list] 1pl
77 assert len(set(get_session_path(f) for f in file_list)) == 1 1plcba
78 assert all(Path(f).exists() for f in file_list) 1plcba
80 client = IBLRegistrationClient(one) 1plcba
81 # If the repository is specified then for the registration client we want server_only=True to
82 # make sure we don't make any other repositories for the lab
83 if kwargs.get('repository') and not kwargs.get('server_only', False): 1plcba
84 kwargs['server_only'] = True
86 return client.register_files(file_list, versions=versions or ibllib.__version__, exists=exists, **kwargs) 1plcba
89def register_session_raw_data(session_path, one=None, overwrite=False, **kwargs):
90 """
91 Registers all files corresponding to raw data files to Alyx. It will select files that
92 match Alyx registration patterns.
94 Parameters
95 ----------
96 session_path : str, pathlib.Path
97 The local session path.
98 one : one.api.OneAlyx
99 An instance of ONE.
100 overwrite : bool
101 If set to True, will patch the datasets. It will take very long. If set to False (default)
102 will skip all already registered data.
103 **kwargs
104 Optional keyword arguments for one.registration.RegistrationClient.register_files.
106 Returns
107 -------
108 list of pathlib.Path
109 A list of raw dataset paths.
110 list of dicts, dict
111 A list of newly created Alyx dataset records or the registration data if dry.
112 """
113 # Clear rest cache to make sure we have the latest entries
114 one.alyx.clear_rest_cache() 1bia
115 client = IBLRegistrationClient(one) 1bia
116 session_path = Path(session_path) 1bia
117 eid = one.path2eid(session_path, query_type='remote') # needs to make sure we're up to date 1bia
118 if not eid: 1bia
119 raise alferr.ALFError(f'Session does not exist on Alyx: {get_alf_path(session_path)}') 1i
120 # find all files that are in a raw data collection
121 file_list = [f for f in client.find_files(session_path) 1bia
122 if f.relative_to(session_path).as_posix().startswith('raw')]
123 # unless overwrite is True, filter out the datasets that already exist
124 if not overwrite: 1bia
125 # query the database for existing datasets on the session and allowed dataset types
126 dsets = datasets2records(one.alyx.rest('datasets', 'list', session=eid, no_cache=True)) 1bia
127 already_registered = list(map(session_path.joinpath, dsets['rel_path'])) 1bia
128 file_list = list(filter(lambda f: f not in already_registered, file_list)) 1bia
130 kwargs['repository'] = get_local_data_repository(one.alyx) 1bia
131 kwargs['server_only'] = True 1bia
133 response = client.register_files(file_list, versions=ibllib.__version__, exists=False, **kwargs) 1bia
134 return file_list, response 1bia
137class IBLRegistrationClient(RegistrationClient):
138 """
139 Object that keeps the ONE instance and provides method to create sessions and register data.
140 """
142 def register_session(self, ses_path, file_list=True, projects=None, procedures=None):
143 """
144 Register an IBL Bpod session in Alyx.
146 Parameters
147 ----------
148 ses_path : str, pathlib.Path
149 The local session path.
150 file_list : bool, list
151 An optional list of file paths to register. If True, all valid files within the
152 session folder are registered. If False, no files are registered.
153 projects: str, list
154 The project(s) to which the experiment belongs (optional).
155 procedures : str, list
156 An optional list of procedures, e.g. 'Behavior training/tasks'.
158 Returns
159 -------
160 dict
161 An Alyx session record.
163 Notes
164 -----
165 For a list of available projects:
166 >>> sorted(proj['name'] for proj in one.alyx.rest('projects', 'list'))
167 For a list of available procedures:
168 >>> sorted(proc['name'] for proc in one.alyx.rest('procedures', 'list'))
169 """
170 if isinstance(ses_path, str): 1gfecbda
171 ses_path = Path(ses_path) 1e
173 # Read in the experiment description file if it exists and get projects and procedures from here
174 experiment_description_file = session_params.read_params(ses_path) 1gfecbda
175 if experiment_description_file is None: 1gfecbda
176 collections = ['raw_behavior_data'] 1geba
177 else:
178 projects = experiment_description_file.get('projects', projects) 1fcd
179 procedures = experiment_description_file.get('procedures', procedures) 1fcd
180 collections = ensure_list(session_params.get_task_collection(experiment_description_file)) 1fcd
182 # read meta data from the rig for the session from the task settings file
183 task_data = (raw.load_bpod(ses_path, collection) for collection in sorted(collections)) 1gfecbda
184 # Filter collections where settings file was not found
185 if not (task_data := list(zip(*filter(lambda x: x[0] is not None, task_data)))): 1gfecbda
186 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.') 1g
187 settings, task_data = task_data 1gfecbda
188 if len(settings) != len(collections): 1gfecbda
189 raise ValueError(f'_iblrig_taskSettings.raw.json not found in {ses_path} Abort.')
191 # Do some validation
192 _, subject, date, number, *_ = folder_parts(ses_path) 1gfecbda
193 assert len({x['SUBJECT_NAME'] for x in settings}) == 1 and settings[0]['SUBJECT_NAME'] == subject 1gfecbda
194 assert len({x['SESSION_DATE'] for x in settings}) == 1 and settings[0]['SESSION_DATE'] == date 1gfecbda
195 assert len({x['SESSION_NUMBER'] for x in settings}) == 1 and settings[0]['SESSION_NUMBER'] == number 1gfecbda
196 assert len({x['IS_MOCK'] for x in settings}) == 1 1gfecbda
197 assert len({md['PYBPOD_BOARD'] for md in settings}) == 1 1gfecbda
198 assert len({md.get('IBLRIG_VERSION') for md in settings}) == 1 1gfecbda
199 # assert len({md['IBLRIG_VERSION_TAG'] for md in settings}) == 1
201 # query Alyx endpoints for subject, error if not found
202 subject = self.assert_exists(subject, 'subjects') 1gfecbda
204 # look for a session from the same subject, same number on the same day
205 session_id, session = self.one.search(subject=subject['nickname'], 1gfecbda
206 date_range=date,
207 number=number,
208 details=True, query_type='remote')
209 users = [] 1gfecbda
210 for user in filter(None, map(lambda x: x.get('PYBPOD_CREATOR'), settings)): 1gfecbda
211 user = self.assert_exists(user[0], 'users') # user is list of [username, uuid] 1gfecbda
212 users.append(user['username']) 1gfecbda
214 # extract information about session duration and performance
215 start_time, end_time = _get_session_times(str(ses_path), settings, task_data) 1gfecbda
216 n_trials, n_correct_trials = _get_session_performance(settings, task_data) 1gfecbda
218 # TODO Add task_protocols to Alyx sessions endpoint
219 task_protocols = [md['PYBPOD_PROTOCOL'] + md['IBLRIG_VERSION_TAG'] for md in settings] 1gfecbda
220 # unless specified label the session projects with subject projects
221 projects = subject['projects'] if projects is None else projects 1gfecbda
222 # makes sure projects is a list
223 projects = [projects] if isinstance(projects, str) else projects 1gfecbda
225 # unless specified label the session procedures with task protocol lookup
226 procedures = procedures or list(set(filter(None, map(self._alyx_procedure_from_task, task_protocols)))) 1gfecbda
227 procedures = [procedures] if isinstance(procedures, str) else procedures 1gfecbda
228 json_fields_names = ['IS_MOCK', 'IBLRIG_VERSION'] 1gfecbda
229 json_field = {k: settings[0].get(k) for k in json_fields_names} 1gfecbda
230 # The poo count field is only updated if the field is defined in at least one of the settings
231 poo_counts = [md.get('POOP_COUNT') for md in settings if md.get('POOP_COUNT') is not None] 1gfecbda
232 if poo_counts: 1gfecbda
233 json_field['POOP_COUNT'] = int(sum(poo_counts)) 1cbda
235 if not session: # Create session and weighings 1gfecbda
236 ses_ = {'subject': subject['nickname'], 1gfebda
237 'users': users or [subject['responsible_user']],
238 'location': settings[0]['PYBPOD_BOARD'],
239 'procedures': procedures,
240 'lab': subject['lab'],
241 'projects': projects,
242 'type': 'Experiment',
243 'task_protocol': '/'.join(task_protocols),
244 'number': number,
245 'start_time': self.ensure_ISO8601(start_time),
246 'end_time': self.ensure_ISO8601(end_time) if end_time else None,
247 'n_correct_trials': n_correct_trials,
248 'n_trials': n_trials,
249 'json': json_field
250 }
251 session = self.one.alyx.rest('sessions', 'create', data=ses_) 1gfebda
252 # Submit weights
253 for md in filter(lambda md: md.get('SUBJECT_WEIGHT') is not None, settings): 1gfebda
254 user = md.get('PYBPOD_CREATOR') 1gfebda
255 if isinstance(user, list): 1gfebda
256 user = user[0] 1gfebda
257 if user not in users: 1gfebda
258 user = self.one.alyx.user
259 self.register_weight(subject['nickname'], md['SUBJECT_WEIGHT'], 1gfebda
260 date_time=md['SESSION_DATETIME'], user=user)
261 else: # if session exists update the JSON field
262 session = self.one.alyx.rest('sessions', 'read', id=session_id[0], no_cache=True) 1c
263 self.one.alyx.json_field_update('sessions', session['id'], data=json_field) 1c
265 _logger.info(session['url'] + ' ') 1gfecbda
266 # create associated water administration if not found
267 if not session['wateradmin_session_related'] and any(task_data): 1gfecbda
268 for md, d in filter(all, zip(settings, task_data)): 1cbda
269 _, _end_time = _get_session_times(ses_path, md, d) 1cbda
270 user = md.get('PYBPOD_CREATOR') 1cbda
271 user = user[0] if user[0] in users else self.one.alyx.user 1cbda
272 volume = d[-1].get('water_delivered', sum(x['reward_amount'] for x in d)) / 1000 1cbda
273 if volume > 0: 1cbda
274 self.register_water_administration( 1cbda
275 subject['nickname'], volume, date_time=_end_time or end_time, user=user,
276 session=session['id'], water_type=md.get('REWARD_TYPE') or 'Water')
277 # at this point the session has been created. If create only, exit
278 if not file_list: 1gfecbda
279 return session, None 1gcbda
281 # register all files that match the Alyx patterns and file_list
282 rename_files_compatibility(ses_path, settings[0]['IBLRIG_VERSION_TAG']) 1fe
283 F = filter(lambda x: self._register_bool(x.name, file_list), self.find_files(ses_path)) 1fe
284 recs = self.register_files(F, created_by=users[0] if users else None, versions=ibllib.__version__) 1fe
285 return session, recs 1fe
287 @staticmethod
288 def _register_bool(fn, file_list):
289 if isinstance(file_list, bool): 1fe
290 return file_list 1fe
291 if isinstance(file_list, str):
292 file_list = [file_list]
293 return any(str(fil) in fn for fil in file_list)
295 @staticmethod
296 def _alyx_procedure_from_task(task_protocol):
297 task_type = ibllib.io.extractors.base.get_task_extractor_type(task_protocol) 1gemba
298 procedure = _alyx_procedure_from_task_type(task_type) 1gemba
299 return procedure or [] 1gemba
301 def find_files(self, session_path):
302 """Similar to base class method but further filters by name and extension.
304 In addition to finding files that match Excludes files
305 whose extension is in EXCLUDED_EXTENSIONS, or that don't match the patterns in
306 REGISTRATION_GLOB_PATTERNS.
308 Parameters
309 ----------
310 session_path : str, pathlib.Path
311 The session path to search.
313 Yields
314 -------
315 pathlib.Path
316 File paths that match the dataset type patterns in Alyx and registration glob patterns.
317 """
318 files = itertools.chain.from_iterable(session_path.glob(x) for x in REGISTRATION_GLOB_PATTERNS) 1febia
319 for file in filter(lambda x: x.suffix not in EXCLUDED_EXTENSIONS, files): 1febia
320 try: 1febia
321 get_dataset_type(file, self.dtypes) 1febia
322 yield file 1febia
323 except ValueError as ex: 1ba
324 _logger.error(ex) 1ba
327def _alyx_procedure_from_task_type(task_type):
328 lookup = {'biased': 'Behavior training/tasks', 1gemba
329 'biased_opto': 'Behavior training/tasks',
330 'habituation': 'Behavior training/tasks',
331 'training': 'Behavior training/tasks',
332 'ephys': 'Ephys recording with acute probe(s)',
333 'ephys_biased_opto': 'Ephys recording with acute probe(s)',
334 'ephys_passive_opto': 'Ephys recording with acute probe(s)',
335 'ephys_replay': 'Ephys recording with acute probe(s)',
336 'ephys_training': 'Ephys recording with acute probe(s)',
337 'mock_ephys': 'Ephys recording with acute probe(s)',
338 'sync_ephys': 'Ephys recording with acute probe(s)'}
339 try: 1gemba
340 # look if there are tasks in the personal projects repo with procedures
341 import projects.base 1gemba
342 custom_tasks = Path(projects.base.__file__).parent.joinpath('task_type_procedures.json') 1gemba
343 with open(custom_tasks) as fp: 1gemba
344 lookup.update(json.load(fp)) 1gemba
345 except (ModuleNotFoundError, FileNotFoundError):
346 pass
347 if task_type in lookup: 1gemba
348 return lookup[task_type] 1gemba
351def rename_files_compatibility(ses_path, version_tag):
352 if not version_tag: 1fe
353 return
354 if parse_version(version_tag) <= parse_version('3.2.3'): 1fe
355 task_code = ses_path.glob('**/_ibl_trials.iti_duration.npy')
356 for fn in task_code:
357 fn.replace(fn.parent.joinpath('_ibl_trials.itiDuration.npy'))
358 task_code = ses_path.glob('**/_iblrig_taskCodeFiles.raw.zip') 1fe
359 for fn in task_code: 1fe
360 fn.replace(fn.parent.joinpath('_iblrig_codeFiles.raw.zip'))
363def _get_session_times(fn, md, ses_data):
364 """
365 Get session start and end time from the Bpod data.
367 Parameters
368 ----------
369 fn : str, pathlib.Path
370 Session/task identifier. Only used in warning logs.
371 md : dict, list of dict
372 A session parameters dictionary or list thereof.
373 ses_data : dict, list of dict
374 A session data dictionary or list thereof.
376 Returns
377 -------
378 datetime.datetime
379 The datetime of the start of the session.
380 datetime.datetime
381 The datetime of the end of the session, or None is ses_data is None.
382 """
383 if isinstance(md, dict): 1gfecbdjan
384 start_time = _start_time = isostr2date(md['SESSION_DATETIME']) 1cbdjan
385 else:
386 start_time = isostr2date(md[0]['SESSION_DATETIME']) 1gfecbdja
387 _start_time = isostr2date(md[-1]['SESSION_DATETIME']) 1gfecbdja
388 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1gfecbdja
389 assert len(md) == 1 or start_time < _start_time 1gfecbdja
390 ses_data = ses_data[-1] 1gfecbdja
391 if not ses_data: 1gfecbdjan
392 return start_time, None 1gfe
393 c = ses_duration_secs = 0 1cbdjan
394 for sd in reversed(ses_data): 1cbdjan
395 ses_duration_secs = (sd['behavior_data']['Trial end timestamp'] - 1cbdjan
396 sd['behavior_data']['Bpod start timestamp'])
397 if ses_duration_secs < (6 * 3600): 1cbdjan
398 break 1cbdjan
399 c += 1 1a
400 if c: 1cbdjan
401 _logger.warning(('Trial end timestamps of last %i trials above 6 hours ' 1a
402 '(most likely corrupt): %s'), c, str(fn))
403 end_time = _start_time + datetime.timedelta(seconds=ses_duration_secs) 1cbdjan
404 return start_time, end_time 1cbdjan
407def _get_session_performance(md, ses_data):
408 """
409 Get performance about the session from Bpod data.
410 Note: This does not support custom protocols.
412 Parameters
413 ----------
414 md : dict, list of dict
415 A session parameters dictionary or list thereof.
416 ses_data : dict, list of dict
417 A session data dictionary or list thereof.
419 Returns
420 -------
421 int
422 The total number of trials across protocols.
423 int
424 The total number of correct trials across protocols.
425 """
427 if not any(filter(None, ses_data or None)): 1gfecbdka
428 return None, None 1gfe
430 if isinstance(md, dict): 1cbdka
431 ses_data = [ses_data] 1k
432 md = [md] 1k
433 else:
434 assert isinstance(ses_data, (list, tuple)) and len(ses_data) == len(md) 1cbdka
436 n_trials = [] 1cbdka
437 n_correct = [] 1cbdka
438 for data, settings in filter(all, zip(ses_data, md)): 1cbdka
439 # In some protocols trials start from 0, in others, from 1
440 n = data[-1]['trial_num'] + int(data[0]['trial_num'] == 0) # +1 if starts from 0 1cbdka
441 n_trials.append(n) 1cbdka
442 # checks that the number of actual trials and labeled number of trials check out
443 assert len(data) == n, f'{len(data)} trials in data, however last trial number was {n}' 1cbdka
444 # task specific logic
445 if 'habituationChoiceWorld' in settings.get('PYBPOD_PROTOCOL', ''): 1cbdka
446 n_correct.append(0) 1k
447 else:
448 n_correct.append(data[-1].get('ntrials_correct', sum(x['trial_correct'] for x in data))) 1cbdka
450 return sum(n_trials), sum(n_correct) 1cbdka
453def get_local_data_repository(ac):
454 """
455 Get local data repo name from Globus client.
457 Parameters
458 ----------
459 ac : one.webclient.AlyxClient
460 An AlyxClient instance for querying data repositories.
462 Returns
463 -------
464 str
465 The (first) data repository associated with the local Globus endpoint ID.
466 """
467 try: 1hlcbia
468 assert ac 1hlcbia
469 globus_id = get_local_endpoint_id() 1hlcbia
470 except AssertionError:
471 return
473 data_repo = ac.rest('data-repository', 'list', globus_endpoint_id=globus_id) 1hlcbia
474 return next((da['name'] for da in data_repo), None) 1hlcbia
477def get_lab(session_path, alyx=None):
478 """
479 Get lab from a session path using the subject name.
481 On local lab servers, the lab name is not in the ALF path and the globus endpoint ID may be
482 associated with multiple labs, so lab name is fetched from the subjects endpoint.
484 Parameters
485 ----------
486 session_path : str, pathlib.Path
487 The session path from which to determine the lab name.
488 alyx : one.webclient.AlyxClient
489 An AlyxClient instance for querying data repositories.
491 Returns
492 -------
493 str
494 The lab name associated with the session path subject.
496 See Also
497 --------
498 one.remote.globus.get_lab_from_endpoint_id
499 """
500 alyx = alyx or AlyxClient() 1ocba
501 if not (ref := ConversionMixin.path2ref(session_path)): 1ocba
502 raise ValueError(f'Failed to parse session path: {session_path}') 1o
504 labs = [x['lab'] for x in alyx.rest('subjects', 'list', nickname=ref['subject'])] 1ocba
505 if len(labs) == 0: 1ocba
506 raise alferr.AlyxSubjectNotFound(ref['subject']) 1o
507 elif len(labs) > 1: # More than one subject with this nickname 1ocba
508 # use local endpoint ID to find the correct lab
509 endpoint_labs = get_lab_from_endpoint_id(alyx=alyx) 1o
510 lab = next(x for x in labs if x in endpoint_labs) 1o
511 else:
512 lab, = labs 1ocba
514 return lab 1ocba