Coverage for ibllib/io/session_params.py: 98%

165 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""A module for handling experiment description files. 

2 

3Each device computer adds its piece of information and consolidates into the final acquisition 

4description. 

5 

6The purpose is 3-fold: 

7 - provide modularity in the extraction: the acquisition description allows to dynamically build 

8 pipelines. 

9 - assist the copying of the experimental data from device computers to the server computers, in 

10 a way that each device is independent from another. 

11 - assist the copying of the experimental data from device computers to the server computers, in 

12 a way that intermediate states (failed copies) are easily recoverable from and completion 

13 criteria (ie. session ready to extract) is objective and simple (all device files copied). 

14 

15INGRESS 

16 - each device computer needs to know the session path on the server. 

17 - create a device file locally in a queue directory. This will serve as a copy flag. 

18 - copy the device file to the local server. 

19 

20EGRESS 

21 - go through the queue and for each item: 

22 - if the device file is not on the server create it. 

23 - once copy is complete aggregate the qc from file. 

24""" 

25import yaml 

26import time 

27from datetime import datetime 

28import logging 

29from pathlib import Path 

30from copy import deepcopy 

31 

32from one.converters import ConversionMixin 

33from iblutil.util import flatten 

34from packaging import version 

35 

36import ibllib.pipes.misc as misc 

37 

38 

39_logger = logging.getLogger(__name__) 

40SPEC_VERSION = '1.0.0' 

41 

42 

43def write_yaml(file_path, data): 

44 """ 

45 Write a device file. This is basically just a yaml dump that ensures the folder tree exists. 

46 

47 Parameters 

48 ---------- 

49 file_path : pathlib.Path 

50 The full path to the description yaml file to write to. 

51 data : dict 

52 The data to write to the yaml file. 

53 

54 """ 

55 file_path.parent.mkdir(exist_ok=True, parents=True) 2a V dbi j k r f N c

56 with open(file_path, 'w') as fp: 2a V dbi j k r f N c

57 yaml.safe_dump(data, fp) 2a V dbi j k r f N c

58 

59 

60def _patch_file(data: dict) -> dict: 

61 """ 

62 Update older description data to conform to the most recent specification. 

63 

64 Parameters 

65 ---------- 

66 data : dict 

67 The description yaml data. 

68 

69 Returns 

70 ------- 

71 dict 

72 The patched description data. 

73 """ 

74 if data and (v := data.get('version', '0')) != SPEC_VERSION: 2a T d Gb5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb

75 if version.parse(v) > version.parse(SPEC_VERSION): 2d Gb5 6

76 _logger.warning('Description file generated by more recent code') 2Gb

77 elif version.parse(v) <= version.parse('0.1.0'): 1d56

78 # Change tasks key from dict to list of dicts 

79 if 'tasks' in data and isinstance(data['tasks'], dict): 1d56

80 data['tasks'] = [{k: v} for k, v in data['tasks'].copy().items()] 16

81 data['version'] = SPEC_VERSION 2d Gb5 6

82 return data 2a T d Gb5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb

83 

84 

85def write_params(session_path, data) -> Path: 

86 """ 

87 Write acquisition description data to the session path. 

88 

89 Parameters 

90 ---------- 

91 session_path : str, pathlib.Path 

92 A session path containing an _ibl_experiment.description.yaml file. 

93 data : dict 

94 The acquisition description data to save 

95 

96 Returns 

97 ------- 

98 pathlib.Path 

99 The full path to the saved acquisition description. 

100 """ 

101 yaml_file = Path(session_path).joinpath('_ibl_experiment.description.yaml') 2V dbf N c

102 write_yaml(yaml_file, data) 2V dbf N c

103 return yaml_file 2V dbf N c

104 

105 

106def read_params(path) -> dict: 

107 """ 

108 Load an experiment description file. 

109 

110 In addition to reading the yaml data, this functions ensures that the specification is the most 

111 recent one. If the file is missing None is returned. If the file cannot be parsed an empty 

112 dict is returned. 

113 

114 Parameters 

115 ---------- 

116 path : pathlib.Path, str 

117 The path to the description yaml file (or it's containing folder) to load. 

118 

119 Returns 

120 ------- 

121 dict, None 

122 The parsed yaml data, or None if the file was not found. 

123 

124 Examples 

125 -------- 

126 # Load a session's _ibl_experiment.description.yaml file 

127 

128 >>> data = read_params('/home/data/subject/2020-01-01/001') 

129 

130 # Load a specific device's description file 

131 

132 >>> data = read_params('/home/data/subject/2020-01-01/001/_devices/behaviour.yaml') 

133 

134 """ 

135 if (path := Path(path)).is_dir(): 2a qb% 3 p l n q ' ( T d 5 ) * HbV IbdbJb+ W U DbEbi j k r b 4 , - . / : y z A B C O P Q D E F G e 6 s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 FbH I J K $ c pbL M

136 yaml_file = next(path.glob('_ibl_experiment.description*'), None) 2a qbp l n q ' ( T 5 * HbV IbdbJb+ U DbEbi j k r b 4 , - . / : y z A B C O P Q D E F G e s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 m 1 2 rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 FbH I J K $ c pbL M

137 else: 

138 yaml_file = path if path.exists() else None 1a%3d5)VWUijkrbe6`c

139 if not yaml_file: 2a qb% 3 p l n q ' ( T d 5 ) * HbV IbdbJb+ W U DbEbi j k r b 4 , - . / : y z A B C O P Q D E F G e 6 s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 FbH I J K $ c pbL M

140 _logger.debug('Experiment description not found: %s', path) 2a qb% 3 p l n q ' ( T ) * HbIbdbJb+ 4 , - . / : y z A B C O P Q D E F G ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ ` rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 9 H I J K c pbL M

141 return 2a qb% 3 p l n q ' ( T ) * HbIbdbJb+ 4 , - . / : y z A B C O P Q D E F G ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ ` rbsbtbubvbwbKb{ | } ~ S xbybabbbcbzbAbBb7 8 9 H I J K c pbL M

142 

143 with open(yaml_file, 'r') as fp: 2a T d 5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb

144 data = _patch_file(yaml.safe_load(fp) or {}) 2a T d 5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb

145 return data 2a T d 5 V W U DbEbi j k r b e 6 s t u v f g R w h x X Y Z 0 m 1 2 N Fb$ c pb

146 

147 

148def merge_params(a, b, copy=False): 

149 """ 

150 Given two experiment descriptions, update first with fields in second. 

151 

152 Parameters 

153 ---------- 

154 a : dict 

155 An experiment description dictionary to be updated with fields from `b`. 

156 b : dict 

157 An experiment description dictionary to update `a` with 

158 copy : bool 

159 If true, return a deep copy of `a` instead of updating directly. 

160 

161 Returns 

162 ------- 

163 dict 

164 A merged dictionary consisting of fields from `a` and `b`. 

165 """ 

166 if copy: 2d Cbi j k b

167 a = deepcopy(a) 2Cb

168 for k in b: 2d Cbi j k b

169 if k == 'sync': 2d Cbi j k b

170 assert k not in a or a[k] == b[k], 'multiple sync fields defined' 2d Cbi j

171 if isinstance(b[k], list): 2d Cbi j k b

172 prev = list(a.get(k, [])) 2d Cbi j k b

173 # For procedures and projects, remove duplicates 

174 to_add = b[k] if k == 'tasks' else set(b[k]) - set(prev) 2d Cbi j k b

175 a[k] = prev + list(to_add) 2d Cbi j k b

176 elif isinstance(b[k], dict): 1dijkb

177 a[k] = {**a.get(k, {}), **b[k]} 1dijb

178 else: # A string 

179 a[k] = b[k] 1dijkb

180 return a 2d Cbi j k b

181 

182 

183def aggregate_device(file_device, file_acquisition_description, unlink=False): 

184 """ 

185 Add the contents of a device file to the main acquisition description file. 

186 

187 Parameters 

188 ---------- 

189 file_device : pathlib.Path 

190 The full path to the device yaml file to add to the main description file. 

191 file_acquisition_description : pathlib.Path 

192 The full path to the main acquisition description yaml file to add the device file to. 

193 unlink : bool 

194 If True, the device file is removed after successfully aggregation. 

195 

196 Returns 

197 ------- 

198 dict 

199 The aggregated experiment description data. 

200 

201 Raises 

202 ------ 

203 AssertionError 

204 Device file contains a main 'sync' key that is already present in the main description 

205 file. For an experiment only one main sync device is allowed. 

206 """ 

207 # if a lock file exists retries 5 times to see if it exists 

208 attempts = 0 1dijkb

209 file_lock = file_acquisition_description.with_suffix('.lock') 1dijkb

210 # reads in the partial device data 

211 data_device = read_params(file_device) 1dijkb

212 

213 if not data_device: 1dijkb

214 _logger.warning('empty device file "%s"', file_device) 1d

215 return 1d

216 

217 while True: 1dijkb

218 if not file_lock.exists() or attempts >= 4: 1dijkb

219 break 1dijkb

220 _logger.info('file lock found, waiting 2 seconds %s', file_lock) 1d

221 time.sleep(2) 1d

222 attempts += 1 1d

223 

224 # if the file still exists after 5 attempts, remove it as it's a job that went wrong 

225 if file_lock.exists(): 1dijkb

226 with open(file_lock, 'r') as fp: 1d

227 _logger.debug('file lock contents: %s', yaml.safe_load(fp)) 1d

228 _logger.info('stale file lock found, deleting %s', file_lock) 1d

229 file_lock.unlink() 1d

230 

231 # add in the lock file, add some meta data to ease debugging if one gets stuck 

232 with open(file_lock, 'w') as fp: 1dijkb

233 yaml.safe_dump(dict(datetime=datetime.utcnow().isoformat(), file_device=str(file_device)), fp) 1dijkb

234 

235 # if the acquisition description file already exists, read in the yaml content 

236 if file_acquisition_description.exists(): 1dijkb

237 acq_desc = read_params(file_acquisition_description) 1dijkb

238 else: 

239 acq_desc = {} 1db

240 

241 # merge the dictionaries (NB: acq_desc modified in place) 

242 acq_desc = merge_params(acq_desc, data_device) 1dijkb

243 

244 with open(file_acquisition_description, 'w') as fp: 1dijkb

245 yaml.safe_dump(acq_desc, fp) 1dijkb

246 

247 # unlink the local file 

248 file_lock.unlink() 1dijkb

249 # delete the original file if necessary 

250 if unlink: 1dijkb

251 file_device.unlink() 1dijkb

252 stub_folder = file_acquisition_description.with_name('_devices') 1dijkb

253 if stub_folder.exists() and not any(stub_folder.glob('*.*')): 1dijkb

254 stub_folder.rmdir() 1b

255 

256 return acq_desc 1dijkb

257 

258 

259def get_cameras(sess_params): 

260 devices = sess_params.get('devices', {}) 1o

261 cameras = devices.get('cameras', None) 1o

262 return None if not cameras else list(cameras.keys()) 1o

263 

264 

265def get_sync_label(sess_params): 

266 if not sess_params: 2a qb% 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 H I J K c L M

267 return None 2a qb% 3 p l n q ' ( ) * + 4 , - . / : y z A B C O P Q D E F G ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ ` rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 9 H I J K L M

268 sync_keys = list((sess_params.get('sync') or {}).keys()) 1aTUeostuvfgRwhxXYZ0m12Nc

269 if len(sync_keys) == 0: 1aTUeostuvfgRwhxXYZ0m12Nc

270 return None 

271 if len(sync_keys) > 1: 1aTUeostuvfgRwhxXYZ0m12Nc

272 _logger.warning('Multiple sync keys found in experiment description: %s', sync_keys) 

273 return sync_keys[0] 1aTUeostuvfgRwhxXYZ0m12Nc

274 

275 

276def get_sync(sess_params): 

277 sync_label = get_sync_label(sess_params) 2qbe 7 8 N 9 c

278 if sync_label: 2qbe 7 8 N 9 c

279 return sync_label, sess_params['sync'][sync_label] or {} 1eNc

280 return None, {} 2qb7 8 9

281 

282 

283def get_sync_values(sess_params): 

284 key = get_sync_label(sess_params) 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 H I J K c L M

285 if key: 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBb7 8 N 9 H I J K c L M

286 return sess_params['sync'][key] 1aTUeostuvfgRwhxXYZ0m12Nc

287 

288 

289def get_sync_collection(sess_params): 

290 return (get_sync_values(sess_params) or {}).get('collection') 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C D E F G e o s t u v f g R w h x ; = ? @ [ ] ^ _ X Y Z 0 ` 1 2 { | } ~ S abbbcbc

291 

292 

293def get_sync_extension(sess_params): 

294 return (get_sync_values(sess_params) or {}).get('extension') 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C O P Q D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` m 1 2 rbsbtbubvbwb{ | } ~ S xbybabbbcbzbAbBbH I J K c L M

295 

296 

297def get_sync_namespace(sess_params): 

298 return (get_sync_values(sess_params) or {}).get('acquisition_software') 2a % 3 p l n q ' ( T ) * + U 4 , - . / : y z A B C D E F G e o s t u v f g R w h x ebfb; gbhb= ib? jbkb@ lbmbnbob[ ] ^ _ X Y Z 0 ` 1 2 { | } ~ S abbbcb7 8 N 9 H I J K c L M

299 

300 

301def get_task_protocol(sess_params, task_collection=None): 

302 """ 

303 Fetch the task protocol from an experiment description dict. 

304 

305 Parameters 

306 ---------- 

307 sess_params : dict 

308 The loaded experiment.description file. 

309 task_collection : str, optional 

310 Return the protocol that corresponds to this collection (returns the first matching 

311 protocol in the list). If None, all protocols are returned. 

312 

313 Returns 

314 ------- 

315 str, set, None 

316 If task_collection is None, returns the set of task protocols, otherwise returns the first 

317 protocol that corresponds to the collection, or None if collection not present. 

318 """ 

319 collections = get_collections({'tasks': sess_params.get('tasks')}) 1plnqyzABCOPQDEFGeofghmSHIJKcLM

320 if task_collection is None: 1plnqyzABCOPQDEFGeofghmSHIJKcLM

321 if len(collections) == 0: 1plnqo

322 return None 1plnq

323 else: 

324 return set(collections.keys()) # Return all protocols 1lo

325 else: 

326 return next((k for k, v in collections.items() if v == task_collection), None) 1lyzABCOPQDEFGeofghmSHIJKcLM

327 

328 

329def get_task_collection(sess_params, task_protocol=None): 

330 """ 

331 Fetch the task collection from an experiment description dict. 

332 

333 Parameters 

334 ---------- 

335 sess_params : dict 

336 The loaded experiment.description file. 

337 task_protocol : str, optional 

338 Return the collection that corresponds to this protocol (returns the first matching 

339 protocol in the list). If None, all collections are returned. 

340 

341 Returns 

342 ------- 

343 str, set, None 

344 If task_protocol is None, returns the set of collections, otherwise returns the first 

345 collection that corresponds to the protocol, or None if protocol not present. 

346 

347 Notes 

348 ----- 

349 - The order of the set may not be the same as the descriptions tasks order when iterating. 

350 """ 

351 protocols = sess_params.get('tasks', []) 13plnqV4eofgh$c

352 if task_protocol is not None: 13plnqV4eofgh$c

353 task = next((x for x in protocols if task_protocol in x), None) 1o

354 return (task.get(task_protocol) or {}).get('collection') 1o

355 else: # Return set of all task collections 

356 cset = set(filter(None, (next(iter(x.values()), {}).get('collection') for x in protocols))) 13plnqV4eofgh$c

357 return (next(iter(cset)) if len(cset) == 1 else cset) or None 13plnqV4eofgh$c

358 

359 

360def get_task_protocol_number(sess_params, task_protocol=None): 

361 """ 

362 Fetch the task protocol number from an experiment description dict. 

363 

364 Parameters 

365 ---------- 

366 sess_params : dict 

367 The loaded experiment.description file. 

368 task_protocol : str, optional 

369 Return the number that corresponds to this protocol (returns the first matching 

370 protocol in the list). If None, all numbers are returned. 

371 

372 Returns 

373 ------- 

374 str, list, None 

375 If task_protocol is None, returns list of all numbers, otherwise returns the first 

376 number that corresponds to the protocol, or None if protocol not present. 

377 """ 

378 protocols = sess_params.get('tasks', []) 1aplnqyzABCOPQDEFGestuvgwhxmHIJKcLM

379 if task_protocol is not None: 1aplnqyzABCOPQDEFGestuvgwhxmHIJKcLM

380 task = next((x for x in protocols if task_protocol in x), None) 1anestuvgwhxmc

381 number = (task.get(task_protocol) or {}).get('protocol_number') 1anestuvgwhxmc

382 return int(number) if isinstance(number, str) else number 1anestuvgwhxmc

383 else: # Return set of all task numbers 

384 numbers = list(filter(None, (next(iter(x.values()), {}).get('protocol_number') for x in protocols))) 1plnqyzABCOPQDEFGHIJKLM

385 numbers = [int(n) if isinstance(n, str) else n for n in numbers] 1plnqyzABCOPQDEFGHIJKLM

386 return (next(iter(numbers)) if len(numbers) == 1 else numbers) or None 1plnqyzABCOPQDEFGHIJKLM

387 

388 

389def get_collections(sess_params, flat=False): 

390 """ 

391 Find all collections associated with the session. 

392 

393 Parameters 

394 ---------- 

395 sess_params : dict 

396 The loaded experiment description map. 

397 flat : bool (False) 

398 If True, return a flat set of collections, otherwise return a map of device/sync/task 

399 

400 Returns 

401 ------- 

402 dict[str, str] 

403 A map of device/sync/task and the corresponding collection name. 

404 

405 set[str] 

406 A set of unique collection names. 

407 

408 Notes 

409 ----- 

410 - Assumes only the following data types contained: list, dict, None, str. 

411 """ 

412 collection_map = {} 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM

413 

414 def iter_dict(d): 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM

415 for k, v in d.items(): 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM

416 if isinstance(v, list): 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM

417 for d in filter(lambda x: isinstance(x, dict), v): 1l!#rbeofghmc

418 iter_dict(d) 1l!#rbeofghmc

419 elif isinstance(v, dict) and 'collection' in v: 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM

420 # if the key already exists, append the collection name to the list 

421 if k in collection_map: 1l!#Wrbeofghmc

422 clist = collection_map[k] if isinstance(collection_map[k], list) else [collection_map[k]] 1#

423 collection_map[k] = list(set(clist + [v['collection']])) 1#

424 else: 

425 collection_map[k] = v['collection'] 1l!#Wrbeofghmc

426 elif isinstance(v, dict): 1plnq!WrbyzABCOPQDEFGSHIJKLM

427 iter_dict(v) 1!Wrb

428 

429 iter_dict(sess_params) 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM

430 return set(flatten(collection_map.values())) if flat else collection_map 1plnq!#WrbyzABCOPQDEFGeofghmSHIJKcLM

431 

432 

433def get_video_compressed(sess_params): 

434 videos = sess_params.get('devices', {}).get('cameras', None) 1aestuvfgRwhxc

435 if not videos: 1aestuvfgRwhxc

436 return None 

437 

438 # This is all or nothing, assumes either all videos or not compressed 

439 for key, vals in videos.items(): 1aestuvfgRwhxc

440 compressed = vals.get('compressed', False) 1aestuvfgRwhxc

441 

442 return compressed 1aestuvfgRwhxc

443 

444 

445def get_remote_stub_name(session_path, device_id=None): 

446 """ 

447 Get or create device specific file path for the remote experiment.description stub. 

448 

449 Parameters 

450 ---------- 

451 session_path : pathlib.Path 

452 A remote session path. 

453 device_id : str, optional 

454 A device name, if None the TRANSFER_LABEL parameter is used (defaults to this device's 

455 hostname with a unique numeric ID) 

456 

457 Returns 

458 ------- 

459 pathlib.Path 

460 The full file path to the remote experiment description stub. 

461 

462 Example 

463 ------- 

464 >>> get_remote_stub_name(Path.home().joinpath('subject', '2020-01-01', '001'), 'host-123') 

465 Path.home() / 'subject/2020-01-01/001/_devices/2020-01-01_1_subject@host-123.yaml' 

466 """ 

467 device_id = device_id or misc.create_basic_transfer_params()['TRANSFER_LABEL'] 1rb

468 exp_ref = '{date}_{sequence:d}_{subject:s}'.format(**ConversionMixin.path2ref(session_path)) 1rb

469 remote_filename = f'{exp_ref}@{device_id}.yaml' 1rb

470 return session_path / '_devices' / remote_filename 1rb