Coverage for ibllib/oneibl/data_handlers.py: 66%

443 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-02 18:55 +0100

1"""Downloading of task dependent datasets and registration of task output datasets. 

2 

3The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are 

4present and to register and upload the output datasets. For examples on how to run a task using 

5specific data handlers, see :func:`ibllib.pipes.tasks`. 

6""" 

7import logging 

8import pandas as pd 

9from pathlib import Path, PurePosixPath 

10import shutil 

11import os 

12import abc 

13from time import time 

14from copy import copy 

15 

16from one.api import ONE 

17from one.webclient import AlyxClient 

18from one.util import filter_datasets 

19from one.alf.path import add_uuid_string, get_alf_path, ensure_alf_path 

20from one.alf.cache import _make_datasets_df 

21from iblutil.util import flatten, ensure_list 

22 

23from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository 

24from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher 

25 

26 

27_logger = logging.getLogger(__name__) 

28 

29 

30class ExpectedDataset: 

31 """An expected input or output dataset.""" 

32 inverted = False 

33 

34 def __init__(self, name, collection, register=None, revision=None, unique=True): 

35 """ 

36 An expected input or output dataset. 

37 

38 NB: This should not be instantiated directly, but rather via the `input` or `output` 

39 static method. 

40 

41 Parameters 

42 ---------- 

43 name : str, None 

44 A dataset name or glob pattern. 

45 collection : str, None 

46 An ALF collection or pattern. 

47 register : bool 

48 Whether to register the file. Default is False for input files, True for output 

49 files. 

50 revision : str 

51 An optional revision. 

52 unique : bool 

53 Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not 

54 affect the output of `find_files`. 

55 """ 

56 if not (collection is None or isinstance(collection, str)): 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

57 collection = '/'.join(collection) 

58 self._identifiers = (collection, revision, name) 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

59 self.operator = None 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

60 self._register = register or False 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

61 self.inverted = False 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

62 self.name = None 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

63 self.unique = unique 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

64 

65 @property 

66 def register(self): 

67 """bool: whether to register the output file.""" 

68 return self._register 1acbptqnrTUVWXvwxygzAj'BCDEFGHIJK)YZmlesfdioS012345+6L7MNO89P!#$%QR

69 

70 @register.setter 

71 def register(self, value): 

72 """bool: whether to register the output file.""" 

73 if self.operator is not None: 1tqn

74 raise AttributeError('cannot set register attribute for operator datasets') 

75 self._register = value 1tqn

76 

77 @property 

78 def identifiers(self): 

79 """tuple: the identifying parts of the dataset. 

80 

81 If no operator is applied, the identifiers are (collection, revision, name). 

82 If an operator is applied, a tuple of 3-element tuples is returned. 

83 """ 

84 if self.operator is None: 1cbp]mlesfdi

85 return self._identifiers 1cbp]mlesfdi

86 # Flatten nested identifiers into tuple of 3-element tuples 

87 identifiers = [] 1bp]d

88 for x in self._identifiers: 1bp]d

89 add = identifiers.extend if x.operator else identifiers.append 1bp]d

90 add(x.identifiers) 1bp]d

91 return tuple(identifiers) 1bp]d

92 

93 @property 

94 def glob_pattern(self): 

95 """str, tuple of str: one or more glob patterns.""" 

96 if self.operator is None: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

97 return str(PurePosixPath(*filter(None, self._identifiers))) 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

98 else: 

99 return tuple(flatten(x.glob_pattern for x in self._identifiers)) 1ns

100 

101 def __repr__(self): 

102 """Represent the dataset object as a string. 

103 

104 If the `name` property is not None, it is returned, otherwise the identifies are used to 

105 format the name. 

106 """ 

107 name = self.__class__.__name__ 

108 if self.name: 

109 return f'<{name}({self.name})>' 

110 if self.operator: 

111 sym = {'or': '|', 'and': '&', 'xor': '^'} 

112 patterns = [d.__repr__() for d in self._identifiers] 

113 pattern = f'{sym[self.operator]:^3}'.join(patterns) 

114 if self.inverted: 

115 pattern = f'~({pattern})' 

116 else: 

117 pattern = ('~' if self.inverted else '') + self.glob_pattern 

118 return f'<{name}({pattern})>' 

119 

120 def find_files(self, session_path, register=False): 

121 """Find files on disk. 

122 

123 Uses glob patterns to find dataset(s) on disk. 

124 

125 Parameters 

126 ---------- 

127 session_path : pathlib.Path, str 

128 A session path within which to glob for the dataset(s). 

129 register : bool 

130 Only return files intended to be registered. 

131 

132 Returns 

133 ------- 

134 bool 

135 True if the dataset is found on disk or is optional. 

136 list of pathlib.Path 

137 A list of matching dataset files. 

138 missing, None, str, set of str 

139 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). 

140 

141 Notes 

142 ----- 

143 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised 

144 although this may change in the future. 

145 - If `register` is false, all files are returned regardless of whether they are intended to be registered. 

146 - If `register` is true, an input with register=True may not be returned if part of an OR operation. 

147 - If `inverted` is true, and files are found, the glob pattern is returned as missing. 

148 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing 

149 patterns. 

150 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. 

151 """ 

152 session_path = Path(session_path) 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

153 ok, actual_files, missing = False, [], None 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

154 if self.operator is None: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

155 if register and not self.register: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

156 return True, actual_files, missing 1atqnrTUVWXvwxygzAj'BCDEFGHIJK)YZmldioS012345+6L7MNO89P!#$%QR

157 actual_files = sorted(session_path.rglob(self.glob_pattern)) 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

158 # If no revision pattern provided and no files found, search for any revision 

159 if self._identifiers[1] is None and not any(actual_files): 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

160 glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2])) 2a bbc b t q n r T U V W X v ( w x y g z A j B C D E F G H I J - K Y Z m l e d i o . 0 1 2 3 4 5 * 6 L 7 M N O 8 9 P ! # $ Q R

161 actual_files = sorted(session_path.rglob(glob_pattern)) 2a bbc b t q n r T U V W X v ( w x y g z A j B C D E F G H I J - K Y Z m l e d i o . 0 1 2 3 4 5 * 6 L 7 M N O 8 9 P ! # $ Q R

162 ok = any(actual_files) != self.inverted 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

163 if not ok: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

164 missing = self.glob_pattern 2a bbc b t q n r T U V W X v ( w x y g z A j B C D E F G H I J - K Y Z m l e d i o . 0 1 2 3 4 5 * 6 L 7 M N O 8 9 P ! # $ Q R

165 elif self.operator == 'and': 1cbtqnrgfd

166 assert len(self._identifiers) == 2 1tgfd

167 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1tgfd

168 ok = all(_ok) 1tgfd

169 actual_files = flatten(_actual_files) 1tgfd

170 missing = set(filter(None, flatten(_missing))) 1tgfd

171 elif self.operator == 'or': 1cbqnrg

172 assert len(self._identifiers) == 2 1cbqrg

173 missing = set() 1cbqrg

174 for d in self._identifiers: 1cbqrg

175 ok, actual_files, _missing = d.find_files(session_path, register=register) 1cbqrg

176 if ok: 1cbqrg

177 break 1cbqg

178 if missing is not None: 1cbqr

179 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1cbqr

180 elif self.operator == 'xor': 1n

181 assert len(self._identifiers) == 2 1n

182 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1n

183 ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1 1n

184 # Return only those datasets that are complete if OK 

185 actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files) 1n

186 if ok: 1n

187 missing = set() 1n

188 elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing 1n

189 missing = set(flatten(self.glob_pattern)) 1n

190 elif not any(_ok): # return all missing glob patterns if none present 1n

191 missing = set(filter(None, flatten(_missing))) 1n

192 elif not isinstance(self.operator, str): 

193 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') 

194 else: 

195 raise NotImplementedError(f'logical {self.operator.upper()} not implemented') 

196 

197 return ok, actual_files, missing 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

198 

199 def filter(self, session_datasets, **kwargs): 

200 """Filter dataset frame by expected datasets. 

201 

202 Parameters 

203 ---------- 

204 session_datasets : pandas.DataFrame 

205 A data frame of session datasets. 

206 kwargs 

207 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and 

208 ignore_qc_not_set. 

209 

210 Returns 

211 ------- 

212 bool 

213 True if the required dataset(s) are present in the data frame. 

214 pandas.DataFrame 

215 A filtered data frame of containing the expected dataset(s). 

216 """ 

217 # ok, datasets = False, session_datasets.iloc[0:0] 

218 if self.operator is None: 1,uhk

219 collection, revision, file = self._identifiers 1,uhk

220 if self._identifiers[1] is not None: 1,uhk

221 raise NotImplementedError('revisions not yet supported') 

222 datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs) 1,uhk

223 ok = datasets.empty == self.inverted 1,uhk

224 elif self.operator == 'or': 1u

225 assert len(self._identifiers) == 2 1u

226 for d in self._identifiers: 1u

227 ok, datasets = d.filter(session_datasets, **kwargs) 1u

228 if ok: 1u

229 break 1u

230 elif self.operator == 'xor': 1u

231 assert len(self._identifiers) == 2 1u

232 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 1u

233 ok = sum(_ok) == 1 1u

234 if ok: 1u

235 # Return only those datasets that are complete. 

236 datasets = _datasets[_ok.index(True)] 1u

237 else: 

238 datasets = pd.concat(_datasets) 1u

239 elif self.operator == 'and': 

240 assert len(self._identifiers) == 2 

241 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 

242 ok = all(_ok) 

243 datasets = pd.concat(_datasets) 

244 elif not isinstance(self.operator, str): 

245 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') 

246 else: 

247 raise NotImplementedError(f'logical {self.operator.upper()} not implemented') 

248 return ok, datasets 1,uhk

249 

250 def _apply_op(self, op, other): 

251 """Apply an operation between two datasets.""" 

252 # Assert both instances of Input or both instances of Output 

253 if not isinstance(other, (self.__class__, tuple)): 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

254 raise TypeError(f'logical operations not supported between objects of type ' 

255 f'{self.__class__.__name__} and {other.__class__.__name__}') 

256 # Assert operation supported 

257 if op not in {'or', 'xor', 'and'}: 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

258 raise ValueError(op) 

259 # Convert tuple to ExpectDataset instance 

260 if isinstance(other, tuple): 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

261 D = (self.input if isinstance(self, Input) else self.output) 

262 other = D(*other) 

263 # Returned instance should only be optional if both datasets are optional 

264 is_input = isinstance(self, Input) 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

265 if all(isinstance(x, OptionalDataset) for x in (self, other)): 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

266 D = OptionalInput if is_input else OptionalOutput 1pesf

267 else: 

268 D = Input if is_input else Output 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbd o

269 # Instantiate 'empty' object 

270 d = D(None, None) 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

271 d._identifiers = (self, other) 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

272 d.operator = op 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

273 return d 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o

274 

275 def __invert__(self): 

276 """Assert dataset doesn't exist on disk.""" 

277 obj = copy(self) 1ag^_`{|}~o

278 obj.inverted = not self.inverted 1ag^_`{|}~o

279 return obj 1ag^_`{|}~o

280 

281 def __or__(self, b): 

282 """Assert either dataset exists or another does, or both exist.""" 

283 return self._apply_op('or', b) 2a c b ? p u ] q r g ^ _ ` { | } ~ e cbdbo

284 

285 def __xor__(self, b): 

286 """Assert either dataset exists or another does, not both.""" 

287 return self._apply_op('xor', b) 1un

288 

289 def __and__(self, b): 

290 """Assert that a second dataset exists together with the first.""" 

291 return self._apply_op('and', b) 1aptg^_`{|}~esfdo

292 

293 @staticmethod 

294 def input(name, collection, required=True, register=False, **kwargs): 

295 """ 

296 Create an expected input dataset. 

297 

298 By default, expected input datasets are not automatically registered. 

299 

300 Parameters 

301 ---------- 

302 name : str 

303 A dataset name or glob pattern. 

304 collection : str, None 

305 An ALF collection or pattern. 

306 required : bool 

307 Whether file must always be present, or is an optional dataset. Default is True. 

308 register : bool 

309 Whether to register the input file. Default is False for input files, True for output 

310 files. 

311 revision : str 

312 An optional revision. 

313 unique : bool 

314 Whether identifier pattern is expected to match a single dataset or several. 

315 

316 Returns 

317 ------- 

318 Input, OptionalInput 

319 An instance of an Input dataset if required is true, otherwise an OptionalInput. 

320 """ 

321 Class = Input if required else OptionalInput 2a bbc b ? , p t u ] q n h k r T U V W X v w x y g z A j ^ _ ` { | } ~ ' B C D E F G H I J K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 + 6 L 7 M N O 8 9 P ! # $ % Q R

322 obj = Class(name, collection, register=register, **kwargs) 2a bbc b ? , p t u ] q n h k r T U V W X v w x y g z A j ^ _ ` { | } ~ ' B C D E F G H I J K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 + 6 L 7 M N O 8 9 P ! # $ % Q R

323 return obj 2a bbc b ? , p t u ] q n h k r T U V W X v w x y g z A j ^ _ ` { | } ~ ' B C D E F G H I J K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 + 6 L 7 M N O 8 9 P ! # $ % Q R

324 

325 @staticmethod 

326 def output(name, collection, required=True, register=True, **kwargs): 

327 """ 

328 Create an expected output dataset. 

329 

330 By default, expected output datasets are automatically registered. 

331 

332 Parameters 

333 ---------- 

334 name : str 

335 A dataset name or glob pattern. 

336 collection : str, None 

337 An ALF collection or pattern. 

338 required : bool 

339 Whether file must always be present, or is an optional dataset. Default is True. 

340 register : bool 

341 Whether to register the output file. Default is False for input files, True for output 

342 files. 

343 revision : str 

344 An optional revision. 

345 unique : bool 

346 Whether identifier pattern is expected to match a single dataset or several. 

347 

348 Returns 

349 ------- 

350 Output, OptionalOutput 

351 An instance of an Output dataset if required is true, otherwise an OptionalOutput. 

352 """ 

353 Class = Output if required else OptionalOutput 1acb,@hk[TUVWXv/(w:xyg;zA=j'BCDEFGHIJ-K)YZmledio.S012345*+6L7MNO89P!#$%QR

354 obj = Class(name, collection, register=register, **kwargs) 1acb,@hk[TUVWXv/(w:xyg;zA=j'BCDEFGHIJ-K)YZmledio.S012345*+6L7MNO89P!#$%QR

355 return obj 1acb,@hk[TUVWXv/(w:xyg;zA=j'BCDEFGHIJ-K)YZmledio.S012345*+6L7MNO89P!#$%QR

356 

357 

358class OptionalDataset(ExpectedDataset): 

359 """An expected dataset that is not strictly required.""" 

360 

361 def find_files(self, session_path, register=False): 

362 """Find files on disk. 

363 

364 Uses glob patterns to find dataset(s) on disk. 

365 

366 Parameters 

367 ---------- 

368 session_path : pathlib.Path, str 

369 A session path within which to glob for the dataset(s). 

370 register : bool 

371 Only return files intended to be registered. 

372 

373 Returns 

374 ------- 

375 True 

376 Always True as dataset is optional. 

377 list of pathlib.Path 

378 A list of matching dataset files. 

379 missing, None, str, set of str 

380 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). 

381 

382 Notes 

383 ----- 

384 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised 

385 although this may change in the future. 

386 - If `register` is false, all files are returned regardless of whether they are intended to be registered. 

387 - If `inverted` is true, and files are found, the glob pattern is returned as missing. 

388 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing 

389 patterns. 

390 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. 

391 """ 

392 ok, actual_files, missing = super().find_files(session_path, register=register) 2c b r ebv / ( w : x y g ; z A = j ' B C D E F G H I J K e f d i . * L M N O P % Q R

393 return True, actual_files, missing 2c b r ebv / ( w : x y g ; z A = j ' B C D E F G H I J K e f d i . * L M N O P % Q R

394 

395 def filter(self, session_datasets, **kwargs): 

396 """Filter dataset frame by expected datasets. 

397 

398 Parameters 

399 ---------- 

400 session_datasets : pandas.DataFrame 

401 An data frame of session datasets. 

402 kwargs 

403 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, 

404 ignore_qc_not_set, and assert_unique. 

405 

406 Returns 

407 ------- 

408 True 

409 Always True as dataset is optional. 

410 pandas.DataFrame 

411 A filtered data frame of containing the expected dataset(s). 

412 """ 

413 ok, datasets = super().filter(session_datasets, **kwargs) 1h

414 return True, datasets 1h

415 

416 

417class Input(ExpectedDataset): 

418 """An expected input dataset.""" 

419 pass 

420 

421 

422class OptionalInput(Input, OptionalDataset): 

423 """An optional expected input dataset.""" 

424 pass 

425 

426 

427class Output(ExpectedDataset): 

428 """An expected output dataset.""" 

429 pass 

430 

431 

432class OptionalOutput(Output, OptionalDataset): 

433 """An optional expected output dataset.""" 

434 pass 

435 

436 

437def _parse_signature(signature): 

438 """ 

439 Ensure all a signature's expected datasets are instances of ExpectedDataset. 

440 

441 Parameters 

442 ---------- 

443 signature : Dict[str, list] 

444 A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or 

445 ExpectedDataset instances. 

446 

447 Returns 

448 ------- 

449 Dict[str, list of ExpectedDataset] 

450 A dict containing all tuples converted to ExpectedDataset instances. 

451 """ 

452 I, O = ExpectedDataset.input, ExpectedDataset.output # noqa 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

453 inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']] 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

454 outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']] 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

455 return {'input_files': inputs, 'output_files': outputs} 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

456 

457 

458def dataset_from_name(name, datasets): 

459 """ 

460 From a list of ExpectedDataset instances, return those that match a given name. 

461 

462 Parameters 

463 ---------- 

464 name : str, function 

465 The name of the dataset or a function to match the dataset name. 

466 datasets : list of ExpectedDataset 

467 A list of ExpectedDataset instances. 

468 

469 Returns 

470 ------- 

471 list of ExpectedDataset 

472 The ExpectedDataset instances that match the given name. 

473 

474 """ 

475 matches = [] 1cb?ef

476 for dataset in datasets: 1cb?ef

477 if dataset.operator is None: 1cb?ef

478 if isinstance(name, str): 1cb?ef

479 if dataset._identifiers[2] == name: 1cb?ef

480 matches.append(dataset) 1cb?ef

481 else: 

482 if name(dataset._identifiers[2]): 1?

483 matches.append(dataset) 1?

484 else: 

485 matches.extend(dataset_from_name(name, dataset._identifiers)) 1cb?ef

486 return matches 1cb?ef

487 

488 

489def update_collections(dataset, new_collection, substring=None, unique=None, exact_match=False): 

490 """ 

491 Update the collection of a dataset. 

492 

493 This updates all nested ExpectedDataset instances with the new collection and returns copies. 

494 

495 Parameters 

496 ---------- 

497 dataset : ExpectedDataset 

498 The dataset to update. 

499 new_collection : str, list of str 

500 The new collection or collections. 

501 substring : str, optional 

502 An optional substring in the collection to replace with new collection(s). If None, the 

503 entire collection will be replaced. 

504 unique : bool, optional 

505 When provided, this will be used to set the `unique` attribute of the new dataset(s). If 

506 None, the `unique` attribute will be set to True if the collection does not contain 

507 wildcards. 

508 exact_match : bool 

509 If True, the collection will be replaced only if it contains `substring`. 

510 

511 Returns 

512 ------- 

513 ExpectedDataset 

514 A copy of the dataset with the updated collection(s). 

515 

516 """ 

517 after = ensure_list(new_collection) 1cbpmlesfdi

518 D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output 1cbpmlesfdi

519 if dataset.operator is None: 1cbpmlesfdi

520 collection, revision, name = dataset.identifiers 1cbpmlesfdi

521 if revision is not None: 1cbpmlesfdi

522 raise NotImplementedError 1p

523 if substring: 1cbpmlesfdi

524 if exact_match and substring not in collection: 1cbpmlesfdi

525 after = [collection] 1cbpledi

526 else: 

527 after = [(collection or '').replace(substring, x) or None for x in after] 1cbpmlesfdi

528 if unique is None: 1cbpmlesfdi

529 unique = [not set(name + (x or '')).intersection('*[?') for x in after] 1cbpmlesfdi

530 else: 

531 unique = [unique] * len(after) 1p

532 register = dataset.register 1cbpmlesfdi

533 updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0]) 1cbpmlesfdi

534 if len(after) > 1: 1cbpmlesfdi

535 for folder, unq in zip(after[1:], unique[1:]): 1pesfd

536 updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq) 1pesfd

537 else: 

538 updated = copy(dataset) 1cbpe

539 updated._identifiers = [update_collections(dd, new_collection, substring, unique, exact_match) 1cbpe

540 for dd in updated._identifiers] 

541 return updated 1cbpmlesfdi

542 

543 

544class DataHandler(abc.ABC): 

545 def __init__(self, session_path, signature, one=None): 

546 """ 

547 Base data handler class 

548 :param session_path: path to session 

549 :param signature: input and output file signatures 

550 :param one: ONE instance 

551 """ 

552 self.session_path = ensure_alf_path(session_path) 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

553 self.signature = _parse_signature(signature) 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

554 self.one = one 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

555 self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records) 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R

556 

557 def setUp(self, **kwargs): 

558 """Function to optionally overload to download required data to run task.""" 

559 pass 2fbS %

560 

561 def getData(self, one=None): 

562 """Finds the datasets required for task based on input signatures. 

563 

564 Parameters 

565 ---------- 

566 one : one.api.One, optional 

567 An instance of ONE to use. 

568 

569 Returns 

570 ------- 

571 pandas.DataFrame, None 

572 A data frame of required datasets. An empty frame is returned if no registered datasets are required, 

573 while None is returned if no instance of ONE is set. 

574 """ 

575 if self.one is None and one is None: 1,hk

576 return 1,

577 one = one or self.one 1,hk

578 session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) 1,hk

579 dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] 1,hk

580 return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() 1,hk

581 

582 def getOutputFiles(self, session_path=None): 

583 """ 

584 Return a data frame of output datasets found on disk. 

585 

586 Returns 

587 ------- 

588 pandas.DataFrame 

589 A dataset data frame of datasets on disk that were specified in signature['output_files']. 

590 """ 

591 session_path = self.session_path if session_path is None else session_path 1hk

592 assert session_path 1hk

593 # Next convert datasets to frame 

594 # Create dataframe of all ALF datasets 

595 df = _make_datasets_df(session_path, hash_files=False).set_index(['eid', 'id']) 1hk

596 # Filter outputs 

597 if len(self.signature['output_files']) == 0: 1hk

598 return pd.DataFrame() 

599 present = [file.filter(df)[1] for file in self.signature['output_files']] 1hk

600 return pd.concat(present).droplevel('eid') 1hk

601 

602 def uploadData(self, outputs, version): 

603 """ 

604 Function to optionally overload to upload and register data 

605 :param outputs: output files from task to register 

606 :param version: ibllib version 

607 :return: 

608 """ 

609 if isinstance(outputs, list): 2@ abj S

610 versions = [version for _ in outputs] 2@ abj S

611 else: 

612 versions = [version] 

613 

614 return versions 2@ abj S

615 

616 def cleanUp(self, **kwargs): 

617 """Function to optionally overload to clean up files after running task.""" 

618 pass 2abj

619 

620 

621class LocalDataHandler(DataHandler): 

622 def __init__(self, session_path, signatures, one=None): 

623 """ 

624 Data handler for running tasks locally, with no architecture or db connection 

625 :param session_path: path to session 

626 :param signature: input and output file signatures 

627 :param one: ONE instance 

628 """ 

629 super().__init__(session_path, signatures, one=one) 2fbS %

630 

631 

632class ServerDataHandler(DataHandler): 

633 def __init__(self, session_path, signatures, one=None): 

634 """ 

635 Data handler for running tasks on lab local servers when all data is available locally 

636 

637 :param session_path: path to session 

638 :param signature: input and output file signatures 

639 :param one: ONE instance 

640 """ 

641 super().__init__(session_path, signatures, one=one) 2a c b , @ [ abT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ Q R

642 

643 def uploadData(self, outputs, version, clobber=False, **kwargs): 

644 """ 

645 Upload and/or register output data. 

646 

647 This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`. 

648 

649 Parameters 

650 ---------- 

651 outputs : list of pathlib.Path 

652 A set of ALF paths to register to Alyx. 

653 version : str, list of str 

654 The version of ibllib used to generate these output files. 

655 clobber : bool 

656 If True, re-upload outputs that have already been passed to this method. 

657 kwargs 

658 Optional keyword arguments for one.registration.RegistrationClient.register_files. 

659 

660 Returns 

661 ------- 

662 list of dicts, dict 

663 A list of newly created Alyx dataset records or the registration data if dry. 

664 """ 

665 versions = super().uploadData(outputs, version) 2@ abj

666 data_repo = get_local_data_repository(self.one.alyx) 2@ abj

667 # If clobber = False, do not re-upload the outputs that have already been processed 

668 outputs = ensure_list(outputs) 2@ abj

669 to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs)) 2@ abj

670 records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or [] 2@ abj

671 if kwargs.get('dry', False): 2@ abj

672 return records 1@

673 # Store processed outputs 

674 self.processed.update({k: v for k, v in zip(to_upload, records) if v}) 2@ abj

675 return [self.processed[x] for x in outputs if x in self.processed] 2@ abj

676 

677 def cleanUp(self, **_): 

678 """Empties and returns the processed dataset mep.""" 

679 super().cleanUp() 2abj

680 processed = self.processed 2abj

681 self.processed = {} 2abj

682 return processed 2abj

683 

684 

685class ServerGlobusDataHandler(DataHandler): 

686 def __init__(self, session_path, signatures, one=None): 

687 """ 

688 Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus 

689 

690 :param session_path: path to session 

691 :param signatures: input and output file signatures 

692 :param one: ONE instance 

693 """ 

694 from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa 

695 super().__init__(session_path, signatures, one=one) 

696 self.globus = Globus(client_name='server', headless=True) 

697 

698 # on local servers set up the local root path manually as some have different globus config paths 

699 self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects' 

700 

701 # Find the lab 

702 self.lab = get_lab(self.session_path, self.one.alyx) 

703 

704 # For cortex lab we need to get the endpoint from the ibl alyx 

705 if self.lab == 'cortexlab': 

706 alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None) 

707 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx) 

708 else: 

709 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx) 

710 

711 self.local_paths = [] 

712 

713 def setUp(self, **_): 

714 """Function to download necessary data to run tasks using globus-sdk.""" 

715 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: 

716 df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode)) 

717 else: 

718 df = super().getData(one=self.one) 

719 

720 if len(df) == 0: 

721 # If no datasets found in the cache only work off local file system do not attempt to 

722 # download any missing data using Globus 

723 return 

724 

725 # Check for space on local server. If less that 500 GB don't download new data 

726 space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2] 

727 if space_free < 500e9: 

728 _logger.warning('Space left on server is < 500GB, won\'t re-download new data') 

729 return 

730 

731 rel_sess_path = self.session_path.session_path_short() 

732 target_paths = [] 

733 source_paths = [] 

734 for i, d in df.iterrows(): 

735 sess_path = Path(rel_sess_path).joinpath(d['rel_path']) 

736 full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path) 

737 if not full_local_path.exists(): 

738 uuid = i 

739 self.local_paths.append(full_local_path) 

740 target_paths.append(sess_path) 

741 source_paths.append(add_uuid_string(sess_path, uuid)) 

742 

743 if len(target_paths) != 0: 

744 ts = time() 

745 for sp, tp in zip(source_paths, target_paths): 

746 _logger.info(f'Downloading {sp} to {tp}') 

747 self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths) 

748 _logger.debug(f'Complete. Time elapsed {time() - ts}') 

749 

750 def uploadData(self, outputs, version, **kwargs): 

751 """ 

752 Function to upload and register data of completed task 

753 :param outputs: output files from task to register 

754 :param version: ibllib version 

755 :return: output info of registered datasets 

756 """ 

757 versions = super().uploadData(outputs, version) 

758 data_repo = get_local_data_repository(self.one.alyx) 

759 return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs) 

760 

761 def cleanUp(self, **_): 

762 """Clean up, remove the files that were downloaded from Globus once task has completed.""" 

763 for file in self.local_paths: 

764 os.unlink(file) 

765 

766 

767class RemoteEC2DataHandler(DataHandler): 

768 def __init__(self, session_path, signature, one=None): 

769 """ 

770 Data handler for running tasks on remote compute node. Will download missing data via http using ONE 

771 

772 :param session_path: path to session 

773 :param signature: input and output file signatures 

774 :param one: ONE instance 

775 """ 

776 super().__init__(session_path, signature, one=one) 

777 

778 def setUp(self, check_hash=True, **_): 

779 """ 

780 Function to download necessary data to run tasks using ONE 

781 :return: 

782 """ 

783 df = super().getData() 

784 self.one._check_filesystem(df, check_hash=check_hash) 

785 

786 def uploadData(self, outputs, version, **kwargs): 

787 """ 

788 Function to upload and register data of completed task via S3 patcher 

789 :param outputs: output files from task to register 

790 :param version: ibllib version 

791 :return: output info of registered datasets 

792 """ 

793 versions = super().uploadData(outputs, version) 

794 s3_patcher = S3Patcher(one=self.one) 

795 return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user, 

796 versions=versions, **kwargs) 

797 

798 

799class RemoteHttpDataHandler(DataHandler): 

800 def __init__(self, session_path, signature, one=None): 

801 """ 

802 Data handler for running tasks on remote compute node. Will download missing data via http using ONE 

803 

804 :param session_path: path to session 

805 :param signature: input and output file signatures 

806 :param one: ONE instance 

807 """ 

808 super().__init__(session_path, signature, one=one) 

809 

810 def setUp(self, **_): 

811 """ 

812 Function to download necessary data to run tasks using ONE 

813 :return: 

814 """ 

815 df = super().getData() 

816 self.one._check_filesystem(df) 

817 

818 def uploadData(self, outputs, version, **kwargs): 

819 """ 

820 Function to upload and register data of completed task via FTP patcher 

821 :param outputs: output files from task to register 

822 :param version: ibllib version 

823 :return: output info of registered datasets 

824 """ 

825 versions = super().uploadData(outputs, version) 

826 ftp_patcher = FTPPatcher(one=self.one) 

827 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

828 versions=versions, **kwargs) 

829 

830 

831class RemoteAwsDataHandler(DataHandler): 

832 def __init__(self, session_path, signature, one=None): 

833 """ 

834 Data handler for running tasks on remote compute node. 

835 

836 This will download missing data from the private IBL S3 AWS data bucket. New datasets are 

837 uploaded via Globus. 

838 

839 :param session_path: path to session 

840 :param signature: input and output file signatures 

841 :param one: ONE instance 

842 """ 

843 super().__init__(session_path, signature, one=one) 

844 self.local_paths = [] 

845 

846 def setUp(self, **_): 

847 """Function to download necessary data to run tasks using AWS boto3.""" 

848 df = super().getData() 

849 self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows())) 

850 

851 def uploadData(self, outputs, version, **kwargs): 

852 """ 

853 Function to upload and register data of completed task via FTP patcher 

854 :param outputs: output files from task to register 

855 :param version: ibllib version 

856 :return: output info of registered datasets 

857 """ 

858 # Set up Globus 

859 from one.remote.globus import Globus # noqa 

860 self.globus = Globus(client_name=kwargs.pop('client_name', 'server'), headless=True) 

861 self.lab = self.session_path.lab 

862 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: 

863 base_url = 'https://alyx.internationalbrainlab.org' 

864 _logger.warning('Changing Alyx client to %s', base_url) 

865 ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode) 

866 else: 

867 ac = self.one.alyx 

868 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac) 

869 

870 # register datasets 

871 versions = super().uploadData(outputs, version) 

872 response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs) 

873 

874 # upload directly via globus 

875 source_paths = [] 

876 target_paths = [] 

877 collections = {} 

878 

879 for dset, out in zip(response, outputs): 

880 assert Path(out).name == dset['name'] 

881 # set flag to false 

882 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 

883 collection = '/'.join(fr['relative_path'].split('/')[:-1]) 

884 if collection in collections.keys(): 

885 collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}) 

886 else: 

887 collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}} 

888 

889 # Set all exists status to false for server file records 

890 self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False}) 

891 

892 source_paths.append(out) 

893 target_paths.append(add_uuid_string(fr['relative_path'], dset['id'])) 

894 

895 if len(target_paths) != 0: 

896 ts = time() 

897 for sp, tp in zip(source_paths, target_paths): 

898 _logger.info(f'Uploading {sp} to {tp}') 

899 self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths) 

900 _logger.debug(f'Complete. Time elapsed {time() - ts}') 

901 

902 for collection, files in collections.items(): 

903 globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True) 

904 file_names = [str(gl[0]) for gl in globus_files] 

905 file_sizes = [gl[1] for gl in globus_files] 

906 

907 for name, details in files.items(): 

908 try: 

909 idx = file_names.index(name) 

910 size = file_sizes[idx] 

911 if size == details['size']: 

912 # update the file record if sizes match 

913 self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True}) 

914 else: 

915 _logger.warning(f'File {name} found on SDSC but sizes do not match') 

916 except ValueError: 

917 _logger.warning(f'File {name} not found on SDSC') 

918 

919 return response 

920 

921 # ftp_patcher = FTPPatcher(one=self.one) 

922 # return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

923 # versions=versions, **kwargs) 

924 

925 def cleanUp(self, task): 

926 """Clean up, remove the files that were downloaded from globus once task has completed.""" 

927 if task.status == 0: 

928 for file in self.local_paths: 

929 os.unlink(file) 

930 

931 

932class RemoteGlobusDataHandler(DataHandler): 

933 """ 

934 Data handler for running tasks on remote compute node. Will download missing data using Globus. 

935 

936 :param session_path: path to session 

937 :param signature: input and output file signatures 

938 :param one: ONE instance 

939 """ 

940 def __init__(self, session_path, signature, one=None): 

941 super().__init__(session_path, signature, one=one) 

942 

943 def setUp(self, **_): 

944 """Function to download necessary data to run tasks using globus.""" 

945 # TODO 

946 pass 

947 

948 def uploadData(self, outputs, version, **kwargs): 

949 """ 

950 Function to upload and register data of completed task via FTP patcher 

951 :param outputs: output files from task to register 

952 :param version: ibllib version 

953 :return: output info of registered datasets 

954 """ 

955 versions = super().uploadData(outputs, version) 

956 ftp_patcher = FTPPatcher(one=self.one) 

957 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

958 versions=versions, **kwargs) 

959 

960 

961class SDSCDataHandler(DataHandler): 

962 """ 

963 Data handler for running tasks on SDSC compute node 

964 

965 :param session_path: path to session 

966 :param signature: input and output file signatures 

967 :param one: ONE instance 

968 """ 

969 

970 def __init__(self, session_path, signatures, one=None): 

971 super().__init__(session_path, signatures, one=one) 1hk

972 self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) 1hk

973 self.root_path = SDSC_ROOT_PATH 1hk

974 self.linked_files = [] # List of symlinks created to run tasks 1hk

975 

976 def setUp(self, task, **_): 

977 """Function to create symlinks to necessary data to run tasks.""" 

978 df = super().getData() 1hk

979 

980 SDSC_TMP = ensure_alf_path(self.patch_path.joinpath(task.__class__.__name__)) 1hk

981 session_path = Path(get_alf_path(self.session_path)) 1hk

982 for uuid, d in df.iterrows(): 1hk

983 file_path = session_path / d['rel_path'] 1hk

984 file_uuid = add_uuid_string(file_path, uuid) 1hk

985 file_link = SDSC_TMP.joinpath(file_path) 1hk

986 file_link.parent.mkdir(exist_ok=True, parents=True) 1hk

987 try: # TODO append link to task attribute 1hk

988 file_link.symlink_to( 1hk

989 Path(self.root_path.joinpath(file_uuid))) 

990 self.linked_files.append(file_link) 1hk

991 except FileExistsError: 

992 pass 

993 task.session_path = SDSC_TMP.joinpath(session_path) 1hk

994 # If one of the symlinked input files is also an expected output, raise here to avoid overwriting 

995 # In the future we may instead copy the data under this condition 

996 assert self.getOutputFiles(session_path=task.session_path).shape[0] == 0, ( 1hk

997 "On SDSC patcher, output files should be distinct from input files to avoid overwriting") 

998 

999 def uploadData(self, outputs, version, **kwargs): 

1000 """ 

1001 Function to upload and register data of completed task via SDSC patcher 

1002 :param outputs: output files from task to register 

1003 :param version: ibllib version 

1004 :return: output info of registered datasets 

1005 """ 

1006 versions = super().uploadData(outputs, version) 

1007 sdsc_patcher = SDSCPatcher(one=self.one) 

1008 return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs) 

1009 

1010 def cleanUp(self, task): 

1011 """Function to clean up symlinks created to run task.""" 

1012 assert self.patch_path.parts[0:4] == task.session_path.parts[0:4] 1h

1013 shutil.rmtree(task.session_path) 1h

1014 

1015 

1016class PopeyeDataHandler(SDSCDataHandler): 

1017 

1018 def __init__(self, session_path, signatures, one=None): 

1019 super().__init__(session_path, signatures, one=one) 

1020 self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/")) 

1021 self.root_path = Path("/mnt/sdceph/users/ibl/data") 

1022 

1023 def uploadData(self, outputs, version, **kwargs): 

1024 raise NotImplementedError( 

1025 "Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task." 

1026 ) 

1027 

1028 def cleanUp(self, **_): 

1029 """Symlinks are preserved until registration.""" 

1030 pass