Coverage for ibllib/io/session_params.py: 98%
165 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""A module for handling experiment description files.
3Each device computer adds its piece of information and consolidates into the final acquisition
4description.
6The purpose is 3-fold:
7 - provide modularity in the extraction: the acquisition description allows to dynamically build
8 pipelines.
9 - assist the copying of the experimental data from device computers to the server computers, in
10 a way that each device is independent from another.
11 - assist the copying of the experimental data from device computers to the server computers, in
12 a way that intermediate states (failed copies) are easily recoverable from and completion
13 criteria (ie. session ready to extract) is objective and simple (all device files copied).
15INGRESS
16 - each device computer needs to know the session path on the server.
17 - create a device file locally in a queue directory. This will serve as a copy flag.
18 - copy the device file to the local server.
20EGRESS
21 - go through the queue and for each item:
22 - if the device file is not on the server create it.
23 - once copy is complete aggregate the qc from file.
24"""
25import yaml
26import time
27from datetime import datetime
28import logging
29from pathlib import Path
30from copy import deepcopy
32from one.converters import ConversionMixin
33from iblutil.util import flatten
34from packaging import version
36import ibllib.pipes.misc as misc
39_logger = logging.getLogger(__name__)
40SPEC_VERSION = '1.0.0'
43def write_yaml(file_path, data):
44 """
45 Write a device file. This is basically just a yaml dump that ensures the folder tree exists.
47 Parameters
48 ----------
49 file_path : pathlib.Path
50 The full path to the description yaml file to write to.
51 data : dict
52 The data to write to the yaml file.
54 """
55 file_path.parent.mkdir(exist_ok=True, parents=True) 2a V dbi j k r f N c
56 with open(file_path, 'w') as fp: 2a V dbi j k r f N c
57 yaml.safe_dump(data, fp) 2a V dbi j k r f N c
60def _patch_file(data: dict) -> dict:
61 """
62 Update older description data to conform to the most recent specification.
64 Parameters
65 ----------
66 data : dict
67 The description yaml data.
69 Returns
70 -------
71 dict
72 The patched description data.
73 """
74 if data and (v := data.get('version', '0')) != SPEC_VERSION: 2a T d Gb5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb
75 if version.parse(v) > version.parse(SPEC_VERSION): 2d Gb5 6
76 _logger.warning('Description file generated by more recent code') 2Gb
77 elif version.parse(v) <= version.parse('0.1.0'): 1d56
78 # Change tasks key from dict to list of dicts
79 if 'tasks' in data and isinstance(data['tasks'], dict): 1d56
80 data['tasks'] = [{k: v} for k, v in data['tasks'].copy().items()] 16
81 data['version'] = SPEC_VERSION 2d Gb5 6
82 return data 2a T d Gb5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb
85def write_params(session_path, data) -> Path:
86 """
87 Write acquisition description data to the session path.
89 Parameters
90 ----------
91 session_path : str, pathlib.Path
92 A session path containing an _ibl_experiment.description.yaml file.
93 data : dict
94 The acquisition description data to save
96 Returns
97 -------
98 pathlib.Path
99 The full path to the saved acquisition description.
100 """
101 yaml_file = Path(session_path).joinpath('_ibl_experiment.description.yaml') 2V dbf N c
102 write_yaml(yaml_file, data) 2V dbf N c
103 return yaml_file 2V dbf N c
106def read_params(path) -> dict:
107 """
108 Load an experiment description file.
110 In addition to reading the yaml data, this functions ensures that the specification is the most
111 recent one. If the file is missing None is returned. If the file cannot be parsed an empty
112 dict is returned.
114 Parameters
115 ----------
116 path : pathlib.Path, str
117 The path to the description yaml file (or it's containing folder) to load.
119 Returns
120 -------
121 dict, None
122 The parsed yaml data, or None if the file was not found.
124 Examples
125 --------
126 # Load a session's _ibl_experiment.description.yaml file
128 >>> data = read_params('/home/data/subject/2020-01-01/001')
130 # Load a specific device's description file
132 >>> data = read_params('/home/data/subject/2020-01-01/001/_devices/behaviour.yaml')
134 """
135 if (path := Path(path)).is_dir(): 2a qb% 3 p l n q ' ( T d 5 ) * HbV IbdbJb+ W U DbEbi j k r b 4 , - . / : y z A B C O P Q D E F G e 6 s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 FbH I J K $ c pbL M
136 yaml_file = next(path.glob('_ibl_experiment.description*'), None) 2a qbp l n q ' ( T 5 * HbV IbdbJb+ U DbEbi j k r b 4 , - . / : y z A B C O P Q D E F G e s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 m 1 2 rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 FbH I J K $ c pbL M
137 else:
138 yaml_file = path if path.exists() else None 1a%3d5)VWUijkrbe6`c
139 if not yaml_file: 2a qb% 3 p l n q ' ( T d 5 ) * HbV IbdbJb+ W U DbEbi j k r b 4 , - . / : y z A B C O P Q D E F G e 6 s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 FbH I J K $ c pbL M
140 _logger.debug('Experiment description not found: %s', path) 2a qb% 3 p l n q ' ( T ) * HbIbdbJb+ 4 , - . / : y z A B C O P Q D E F G ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ ` rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 9 H I J K c pbL M
141 return 2a qb% 3 p l n q ' ( T ) * HbIbdbJb+ 4 , - . / : y z A B C O P Q D E F G ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ ` rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 9 H I J K c pbL M
143 with open(yaml_file, 'r') as fp: 2a T d 5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb
144 data = _patch_file(yaml.safe_load(fp) or {}) 2a T d 5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb
145 return data 2a T d 5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb
148def merge_params(a, b, copy=False):
149 """
150 Given two experiment descriptions, update first with fields in second.
152 Parameters
153 ----------
154 a : dict
155 An experiment description dictionary to be updated with fields from `b`.
156 b : dict
157 An experiment description dictionary to update `a` with
158 copy : bool
159 If true, return a deep copy of `a` instead of updating directly.
161 Returns
162 -------
163 dict
164 A merged dictionary consisting of fields from `a` and `b`.
165 """
166 if copy: 2d Cbi j k b
167 a = deepcopy(a) 2Cb
168 for k in b: 2d Cbi j k b
169 if k == 'sync': 2d Cbi j k b
170 assert k not in a or a[k] == b[k], 'multiple sync fields defined' 2d Cbi j
171 if isinstance(b[k], list): 2d Cbi j k b
172 prev = list(a.get(k, [])) 2d Cbi j k b
173 # For procedures and projects, remove duplicates
174 to_add = b[k] if k == 'tasks' else set(b[k]) - set(prev) 2d Cbi j k b
175 a[k] = prev + list(to_add) 2d Cbi j k b
176 elif isinstance(b[k], dict): 1dijkb
177 a[k] = {**a.get(k, {}), **b[k]} 1dijb
178 else: # A string
179 a[k] = b[k] 1dijkb
180 return a 2d Cbi j k b
183def aggregate_device(file_device, file_acquisition_description, unlink=False):
184 """
185 Add the contents of a device file to the main acquisition description file.
187 Parameters
188 ----------
189 file_device : pathlib.Path
190 The full path to the device yaml file to add to the main description file.
191 file_acquisition_description : pathlib.Path
192 The full path to the main acquisition description yaml file to add the device file to.
193 unlink : bool
194 If True, the device file is removed after successfully aggregation.
196 Returns
197 -------
198 dict
199 The aggregated experiment description data.
201 Raises
202 ------
203 AssertionError
204 Device file contains a main 'sync' key that is already present in the main description
205 file. For an experiment only one main sync device is allowed.
206 """
207 # if a lock file exists retries 5 times to see if it exists
208 attempts = 0 1dijkb
209 file_lock = file_acquisition_description.with_suffix('.lock') 1dijkb
210 # reads in the partial device data
211 data_device = read_params(file_device) 1dijkb
213 if not data_device: 1dijkb
214 _logger.warning('empty device file "%s"', file_device) 1d
215 return 1d
217 while True: 1dijkb
218 if not file_lock.exists() or attempts >= 4: 1dijkb
219 break 1dijkb
220 _logger.info('file lock found, waiting 2 seconds %s', file_lock) 1d
221 time.sleep(2) 1d
222 attempts += 1 1d
224 # if the file still exists after 5 attempts, remove it as it's a job that went wrong
225 if file_lock.exists(): 1dijkb
226 with open(file_lock, 'r') as fp: 1d
227 _logger.debug('file lock contents: %s', yaml.safe_load(fp)) 1d
228 _logger.info('stale file lock found, deleting %s', file_lock) 1d
229 file_lock.unlink() 1d
231 # add in the lock file, add some meta data to ease debugging if one gets stuck
232 with open(file_lock, 'w') as fp: 1dijkb
233 yaml.safe_dump(dict(datetime=datetime.utcnow().isoformat(), file_device=str(file_device)), fp) 1dijkb
235 # if the acquisition description file already exists, read in the yaml content
236 if file_acquisition_description.exists(): 1dijkb
237 acq_desc = read_params(file_acquisition_description) 1dijkb
238 else:
239 acq_desc = {} 1db
241 # merge the dictionaries (NB: acq_desc modified in place)
242 acq_desc = merge_params(acq_desc, data_device) 1dijkb
244 with open(file_acquisition_description, 'w') as fp: 1dijkb
245 yaml.safe_dump(acq_desc, fp) 1dijkb
247 # unlink the local file
248 file_lock.unlink() 1dijkb
249 # delete the original file if necessary
250 if unlink: 1dijkb
251 file_device.unlink() 1dijkb
252 stub_folder = file_acquisition_description.with_name('_devices') 1dijkb
253 if stub_folder.exists() and not any(stub_folder.glob('*.*')): 1dijkb
254 stub_folder.rmdir() 1b
256 return acq_desc 1dijkb
259def get_cameras(sess_params):
260 devices = sess_params.get('devices', {}) 1o
261 cameras = devices.get('cameras', None) 1o
262 return None if not cameras else list(cameras.keys()) 1o
265def get_sync_label(sess_params):
266 if not sess_params: 2a qb% 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 H I J K c L M
267 return None 2a qb% 3 p l n q ' ( ) * + 4 , - . / : y z A B C O P Q D E F G ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ ` rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 9 H I J K L M
268 sync_keys = list((sess_params.get('sync') or {}).keys()) 1aTUeostuvfgRwhxXYZ0m12Nc
269 if len(sync_keys) == 0: 1aTUeostuvfgRwhxXYZ0m12Nc
270 return None
271 if len(sync_keys) > 1: 1aTUeostuvfgRwhxXYZ0m12Nc
272 _logger.warning('Multiple sync keys found in experiment description: %s', sync_keys)
273 return sync_keys[0] 1aTUeostuvfgRwhxXYZ0m12Nc
276def get_sync(sess_params):
277 sync_label = get_sync_label(sess_params) 2qbe 7 8 N 9 c
278 if sync_label: 2qbe 7 8 N 9 c
279 return sync_label, sess_params['sync'][sync_label] or {} 1eNc
280 return None, {} 2qb7 8 9
283def get_sync_values(sess_params):
284 key = get_sync_label(sess_params) 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 H I J K c L M
285 if key: 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 H I J K c L M
286 return sess_params['sync'][key] 1aTUeostuvfgRwhxXYZ0m12Nc
289def get_sync_collection(sess_params):
290 return (get_sync_values(sess_params) or {}).get('collection') 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C D E F G e o s t u v f g R w h x ; = ? @ [ ] ^ _ X Y Z 0 ` 1 2 { | } ~ S abbbcbc
293def get_sync_extension(sess_params):
294 return (get_sync_values(sess_params) or {}).get('extension') 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBbH I J K c L M
297def get_sync_namespace(sess_params):
298 return (get_sync_values(sess_params) or {}).get('acquisition_software') 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` 1 2 { | } ~ S abbbcb7 8 N 9 H I J K c L M
301def get_task_protocol(sess_params, task_collection=None):
302 """
303 Fetch the task protocol from an experiment description dict.
305 Parameters
306 ----------
307 sess_params : dict
308 The loaded experiment.description file.
309 task_collection : str, optional
310 Return the protocol that corresponds to this collection (returns the first matching
311 protocol in the list). If None, all protocols are returned.
313 Returns
314 -------
315 str, set, None
316 If task_collection is None, returns the set of task protocols, otherwise returns the first
317 protocol that corresponds to the collection, or None if collection not present.
318 """
319 collections = get_collections({'tasks': sess_params.get('tasks')}) 1plnqyzABCOPQDEFGeofghmSHIJKcLM
320 if task_collection is None: 1plnqyzABCOPQDEFGeofghmSHIJKcLM
321 if len(collections) == 0: 1plnqo
322 return None 1plnq
323 else:
324 return set(collections.keys()) # Return all protocols 1lo
325 else:
326 return next((k for k, v in collections.items() if v == task_collection), None) 1lyzABCOPQDEFGeofghmSHIJKcLM
329def get_task_collection(sess_params, task_protocol=None):
330 """
331 Fetch the task collection from an experiment description dict.
333 Parameters
334 ----------
335 sess_params : dict
336 The loaded experiment.description file.
337 task_protocol : str, optional
338 Return the collection that corresponds to this protocol (returns the first matching
339 protocol in the list). If None, all collections are returned.
341 Returns
342 -------
343 str, set, None
344 If task_protocol is None, returns the set of collections, otherwise returns the first
345 collection that corresponds to the protocol, or None if protocol not present.
347 Notes
348 -----
349 - The order of the set may not be the same as the descriptions tasks order when iterating.
350 """
351 protocols = sess_params.get('tasks', []) 13plnqV4eofgh$c
352 if task_protocol is not None: 13plnqV4eofgh$c
353 task = next((x for x in protocols if task_protocol in x), None) 1o
354 return (task.get(task_protocol) or {}).get('collection') 1o
355 else: # Return set of all task collections
356 cset = set(filter(None, (next(iter(x.values()), {}).get('collection') for x in protocols))) 13plnqV4eofgh$c
357 return (next(iter(cset)) if len(cset) == 1 else cset) or None 13plnqV4eofgh$c
360def get_task_protocol_number(sess_params, task_protocol=None):
361 """
362 Fetch the task protocol number from an experiment description dict.
364 Parameters
365 ----------
366 sess_params : dict
367 The loaded experiment.description file.
368 task_protocol : str, optional
369 Return the number that corresponds to this protocol (returns the first matching
370 protocol in the list). If None, all numbers are returned.
372 Returns
373 -------
374 str, list, None
375 If task_protocol is None, returns list of all numbers, otherwise returns the first
376 number that corresponds to the protocol, or None if protocol not present.
377 """
378 protocols = sess_params.get('tasks', []) 1aplnqyzABCOPQDEFGestuvgwhxmHIJKcLM
379 if task_protocol is not None: 1aplnqyzABCOPQDEFGestuvgwhxmHIJKcLM
380 task = next((x for x in protocols if task_protocol in x), None) 1anestuvgwhxmc
381 number = (task.get(task_protocol) or {}).get('protocol_number') 1anestuvgwhxmc
382 return int(number) if isinstance(number, str) else number 1anestuvgwhxmc
383 else: # Return set of all task numbers
384 numbers = list(filter(None, (next(iter(x.values()), {}).get('protocol_number') for x in protocols))) 1plnqyzABCOPQDEFGHIJKLM
385 numbers = [int(n) if isinstance(n, str) else n for n in numbers] 1plnqyzABCOPQDEFGHIJKLM
386 return (next(iter(numbers)) if len(numbers) == 1 else numbers) or None 1plnqyzABCOPQDEFGHIJKLM
389def get_collections(sess_params, flat=False):
390 """
391 Find all collections associated with the session.
393 Parameters
394 ----------
395 sess_params : dict
396 The loaded experiment description map.
397 flat : bool (False)
398 If True, return a flat set of collections, otherwise return a map of device/sync/task
400 Returns
401 -------
402 dict[str, str]
403 A map of device/sync/task and the corresponding collection name.
405 set[str]
406 A set of unique collection names.
408 Notes
409 -----
410 - Assumes only the following data types contained: list, dict, None, str.
411 """
412 collection_map = {} 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM
414 def iter_dict(d): 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM
415 for k, v in d.items(): 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM
416 if isinstance(v, list): 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM
417 for d in filter(lambda x: isinstance(x, dict), v): 1l!#rbeofghmc
418 iter_dict(d) 1l!#rbeofghmc
419 elif isinstance(v, dict) and 'collection' in v: 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM
420 # if the key already exists, append the collection name to the list
421 if k in collection_map: 1l!#Wrbeofghmc
422 clist = collection_map[k] if isinstance(collection_map[k], list) else [collection_map[k]] 1#
423 collection_map[k] = list(set(clist + [v['collection']])) 1#
424 else:
425 collection_map[k] = v['collection'] 1l!#Wrbeofghmc
426 elif isinstance(v, dict): 1plnq!WrbyzABCOPQDEFGSHIJKLM
427 iter_dict(v) 1!Wrb
429 iter_dict(sess_params) 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM
430 return set(flatten(collection_map.values())) if flat else collection_map 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM
433def get_video_compressed(sess_params):
434 videos = sess_params.get('devices', {}).get('cameras', None) 1aestuvfgRwhxc
435 if not videos: 1aestuvfgRwhxc
436 return None
438 # This is all or nothing, assumes either all videos or not compressed
439 for key, vals in videos.items(): 1aestuvfgRwhxc
440 compressed = vals.get('compressed', False) 1aestuvfgRwhxc
442 return compressed 1aestuvfgRwhxc
445def get_remote_stub_name(session_path, device_id=None):
446 """
447 Get or create device specific file path for the remote experiment.description stub.
449 Parameters
450 ----------
451 session_path : pathlib.Path
452 A remote session path.
453 device_id : str, optional
454 A device name, if None the TRANSFER_LABEL parameter is used (defaults to this device's
455 hostname with a unique numeric ID)
457 Returns
458 -------
459 pathlib.Path
460 The full file path to the remote experiment description stub.
462 Example
463 -------
464 >>> get_remote_stub_name(Path.home().joinpath('subject', '2020-01-01', '001'), 'host-123')
465 Path.home() / 'subject/2020-01-01/001/_devices/2020-01-01_1_subject@host-123.yaml'
466 """
467 device_id = device_id or misc.create_basic_transfer_params()['TRANSFER_LABEL'] 1rb
468 exp_ref = '{date}_{sequence:d}_{subject:s}'.format(**ConversionMixin.path2ref(session_path)) 1rb
469 remote_filename = f'{exp_ref}@{device_id}.yaml' 1rb
470 return session_path / '_devices' / remote_filename 1rb