Coverage for ibllib/io/session_params.py: 95%

161 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 15:25 +0000

1"""A module for handling experiment description files. 

2 

3Each device computer adds its piece of information and consolidates into the final acquisition 

4description. 

5 

6The purpose is 3-fold: 

7 - provide modularity in the extraction: the acquisition description allows to dynamically build 

8 pipelines. 

9 - assist the copying of the experimental data from device computers to the server computers, in 

10 a way that each device is independent from another. 

11 - assist the copying of the experimental data from device computers to the server computers, in 

12 a way that intermediate states (failed copies) are easily recoverable from and completion 

13 criteria (ie. session ready to extract) is objective and simple (all device files copied). 

14 

15INGRESS 

16 - each device computer needs to know the session path on the server. 

17 - create a device file locally in a queue directory. This will serve as a copy flag. 

18 - copy the device file to the local server. 

19 

20EGRESS 

21 - go through the queue and for each item: 

22 - if the device file is not on the server create it. 

23 - once copy is complete aggregate the qc from file. 

24""" 

25import yaml 

26import uuid 

27import logging 

28import socket 

29from pathlib import Path 

30from itertools import chain 

31from copy import deepcopy 

32 

33from one.converters import ConversionMixin 

34from iblutil.util import flatten 

35from iblutil.io.params import FileLock 

36from packaging import version 

37 

38_logger = logging.getLogger(__name__) 

39SPEC_VERSION = '1.0.0' 

40 

41 

42def write_yaml(file_path, data): 

43 """ 

44 Write a device file. This is basically just a yaml dump that ensures the folder tree exists. 

45 

46 Parameters 

47 ---------- 

48 file_path : pathlib.Path 

49 The full path to the description yaml file to write to. 

50 data : dict 

51 The data to write to the yaml file. 

52 

53 """ 

54 file_path.parent.mkdir(exist_ok=True, parents=True) 1aYSdN

55 with open(file_path, 'w') as fp: 1aYSdN

56 yaml.safe_dump(data, fp) 1aYSdN

57 

58 

59def _patch_file(data: dict) -> dict: 

60 """ 

61 Update older description data to conform to the most recent specification. 

62 

63 Parameters 

64 ---------- 

65 data : dict 

66 The description yaml data. 

67 

68 Returns 

69 ------- 

70 dict 

71 The patched description data. 

72 """ 

73 if data and (v := data.get('version', '0')) != SPEC_VERSION: 2a F c qb8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb

74 if version.parse(v) > version.parse(SPEC_VERSION): 2c qb8 S 9

75 _logger.warning('Description file generated by more recent code') 2qb

76 elif version.parse(v) <= version.parse('0.1.0'): 2c qb8 S 9

77 # Change tasks key from dict to list of dicts 

78 if 'tasks' in data and isinstance(data['tasks'], dict): 2c qb8 S 9

79 data['tasks'] = [{k: v} for k, v in data['tasks'].copy().items()] 2qb9

80 data['version'] = SPEC_VERSION 2c qb8 S 9

81 # Ensure all items in tasks list are single value dicts 

82 if 'tasks' in data: 2c qb8 S 9

83 data['tasks'] = [{k: v} for k, v in chain.from_iterable(map(dict.items, data['tasks']))] 2qb8 9

84 return data 2a F c qb8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb

85 

86 

87def write_params(session_path, data) -> Path: 

88 """ 

89 Write acquisition description data to the session path. 

90 

91 Parameters 

92 ---------- 

93 session_path : str, pathlib.Path 

94 A session path containing an _ibl_experiment.description.yaml file. 

95 data : dict 

96 The acquisition description data to save 

97 

98 Returns 

99 ------- 

100 pathlib.Path 

101 The full path to the saved acquisition description. 

102 """ 

103 yaml_file = Path(session_path).joinpath('_ibl_experiment.description.yaml') 1YSdN

104 write_yaml(yaml_file, data) 1YSdN

105 return yaml_file 1YSdN

106 

107 

108def read_params(path) -> dict: 

109 """ 

110 Load an experiment description file. 

111 

112 In addition to reading the yaml data, this functions ensures that the specification is the most 

113 recent one. If the file is missing None is returned. If the file cannot be parsed an empty 

114 dict is returned. 

115 

116 Parameters 

117 ---------- 

118 path : pathlib.Path, str 

119 The path to the description yaml file (or it's containing folder) to load. 

120 

121 Returns 

122 ------- 

123 dict, None 

124 The parsed yaml data, or None if the file was not found. 

125 

126 Examples 

127 -------- 

128 # Load a session's _ibl_experiment.description.yaml file 

129 

130 >>> data = read_params('/home/data/subject/2020-01-01/001') 

131 

132 # Load a specific device's description file 

133 

134 >>> data = read_params('/home/data/subject/2020-01-01/001/_devices/behaviour.yaml') 

135 

136 """ 

137 if (path := Path(path)).is_dir(): 2a 6 ' 7 l h j m ( ) F c 8 * + s FbY GbS t , Z V 0 - . / : ; u v w x y O P Q z A B C b 9 n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCbW ! N X DbG H I J K % rbD E L M

138 yaml_file = next(path.glob('_ibl_experiment.description*.yaml'), None) 2a 6 l h j m ( ) F 8 + FbY GbS , V 0 - . / : ; u v w x y O P Q z A B C b n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 i T sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCbW ! N X DbG H I J K % rbD E L M

139 else: 

140 yaml_file = path if path.exists() else None 1a'7c8*sYtZVb9|

141 if not yaml_file: 2a 6 ' 7 l h j m ( ) F c 8 * + s FbY GbS t , Z V 0 - . / : ; u v w x y O P Q z A B C b 9 n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCbW ! N X DbG H I J K % rbD E L M

142 _logger.debug('Experiment description not found: %s', path) 2a 6 ' 7 l h j m ( ) F * + s FbGbt , - . / : ; u v w x y O P Q z A B C fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { | sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCb! G H I J K rbD E L M

143 return 2a 6 ' 7 l h j m ( ) F * + s FbGbt , - . / : ; u v w x y O P Q z A B C fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { | sbtbubvbwbxbHb} ~ abbbU ybzbcbdbebAbBbCb! G H I J K rbD E L M

144 

145 with open(yaml_file, 'r') as fp: 2a F c 8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb

146 data = _patch_file(yaml.safe_load(fp) or {}) 2a F c 8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb

147 return data 2a F c 8 Y S Z V 0 b 9 n o p q d e R g f r 1 2 3 4 i T W N X Db% rb

148 

149 

150def merge_params(a, b, copy=False): 

151 """ 

152 Given two experiment descriptions, update first with fields in second. 

153 

154 Parameters 

155 ---------- 

156 a : dict 

157 An experiment description dictionary to be updated with fields from `b`. 

158 b : dict 

159 An experiment description dictionary to update `a` with 

160 copy : bool 

161 If true, return a deep copy of `a` instead of updating directly. 

162 

163 Returns 

164 ------- 

165 dict 

166 A merged dictionary consisting of fields from `a` and `b`. 

167 """ 

168 def to_hashable(dict_item): 1c5

169 """Convert protocol -> dict map to hashable tuple of protocol + sorted key value pairs.""" 

170 hashable = (dict_item[0], *chain.from_iterable(sorted(dict_item[1].items()))) 1c5

171 return tuple(tuple(x) if isinstance(x, list) else x for x in hashable) 1c5

172 

173 if copy: 1c5

174 a = deepcopy(a) 15

175 for k in b: 1c5

176 if k == 'sync': 1c5

177 assert k not in a or a[k] == b[k], 'multiple sync fields defined' 1c5

178 if isinstance(b[k], list): 1c5

179 prev = list(a.get(k, [])) 1c5

180 if k == 'tasks': 1c5

181 # For tasks, keep order and skip duplicates 

182 # Assert tasks is a list of single value dicts 

183 assert (not prev or set(map(len, prev)) == {1}) and set(map(len, b[k])) == {1} 1c5

184 # Get the set of previous tasks 

185 prev_tasks = set(map(to_hashable, chain.from_iterable(map(dict.items, prev)))) 1c5

186 tasks = chain.from_iterable(map(dict.items, b[k])) 1c5

187 to_add = [dict([itm]) for itm in tasks if to_hashable(itm) not in prev_tasks] 1c5

188 else: 

189 # For procedures and projects, remove duplicates 

190 to_add = set(b[k]) - set(prev) 1c5

191 a[k] = prev + list(to_add) 1c5

192 elif isinstance(b[k], dict): 1c

193 a[k] = {**a.get(k, {}), **b[k]} 1c

194 else: # A string 

195 a[k] = b[k] 1c

196 return a 1c5

197 

198 

199def aggregate_device(file_device, file_acquisition_description, unlink=False): 

200 """ 

201 Add the contents of a device file to the main acquisition description file. 

202 

203 Parameters 

204 ---------- 

205 file_device : pathlib.Path 

206 The full path to the device yaml file to add to the main description file. 

207 file_acquisition_description : pathlib.Path 

208 The full path to the main acquisition description yaml file to add the device file to. 

209 unlink : bool 

210 If True, the device file is removed after successfully aggregation. 

211 

212 Returns 

213 ------- 

214 dict 

215 The aggregated experiment description data. 

216 

217 Raises 

218 ------ 

219 AssertionError 

220 Device file contains a main 'sync' key that is already present in the main description 

221 file. For an experiment only one main sync device is allowed. 

222 """ 

223 # reads in the partial device data 

224 data_device = read_params(file_device) 1c

225 

226 if not data_device: 1c

227 _logger.warning('empty device file "%s"', file_device) 1c

228 return 1c

229 

230 with FileLock(file_acquisition_description, log=_logger, timeout_action='delete'): 1c

231 # if the acquisition description file already exists, read in the yaml content 

232 if file_acquisition_description.exists(): 1c

233 acq_desc = read_params(file_acquisition_description) 1c

234 else: 

235 acq_desc = {} 1c

236 

237 # merge the dictionaries (NB: acq_desc modified in place) 

238 acq_desc = merge_params(acq_desc, data_device) 1c

239 

240 with open(file_acquisition_description, 'w') as fp: 1c

241 yaml.safe_dump(acq_desc, fp) 1c

242 

243 # delete the original file if necessary 

244 if unlink: 1c

245 file_device.unlink() 1c

246 stub_folder = file_acquisition_description.with_name('_devices') 1c

247 if stub_folder.exists() and not any(stub_folder.glob('*.*')): 1c

248 stub_folder.rmdir() 

249 

250 return acq_desc 1c

251 

252 

253def get_cameras(sess_params): 

254 devices = sess_params.get('devices', {}) 1k

255 cameras = devices.get('cameras', None) 1k

256 return None if not cameras else list(cameras.keys()) 1k

257 

258 

259def get_sync_label(sess_params): 

260 if not sess_params: 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbW ! N X G H I J K D E L M

261 return None 2a 6 ' 7 l h j m ( ) F * + s t , - . / : ; u v w x y O P Q z A B C fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { | sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCb! G H I J K D E L M

262 sync_keys = list((sess_params.get('sync') or {}).keys()) 1aFV0bknopqdeRgfr1234iTWNX

263 if len(sync_keys) == 0: 1aFV0bknopqdeRgfr1234iTWNX

264 return None 

265 if len(sync_keys) > 1: 1aFV0bknopqdeRgfr1234iTWNX

266 _logger.warning('Multiple sync keys found in experiment description: %s', sync_keys) 

267 return sync_keys[0] 1aFV0bknopqdeRgfr1234iTWNX

268 

269 

270def get_sync(sess_params): 

271 sync_label = get_sync_label(sess_params) 1bW!NX

272 if sync_label: 1bW!NX

273 return sync_label, sess_params['sync'][sync_label] or {} 1bWNX

274 return None, {} 1!

275 

276 

277def get_sync_values(sess_params): 

278 key = get_sync_label(sess_params) 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbW ! N X G H I J K D E L M

279 if key: 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbW ! N X G H I J K D E L M

280 return sess_params['sync'][key] 1aFV0bknopqdeRgfr1234iTWNX

281 

282 

283def get_sync_collection(sess_params): 

284 return (get_sync_values(sess_params) or {}).get('collection') 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y z A B C b k n o p q d e R g f r = ? @ [ ] ^ _ ` { 1 2 3 4 | T } ~ abbbU cbdbebD E

285 

286 

287def get_sync_extension(sess_params): 

288 return (get_sync_values(sess_params) or {}).get('extension') 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y O P Q z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | i T sbtbubvbwbxb} ~ abbbU ybzbcbdbebAbBbCbG H I J K D E L M

289 

290 

291def get_sync_namespace(sess_params): 

292 return (get_sync_values(sess_params) or {}).get('acquisition_software') 2a 6 ' 7 l h j m ( ) F * + s t , V 0 - . / : ; u v w x y z A B C b k n o p q d e R g f r fbgb= hbib? jb@ kblb[ mbnbobpb] ^ _ ` { 1 2 3 4 | T } ~ abbbU cbdbebW ! N X G H I J K D E L M

293 

294 

295def get_task_protocol(sess_params, task_collection=None): 

296 """ 

297 Fetch the task protocol from an experiment description dict. 

298 

299 Parameters 

300 ---------- 

301 sess_params : dict 

302 The loaded experiment.description file. 

303 task_collection : str, optional 

304 Return the protocol that corresponds to this collection (returns the first matching 

305 protocol in the list). If None, all protocols are returned. 

306 

307 Returns 

308 ------- 

309 str, set, None 

310 If task_collection is None, returns the set of task protocols, otherwise returns the first 

311 protocol that corresponds to the collection, or None if collection not present. 

312 """ 

313 collections = get_collections({'tasks': sess_params.get('tasks')}) 1alhjmstuvwxyOPQzABCbkdegfiUGHIJKDELM

314 if task_collection is None: 1alhjmstuvwxyOPQzABCbkdegfiUGHIJKDELM

315 if len(collections) == 0: 1lhjmk

316 return None 1lhjm

317 else: 

318 return set(collections.keys()) # Return all protocols 1hk

319 else: 

320 return next((k for k, v in collections.items() if v == task_collection), None) 1ahstuvwxyOPQzABCbkdegfiUGHIJKDELM

321 

322 

323def get_task_collection(sess_params, task_protocol=None): 

324 """ 

325 Fetch the task collection from an experiment description dict. 

326 

327 Parameters 

328 ---------- 

329 sess_params : dict 

330 The loaded experiment.description file. 

331 task_protocol : str, optional 

332 Return the collection that corresponds to this protocol (returns the first matching 

333 protocol in the list). If None, all collections are returned. 

334 

335 Returns 

336 ------- 

337 str, set, None 

338 If task_protocol is None, returns the set of collections, otherwise returns the first 

339 collection that corresponds to the protocol, or None if protocol not present. 

340 

341 Notes 

342 ----- 

343 - The order of the set may not be the same as the descriptions tasks order when iterating. 

344 """ 

345 protocols = sess_params.get('tasks', []) 17lhjmYSbkdegfT%

346 if task_protocol is not None: 17lhjmYSbkdegfT%

347 task = next((x for x in protocols if task_protocol in x), None) 1kgT

348 return (task.get(task_protocol) or {}).get('collection') 1kgT

349 else: # Return set of all task collections 

350 cset = set(filter(None, (next(iter(x.values()), {}).get('collection') for x in protocols))) 17lhjmYSbkdegf%

351 return (next(iter(cset)) if len(cset) == 1 else cset) or None 17lhjmYSbkdegf%

352 

353 

354def get_task_protocol_number(sess_params, task_protocol=None): 

355 """ 

356 Fetch the task protocol number from an experiment description dict. 

357 

358 Parameters 

359 ---------- 

360 sess_params : dict 

361 The loaded experiment.description file. 

362 task_protocol : str, optional 

363 Return the number that corresponds to this protocol (returns the first matching 

364 protocol in the list). If None, all numbers are returned. 

365 

366 Returns 

367 ------- 

368 str, list, None 

369 If task_protocol is None, returns list of all numbers, otherwise returns the first 

370 number that corresponds to the protocol, or None if protocol not present. 

371 """ 

372 protocols = sess_params.get('tasks', []) 2a 6 l h j m F Ebs t u v w x y O P Q z A B C b n o p q e f r i G H I J K D E L M

373 if task_protocol is None: # Return set of all task numbers 2a 6 l h j m F Ebs t u v w x y O P Q z A B C b n o p q e f r i G H I J K D E L M

374 numbers = (next(iter(x.values()), {}).get('protocol_number') for x in protocols) 2a l h j m Ebs t u v w x y O P Q z A B C G H I J K D E L M

375 numbers = list(map(int, filter(lambda x: x is not None, numbers))) 2a l h j m Ebs t u v w x y O P Q z A B C G H I J K D E L M

376 return (next(iter(numbers)) if len(numbers) == 1 else numbers) or None 2a l h j m Ebs t u v w x y O P Q z A B C G H I J K D E L M

377 else: 

378 task = next((x for x in protocols if task_protocol in x), {}) 2a 6 j F Ebb n o p q e f r i

379 number = (task.get(task_protocol) or {}).get('protocol_number') 2a 6 j F Ebb n o p q e f r i

380 return int(number) if isinstance(number, str) else number 2a 6 j F Ebb n o p q e f r i

381 

382 

383def get_collections(sess_params, flat=False): 

384 """ 

385 Find all collections associated with the session. 

386 

387 Parameters 

388 ---------- 

389 sess_params : dict 

390 The loaded experiment description map. 

391 flat : bool (False) 

392 If True, return a flat set of collections, otherwise return a map of device/sync/task 

393 

394 Returns 

395 ------- 

396 dict[str, str] 

397 A map of device/sync/task and the corresponding collection name. 

398 

399 set[str] 

400 A set of unique collection names. 

401 

402 Notes 

403 ----- 

404 - Assumes only the following data types contained: list, dict, None, str. 

405 """ 

406 collection_map = {} 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM

407 

408 def iter_dict(d): 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM

409 for k, v in d.items(): 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM

410 if isinstance(v, list): 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM

411 for d in filter(lambda x: isinstance(x, dict), v): 1h#$bkdegfi

412 iter_dict(d) 1h#$bkdegfi

413 elif isinstance(v, dict) and 'collection' in v: 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM

414 # if the key already exists, append the collection name to the list 

415 if k in collection_map: 1h#$Zbkdegfi

416 clist = collection_map[k] if isinstance(collection_map[k], list) else [collection_map[k]] 1$

417 collection_map[k] = list(set(clist + [v['collection']])) 1$

418 else: 

419 collection_map[k] = v['collection'] 1h#$Zbkdegfi

420 elif isinstance(v, dict): 1alhjm#stZuvwxyOPQzABCUGHIJKDELM

421 iter_dict(v) 1#Z

422 

423 iter_dict(sess_params) 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM

424 return set(flatten(collection_map.values())) if flat else collection_map 1alhjm#$stZuvwxyOPQzABCbkdegfiUGHIJKDELM

425 

426 

427def get_video_compressed(sess_params): 

428 videos = sess_params.get('devices', {}).get('cameras', None) 1abnopqdeRgfr

429 if not videos: 1abnopqdeRgfr

430 return None 

431 

432 # This is all or nothing, assumes either all videos or not compressed 

433 for key, vals in videos.items(): 1abnopqdeRgfr

434 compressed = vals.get('compressed', False) 1abnopqdeRgfr

435 

436 return compressed 1abnopqdeRgfr

437 

438 

439def get_remote_stub_name(session_path, device_id=None): 

440 """ 

441 Get or create device specific file path for the remote experiment.description stub. 

442 

443 Parameters 

444 ---------- 

445 session_path : pathlib.Path 

446 A remote session path. 

447 device_id : str, optional 

448 A device name, if None the TRANSFER_LABEL parameter is used (defaults to this device's 

449 hostname with a unique numeric ID) 

450 

451 Returns 

452 ------- 

453 pathlib.Path 

454 The full file path to the remote experiment description stub. 

455 

456 Example 

457 ------- 

458 >>> get_remote_stub_name(Path.home().joinpath('subject', '2020-01-01', '001'), 'host-123') 

459 Path.home() / 'subject/2020-01-01/001/_devices/2020-01-01_1_subject@host-123.yaml' 

460 """ 

461 device_id = device_id or f'{socket.gethostname()}_{uuid.getnode()}' 

462 exp_ref = '{date}_{sequence:d}_{subject:s}'.format(**ConversionMixin.path2ref(session_path)) 

463 remote_filename = f'{exp_ref}@{device_id}.yaml' 

464 return session_path / '_devices' / remote_filename