Coverage for ibllib/oneibl/data_handlers.py: 64%

434 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-07 14:26 +0100

1"""Downloading of task dependent datasets and registration of task output datasets. 

2 

3The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are 

4present and to register and upload the output datasets. For examples on how to run a task using 

5specific data handlers, see :func:`ibllib.pipes.tasks`. 

6""" 

7import logging 

8import pandas as pd 

9from pathlib import Path, PurePosixPath 

10import shutil 

11import os 

12import abc 

13from time import time 

14from copy import copy 

15 

16from one.api import ONE 

17from one.webclient import AlyxClient 

18from one.util import filter_datasets 

19from one.alf.path import add_uuid_string, session_path_parts, get_alf_path 

20from one.alf.cache import _make_datasets_df 

21from iblutil.util import flatten, ensure_list 

22 

23from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository 

24from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher 

25 

26 

27_logger = logging.getLogger(__name__) 

28 

29 

30class ExpectedDataset: 

31 """An expected input or output dataset.""" 

32 inverted = False 

33 

34 def __init__(self, name, collection, register=None, revision=None, unique=True): 

35 """ 

36 An expected input or output dataset. 

37 

38 NB: This should not be instantiated directly, but rather via the `input` or `output` 

39 static method. 

40 

41 Parameters 

42 ---------- 

43 name : str, None 

44 A dataset name or glob pattern. 

45 collection : str, None 

46 An ALF collection or pattern. 

47 register : bool 

48 Whether to register the file. Default is False for input files, True for output 

49 files. 

50 revision : str 

51 An optional revision. 

52 unique : bool 

53 Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not 

54 affect the output of `find_files`. 

55 """ 

56 if not (collection is None or isinstance(collection, str)): 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

57 collection = '/'.join(collection) 

58 self._identifiers = (collection, revision, name) 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

59 self.operator = None 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

60 self._register = register or False 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

61 self.inverted = False 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

62 self.name = None 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

63 self.unique = unique 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

64 

65 @property 

66 def register(self): 

67 """bool: whether to register the output file.""" 

68 return self._register 1acbnsojqSTUVWuvwxgyzh%ABCDEFGHIJ(XYklerfdimRZ01234*5K6LMN78O9!#$PQ

69 

70 @register.setter 

71 def register(self, value): 

72 """bool: whether to register the output file.""" 

73 if self.operator is not None: 1soj

74 raise AttributeError('cannot set register attribute for operator datasets') 

75 self._register = value 1soj

76 

77 @property 

78 def identifiers(self): 

79 """tuple: the identifying parts of the dataset. 

80 

81 If no operator is applied, the identifiers are (collection, revision, name). 

82 If an operator is applied, a tuple of 3-element tuples is returned. 

83 """ 

84 if self.operator is None: 1cbn[klerfdi

85 return self._identifiers 1cbn[klerfdi

86 # Flatten nested identifiers into tuple of 3-element tuples 

87 identifiers = [] 1bn[d

88 for x in self._identifiers: 1bn[d

89 add = identifiers.extend if x.operator else identifiers.append 1bn[d

90 add(x.identifiers) 1bn[d

91 return tuple(identifiers) 1bn[d

92 

93 @property 

94 def glob_pattern(self): 

95 """str, tuple of str: one or more glob patterns.""" 

96 if self.operator is None: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

97 return str(PurePosixPath(*filter(None, self._identifiers))) 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

98 else: 

99 return tuple(flatten(x.glob_pattern for x in self._identifiers)) 1jr

100 

101 def __repr__(self): 

102 """Represent the dataset object as a string. 

103 

104 If the `name` property is not None, it is returned, otherwise the identifies are used to 

105 format the name. 

106 """ 

107 name = self.__class__.__name__ 

108 if self.name: 

109 return f'<{name}({self.name})>' 

110 if self.operator: 

111 sym = {'or': '|', 'and': '&', 'xor': '^'} 

112 patterns = [d.__repr__() for d in self._identifiers] 

113 pattern = f'{sym[self.operator]:^3}'.join(patterns) 

114 if self.inverted: 

115 pattern = f'~({pattern})' 

116 else: 

117 pattern = ('~' if self.inverted else '') + self.glob_pattern 

118 return f'<{name}({pattern})>' 

119 

120 def find_files(self, session_path, register=False): 

121 """Find files on disk. 

122 

123 Uses glob patterns to find dataset(s) on disk. 

124 

125 Parameters 

126 ---------- 

127 session_path : pathlib.Path, str 

128 A session path within which to glob for the dataset(s). 

129 register : bool 

130 Only return files intended to be registered. 

131 

132 Returns 

133 ------- 

134 bool 

135 True if the dataset is found on disk or is optional. 

136 list of pathlib.Path 

137 A list of matching dataset files. 

138 missing, None, str, set of str 

139 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). 

140 

141 Notes 

142 ----- 

143 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised 

144 although this may change in the future. 

145 - If `register` is false, all files are returned regardless of whether they are intended to be registered. 

146 - If `register` is true, an input with register=True may not be returned if part of an OR operation. 

147 - If `inverted` is true, and files are found, the glob pattern is returned as missing. 

148 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing 

149 patterns. 

150 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. 

151 """ 

152 session_path = Path(session_path) 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

153 ok, actual_files, missing = False, [], None 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

154 if self.operator is None: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

155 if register and not self.register: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

156 return True, actual_files, missing 1asojqSTUVWuvwxgyzh%ABCDEFGHIJ(XYkldimRZ01234*5K6LMN78O9!#$PQ

157 actual_files = sorted(session_path.rglob(self.glob_pattern)) 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

158 # If no revision pattern provided and no files found, search for any revision 

159 if self._identifiers[1] is None and not any(actual_files): 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

160 glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2])) 2a abc b s o j q S T U V W u ' v w x g y z h A B C D E F G H I , J X Y k l e d i m - Z 0 1 2 3 4 ) 5 K 6 L M N 7 8 O 9 ! # P Q

161 actual_files = sorted(session_path.rglob(glob_pattern)) 2a abc b s o j q S T U V W u ' v w x g y z h A B C D E F G H I , J X Y k l e d i m - Z 0 1 2 3 4 ) 5 K 6 L M N 7 8 O 9 ! # P Q

162 ok = any(actual_files) != self.inverted 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

163 if not ok: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

164 missing = self.glob_pattern 2a abc b s o j q S T U V W u ' v w x g y z h A B C D E F G H I , J X Y k l e d i m - Z 0 1 2 3 4 ) 5 K 6 L M N 7 8 O 9 ! # P Q

165 elif self.operator == 'and': 1cbsojqgefd

166 assert len(self._identifiers) == 2 1sgefd

167 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1sgefd

168 ok = all(_ok) 1sgefd

169 actual_files = flatten(_actual_files) 1sgefd

170 missing = set(filter(None, flatten(_missing))) 1sgefd

171 elif self.operator == 'or': 1cbojqg

172 assert len(self._identifiers) == 2 1cboqg

173 missing = set() 1cboqg

174 for d in self._identifiers: 1cboqg

175 ok, actual_files, _missing = d.find_files(session_path, register=register) 1cboqg

176 if ok: 1cboqg

177 break 1cbog

178 if missing is not None: 1cboq

179 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1cboq

180 elif self.operator == 'xor': 1j

181 assert len(self._identifiers) == 2 1j

182 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1j

183 ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1 1j

184 # Return only those datasets that are complete if OK 

185 actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files) 1j

186 if ok: 1j

187 missing = set() 1j

188 elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing 1j

189 missing = set(flatten(self.glob_pattern)) 1j

190 elif not any(_ok): # return all missing glob patterns if none present 1j

191 missing = set(filter(None, flatten(_missing))) 1j

192 elif not isinstance(self.operator, str): 

193 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') 

194 else: 

195 raise NotImplementedError(f'logical {self.operator.upper()} not implemented') 

196 

197 return ok, actual_files, missing 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

198 

199 def filter(self, session_datasets, **kwargs): 

200 """Filter dataset frame by expected datasets. 

201 

202 Parameters 

203 ---------- 

204 session_datasets : pandas.DataFrame 

205 A data frame of session datasets. 

206 kwargs 

207 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and 

208 ignore_qc_not_set. 

209 

210 Returns 

211 ------- 

212 bool 

213 True if the required dataset(s) are present in the data frame. 

214 pandas.DataFrame 

215 A filtered data frame of containing the expected dataset(s). 

216 """ 

217 # ok, datasets = False, session_datasets.iloc[0:0] 

218 if self.operator is None: 1+tp

219 collection, revision, file = self._identifiers 1+tp

220 if self._identifiers[1] is not None: 1+tp

221 raise NotImplementedError('revisions not yet supported') 

222 datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs) 1+tp

223 ok = datasets.empty == self.inverted 1+tp

224 elif self.operator == 'or': 1t

225 assert len(self._identifiers) == 2 1t

226 for d in self._identifiers: 1t

227 ok, datasets = d.filter(session_datasets, **kwargs) 1t

228 if ok: 1t

229 break 1t

230 elif self.operator == 'xor': 1t

231 assert len(self._identifiers) == 2 1t

232 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 1t

233 ok = sum(_ok) == 1 1t

234 if ok: 1t

235 # Return only those datasets that are complete. 

236 datasets = _datasets[_ok.index(True)] 1t

237 else: 

238 datasets = pd.concat(_datasets) 1t

239 elif self.operator == 'and': 

240 assert len(self._identifiers) == 2 

241 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 

242 ok = all(_ok) 

243 datasets = pd.concat(_datasets) 

244 elif not isinstance(self.operator, str): 

245 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') 

246 else: 

247 raise NotImplementedError(f'logical {self.operator.upper()} not implemented') 

248 return ok, datasets 1+tp

249 

250 def _apply_op(self, op, other): 

251 """Apply an operation between two datasets.""" 

252 # Assert both instances of Input or both instances of Output 

253 if not isinstance(other, (self.__class__, tuple)): 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

254 raise TypeError(f'logical operations not supported between objects of type ' 

255 f'{self.__class__.__name__} and {other.__class__.__name__}') 

256 # Assert operation supported 

257 if op not in {'or', 'xor', 'and'}: 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

258 raise ValueError(op) 

259 # Convert tuple to ExpectDataset instance 

260 if isinstance(other, tuple): 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

261 D = (self.input if isinstance(self, Input) else self.output) 

262 other = D(*other) 

263 # Returned instance should only be optional if both datasets are optional 

264 is_input = isinstance(self, Input) 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

265 if all(isinstance(x, OptionalDataset) for x in (self, other)): 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

266 D = OptionalInput if is_input else OptionalOutput 1nerfd

267 else: 

268 D = Input if is_input else Output 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbd m

269 # Instantiate 'empty' object 

270 d = D(None, None) 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

271 d._identifiers = (self, other) 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

272 d.operator = op 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

273 return d 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m

274 

275 def __invert__(self): 

276 """Assert dataset doesn't exist on disk.""" 

277 obj = copy(self) 1ag]^_`{|}m

278 obj.inverted = not self.inverted 1ag]^_`{|}m

279 return obj 1ag]^_`{|}m

280 

281 def __or__(self, b): 

282 """Assert either dataset exists or another does, or both exist.""" 

283 return self._apply_op('or', b) 2a c b @ n t [ o q g ] ^ _ ` { | } e bbcbm

284 

285 def __xor__(self, b): 

286 """Assert either dataset exists or another does, not both.""" 

287 return self._apply_op('xor', b) 1tj

288 

289 def __and__(self, b): 

290 """Assert that a second dataset exists together with the first.""" 

291 return self._apply_op('and', b) 1ansg]^_`{|}erfdm

292 

293 @staticmethod 

294 def input(name, collection, required=True, register=False, **kwargs): 

295 """ 

296 Create an expected input dataset. 

297 

298 By default, expected input datasets are not automatically registered. 

299 

300 Parameters 

301 ---------- 

302 name : str 

303 A dataset name or glob pattern. 

304 collection : str, None 

305 An ALF collection or pattern. 

306 required : bool 

307 Whether file must always be present, or is an optional dataset. Default is True. 

308 register : bool 

309 Whether to register the input file. Default is False for input files, True for output 

310 files. 

311 revision : str 

312 An optional revision. 

313 unique : bool 

314 Whether identifier pattern is expected to match a single dataset or several. 

315 

316 Returns 

317 ------- 

318 Input, OptionalInput 

319 An instance of an Input dataset if required is true, otherwise an OptionalInput. 

320 """ 

321 Class = Input if required else OptionalInput 2a abc b @ + n s t [ o j p q S T U V W u v w x g y z h ] ^ _ ` { | } % A B C D E F G H I J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q

322 obj = Class(name, collection, register=register, **kwargs) 2a abc b @ + n s t [ o j p q S T U V W u v w x g y z h ] ^ _ ` { | } % A B C D E F G H I J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q

323 return obj 2a abc b @ + n s t [ o j p q S T U V W u v w x g y z h ] ^ _ ` { | } % A B C D E F G H I J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q

324 

325 @staticmethod 

326 def output(name, collection, required=True, register=True, **kwargs): 

327 """ 

328 Create an expected output dataset. 

329 

330 By default, expected output datasets are automatically registered. 

331 

332 Parameters 

333 ---------- 

334 name : str 

335 A dataset name or glob pattern. 

336 collection : str, None 

337 An ALF collection or pattern. 

338 required : bool 

339 Whether file must always be present, or is an optional dataset. Default is True. 

340 register : bool 

341 Whether to register the output file. Default is False for input files, True for output 

342 files. 

343 revision : str 

344 An optional revision. 

345 unique : bool 

346 Whether identifier pattern is expected to match a single dataset or several. 

347 

348 Returns 

349 ------- 

350 Output, OptionalOutput 

351 An instance of an Output dataset if required is true, otherwise an OptionalOutput. 

352 """ 

353 Class = Output if required else OptionalOutput 1acb+=p?STUVWu.'v/wxg:yz;h%ABCDEFGHI,J(XYkledim-RZ01234)*5K6LMN78O9!#$PQ

354 obj = Class(name, collection, register=register, **kwargs) 1acb+=p?STUVWu.'v/wxg:yz;h%ABCDEFGHI,J(XYkledim-RZ01234)*5K6LMN78O9!#$PQ

355 return obj 1acb+=p?STUVWu.'v/wxg:yz;h%ABCDEFGHI,J(XYkledim-RZ01234)*5K6LMN78O9!#$PQ

356 

357 

358class OptionalDataset(ExpectedDataset): 

359 """An expected dataset that is not strictly required.""" 

360 

361 def find_files(self, session_path, register=False): 

362 """Find files on disk. 

363 

364 Uses glob patterns to find dataset(s) on disk. 

365 

366 Parameters 

367 ---------- 

368 session_path : pathlib.Path, str 

369 A session path within which to glob for the dataset(s). 

370 register : bool 

371 Only return files intended to be registered. 

372 

373 Returns 

374 ------- 

375 True 

376 Always True as dataset is optional. 

377 list of pathlib.Path 

378 A list of matching dataset files. 

379 missing, None, str, set of str 

380 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). 

381 

382 Notes 

383 ----- 

384 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised 

385 although this may change in the future. 

386 - If `register` is false, all files are returned regardless of whether they are intended to be registered. 

387 - If `inverted` is true, and files are found, the glob pattern is returned as missing. 

388 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing 

389 patterns. 

390 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. 

391 """ 

392 ok, actual_files, missing = super().find_files(session_path, register=register) 2c b q dbu . ' v / w x g : y z ; h % A B C D E F G H I J e f d i - ) K L M N O $ P Q

393 return True, actual_files, missing 2c b q dbu . ' v / w x g : y z ; h % A B C D E F G H I J e f d i - ) K L M N O $ P Q

394 

395 def filter(self, session_datasets, **kwargs): 

396 """Filter dataset frame by expected datasets. 

397 

398 Parameters 

399 ---------- 

400 session_datasets : pandas.DataFrame 

401 An data frame of session datasets. 

402 kwargs 

403 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, 

404 ignore_qc_not_set, and assert_unique. 

405 

406 Returns 

407 ------- 

408 True 

409 Always True as dataset is optional. 

410 pandas.DataFrame 

411 A filtered data frame of containing the expected dataset(s). 

412 """ 

413 ok, datasets = super().filter(session_datasets, **kwargs) 

414 return True, datasets 

415 

416 

417class Input(ExpectedDataset): 

418 """An expected input dataset.""" 

419 pass 

420 

421 

422class OptionalInput(Input, OptionalDataset): 

423 """An optional expected input dataset.""" 

424 pass 

425 

426 

427class Output(ExpectedDataset): 

428 """An expected output dataset.""" 

429 pass 

430 

431 

432class OptionalOutput(Output, OptionalDataset): 

433 """An optional expected output dataset.""" 

434 pass 

435 

436 

437def _parse_signature(signature): 

438 """ 

439 Ensure all a signature's expected datasets are instances of ExpectedDataset. 

440 

441 Parameters 

442 ---------- 

443 signature : Dict[str, list] 

444 A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or 

445 ExpectedDataset instances. 

446 

447 Returns 

448 ------- 

449 Dict[str, list of ExpectedDataset] 

450 A dict containing all tuples converted to ExpectedDataset instances. 

451 """ 

452 I, O = ExpectedDataset.input, ExpectedDataset.output # noqa 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

453 inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']] 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

454 outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']] 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

455 return {'input_files': inputs, 'output_files': outputs} 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

456 

457 

458def dataset_from_name(name, datasets): 

459 """ 

460 From a list of ExpectedDataset instances, return those that match a given name. 

461 

462 Parameters 

463 ---------- 

464 name : str 

465 The name of the dataset. 

466 datasets : list of ExpectedDataset 

467 A list of ExpectedDataset instances. 

468 

469 Returns 

470 ------- 

471 list of ExpectedDataset 

472 The ExpectedDataset instances that match the given name. 

473 

474 """ 

475 matches = [] 1cb@ef

476 for dataset in datasets: 1cb@ef

477 if dataset.operator is None: 1cb@ef

478 if dataset._identifiers[2] == name: 1cb@ef

479 matches.append(dataset) 1cb@ef

480 else: 

481 matches.extend(dataset_from_name(name, dataset._identifiers)) 1cb@ef

482 return matches 1cb@ef

483 

484 

485def update_collections(dataset, new_collection, substring=None, unique=None): 

486 """ 

487 Update the collection of a dataset. 

488 

489 This updates all nested ExpectedDataset instances with the new collection and returns copies. 

490 

491 Parameters 

492 ---------- 

493 dataset : ExpectedDataset 

494 The dataset to update. 

495 new_collection : str, list of str 

496 The new collection or collections. 

497 substring : str, optional 

498 An optional substring in the collection to replace with new collection(s). If None, the 

499 entire collection will be replaced. 

500 

501 Returns 

502 ------- 

503 ExpectedDataset 

504 A copy of the dataset with the updated collection(s). 

505 

506 """ 

507 after = ensure_list(new_collection) 1cbnklerfdi

508 D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output 1cbnklerfdi

509 if dataset.operator is None: 1cbnklerfdi

510 collection, revision, name = dataset.identifiers 1cbnklerfdi

511 if revision is not None: 1cbnklerfdi

512 raise NotImplementedError 1n

513 if substring: 1cbnklerfdi

514 after = [(collection or '').replace(substring, x) or None for x in after] 1cbnklerfdi

515 if unique is None: 1cbnklerfdi

516 unique = [not set(name + (x or '')).intersection('*[?') for x in after] 1cbnklerfdi

517 else: 

518 unique = [unique] * len(after) 1n

519 register = dataset.register 1cbnklerfdi

520 updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0]) 1cbnklerfdi

521 if len(after) > 1: 1cbnklerfdi

522 for folder, unq in zip(after[1:], unique[1:]): 1nerfd

523 updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq) 1nerfd

524 else: 

525 updated = copy(dataset) 1cbne

526 updated._identifiers = [update_collections(dd, new_collection, substring, unique) 1cbne

527 for dd in updated._identifiers] 

528 return updated 1cbnklerfdi

529 

530 

531class DataHandler(abc.ABC): 

532 def __init__(self, session_path, signature, one=None): 

533 """ 

534 Base data handler class 

535 :param session_path: path to session 

536 :param signature: input and output file signatures 

537 :param one: ONE instance 

538 """ 

539 self.session_path = session_path 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

540 self.signature = _parse_signature(signature) 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

541 self.one = one 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

542 self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records) 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

543 

544 def setUp(self, **kwargs): 

545 """Function to optionally overload to download required data to run task.""" 

546 pass 2ebR $

547 

548 def getData(self, one=None): 

549 """Finds the datasets required for task based on input signatures. 

550 

551 Parameters 

552 ---------- 

553 one : one.api.One, optional 

554 An instance of ONE to use. 

555 

556 Returns 

557 ------- 

558 pandas.DataFrame, None 

559 A data frame of required datasets. An empty frame is returned if no registered datasets are required, 

560 while None is returned if no instance of ONE is set. 

561 """ 

562 if self.one is None and one is None: 1+p

563 return 1+

564 one = one or self.one 1+p

565 session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) 1+p

566 dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] 1+p

567 return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() 1+p

568 

569 def getOutputFiles(self): 

570 """ 

571 Return a data frame of output datasets found on disk. 

572 

573 Returns 

574 ------- 

575 pandas.DataFrame 

576 A dataset data frame of datasets on disk that were specified in signature['output_files']. 

577 """ 

578 assert self.session_path 

579 # Next convert datasets to frame 

580 # Create dataframe of all ALF datasets 

581 df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id']) 

582 # Filter outputs 

583 if len(self.signature['output_files']) == 0: 

584 return pd.DataFrame() 

585 present = [file.filter(df)[1] for file in self.signature['output_files']] 

586 return pd.concat(present).droplevel('eid') 

587 

588 def uploadData(self, outputs, version): 

589 """ 

590 Function to optionally overload to upload and register data 

591 :param outputs: output files from task to register 

592 :param version: ibllib version 

593 :return: 

594 """ 

595 if isinstance(outputs, list): 1=~hR

596 versions = [version for _ in outputs] 1=~hR

597 else: 

598 versions = [version] 

599 

600 return versions 1=~hR

601 

602 def cleanUp(self, **kwargs): 

603 """Function to optionally overload to clean up files after running task.""" 

604 pass 1~h

605 

606 

607class LocalDataHandler(DataHandler): 

608 def __init__(self, session_path, signatures, one=None): 

609 """ 

610 Data handler for running tasks locally, with no architecture or db connection 

611 :param session_path: path to session 

612 :param signature: input and output file signatures 

613 :param one: ONE instance 

614 """ 

615 super().__init__(session_path, signatures, one=one) 2ebR $

616 

617 

618class ServerDataHandler(DataHandler): 

619 def __init__(self, session_path, signatures, one=None): 

620 """ 

621 Data handler for running tasks on lab local servers when all data is available locally 

622 

623 :param session_path: path to session 

624 :param signature: input and output file signatures 

625 :param one: ONE instance 

626 """ 

627 super().__init__(session_path, signatures, one=one) 2a c b + = ? ~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # P Q

628 

629 def uploadData(self, outputs, version, clobber=False, **kwargs): 

630 """ 

631 Upload and/or register output data. 

632 

633 This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`. 

634 

635 Parameters 

636 ---------- 

637 outputs : list of pathlib.Path 

638 A set of ALF paths to register to Alyx. 

639 version : str, list of str 

640 The version of ibllib used to generate these output files. 

641 clobber : bool 

642 If True, re-upload outputs that have already been passed to this method. 

643 kwargs 

644 Optional keyword arguments for one.registration.RegistrationClient.register_files. 

645 

646 Returns 

647 ------- 

648 list of dicts, dict 

649 A list of newly created Alyx dataset records or the registration data if dry. 

650 """ 

651 versions = super().uploadData(outputs, version) 1=~h

652 data_repo = get_local_data_repository(self.one.alyx) 1=~h

653 # If clobber = False, do not re-upload the outputs that have already been processed 

654 outputs = ensure_list(outputs) 1=~h

655 to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs)) 1=~h

656 records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or [] 1=~h

657 if kwargs.get('dry', False): 1=~h

658 return records 1=

659 # Store processed outputs 

660 self.processed.update({k: v for k, v in zip(to_upload, records) if v}) 1=~h

661 return [self.processed[x] for x in outputs if x in self.processed] 1=~h

662 

663 def cleanUp(self, **_): 

664 """Empties and returns the processed dataset mep.""" 

665 super().cleanUp() 1~h

666 processed = self.processed 1~h

667 self.processed = {} 1~h

668 return processed 1~h

669 

670 

671class ServerGlobusDataHandler(DataHandler): 

672 def __init__(self, session_path, signatures, one=None): 

673 """ 

674 Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus 

675 

676 :param session_path: path to session 

677 :param signatures: input and output file signatures 

678 :param one: ONE instance 

679 """ 

680 from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa 

681 super().__init__(session_path, signatures, one=one) 

682 self.globus = Globus(client_name='server', headless=True) 

683 

684 # on local servers set up the local root path manually as some have different globus config paths 

685 self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects' 

686 

687 # Find the lab 

688 self.lab = get_lab(self.session_path, self.one.alyx) 

689 

690 # For cortex lab we need to get the endpoint from the ibl alyx 

691 if self.lab == 'cortexlab': 

692 alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None) 

693 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx) 

694 else: 

695 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx) 

696 

697 self.local_paths = [] 

698 

699 def setUp(self, **_): 

700 """Function to download necessary data to run tasks using globus-sdk.""" 

701 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: 

702 df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode)) 

703 else: 

704 df = super().getData(one=self.one) 

705 

706 if len(df) == 0: 

707 # If no datasets found in the cache only work off local file system do not attempt to 

708 # download any missing data using Globus 

709 return 

710 

711 # Check for space on local server. If less that 500 GB don't download new data 

712 space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2] 

713 if space_free < 500e9: 

714 _logger.warning('Space left on server is < 500GB, won\'t re-download new data') 

715 return 

716 

717 rel_sess_path = '/'.join(self.session_path.parts[-3:]) 

718 target_paths = [] 

719 source_paths = [] 

720 for i, d in df.iterrows(): 

721 sess_path = Path(rel_sess_path).joinpath(d['rel_path']) 

722 full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path) 

723 if not full_local_path.exists(): 

724 uuid = i 

725 self.local_paths.append(full_local_path) 

726 target_paths.append(sess_path) 

727 source_paths.append(add_uuid_string(sess_path, uuid)) 

728 

729 if len(target_paths) != 0: 

730 ts = time() 

731 for sp, tp in zip(source_paths, target_paths): 

732 _logger.info(f'Downloading {sp} to {tp}') 

733 self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths) 

734 _logger.debug(f'Complete. Time elapsed {time() - ts}') 

735 

736 def uploadData(self, outputs, version, **kwargs): 

737 """ 

738 Function to upload and register data of completed task 

739 :param outputs: output files from task to register 

740 :param version: ibllib version 

741 :return: output info of registered datasets 

742 """ 

743 versions = super().uploadData(outputs, version) 

744 data_repo = get_local_data_repository(self.one.alyx) 

745 return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs) 

746 

747 def cleanUp(self, **_): 

748 """Clean up, remove the files that were downloaded from Globus once task has completed.""" 

749 for file in self.local_paths: 

750 os.unlink(file) 

751 

752 

753class RemoteEC2DataHandler(DataHandler): 

754 def __init__(self, session_path, signature, one=None): 

755 """ 

756 Data handler for running tasks on remote compute node. Will download missing data via http using ONE 

757 

758 :param session_path: path to session 

759 :param signature: input and output file signatures 

760 :param one: ONE instance 

761 """ 

762 super().__init__(session_path, signature, one=one) 

763 

764 def setUp(self, **_): 

765 """ 

766 Function to download necessary data to run tasks using ONE 

767 :return: 

768 """ 

769 df = super().getData() 

770 self.one._check_filesystem(df, check_hash=False) 

771 

772 def uploadData(self, outputs, version, **kwargs): 

773 """ 

774 Function to upload and register data of completed task via S3 patcher 

775 :param outputs: output files from task to register 

776 :param version: ibllib version 

777 :return: output info of registered datasets 

778 """ 

779 versions = super().uploadData(outputs, version) 

780 s3_patcher = S3Patcher(one=self.one) 

781 return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user, 

782 versions=versions, **kwargs) 

783 

784 

785class RemoteHttpDataHandler(DataHandler): 

786 def __init__(self, session_path, signature, one=None): 

787 """ 

788 Data handler for running tasks on remote compute node. Will download missing data via http using ONE 

789 

790 :param session_path: path to session 

791 :param signature: input and output file signatures 

792 :param one: ONE instance 

793 """ 

794 super().__init__(session_path, signature, one=one) 

795 

796 def setUp(self, **_): 

797 """ 

798 Function to download necessary data to run tasks using ONE 

799 :return: 

800 """ 

801 df = super().getData() 

802 self.one._check_filesystem(df) 

803 

804 def uploadData(self, outputs, version, **kwargs): 

805 """ 

806 Function to upload and register data of completed task via FTP patcher 

807 :param outputs: output files from task to register 

808 :param version: ibllib version 

809 :return: output info of registered datasets 

810 """ 

811 versions = super().uploadData(outputs, version) 

812 ftp_patcher = FTPPatcher(one=self.one) 

813 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

814 versions=versions, **kwargs) 

815 

816 

817class RemoteAwsDataHandler(DataHandler): 

818 def __init__(self, session_path, signature, one=None): 

819 """ 

820 Data handler for running tasks on remote compute node. 

821 

822 This will download missing data from the private IBL S3 AWS data bucket. New datasets are 

823 uploaded via Globus. 

824 

825 :param session_path: path to session 

826 :param signature: input and output file signatures 

827 :param one: ONE instance 

828 """ 

829 super().__init__(session_path, signature, one=one) 

830 self.local_paths = [] 

831 

832 def setUp(self, **_): 

833 """Function to download necessary data to run tasks using AWS boto3.""" 

834 df = super().getData() 

835 self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows())) 

836 

837 def uploadData(self, outputs, version, **kwargs): 

838 """ 

839 Function to upload and register data of completed task via FTP patcher 

840 :param outputs: output files from task to register 

841 :param version: ibllib version 

842 :return: output info of registered datasets 

843 """ 

844 # Set up Globus 

845 from one.remote.globus import Globus # noqa 

846 self.globus = Globus(client_name='server', headless=True) 

847 self.lab = session_path_parts(self.session_path, as_dict=True)['lab'] 

848 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: 

849 base_url = 'https://alyx.internationalbrainlab.org' 

850 _logger.warning('Changing Alyx client to %s', base_url) 

851 ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode) 

852 else: 

853 ac = self.one.alyx 

854 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac) 

855 

856 # register datasets 

857 versions = super().uploadData(outputs, version) 

858 response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs) 

859 

860 # upload directly via globus 

861 source_paths = [] 

862 target_paths = [] 

863 collections = {} 

864 

865 for dset, out in zip(response, outputs): 

866 assert Path(out).name == dset['name'] 

867 # set flag to false 

868 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 

869 collection = '/'.join(fr['relative_path'].split('/')[:-1]) 

870 if collection in collections.keys(): 

871 collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}) 

872 else: 

873 collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}} 

874 

875 # Set all exists status to false for server file records 

876 self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False}) 

877 

878 source_paths.append(out) 

879 target_paths.append(add_uuid_string(fr['relative_path'], dset['id'])) 

880 

881 if len(target_paths) != 0: 

882 ts = time() 

883 for sp, tp in zip(source_paths, target_paths): 

884 _logger.info(f'Uploading {sp} to {tp}') 

885 self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths) 

886 _logger.debug(f'Complete. Time elapsed {time() - ts}') 

887 

888 for collection, files in collections.items(): 

889 globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True) 

890 file_names = [str(gl[0]) for gl in globus_files] 

891 file_sizes = [gl[1] for gl in globus_files] 

892 

893 for name, details in files.items(): 

894 try: 

895 idx = file_names.index(name) 

896 size = file_sizes[idx] 

897 if size == details['size']: 

898 # update the file record if sizes match 

899 self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True}) 

900 else: 

901 _logger.warning(f'File {name} found on SDSC but sizes do not match') 

902 except ValueError: 

903 _logger.warning(f'File {name} not found on SDSC') 

904 

905 return response 

906 

907 # ftp_patcher = FTPPatcher(one=self.one) 

908 # return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

909 # versions=versions, **kwargs) 

910 

911 def cleanUp(self, task): 

912 """Clean up, remove the files that were downloaded from globus once task has completed.""" 

913 if task.status == 0: 

914 for file in self.local_paths: 

915 os.unlink(file) 

916 

917 

918class RemoteGlobusDataHandler(DataHandler): 

919 """ 

920 Data handler for running tasks on remote compute node. Will download missing data using Globus. 

921 

922 :param session_path: path to session 

923 :param signature: input and output file signatures 

924 :param one: ONE instance 

925 """ 

926 def __init__(self, session_path, signature, one=None): 

927 super().__init__(session_path, signature, one=one) 

928 

929 def setUp(self, **_): 

930 """Function to download necessary data to run tasks using globus.""" 

931 # TODO 

932 pass 

933 

934 def uploadData(self, outputs, version, **kwargs): 

935 """ 

936 Function to upload and register data of completed task via FTP patcher 

937 :param outputs: output files from task to register 

938 :param version: ibllib version 

939 :return: output info of registered datasets 

940 """ 

941 versions = super().uploadData(outputs, version) 

942 ftp_patcher = FTPPatcher(one=self.one) 

943 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

944 versions=versions, **kwargs) 

945 

946 

947class SDSCDataHandler(DataHandler): 

948 """ 

949 Data handler for running tasks on SDSC compute node 

950 

951 :param session_path: path to session 

952 :param signature: input and output file signatures 

953 :param one: ONE instance 

954 """ 

955 

956 def __init__(self, session_path, signatures, one=None): 

957 super().__init__(session_path, signatures, one=one) 1p

958 self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) 1p

959 self.root_path = SDSC_ROOT_PATH 1p

960 

961 def setUp(self, task): 

962 """Function to create symlinks to necessary data to run tasks.""" 

963 df = super().getData() 1p

964 

965 SDSC_TMP = Path(self.patch_path.joinpath(task.__class__.__name__)) 1p

966 session_path = Path(get_alf_path(self.session_path)) 1p

967 for uuid, d in df.iterrows(): 1p

968 file_path = session_path / d['rel_path'] 1p

969 file_uuid = add_uuid_string(file_path, uuid) 1p

970 file_link = SDSC_TMP.joinpath(file_path) 1p

971 file_link.parent.mkdir(exist_ok=True, parents=True) 1p

972 try: 1p

973 file_link.symlink_to( 1p

974 Path(self.root_path.joinpath(file_uuid))) 

975 except FileExistsError: 

976 pass 

977 

978 task.session_path = SDSC_TMP.joinpath(session_path) 1p

979 

980 def uploadData(self, outputs, version, **kwargs): 

981 """ 

982 Function to upload and register data of completed task via SDSC patcher 

983 :param outputs: output files from task to register 

984 :param version: ibllib version 

985 :return: output info of registered datasets 

986 """ 

987 versions = super().uploadData(outputs, version) 

988 sdsc_patcher = SDSCPatcher(one=self.one) 

989 return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs) 

990 

991 def cleanUp(self, task): 

992 """Function to clean up symlinks created to run task.""" 

993 assert self.patch_path.parts[0:4] == task.session_path.parts[0:4] 1p

994 shutil.rmtree(task.session_path) 1p

995 

996 

997class PopeyeDataHandler(SDSCDataHandler): 

998 

999 def __init__(self, session_path, signatures, one=None): 

1000 super().__init__(session_path, signatures, one=one) 

1001 self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/")) 

1002 self.root_path = Path("/mnt/sdceph/users/ibl/data") 

1003 

1004 def uploadData(self, outputs, version, **kwargs): 

1005 raise NotImplementedError( 

1006 "Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task." 

1007 ) 

1008 

1009 def cleanUp(self, **_): 

1010 """Symlinks are preserved until registration.""" 

1011 pass