Coverage for ibllib/io/session_params.py: 95%
161 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1"""A module for handling experiment description files.
3Each device computer adds its piece of information and consolidates into the final acquisition
4description.
6The purpose is 3-fold:
7 - provide modularity in the extraction: the acquisition description allows to dynamically build
8 pipelines.
9 - assist the copying of the experimental data from device computers to the server computers, in
10 a way that each device is independent from another.
11 - assist the copying of the experimental data from device computers to the server computers, in
12 a way that intermediate states (failed copies) are easily recoverable from and completion
13 criteria (ie. session ready to extract) is objective and simple (all device files copied).
15INGRESS
16 - each device computer needs to know the session path on the server.
17 - create a device file locally in a queue directory. This will serve as a copy flag.
18 - copy the device file to the local server.
20EGRESS
21 - go through the queue and for each item:
22 - if the device file is not on the server create it.
23 - once copy is complete aggregate the qc from file.
24"""
25import yaml
26import uuid
27import logging
28import socket
29from pathlib import Path
30from itertools import chain
31from copy import deepcopy
33from one.converters import ConversionMixin
34from iblutil.util import flatten
35from iblutil.io.params import FileLock
36from packaging import version
38_logger = logging.getLogger(__name__)
39SPEC_VERSION = '1.0.0'
42def write_yaml(file_path, data):
43 """
44 Write a device file. This is basically just a yaml dump that ensures the folder tree exists.
46 Parameters
47 ----------
48 file_path : pathlib.Path
49 The full path to the description yaml file to write to.
50 data : dict
51 The data to write to the yaml file.
53 """
54 file_path.parent.mkdir(exist_ok=True, parents=True) 1aYSdN
55 with open(file_path, 'w') as fp: 1aYSdN
56 yaml.safe_dump(data, fp) 1aYSdN
59def _patch_file(data: dict) -> dict:
60 """
61 Update older description data to conform to the most recent specification.
63 Parameters
64 ----------
65 data : dict
66 The description yaml data.
68 Returns
69 -------
70 dict
71 The patched description data.
72 """
73 if data and (v := data.get('version', '0')) != SPEC_VERSION: 2a F c qb8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb
74 if version.parse(v) > version.parse(SPEC_VERSION): 2c qb8 S 9
75 _logger.warning('Description file generated by more recent code') 2qb
76 elif version.parse(v) <= version.parse('0.1.0'): 2c qb8 S 9
77 # Change tasks key from dict to list of dicts
78 if 'tasks' in data and isinstance(data['tasks'], dict): 2c qb8 S 9
79 data['tasks'] = [{k: v} for k, v in data['tasks'].copy().items()] 2qb9
80 data['version'] = SPEC_VERSION 2c qb8 S 9
81 # Ensure all items in tasks list are single value dicts
82 if 'tasks' in data: 2c qb8 S 9
83 data['tasks'] = [{k: v} for k, v in chain.from_iterable(map(dict.items, data['tasks']))] 2qb8 9
84 return data 2a F c qb8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb
87def write_params(session_path, data) -> Path:
88 """
89 Write acquisition description data to the session path.
91 Parameters
92 ----------
93 session_path : str, pathlib.Path
94 A session path containing an _ibl_experiment.description.yaml file.
95 data : dict
96 The acquisition description data to save
98 Returns
99 -------
100 pathlib.Path
101 The full path to the saved acquisition description.
102 """
103 yaml_file = Path(session_path).joinpath('_ibl_experiment.description.yaml') 1YSdN
104 write_yaml(yaml_file, data) 1YSdN
105 return yaml_file 1YSdN
108def read_params(path) -> dict:
109 """
110 Load an experiment description file.
112 In addition to reading the yaml data, this functions ensures that the specification is the most
113 recent one. If the file is missing None is returned. If the file cannot be parsed an empty
114 dict is returned.
116 Parameters
117 ----------
118 path : pathlib.Path, str
119 The path to the description yaml file (or it's containing folder) to load.
121 Returns
122 -------
123 dict, None
124 The parsed yaml data, or None if the file was not found.
126 Examples
127 --------
128 # Load a session's _ibl_experiment.description.yaml file
130 >>> data = read_params('/home/data/subject/2020-01-01/001')
132 # Load a specific device's description file
134 >>> data = read_params('/home/data/subject/2020-01-01/001/_devices/behaviour.yaml')
136 """
137 if (path := Path(path)).is_dir(): 2a 6 ' 7 l h j m ( ) F c 8 * + s FbY GbS t , Z V 0 - . / : ; u v w x y O P Q z A B C b 9 n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCbW ! N X DbG H I J K % rbD E L M
138 yaml_file = next(path.glob('_ibl_experiment.description*.yaml'), None) 2a 6 l h j m ( ) F 8 + FbY GbS , V 0 - . / : ; u v w x y O P Q z A B C b n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 i T sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCbW ! N X DbG H I J K % rbD E L M
139 else:
140 yaml_file = path if path.exists() else None 1a'7c8*sYtZVb9|
141 if not yaml_file: 2a 6 ' 7 l h j m ( ) F c 8 * + s FbY GbS t , Z V 0 - . / : ; u v w x y O P Q z A B C b 9 n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCbW ! N X DbG H I J K % rbD E L M
142 _logger.debug('Experiment description not found: %s', path) 2a 6 ' 7 l h j m ( ) F * + s FbGbt , - . / : ; u v w x y O P Q z A B C fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { | sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCb! G H I J K rbD E L M
143 return 2a 6 ' 7 l h j m ( ) F * + s FbGbt , - . / : ; u v w x y O P Q z A B C fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { | sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCb! G H I J K rbD E L M
145 with open(yaml_file, 'r') as fp: 2a F c 8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb
146 data = _patch_file(yaml.safe_load(fp) or {}) 2a F c 8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb
147 return data 2a F c 8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb
150def merge_params(a, b, copy=False):
151 """
152 Given two experiment descriptions, update first with fields in second.
154 Parameters
155 ----------
156 a : dict
157 An experiment description dictionary to be updated with fields from `b`.
158 b : dict
159 An experiment description dictionary to update `a` with
160 copy : bool
161 If true, return a deep copy of `a` instead of updating directly.
163 Returns
164 -------
165 dict
166 A merged dictionary consisting of fields from `a` and `b`.
167 """
168 def to_hashable(dict_item): 1c5
169 """Convert protocol -> dict map to hashable tuple of protocol + sorted key value pairs."""
170 hashable = (dict_item[0], *chain.from_iterable(sorted(dict_item[1].items()))) 1c5
171 return tuple(tuple(x) if isinstance(x, list) else x for x in hashable) 1c5
173 if copy: 1c5
174 a = deepcopy(a) 15
175 for k in b: 1c5
176 if k == 'sync': 1c5
177 assert k not in a or a[k] == b[k], 'multiple sync fields defined' 1c5
178 if isinstance(b[k], list): 1c5
179 prev = list(a.get(k, [])) 1c5
180 if k == 'tasks': 1c5
181 # For tasks, keep order and skip duplicates
182 # Assert tasks is a list of single value dicts
183 assert (not prev or set(map(len, prev)) == {1}) and set(map(len, b[k])) == {1} 1c5
184 # Get the set of previous tasks
185 prev_tasks = set(map(to_hashable, chain.from_iterable(map(dict.items, prev)))) 1c5
186 tasks = chain.from_iterable(map(dict.items, b[k])) 1c5
187 to_add = [dict([itm]) for itm in tasks if to_hashable(itm) not in prev_tasks] 1c5
188 else:
189 # For procedures and projects, remove duplicates
190 to_add = set(b[k]) - set(prev) 1c5
191 a[k] = prev + list(to_add) 1c5
192 elif isinstance(b[k], dict): 1c
193 a[k] = {**a.get(k, {}), **b[k]} 1c
194 else: # A string
195 a[k] = b[k] 1c
196 return a 1c5
199def aggregate_device(file_device, file_acquisition_description, unlink=False):
200 """
201 Add the contents of a device file to the main acquisition description file.
203 Parameters
204 ----------
205 file_device : pathlib.Path
206 The full path to the device yaml file to add to the main description file.
207 file_acquisition_description : pathlib.Path
208 The full path to the main acquisition description yaml file to add the device file to.
209 unlink : bool
210 If True, the device file is removed after successfully aggregation.
212 Returns
213 -------
214 dict
215 The aggregated experiment description data.
217 Raises
218 ------
219 AssertionError
220 Device file contains a main 'sync' key that is already present in the main description
221 file. For an experiment only one main sync device is allowed.
222 """
223 # reads in the partial device data
224 data_device = read_params(file_device) 1c
226 if not data_device: 1c
227 _logger.warning('empty device file "%s"', file_device) 1c
228 return 1c
230 with FileLock(file_acquisition_description, log=_logger, timeout_action='delete'): 1c
231 # if the acquisition description file already exists, read in the yaml content
232 if file_acquisition_description.exists(): 1c
233 acq_desc = read_params(file_acquisition_description) 1c
234 else:
235 acq_desc = {} 1c
237 # merge the dictionaries (NB: acq_desc modified in place)
238 acq_desc = merge_params(acq_desc, data_device) 1c
240 with open(file_acquisition_description, 'w') as fp: 1c
241 yaml.safe_dump(acq_desc, fp) 1c
243 # delete the original file if necessary
244 if unlink: 1c
245 file_device.unlink() 1c
246 stub_folder = file_acquisition_description.with_name('_devices') 1c
247 if stub_folder.exists() and not any(stub_folder.glob('*.*')): 1c
248 stub_folder.rmdir()
250 return acq_desc 1c
253def get_cameras(sess_params):
254 devices = sess_params.get('devices', {}) 1k
255 cameras = devices.get('cameras', None) 1k
256 return None if not cameras else list(cameras.keys()) 1k
259def get_sync_label(sess_params):
260 if not sess_params: 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbW ! N X G H I J K D E L M
261 return None 2a 6 ' 7 l h j m ( ) F * + s t , - . / : ; u v w x y O P Q z A B C fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { | sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCb! G H I J K D E L M
262 sync_keys = list((sess_params.get('sync') or {}).keys()) 1aFV0bknopqdeRgfr1234iTWNX
263 if len(sync_keys) == 0: 1aFV0bknopqdeRgfr1234iTWNX
264 return None
265 if len(sync_keys) > 1: 1aFV0bknopqdeRgfr1234iTWNX
266 _logger.warning('Multiple sync keys found in experiment description: %s', sync_keys)
267 return sync_keys[0] 1aFV0bknopqdeRgfr1234iTWNX
270def get_sync(sess_params):
271 sync_label = get_sync_label(sess_params) 1bW!NX
272 if sync_label: 1bW!NX
273 return sync_label, sess_params['sync'][sync_label] or {} 1bWNX
274 return None, {} 1!
277def get_sync_values(sess_params):
278 key = get_sync_label(sess_params) 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbW ! N X G H I J K D E L M
279 if key: 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbW ! N X G H I J K D E L M
280 return sess_params['sync'][key] 1aFV0bknopqdeRgfr1234iTWNX
283def get_sync_collection(sess_params):
284 return (get_sync_values(sess_params) or {}).get('collection') 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y z A B C b k n o p q d e R g f r = ? @ [ ] ^ _ ` { 1 2 3 4 | T } ~ abbbU cbdbebD E
287def get_sync_extension(sess_params):
288 return (get_sync_values(sess_params) or {}).get('extension') 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbG H I J K D E L M
291def get_sync_namespace(sess_params):
292 return (get_sync_values(sess_params) or {}).get('acquisition_software') 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | T } ~ abbbU cbdbebW ! N X G H I J K D E L M
295def get_task_protocol(sess_params, task_collection=None):
296 """
297 Fetch the task protocol from an experiment description dict.
299 Parameters
300 ----------
301 sess_params : dict
302 The loaded experiment.description file.
303 task_collection : str, optional
304 Return the protocol that corresponds to this collection (returns the first matching
305 protocol in the list). If None, all protocols are returned.
307 Returns
308 -------
309 str, set, None
310 If task_collection is None, returns the set of task protocols, otherwise returns the first
311 protocol that corresponds to the collection, or None if collection not present.
312 """
313 collections = get_collections({'tasks': sess_params.get('tasks')}) 1alhjmstuvwxyOPQzABCbkdegfiUGHIJKDELM
314 if task_collection is None: 1alhjmstuvwxyOPQzABCbkdegfiUGHIJKDELM
315 if len(collections) == 0: 1lhjmk
316 return None 1lhjm
317 else:
318 return set(collections.keys()) # Return all protocols 1hk
319 else:
320 return next((k for k, v in collections.items() if v == task_collection), None) 1ahstuvwxyOPQzABCbkdegfiUGHIJKDELM
323def get_task_collection(sess_params, task_protocol=None):
324 """
325 Fetch the task collection from an experiment description dict.
327 Parameters
328 ----------
329 sess_params : dict
330 The loaded experiment.description file.
331 task_protocol : str, optional
332 Return the collection that corresponds to this protocol (returns the first matching
333 protocol in the list). If None, all collections are returned.
335 Returns
336 -------
337 str, set, None
338 If task_protocol is None, returns the set of collections, otherwise returns the first
339 collection that corresponds to the protocol, or None if protocol not present.
341 Notes
342 -----
343 - The order of the set may not be the same as the descriptions tasks order when iterating.
344 """
345 protocols = sess_params.get('tasks', []) 17lhjmYSbkdegfT%
346 if task_protocol is not None: 17lhjmYSbkdegfT%
347 task = next((x for x in protocols if task_protocol in x), None) 1kgT
348 return (task.get(task_protocol) or {}).get('collection') 1kgT
349 else: # Return set of all task collections
350 cset = set(filter(None, (next(iter(x.values()), {}).get('collection') for x in protocols))) 17lhjmYSbkdegf%
351 return (next(iter(cset)) if len(cset) == 1 else cset) or None 17lhjmYSbkdegf%
354def get_task_protocol_number(sess_params, task_protocol=None):
355 """
356 Fetch the task protocol number from an experiment description dict.
358 Parameters
359 ----------
360 sess_params : dict
361 The loaded experiment.description file.
362 task_protocol : str, optional
363 Return the number that corresponds to this protocol (returns the first matching
364 protocol in the list). If None, all numbers are returned.
366 Returns
367 -------
368 str, list, None
369 If task_protocol is None, returns list of all numbers, otherwise returns the first
370 number that corresponds to the protocol, or None if protocol not present.
371 """
372 protocols = sess_params.get('tasks', []) 2a 6 l h j m F Ebs t u v w x y O P Q z A B C b n o p q e f r i G H I J K D E L M
373 if task_protocol is None: # Return set of all task numbers 2a 6 l h j m F Ebs t u v w x y O P Q z A B C b n o p q e f r i G H I J K D E L M
374 numbers = (next(iter(x.values()), {}).get('protocol_number') for x in protocols) 2a l h j m Ebs t u v w x y O P Q z A B C G H I J K D E L M
375 numbers = list(map(int, filter(lambda x: x is not None, numbers))) 2a l h j m Ebs t u v w x y O P Q z A B C G H I J K D E L M
376 return (next(iter(numbers)) if len(numbers) == 1 else numbers) or None 2a l h j m Ebs t u v w x y O P Q z A B C G H I J K D E L M
377 else:
378 task = next((x for x in protocols if task_protocol in x), {}) 2a 6 j F Ebb n o p q e f r i
379 number = (task.get(task_protocol) or {}).get('protocol_number') 2a 6 j F Ebb n o p q e f r i
380 return int(number) if isinstance(number, str) else number 2a 6 j F Ebb n o p q e f r i
383def get_collections(sess_params, flat=False):
384 """
385 Find all collections associated with the session.
387 Parameters
388 ----------
389 sess_params : dict
390 The loaded experiment description map.
391 flat : bool (False)
392 If True, return a flat set of collections, otherwise return a map of device/sync/task
394 Returns
395 -------
396 dict[str, str]
397 A map of device/sync/task and the corresponding collection name.
399 set[str]
400 A set of unique collection names.
402 Notes
403 -----
404 - Assumes only the following data types contained: list, dict, None, str.
405 """
406 collection_map = {} 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM
408 def iter_dict(d): 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM
409 for k, v in d.items(): 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM
410 if isinstance(v, list): 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM
411 for d in filter(lambda x: isinstance(x, dict), v): 1h#$bkdegfi
412 iter_dict(d) 1h#$bkdegfi
413 elif isinstance(v, dict) and 'collection' in v: 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM
414 # if the key already exists, append the collection name to the list
415 if k in collection_map: 1h#$Zbkdegfi
416 clist = collection_map[k] if isinstance(collection_map[k], list) else [collection_map[k]] 1$
417 collection_map[k] = list(set(clist + [v['collection']])) 1$
418 else:
419 collection_map[k] = v['collection'] 1h#$Zbkdegfi
420 elif isinstance(v, dict): 1alhjm#stZuvwxyOPQzABCUGHIJKDELM
421 iter_dict(v) 1#Z
423 iter_dict(sess_params) 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM
424 return set(flatten(collection_map.values())) if flat else collection_map 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM
427def get_video_compressed(sess_params):
428 videos = sess_params.get('devices', {}).get('cameras', None) 1abnopqdeRgfr
429 if not videos: 1abnopqdeRgfr
430 return None
432 # This is all or nothing, assumes either all videos or not compressed
433 for key, vals in videos.items(): 1abnopqdeRgfr
434 compressed = vals.get('compressed', False) 1abnopqdeRgfr
436 return compressed 1abnopqdeRgfr
439def get_remote_stub_name(session_path, device_id=None):
440 """
441 Get or create device specific file path for the remote experiment.description stub.
443 Parameters
444 ----------
445 session_path : pathlib.Path
446 A remote session path.
447 device_id : str, optional
448 A device name, if None the TRANSFER_LABEL parameter is used (defaults to this device's
449 hostname with a unique numeric ID)
451 Returns
452 -------
453 pathlib.Path
454 The full file path to the remote experiment description stub.
456 Example
457 -------
458 >>> get_remote_stub_name(Path.home().joinpath('subject', '2020-01-01', '001'), 'host-123')
459 Path.home() / 'subject/2020-01-01/001/_devices/2020-01-01_1_subject@host-123.yaml'
460 """
461 device_id = device_id or f'{socket.gethostname()}_{uuid.getnode()}'
462 exp_ref = '{date}_{sequence:d}_{subject:s}'.format(**ConversionMixin.path2ref(session_path))
463 remote_filename = f'{exp_ref}@{device_id}.yaml'
464 return session_path / '_devices' / remote_filename