Coverage for ibllib/io/session_params.py: 87%
193 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1"""A module for handling experiment description files.
3Each device computer adds its piece of information and consolidates into the final acquisition
4description.
6The purpose is 3-fold:
7 - provide modularity in the extraction: the acquisition description allows to dynamically build
8 pipelines.
9 - assist the copying of the experimental data from device computers to the server computers, in
10 a way that each device is independent from another.
11 - assist the copying of the experimental data from device computers to the server computers, in
12 a way that intermediate states (failed copies) are easily recoverable from and completion
13 criteria (ie. session ready to extract) is objective and simple (all device files copied).
15INGRESS
16 - each device computer needs to know the session path on the server.
17 - create a device file locally in a queue directory. This will serve as a copy flag.
18 - copy the device file to the local server.
20EGRESS
21 - go through the queue and for each item:
22 - if the device file is not on the server create it.
23 - once copy is complete aggregate the qc from file.
24"""
25import yaml
26import time
27from datetime import datetime
28import logging
29from pathlib import Path
30from copy import deepcopy
32from one.converters import ConversionMixin
33from pkg_resources import parse_version
35import ibllib.pipes.misc as misc
38_logger = logging.getLogger(__name__)
39SPEC_VERSION = '1.0.0'
42def write_yaml(file_path, data):
43 """
44 Write a device file. This is basically just a yaml dump that ensures the folder tree exists.
46 Parameters
47 ----------
48 file_path : pathlib.Path
49 The full path to the description yaml file to write to.
50 data : dict
51 The data to write to the yaml file.
53 """
54 file_path.parent.mkdir(exist_ok=True, parents=True) 1aNijkofGlc
55 with open(file_path, 'w') as fp: 1aNijkofGlc
56 yaml.safe_dump(data, fp) 1aNijkofGlc
59def _patch_file(data: dict) -> dict:
60 """
61 Update older description data to conform to the most recent specification.
63 Parameters
64 ----------
65 data : dict
66 The description yaml data.
68 Returns
69 -------
70 dict
71 The patched description data.
72 """
73 if data and (v := data.get('version', '0')) != SPEC_VERSION: 2a d pbW N K M mbnbi j k o b e X p q r s f g I t h u O P Q R m S T G obl 1 c H
74 if parse_version(v) > parse_version(SPEC_VERSION): 2d pbW X
75 _logger.warning('Description file generated by more recent code') 2pb
76 elif parse_version(v) <= parse_version('0.1.0'): 1dWX
77 # Change tasks key from dict to list of dicts
78 if 'tasks' in data and isinstance(data['tasks'], dict): 1dWX
79 data['tasks'] = [{k: v} for k, v in data['tasks'].copy().items()] 1X
80 data['version'] = SPEC_VERSION 2d pbW X
81 return data 2a d pbW N K M mbnbi j k o b e X p q r s f g I t h u O P Q R m S T G obl 1 c H
84def write_params(session_path, data) -> Path:
85 """
86 Write acquisition description data to the session path.
88 Parameters
89 ----------
90 session_path : str, pathlib.Path
91 A session path containing an _ibl_experiment.description.yaml file.
92 data : dict
93 The acquisition description data to save
95 Returns
96 -------
97 pathlib.Path
98 The full path to the saved acquisition description.
99 """
100 yaml_file = Path(session_path).joinpath('_ibl_experiment.description.yaml') 1NfGlc
101 write_yaml(yaml_file, data) 1NfGlc
102 return yaml_file 1NfGlc
105def read_params(path) -> dict:
106 """
107 Load an experiment description file.
109 In addition to reading the yaml data, this functions ensures that the specification is the most
110 recent one. If the file is missing None is returned. If the file cannot be parsed an empty
111 dict is returned.
113 Parameters
114 ----------
115 path : pathlib.Path, str
116 The path to the description yaml file (or it's containing folder) to load.
118 Returns
119 -------
120 dict, None
121 The parsed yaml data, or None if the file was not found.
123 Examples
124 --------
125 # Load a session's _ibl_experiment.description.yaml file
127 >>> data = read_params('/home/data/subject/2020-01-01/001')
129 # Load a specific device's description file
131 >>> data = read_params('/home/data/subject/2020-01-01/001/_devices/behaviour.yaml')
133 """
134 if (path := Path(path)).is_dir(): 2a ab2 3 d W 4 qbN rbsb5 K M mbnbi j k o b U 6 7 8 9 ! v w x y z E F A B C D e X p q r s f g I t h u ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + O P Q R , m S T bbcbdbebfbgbtb- . / J hbib: ; = jbkblbY Z G 0 obl 1 c H
135 yaml_file = next(path.glob('_ibl_experiment.description*'), None) 2a ab2 3 W 4 qbN rbsb5 M mbnbi j k o b U 6 7 8 9 ! v w x y z E F A B C D e p q r s f g I t h u ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + O P Q R m S T bbcbdbebfbgbtb- . / J hbib: ; = jbkblbY Z G 0 obl 1 c H
136 else:
137 yaml_file = path if path.exists() else None 1adWNKMijkobeX,lc
138 if not yaml_file: 2a ab2 3 d W 4 qbN rbsb5 K M mbnbi j k o b U 6 7 8 9 ! v w x y z E F A B C D e X p q r s f g I t h u ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + O P Q R , m S T bbcbdbebfbgbtb- . / J hbib: ; = jbkblbY Z G 0 obl 1 c H
139 _logger.debug('Experiment description not found: %s', path) 2a ab2 3 4 qbrbsb5 U 6 7 8 9 ! v w x y z E F A B C D ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + , bbcbdbebfbgbtb- . / J hbib: ; = jbkblbY Z 0 l c H
140 return 2a ab2 3 4 qbrbsb5 U 6 7 8 9 ! v w x y z E F A B C D ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + , bbcbdbebfbgbtb- . / J hbib: ; = jbkblbY Z 0 l c H
142 with open(yaml_file, 'r') as fp: 2a d W N K M mbnbi j k o b e X p q r s f g I t h u O P Q R m S T G obl 1 c H
143 data = _patch_file(yaml.safe_load(fp) or {}) 2a d W N K M mbnbi j k o b e X p q r s f g I t h u O P Q R m S T G obl 1 c H
144 return data 2a d W N K M mbnbi j k o b e X p q r s f g I t h u O P Q R m S T G obl 1 c H
147def merge_params(a, b, copy=False):
148 """
149 Given two experiment descriptions, update first with fields in second.
151 Parameters
152 ----------
153 a : dict
154 An experiment description dictionary to be updated with fields from `b`.
155 b : dict
156 An experiment description dictionary to update `a` with
157 copy : bool
158 If true, return a deep copy of `a` instead of updating directly.
160 Returns
161 -------
162 dict
163 A merged dictionary consisting of fields from `a` and `b`.
164 """
165 if copy: 1dijkb
166 a = deepcopy(a)
167 for k in b: 1dijkb
168 if k == 'sync': 1dijkb
169 assert k not in a or a[k] == b[k], 'multiple sync fields defined' 1dij
170 if isinstance(b[k], list): 1dijkb
171 prev = a.get(k, []) 1dijkb
172 # For procedures and projects, remove duplicates
173 to_add = b[k] if k == 'tasks' else set(prev) ^ set(b[k]) 1dijkb
174 a[k] = prev + list(to_add) 1dijkb
175 elif isinstance(b[k], dict): 1dijkb
176 a[k] = {**a.get(k, {}), **b[k]} 1dijb
177 else: # A string
178 a[k] = b[k] 1dijkb
179 return a 1dijkb
182def aggregate_device(file_device, file_acquisition_description, unlink=False):
183 """
184 Add the contents of a device file to the main acquisition description file.
186 Parameters
187 ----------
188 file_device : pathlib.Path
189 The full path to the device yaml file to add to the main description file.
190 file_acquisition_description : pathlib.Path
191 The full path to the main acquisition description yaml file to add the device file to.
192 unlink : bool
193 If True, the device file is removed after successfully aggregation.
195 Returns
196 -------
197 dict
198 The aggregated experiment description data.
200 Raises
201 ------
202 AssertionError
203 Device file contains a main 'sync' key that is already present in the main description
204 file. For an experiment only one main sync device is allowed.
205 """
206 # if a lock file exists retries 5 times to see if it exists
207 attempts = 0 1dijkb
208 file_lock = file_acquisition_description.with_suffix('.lock') 1dijkb
209 # reads in the partial device data
210 data_device = read_params(file_device) 1dijkb
212 if not data_device: 1dijkb
213 _logger.warning('empty device file "%s"', file_device) 1d
214 return 1d
216 while True: 1dijkb
217 if not file_lock.exists() or attempts >= 4: 1dijkb
218 break 1dijkb
219 _logger.info('file lock found, waiting 2 seconds %s', file_lock) 1d
220 time.sleep(2) 1d
221 attempts += 1 1d
223 # if the file still exists after 5 attempts, remove it as it's a job that went wrong
224 if file_lock.exists(): 1dijkb
225 with open(file_lock, 'r') as fp: 1d
226 _logger.debug('file lock contents: %s', yaml.safe_load(fp)) 1d
227 _logger.info('stale file lock found, deleting %s', file_lock) 1d
228 file_lock.unlink() 1d
230 # add in the lock file, add some meta data to ease debugging if one gets stuck
231 with open(file_lock, 'w') as fp: 1dijkb
232 yaml.safe_dump(dict(datetime=datetime.utcnow().isoformat(), file_device=str(file_device)), fp) 1dijkb
234 # if the acquisition description file already exists, read in the yaml content
235 if file_acquisition_description.exists(): 1dijkb
236 acq_desc = read_params(file_acquisition_description) 1dijkb
237 else:
238 acq_desc = {} 1db
240 # merge the dictionaries (NB: acq_desc modified in place)
241 acq_desc = merge_params(acq_desc, data_device) 1dijkb
243 with open(file_acquisition_description, 'w') as fp: 1dijkb
244 yaml.safe_dump(acq_desc, fp) 1dijkb
246 # unlink the local file
247 file_lock.unlink() 1dijkb
248 # delete the original file if necessary
249 if unlink: 1dijkb
250 file_device.unlink() 1dijkb
251 stub_folder = file_acquisition_description.with_name('_devices') 1dijkb
252 if stub_folder.exists() and not any(stub_folder.glob('*.*')): 1dijkb
253 stub_folder.rmdir() 1b
255 return acq_desc 1dijkb
258def get_cameras(sess_params):
259 devices = sess_params.get('devices', {}) 1n
260 cameras = devices.get('cameras', None) 1n
261 return None if not cameras else list(cameras.keys()) 1n
264def get_sync_label(sess_params):
265 if not sess_params: 2a ab2 3 4 5 M U 6 7 8 9 ! v w x y z E F A B C D e n p q r s f g I t h u ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + O P Q R , m S T bbcbdbebfbgb- . / J hbib: ; = jbkblbY Z G 0 l c H
266 return None 2a ab2 3 4 5 U 6 7 8 9 ! v w x y z E F A B C D ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + , bbcbdbebfbgb- . / J hbib: ; = jbkblbY Z 0
267 sync_keys = list((sess_params.get('sync') or {}).keys()) 1aMenpqrsfgIthuOPQRmSTGlcH
268 if len(sync_keys) == 0: 1aMenpqrsfgIthuOPQRmSTGlcH
269 return None
270 if len(sync_keys) > 1: 1aMenpqrsfgIthuOPQRmSTGlcH
271 _logger.warning('Multiple sync keys found in experiment description: %s', sync_keys)
272 return sync_keys[0] 1aMenpqrsfgIthuOPQRmSTGlcH
275def get_sync(sess_params):
276 sync_label = get_sync_label(sess_params) 2abe Y Z G 0 c
277 if sync_label: 2abe Y Z G 0 c
278 return sync_label, sess_params['sync'][sync_label] or {} 1eGc
279 return None, {} 2abY Z 0
282def get_sync_values(sess_params):
283 key = get_sync_label(sess_params) 2a 2 3 4 5 M U 6 7 8 9 ! v w x y z E F A B C D e n p q r s f g I t h u ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + O P Q R , m S T bbcbdbebfbgb- . / J hbib: ; = jbkblbY Z G 0 l c H
284 if key: 2a 2 3 4 5 M U 6 7 8 9 ! v w x y z E F A B C D e n p q r s f g I t h u ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + O P Q R , m S T bbcbdbebfbgb- . / J hbib: ; = jbkblbY Z G 0 l c H
285 return sess_params['sync'][key] 1aMenpqrsfgIthuOPQRmSTGlcH
288def get_sync_collection(sess_params):
289 return (get_sync_values(sess_params) or {}).get('collection') 1a2345MU6789!vwxyzABCDenpqrsfgIthu#$%'()*+OPQR,ST-./J:;=lcH
292def get_sync_extension(sess_params):
293 return (get_sync_values(sess_params) or {}).get('extension') 2a 2 3 4 5 M U 6 7 8 9 ! v w x y z E F A B C D e n p q r s f g I t h u ? @ # [ ] $ ^ % _ ` ' { | } ~ ( ) * + O P Q R , m S T bbcbdbebfbgb- . / J hbib: ; = jbkblbl c H
296def get_sync_namespace(sess_params):
297 return (get_sync_values(sess_params) or {}).get('acquisition_software') 1a2345MU6789!vwxyzABCDenpqrsfgIthu?@#[]$^%_`'{|}~()*+OPQR,ST-./J:;=YZG0lcH
300def get_task_protocol(sess_params, task_collection=None):
301 """
302 Fetch the task protocol from an experiment description dict.
304 Parameters
305 ----------
306 sess_params : dict
307 The loaded experiment.description file.
308 task_collection : str, optional
309 Return the protocol that corresponds to this collection (returns the first matching
310 protocol in the list). If None, all protocols are returned.
312 Returns
313 -------
314 str, set, None
315 If task_collection is None, returns the set of task protocols, otherwise returns the first
316 protocol that corresponds to the collection, or None if collection not present.
317 """
318 collections = get_collections({'tasks': sess_params.get('tasks')}) 1vwxyzEFABCDenfghmJc
319 if task_collection is None: 1vwxyzEFABCDenfghmJc
320 return set(collections.keys()) # Return all protocols 1n
321 else:
322 return next((k for k, v in collections.items() if v == task_collection), None) 1vwxyzEFABCDenfghmJc
325def get_task_collection(sess_params, task_protocol=None):
326 """
327 Fetch the task collection from an experiment description dict.
329 Parameters
330 ----------
331 sess_params : dict
332 The loaded experiment.description file.
333 task_protocol : str, optional
334 Return the collection that corresponds to this protocol (returns the first matching
335 protocol in the list). If None, all collections are returned.
337 Returns
338 -------
339 str, set, None
340 If task_protocol is None, returns the set of collections, otherwise returns the first
341 collection that corresponds to the protocol, or None if protocol not present.
343 Notes
344 -----
345 - The order of the set may not be the same as the descriptions tasks order when iterating.
346 """
347 protocols = sess_params.get('tasks', []) 1NUenfgh1c
348 if task_protocol is not None: 1NUenfgh1c
349 task = next((x for x in protocols if task_protocol in x), None) 1n
350 return (task.get(task_protocol) or {}).get('collection') 1n
351 else: # Return set of all task collections
352 cset = set(filter(None, (next(iter(x.values()), {}).get('collection') for x in protocols))) 1NUenfgh1c
353 return (next(iter(cset)) if len(cset) == 1 else cset) or None 1NUenfgh1c
356def get_task_protocol_number(sess_params, task_protocol=None):
357 """
358 Fetch the task protocol number from an experiment description dict.
360 Parameters
361 ----------
362 sess_params : dict
363 The loaded experiment.description file.
364 task_protocol : str, optional
365 Return the number that corresponds to this protocol (returns the first matching
366 protocol in the list). If None, all numbers are returned.
368 Returns
369 -------
370 str, list, None
371 If task_protocol is None, returns list of all numbers, otherwise returns the first
372 number that corresponds to the protocol, or None if protocol not present.
373 """
374 protocols = sess_params.get('tasks', []) 1avwxyzEFABCDepqrsgthumlc
375 if task_protocol is not None: 1avwxyzEFABCDepqrsgthumlc
376 task = next((x for x in protocols if task_protocol in x), None) 1aepqrsgthumlc
377 number = (task.get(task_protocol) or {}).get('protocol_number') 1aepqrsgthumlc
378 return int(number) if isinstance(number, str) else number 1aepqrsgthumlc
379 else: # Return set of all task numbers
380 numbers = list(filter(None, (next(iter(x.values()), {}).get('protocol_number') for x in protocols))) 1vwxyzEFABCD
381 numbers = [int(n) if isinstance(n, str) else n for n in numbers] 1vwxyzEFABCD
382 return (next(iter(numbers)) if len(numbers) == 1 else numbers) or None 1vwxyzEFABCD
385def get_collections(sess_params, flat=False):
386 """
387 Find all collections associated with the session.
389 Parameters
390 ----------
391 sess_params : dict
392 The loaded experiment description map.
393 flat : bool (False)
394 If True, return a flat list of unique collections, otherwise return a map of device/sync/task
396 Returns
397 -------
398 dict[str, str]
399 A map of device/sync/task and the corresponding collection name.
401 list[str]
402 A flat list of unique collection names.
404 Notes
405 -----
406 - Assumes only the following data types contained: list, dict, None, str.
407 """
408 collection_map = {} 1VLKobvwxyzEFABCDenfghmJc
410 def iter_dict(d): 1VLKobvwxyzEFABCDenfghmJc
411 for k, v in d.items(): 1VLKobvwxyzEFABCDenfghmJc
412 if isinstance(v, list): 1VLKobvwxyzEFABCDenfghmJc
413 for d in filter(lambda x: isinstance(x, dict), v): 1VLobenfghmc
414 iter_dict(d) 1VLobenfghmc
415 elif isinstance(v, dict) and 'collection' in v: 1VLKobvwxyzEFABCDenfghmJc
416 print(k) 1VLKobenfghmc
417 # if the key already exists, append the collection name to the list
418 if k in collection_map: 1VLKobenfghmc
419 clist = collection_map[k] if isinstance(collection_map[k], list) else [collection_map[k]] 1L
420 collection_map[k] = list(set(clist + [v['collection']])) 1L
421 else:
422 collection_map[k] = v['collection'] 1VLKobenfghmc
423 elif isinstance(v, dict): 1VKobvwxyzEFABCDJ
424 iter_dict(v) 1VKob
426 iter_dict(sess_params) 1VLKobvwxyzEFABCDenfghmJc
427 if flat: 1VLKobvwxyzEFABCDenfghmJc
428 cflat = [] 1L
429 for k, v in collection_map.items(): 1L
430 if isinstance(v, list): 1L
431 cflat.extend(v) 1L
432 else:
433 cflat.append(v) 1L
434 return list(set(cflat)) 1L
435 else:
436 return collection_map 1VLKobvwxyzEFABCDenfghmJc
439def get_video_compressed(sess_params):
440 videos = sess_params.get('devices', {}).get('cameras', None) 1aepqrsfgIthulcH
441 if not videos: 1aepqrsfgIthulcH
442 return None
444 # This is all or nothing, assumes either all videos or not compressed
445 for key, vals in videos.items(): 1aepqrsfgIthulcH
446 compressed = vals.get('compressed', False) 1aepqrsfgIthulcH
448 return compressed 1aepqrsfgIthulcH
451def get_remote_stub_name(session_path, device_id=None):
452 """
453 Get or create device specific file path for the remote experiment.description stub.
455 Parameters
456 ----------
457 session_path : pathlib.Path
458 A remote session path.
459 device_id : str, optional
460 A device name, if None the TRANSFER_LABEL parameter is used (defaults to this device's
461 hostname with a unique numeric ID)
463 Returns
464 -------
465 pathlib.Path
466 The full file path to the remote experiment description stub.
468 Example
469 -------
470 >>> get_remote_stub_name(Path.home().joinpath('subject', '2020-01-01', '001'), 'host-123')
471 Path.home() / 'subject/2020-01-01/001/_devices/2020-01-01_1_subject@host-123.yaml'
472 """
473 device_id = device_id or misc.create_basic_transfer_params()['TRANSFER_LABEL'] 1ob
474 exp_ref = '{date}_{sequence:d}_{subject:s}'.format(**ConversionMixin.path2ref(session_path)) 1ob
475 remote_filename = f'{exp_ref}@{device_id}.yaml' 1ob
476 return session_path / '_devices' / remote_filename 1ob
479def prepare_experiment(session_path, acquisition_description=None, local=None, remote=None, device_id=None, overwrite=False):
480 """
481 Copy acquisition description yaml to the server and local transfers folder.
483 Parameters
484 ----------
485 session_path : str, pathlib.Path, pathlib.PurePath
486 The RELATIVE session path, e.g. subject/2020-01-01/001.
487 acquisition_description : dict
488 The data to write to the experiment.description.yaml file.
489 local : str, pathlib.Path
490 The path to the local session folders.
491 >>> C:\iblrigv8_data\cortexlab\Subjects # noqa
492 remote : str, pathlib.Path
493 The path to the remote server session folders.
494 >>> Y:\Subjects # noqa
495 device_id : str, optional
496 A device name, if None the TRANSFER_LABEL parameter is used (defaults to this device's
497 hostname with a unique numeric ID)
498 overwrite : bool
499 If true, overwrite any existing file with the new one, otherwise, update the existing file.
500 """
501 if not acquisition_description:
502 return
504 # Determine if user passed in arg for local/remote subject folder locations or pull in from
505 # local param file or prompt user if missing data.
506 if local is None or remote is None or device_id is None:
507 params = misc.create_basic_transfer_params(local_data_path=local, remote_data_path=remote, TRANSFER_LABEL=device_id)
508 local, device_id = (params['DATA_FOLDER_PATH'], params['TRANSFER_LABEL'])
509 # if the user provides False as an argument, it means the intent is to not copy anything, this
510 # won't be preserved by create_basic_transfer_params by default
511 remote = False if remote is False else params['REMOTE_DATA_FOLDER_PATH']
513 # THis is in the docstring but still, if the session Path is absolute, we need to make it relative
514 if Path(session_path).is_absolute():
515 session_path = Path(*session_path.parts[-3:])
517 # First attempt to copy to server
518 if remote is not False:
519 remote_session_path = Path(remote).joinpath(session_path)
520 remote_device_path = get_remote_stub_name(remote_session_path, device_id=device_id)
521 previous_description = read_params(remote_device_path) if remote_device_path.exists() and not overwrite else {}
522 try:
523 write_yaml(remote_device_path, merge_params(previous_description, acquisition_description))
524 _logger.info(f'Written data to remote device at: {remote_device_path}.')
525 except Exception as ex:
526 _logger.warning(f'Failed to write data to remote device at: {remote_device_path}. \n {ex}')
528 # then create on the local machine
529 filename = f'_ibl_experiment.description_{device_id}.yaml'
530 local_device_path = Path(local).joinpath(session_path, filename)
531 previous_description = read_params(local_device_path) if local_device_path.exists() and not overwrite else {}
532 write_yaml(local_device_path, merge_params(previous_description, acquisition_description))
533 _logger.info(f'Written data to local session at : {local_device_path}.')