Coverage for ibllib/oneibl/data_handlers.py: 63%

434 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 15:25 +0000

1"""Downloading of task dependent datasets and registration of task output datasets. 

2 

3The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are 

4present and to register and upload the output datasets. For examples on how to run a task using 

5specific data handlers, see :func:`ibllib.pipes.tasks`. 

6""" 

7import logging 

8import pandas as pd 

9from pathlib import Path, PurePosixPath 

10import shutil 

11import os 

12import abc 

13from time import time 

14from copy import copy 

15 

16from one.api import ONE 

17from one.webclient import AlyxClient 

18from one.util import filter_datasets 

19from one.alf.path import add_uuid_string, session_path_parts, get_alf_path 

20from one.alf.cache import _make_datasets_df 

21from iblutil.util import flatten, ensure_list 

22 

23from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository 

24from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher 

25 

26 

27_logger = logging.getLogger(__name__) 

28 

29 

30class ExpectedDataset: 

31 """An expected input or output dataset.""" 

32 inverted = False 

33 

34 def __init__(self, name, collection, register=None, revision=None, unique=True): 

35 """ 

36 An expected input or output dataset. 

37 

38 NB: This should not be instantiated directly, but rather via the `input` or `output` 

39 static method. 

40 

41 Parameters 

42 ---------- 

43 name : str, None 

44 A dataset name or glob pattern. 

45 collection : str, None 

46 An ALF collection or pattern. 

47 register : bool 

48 Whether to register the file. Default is False for input files, True for output 

49 files. 

50 revision : str 

51 An optional revision. 

52 unique : bool 

53 Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not 

54 affect the output of `find_files`. 

55 """ 

56 if not (collection is None or isinstance(collection, str)): 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

57 collection = '/'.join(collection) 

58 self._identifiers = (collection, revision, name) 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

59 self.operator = None 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

60 self._register = register or False 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

61 self.inverted = False 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

62 self.name = None 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

63 self.unique = unique 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

64 

65 @property 

66 def register(self): 

67 """bool: whether to register the output file.""" 

68 return self._register 1acbmpSTUVWuvwxgyzh%ABCDEFGHIJ(XYjkdqfielRZ01234*5K6LMN78O9!#$PQ

69 

70 @register.setter 

71 def register(self, value): 

72 """bool: whether to register the output file.""" 

73 if self.operator is not None: 

74 raise AttributeError('cannot set register attribute for operator datasets') 

75 self._register = value 

76 

77 @property 

78 def identifiers(self): 

79 """tuple: the identifying parts of the dataset. 

80 

81 If no operator is applied, the identifiers are (collection, revision, name). 

82 If an operator is applied, a tuple of 3-element tuples is returned. 

83 """ 

84 if self.operator is None: 1cbm@jkdqfie

85 return self._identifiers 1cbm@jkdqfie

86 # Flatten nested identifiers into tuple of 3-element tuples 

87 identifiers = [] 1bm@e

88 for x in self._identifiers: 1bm@e

89 add = identifiers.extend if x.operator else identifiers.append 1bm@e

90 add(x.identifiers) 1bm@e

91 return tuple(identifiers) 1bm@e

92 

93 @property 

94 def glob_pattern(self): 

95 """str, tuple of str: one or more glob patterns.""" 

96 if self.operator is None: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

97 return str(PurePosixPath(*filter(None, self._identifiers))) 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

98 else: 

99 return tuple(flatten(x.glob_pattern for x in self._identifiers)) 1nq

100 

101 def __repr__(self): 

102 """Represent the dataset object as a string. 

103 

104 If the `name` property is not None, it is returned, otherwise the identifies are used to 

105 format the name. 

106 """ 

107 name = self.__class__.__name__ 

108 if self.name: 

109 return f'<{name}({self.name})>' 

110 if self.operator: 

111 sym = {'or': '|', 'and': '&', 'xor': '^'} 

112 patterns = [d.__repr__() for d in self._identifiers] 

113 pattern = f'{sym[self.operator]:^3}'.join(patterns) 

114 if self.inverted: 

115 pattern = f'~({pattern})' 

116 else: 

117 pattern = ('~' if self.inverted else '') + self.glob_pattern 

118 return f'<{name}({pattern})>' 

119 

120 def find_files(self, session_path, register=False): 

121 """Find files on disk. 

122 

123 Uses glob patterns to find dataset(s) on disk. 

124 

125 Parameters 

126 ---------- 

127 session_path : pathlib.Path, str 

128 A session path within which to glob for the dataset(s). 

129 register : bool 

130 Only return files intended to be registered. 

131 

132 Returns 

133 ------- 

134 bool 

135 True if the dataset is found on disk or is optional. 

136 list of pathlib.Path 

137 A list of matching dataset files. 

138 missing, None, str, set of str 

139 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). 

140 

141 Notes 

142 ----- 

143 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised 

144 although this may change in the future. 

145 - If `register` is false, all files are returned regardless of whether they are intended to be registered. 

146 - If `inverted` is true, and files are found, the glob pattern is returned as missing. 

147 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing 

148 patterns. 

149 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. 

150 """ 

151 session_path = Path(session_path) 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

152 ok, actual_files, missing = False, [], None 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

153 if self.operator is None: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

154 if register and not self.register: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

155 return True, actual_files, missing 1apSTUVWuvwxgyzh%ABCDEFGHIJ(XYjkilRZ01234*5K6LMN78O9!#$PQ

156 actual_files = sorted(session_path.rglob(self.glob_pattern)) 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

157 # If no revision pattern provided and no files found, search for any revision 

158 if self._identifiers[1] is None and not any(actual_files): 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

159 glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2])) 1a~cbsrnpSTUVWu'vwxgyzhABCDEFGHI,JXYjkdielZ01234)5K6LMN78O9!#PQ

160 actual_files = sorted(session_path.rglob(glob_pattern)) 1a~cbsrnpSTUVWu'vwxgyzhABCDEFGHI,JXYjkdielZ01234)5K6LMN78O9!#PQ

161 ok = any(actual_files) != self.inverted 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

162 if not ok: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

163 missing = self.glob_pattern 1a~cbsrnpSTUVWu'vwxgyzhABCDEFGHI,JXYjkdielZ01234)5K6LMN78O9!#PQ

164 elif self.operator == 'and': 1cbsrnpgdfe

165 assert len(self._identifiers) == 2 1sgdfe

166 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers)) 1sgdfe

167 ok = all(_ok) 1sgdfe

168 actual_files = flatten(_actual_files) 1sgdfe

169 missing = set(filter(None, flatten(_missing))) 1sgdfe

170 elif self.operator == 'or': 1cbrnpg

171 assert len(self._identifiers) == 2 1cbrpg

172 missing = set() 1cbrpg

173 for d in self._identifiers: 1cbrpg

174 ok, actual_files, _missing = d.find_files(session_path) 1cbrpg

175 if ok: 1cbrpg

176 break 1cbrg

177 if missing is not None: 1cbrp

178 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1cbrp

179 elif self.operator == 'xor': 1n

180 assert len(self._identifiers) == 2 1n

181 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers)) 1n

182 ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1 1n

183 # Return only those datasets that are complete if OK 

184 actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files) 1n

185 if ok: 1n

186 missing = set() 1n

187 elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing 1n

188 missing = set(flatten(self.glob_pattern)) 1n

189 elif not any(_ok): # return all missing glob patterns if none present 1n

190 missing = set(filter(None, flatten(_missing))) 1n

191 elif not isinstance(self.operator, str): 

192 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') 

193 else: 

194 raise NotImplementedError(f'logical {self.operator.upper()} not implemented') 

195 

196 return ok, actual_files, missing 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

197 

198 def filter(self, session_datasets, **kwargs): 

199 """Filter dataset frame by expected datasets. 

200 

201 Parameters 

202 ---------- 

203 session_datasets : pandas.DataFrame 

204 A data frame of session datasets. 

205 kwargs 

206 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and 

207 ignore_qc_not_set. 

208 

209 Returns 

210 ------- 

211 bool 

212 True if the required dataset(s) are present in the data frame. 

213 pandas.DataFrame 

214 A filtered data frame of containing the expected dataset(s). 

215 """ 

216 # ok, datasets = False, session_datasets.iloc[0:0] 

217 if self.operator is None: 1+to

218 collection, revision, file = self._identifiers 1+to

219 if self._identifiers[1] is not None: 1+to

220 raise NotImplementedError('revisions not yet supported') 

221 datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs) 1+to

222 ok = datasets.empty == self.inverted 1+to

223 elif self.operator == 'or': 1t

224 assert len(self._identifiers) == 2 1t

225 for d in self._identifiers: 1t

226 ok, datasets = d.filter(session_datasets, **kwargs) 1t

227 if ok: 1t

228 break 1t

229 elif self.operator == 'xor': 1t

230 assert len(self._identifiers) == 2 1t

231 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 1t

232 ok = sum(_ok) == 1 1t

233 if ok: 1t

234 # Return only those datasets that are complete. 

235 datasets = _datasets[_ok.index(True)] 1t

236 else: 

237 datasets = pd.concat(_datasets) 1t

238 elif self.operator == 'and': 

239 assert len(self._identifiers) == 2 

240 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 

241 ok = all(_ok) 

242 datasets = pd.concat(_datasets) 

243 elif not isinstance(self.operator, str): 

244 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"') 

245 else: 

246 raise NotImplementedError(f'logical {self.operator.upper()} not implemented') 

247 return ok, datasets 1+to

248 

249 def _apply_op(self, op, other): 

250 """Apply an operation between two datasets.""" 

251 # Assert both instances of Input or both instances of Output 

252 if not isinstance(other, (self.__class__, tuple)): 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

253 raise TypeError(f'logical operations not supported between objects of type ' 

254 f'{self.__class__.__name__} and {other.__class__.__name__}') 

255 # Assert operation supported 

256 if op not in {'or', 'xor', 'and'}: 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

257 raise ValueError(op) 

258 # Convert tuple to ExpectDataset instance 

259 if isinstance(other, tuple): 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

260 D = (self.input if isinstance(self, Input) else self.output) 

261 other = D(*other) 

262 # Returned instance should only be optional if both datasets are optional 

263 is_input = isinstance(self, Input) 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

264 if all(isinstance(x, OptionalDataset) for x in (self, other)): 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

265 D = OptionalInput if is_input else OptionalOutput 1mdqfe

266 else: 

267 D = Input if is_input else Output 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbe l

268 # Instantiate 'empty' object 

269 d = D(None, None) 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

270 d._identifiers = (self, other) 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

271 d.operator = op 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

272 return d 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l

273 

274 def __invert__(self): 

275 """Assert dataset doesn't exist on disk.""" 

276 obj = copy(self) 1ag[]^_`{|l

277 obj.inverted = not self.inverted 1ag[]^_`{|l

278 return obj 1ag[]^_`{|l

279 

280 def __or__(self, b): 

281 """Assert either dataset exists or another does, or both exist.""" 

282 return self._apply_op('or', b) 2a c b ? m t @ r p g [ ] ^ _ ` { | d abbbl

283 

284 def __xor__(self, b): 

285 """Assert either dataset exists or another does, not both.""" 

286 return self._apply_op('xor', b) 1tn

287 

288 def __and__(self, b): 

289 """Assert that a second dataset exists together with the first.""" 

290 return self._apply_op('and', b) 1amsg[]^_`{|dqfel

291 

292 @staticmethod 

293 def input(name, collection, required=True, register=False, **kwargs): 

294 """ 

295 Create an expected input dataset. 

296 

297 By default, expected input datasets are not automatically registered. 

298 

299 Parameters 

300 ---------- 

301 name : str 

302 A dataset name or glob pattern. 

303 collection : str, None 

304 An ALF collection or pattern. 

305 required : bool 

306 Whether file must always be present, or is an optional dataset. Default is True. 

307 register : bool 

308 Whether to register the input file. Default is False for input files, True for output 

309 files. 

310 revision : str 

311 An optional revision. 

312 unique : bool 

313 Whether identifier pattern is expected to match a single dataset or several. 

314 

315 Returns 

316 ------- 

317 Input, OptionalInput 

318 An instance of an Input dataset if required is true, otherwise an OptionalInput. 

319 """ 

320 Class = Input if required else OptionalInput 2a ~ c b ? + m s t @ r n o p S T U V W u v w x g y z h [ ] ^ _ ` { | % A B C D E F G H I J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q

321 obj = Class(name, collection, register=register, **kwargs) 2a ~ c b ? + m s t @ r n o p S T U V W u v w x g y z h [ ] ^ _ ` { | % A B C D E F G H I J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q

322 return obj 2a ~ c b ? + m s t @ r n o p S T U V W u v w x g y z h [ ] ^ _ ` { | % A B C D E F G H I J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q

323 

324 @staticmethod 

325 def output(name, collection, required=True, register=True, **kwargs): 

326 """ 

327 Create an expected output dataset. 

328 

329 By default, expected output datasets are automatically registered. 

330 

331 Parameters 

332 ---------- 

333 name : str 

334 A dataset name or glob pattern. 

335 collection : str, None 

336 An ALF collection or pattern. 

337 required : bool 

338 Whether file must always be present, or is an optional dataset. Default is True. 

339 register : bool 

340 Whether to register the output file. Default is False for input files, True for output 

341 files. 

342 revision : str 

343 An optional revision. 

344 unique : bool 

345 Whether identifier pattern is expected to match a single dataset or several. 

346 

347 Returns 

348 ------- 

349 Output, OptionalOutput 

350 An instance of an Output dataset if required is true, otherwise an OptionalOutput. 

351 """ 

352 Class = Output if required else OptionalOutput 1acb+;o=STUVWu-'v.wxg/yz:h%ABCDEFGHI,J(XYjkdielRZ01234)*5K6LMN78O9!#$PQ

353 obj = Class(name, collection, register=register, **kwargs) 1acb+;o=STUVWu-'v.wxg/yz:h%ABCDEFGHI,J(XYjkdielRZ01234)*5K6LMN78O9!#$PQ

354 return obj 1acb+;o=STUVWu-'v.wxg/yz:h%ABCDEFGHI,J(XYjkdielRZ01234)*5K6LMN78O9!#$PQ

355 

356 

357class OptionalDataset(ExpectedDataset): 

358 """An expected dataset that is not strictly required.""" 

359 

360 def find_files(self, session_path, register=False): 

361 """Find files on disk. 

362 

363 Uses glob patterns to find dataset(s) on disk. 

364 

365 Parameters 

366 ---------- 

367 session_path : pathlib.Path, str 

368 A session path within which to glob for the dataset(s). 

369 register : bool 

370 Only return files intended to be registered. 

371 

372 Returns 

373 ------- 

374 True 

375 Always True as dataset is optional. 

376 list of pathlib.Path 

377 A list of matching dataset files. 

378 missing, None, str, set of str 

379 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets). 

380 

381 Notes 

382 ----- 

383 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised 

384 although this may change in the future. 

385 - If `register` is false, all files are returned regardless of whether they are intended to be registered. 

386 - If `inverted` is true, and files are found, the glob pattern is returned as missing. 

387 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing 

388 patterns. 

389 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional. 

390 """ 

391 ok, actual_files, missing = super().find_files(session_path, register=register) 2c b p cbu - ' v . w x g / y z : h % A B C D E F G H I J d f i e ) K L M N O $ P Q

392 return True, actual_files, missing 2c b p cbu - ' v . w x g / y z : h % A B C D E F G H I J d f i e ) K L M N O $ P Q

393 

394 def filter(self, session_datasets, **kwargs): 

395 """Filter dataset frame by expected datasets. 

396 

397 Parameters 

398 ---------- 

399 session_datasets : pandas.DataFrame 

400 An data frame of session datasets. 

401 kwargs 

402 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, 

403 ignore_qc_not_set, and assert_unique. 

404 

405 Returns 

406 ------- 

407 True 

408 Always True as dataset is optional. 

409 pandas.DataFrame 

410 A filtered data frame of containing the expected dataset(s). 

411 """ 

412 ok, datasets = super().filter(session_datasets, **kwargs) 

413 return True, datasets 

414 

415 

416class Input(ExpectedDataset): 

417 """An expected input dataset.""" 

418 pass 

419 

420 

421class OptionalInput(Input, OptionalDataset): 

422 """An optional expected input dataset.""" 

423 pass 

424 

425 

426class Output(ExpectedDataset): 

427 """An expected output dataset.""" 

428 pass 

429 

430 

431class OptionalOutput(Output, OptionalDataset): 

432 """An optional expected output dataset.""" 

433 pass 

434 

435 

436def _parse_signature(signature): 

437 """ 

438 Ensure all a signature's expected datasets are instances of ExpectedDataset. 

439 

440 Parameters 

441 ---------- 

442 signature : Dict[str, list] 

443 A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or 

444 ExpectedDataset instances. 

445 

446 Returns 

447 ------- 

448 Dict[str, list of ExpectedDataset] 

449 A dict containing all tuples converted to ExpectedDataset instances. 

450 """ 

451 I, O = ExpectedDataset.input, ExpectedDataset.output # noqa 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

452 inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']] 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

453 outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']] 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

454 return {'input_files': inputs, 'output_files': outputs} 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

455 

456 

457def dataset_from_name(name, datasets): 

458 """ 

459 From a list of ExpectedDataset instances, return those that match a given name. 

460 

461 Parameters 

462 ---------- 

463 name : str 

464 The name of the dataset. 

465 datasets : list of ExpectedDataset 

466 A list of ExpectedDataset instances. 

467 

468 Returns 

469 ------- 

470 list of ExpectedDataset 

471 The ExpectedDataset instances that match the given name. 

472 

473 """ 

474 matches = [] 1cb?df

475 for dataset in datasets: 1cb?df

476 if dataset.operator is None: 1cb?df

477 if dataset._identifiers[2] == name: 1cb?df

478 matches.append(dataset) 1cb?df

479 else: 

480 matches.extend(dataset_from_name(name, dataset._identifiers)) 1cb?df

481 return matches 1cb?df

482 

483 

484def update_collections(dataset, new_collection, substring=None, unique=None): 

485 """ 

486 Update the collection of a dataset. 

487 

488 This updates all nested ExpectedDataset instances with the new collection and returns copies. 

489 

490 Parameters 

491 ---------- 

492 dataset : ExpectedDataset 

493 The dataset to update. 

494 new_collection : str, list of str 

495 The new collection or collections. 

496 substring : str, optional 

497 An optional substring in the collection to replace with new collection(s). If None, the 

498 entire collection will be replaced. 

499 

500 Returns 

501 ------- 

502 ExpectedDataset 

503 A copy of the dataset with the updated collection(s). 

504 

505 """ 

506 after = ensure_list(new_collection) 1cbmjkdqfie

507 D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output 1cbmjkdqfie

508 if dataset.operator is None: 1cbmjkdqfie

509 collection, revision, name = dataset.identifiers 1cbmjkdqfie

510 if revision is not None: 1cbmjkdqfie

511 raise NotImplementedError 1m

512 if substring: 1cbmjkdqfie

513 after = [(collection or '').replace(substring, x) or None for x in after] 1cbmjkdqfie

514 if unique is None: 1cbmjkdqfie

515 unique = [not set(name + (x or '')).intersection('*[?') for x in after] 1cbmjkdqfie

516 else: 

517 unique = [unique] * len(after) 1m

518 register = dataset.register 1cbmjkdqfie

519 updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0]) 1cbmjkdqfie

520 if len(after) > 1: 1cbmjkdqfie

521 for folder, unq in zip(after[1:], unique[1:]): 1mdqfe

522 updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq) 1mdqfe

523 else: 

524 updated = copy(dataset) 1cbmd

525 updated._identifiers = [update_collections(dd, new_collection, substring, unique) 1cbmd

526 for dd in updated._identifiers] 

527 return updated 1cbmjkdqfie

528 

529 

530class DataHandler(abc.ABC): 

531 def __init__(self, session_path, signature, one=None): 

532 """ 

533 Base data handler class 

534 :param session_path: path to session 

535 :param signature: input and output file signatures 

536 :param one: ONE instance 

537 """ 

538 self.session_path = session_path 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

539 self.signature = _parse_signature(signature) 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

540 self.one = one 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

541 self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records) 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q

542 

543 def setUp(self, **kwargs): 

544 """Function to optionally overload to download required data to run task.""" 

545 pass 2dbR $

546 

547 def getData(self, one=None): 

548 """Finds the datasets required for task based on input signatures. 

549 

550 Parameters 

551 ---------- 

552 one : one.api.One, optional 

553 An instance of ONE to use. 

554 

555 Returns 

556 ------- 

557 pandas.DataFrame, None 

558 A data frame of required datasets. An empty frame is returned if no registered datasets are required, 

559 while None is returned if no instance of ONE is set. 

560 """ 

561 if self.one is None and one is None: 1+o

562 return 1+

563 one = one or self.one 1+o

564 session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) 1+o

565 dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] 1+o

566 return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() 1+o

567 

568 def getOutputFiles(self): 

569 """ 

570 Return a data frame of output datasets found on disk. 

571 

572 Returns 

573 ------- 

574 pandas.DataFrame 

575 A dataset data frame of datasets on disk that were specified in signature['output_files']. 

576 """ 

577 assert self.session_path 

578 # Next convert datasets to frame 

579 # Create dataframe of all ALF datasets 

580 df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id']) 

581 # Filter outputs 

582 if len(self.signature['output_files']) == 0: 

583 return pd.DataFrame() 

584 present = [file.filter(df)[1] for file in self.signature['output_files']] 

585 return pd.concat(present).droplevel('eid') 

586 

587 def uploadData(self, outputs, version): 

588 """ 

589 Function to optionally overload to upload and register data 

590 :param outputs: output files from task to register 

591 :param version: ibllib version 

592 :return: 

593 """ 

594 if isinstance(outputs, list): 1;}hR

595 versions = [version for _ in outputs] 1;}hR

596 else: 

597 versions = [version] 

598 

599 return versions 1;}hR

600 

601 def cleanUp(self, **kwargs): 

602 """Function to optionally overload to clean up files after running task.""" 

603 pass 1}h

604 

605 

606class LocalDataHandler(DataHandler): 

607 def __init__(self, session_path, signatures, one=None): 

608 """ 

609 Data handler for running tasks locally, with no architecture or db connection 

610 :param session_path: path to session 

611 :param signature: input and output file signatures 

612 :param one: ONE instance 

613 """ 

614 super().__init__(session_path, signatures, one=one) 2dbR $

615 

616 

617class ServerDataHandler(DataHandler): 

618 def __init__(self, session_path, signatures, one=None): 

619 """ 

620 Data handler for running tasks on lab local servers when all data is available locally 

621 

622 :param session_path: path to session 

623 :param signature: input and output file signatures 

624 :param one: ONE instance 

625 """ 

626 super().__init__(session_path, signatures, one=one) 2a c b + ; = } ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # P Q

627 

628 def uploadData(self, outputs, version, clobber=False, **kwargs): 

629 """ 

630 Upload and/or register output data. 

631 

632 This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`. 

633 

634 Parameters 

635 ---------- 

636 outputs : list of pathlib.Path 

637 A set of ALF paths to register to Alyx. 

638 version : str, list of str 

639 The version of ibllib used to generate these output files. 

640 clobber : bool 

641 If True, re-upload outputs that have already been passed to this method. 

642 kwargs 

643 Optional keyword arguments for one.registration.RegistrationClient.register_files. 

644 

645 Returns 

646 ------- 

647 list of dicts, dict 

648 A list of newly created Alyx dataset records or the registration data if dry. 

649 """ 

650 versions = super().uploadData(outputs, version) 1;}h

651 data_repo = get_local_data_repository(self.one.alyx) 1;}h

652 # If clobber = False, do not re-upload the outputs that have already been processed 

653 outputs = ensure_list(outputs) 1;}h

654 to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs)) 1;}h

655 records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or [] 1;}h

656 if kwargs.get('dry', False): 1;}h

657 return records 1;

658 # Store processed outputs 

659 self.processed.update({k: v for k, v in zip(to_upload, records) if v}) 1;}h

660 return [self.processed[x] for x in outputs if x in self.processed] 1;}h

661 

662 def cleanUp(self, **_): 

663 """Empties and returns the processed dataset mep.""" 

664 super().cleanUp() 1}h

665 processed = self.processed 1}h

666 self.processed = {} 1}h

667 return processed 1}h

668 

669 

670class ServerGlobusDataHandler(DataHandler): 

671 def __init__(self, session_path, signatures, one=None): 

672 """ 

673 Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus 

674 

675 :param session_path: path to session 

676 :param signatures: input and output file signatures 

677 :param one: ONE instance 

678 """ 

679 from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa 

680 super().__init__(session_path, signatures, one=one) 

681 self.globus = Globus(client_name='server', headless=True) 

682 

683 # on local servers set up the local root path manually as some have different globus config paths 

684 self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects' 

685 

686 # Find the lab 

687 self.lab = get_lab(self.session_path, self.one.alyx) 

688 

689 # For cortex lab we need to get the endpoint from the ibl alyx 

690 if self.lab == 'cortexlab': 

691 alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None) 

692 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx) 

693 else: 

694 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx) 

695 

696 self.local_paths = [] 

697 

698 def setUp(self, **_): 

699 """Function to download necessary data to run tasks using globus-sdk.""" 

700 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: 

701 df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode)) 

702 else: 

703 df = super().getData(one=self.one) 

704 

705 if len(df) == 0: 

706 # If no datasets found in the cache only work off local file system do not attempt to 

707 # download any missing data using Globus 

708 return 

709 

710 # Check for space on local server. If less that 500 GB don't download new data 

711 space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2] 

712 if space_free < 500e9: 

713 _logger.warning('Space left on server is < 500GB, won\'t re-download new data') 

714 return 

715 

716 rel_sess_path = '/'.join(self.session_path.parts[-3:]) 

717 target_paths = [] 

718 source_paths = [] 

719 for i, d in df.iterrows(): 

720 sess_path = Path(rel_sess_path).joinpath(d['rel_path']) 

721 full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path) 

722 if not full_local_path.exists(): 

723 uuid = i 

724 self.local_paths.append(full_local_path) 

725 target_paths.append(sess_path) 

726 source_paths.append(add_uuid_string(sess_path, uuid)) 

727 

728 if len(target_paths) != 0: 

729 ts = time() 

730 for sp, tp in zip(source_paths, target_paths): 

731 _logger.info(f'Downloading {sp} to {tp}') 

732 self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths) 

733 _logger.debug(f'Complete. Time elapsed {time() - ts}') 

734 

735 def uploadData(self, outputs, version, **kwargs): 

736 """ 

737 Function to upload and register data of completed task 

738 :param outputs: output files from task to register 

739 :param version: ibllib version 

740 :return: output info of registered datasets 

741 """ 

742 versions = super().uploadData(outputs, version) 

743 data_repo = get_local_data_repository(self.one.alyx) 

744 return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs) 

745 

746 def cleanUp(self, **_): 

747 """Clean up, remove the files that were downloaded from Globus once task has completed.""" 

748 for file in self.local_paths: 

749 os.unlink(file) 

750 

751 

752class RemoteEC2DataHandler(DataHandler): 

753 def __init__(self, session_path, signature, one=None): 

754 """ 

755 Data handler for running tasks on remote compute node. Will download missing data via http using ONE 

756 

757 :param session_path: path to session 

758 :param signature: input and output file signatures 

759 :param one: ONE instance 

760 """ 

761 super().__init__(session_path, signature, one=one) 

762 

763 def setUp(self, **_): 

764 """ 

765 Function to download necessary data to run tasks using ONE 

766 :return: 

767 """ 

768 df = super().getData() 

769 self.one._check_filesystem(df, check_hash=False) 

770 

771 def uploadData(self, outputs, version, **kwargs): 

772 """ 

773 Function to upload and register data of completed task via S3 patcher 

774 :param outputs: output files from task to register 

775 :param version: ibllib version 

776 :return: output info of registered datasets 

777 """ 

778 versions = super().uploadData(outputs, version) 

779 s3_patcher = S3Patcher(one=self.one) 

780 return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user, 

781 versions=versions, **kwargs) 

782 

783 

784class RemoteHttpDataHandler(DataHandler): 

785 def __init__(self, session_path, signature, one=None): 

786 """ 

787 Data handler for running tasks on remote compute node. Will download missing data via http using ONE 

788 

789 :param session_path: path to session 

790 :param signature: input and output file signatures 

791 :param one: ONE instance 

792 """ 

793 super().__init__(session_path, signature, one=one) 

794 

795 def setUp(self, **_): 

796 """ 

797 Function to download necessary data to run tasks using ONE 

798 :return: 

799 """ 

800 df = super().getData() 

801 self.one._check_filesystem(df) 

802 

803 def uploadData(self, outputs, version, **kwargs): 

804 """ 

805 Function to upload and register data of completed task via FTP patcher 

806 :param outputs: output files from task to register 

807 :param version: ibllib version 

808 :return: output info of registered datasets 

809 """ 

810 versions = super().uploadData(outputs, version) 

811 ftp_patcher = FTPPatcher(one=self.one) 

812 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

813 versions=versions, **kwargs) 

814 

815 

816class RemoteAwsDataHandler(DataHandler): 

817 def __init__(self, session_path, signature, one=None): 

818 """ 

819 Data handler for running tasks on remote compute node. 

820 

821 This will download missing data from the private IBL S3 AWS data bucket. New datasets are 

822 uploaded via Globus. 

823 

824 :param session_path: path to session 

825 :param signature: input and output file signatures 

826 :param one: ONE instance 

827 """ 

828 super().__init__(session_path, signature, one=one) 

829 self.local_paths = [] 

830 

831 def setUp(self, **_): 

832 """Function to download necessary data to run tasks using AWS boto3.""" 

833 df = super().getData() 

834 self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows())) 

835 

836 def uploadData(self, outputs, version, **kwargs): 

837 """ 

838 Function to upload and register data of completed task via FTP patcher 

839 :param outputs: output files from task to register 

840 :param version: ibllib version 

841 :return: output info of registered datasets 

842 """ 

843 # Set up Globus 

844 from one.remote.globus import Globus # noqa 

845 self.globus = Globus(client_name='server', headless=True) 

846 self.lab = session_path_parts(self.session_path, as_dict=True)['lab'] 

847 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url: 

848 base_url = 'https://alyx.internationalbrainlab.org' 

849 _logger.warning('Changing Alyx client to %s', base_url) 

850 ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode) 

851 else: 

852 ac = self.one.alyx 

853 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac) 

854 

855 # register datasets 

856 versions = super().uploadData(outputs, version) 

857 response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs) 

858 

859 # upload directly via globus 

860 source_paths = [] 

861 target_paths = [] 

862 collections = {} 

863 

864 for dset, out in zip(response, outputs): 

865 assert Path(out).name == dset['name'] 

866 # set flag to false 

867 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 

868 collection = '/'.join(fr['relative_path'].split('/')[:-1]) 

869 if collection in collections.keys(): 

870 collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}) 

871 else: 

872 collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}} 

873 

874 # Set all exists status to false for server file records 

875 self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False}) 

876 

877 source_paths.append(out) 

878 target_paths.append(add_uuid_string(fr['relative_path'], dset['id'])) 

879 

880 if len(target_paths) != 0: 

881 ts = time() 

882 for sp, tp in zip(source_paths, target_paths): 

883 _logger.info(f'Uploading {sp} to {tp}') 

884 self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths) 

885 _logger.debug(f'Complete. Time elapsed {time() - ts}') 

886 

887 for collection, files in collections.items(): 

888 globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True) 

889 file_names = [str(gl[0]) for gl in globus_files] 

890 file_sizes = [gl[1] for gl in globus_files] 

891 

892 for name, details in files.items(): 

893 try: 

894 idx = file_names.index(name) 

895 size = file_sizes[idx] 

896 if size == details['size']: 

897 # update the file record if sizes match 

898 self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True}) 

899 else: 

900 _logger.warning(f'File {name} found on SDSC but sizes do not match') 

901 except ValueError: 

902 _logger.warning(f'File {name} not found on SDSC') 

903 

904 return response 

905 

906 # ftp_patcher = FTPPatcher(one=self.one) 

907 # return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

908 # versions=versions, **kwargs) 

909 

910 def cleanUp(self, task): 

911 """Clean up, remove the files that were downloaded from globus once task has completed.""" 

912 if task.status == 0: 

913 for file in self.local_paths: 

914 os.unlink(file) 

915 

916 

917class RemoteGlobusDataHandler(DataHandler): 

918 """ 

919 Data handler for running tasks on remote compute node. Will download missing data using Globus. 

920 

921 :param session_path: path to session 

922 :param signature: input and output file signatures 

923 :param one: ONE instance 

924 """ 

925 def __init__(self, session_path, signature, one=None): 

926 super().__init__(session_path, signature, one=one) 

927 

928 def setUp(self, **_): 

929 """Function to download necessary data to run tasks using globus.""" 

930 # TODO 

931 pass 

932 

933 def uploadData(self, outputs, version, **kwargs): 

934 """ 

935 Function to upload and register data of completed task via FTP patcher 

936 :param outputs: output files from task to register 

937 :param version: ibllib version 

938 :return: output info of registered datasets 

939 """ 

940 versions = super().uploadData(outputs, version) 

941 ftp_patcher = FTPPatcher(one=self.one) 

942 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user, 

943 versions=versions, **kwargs) 

944 

945 

946class SDSCDataHandler(DataHandler): 

947 """ 

948 Data handler for running tasks on SDSC compute node 

949 

950 :param session_path: path to session 

951 :param signature: input and output file signatures 

952 :param one: ONE instance 

953 """ 

954 

955 def __init__(self, session_path, signatures, one=None): 

956 super().__init__(session_path, signatures, one=one) 1o

957 self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) 1o

958 self.root_path = SDSC_ROOT_PATH 1o

959 

960 def setUp(self, task): 

961 """Function to create symlinks to necessary data to run tasks.""" 

962 df = super().getData() 1o

963 

964 SDSC_TMP = Path(self.patch_path.joinpath(task.__class__.__name__)) 1o

965 session_path = Path(get_alf_path(self.session_path)) 1o

966 for uuid, d in df.iterrows(): 1o

967 file_path = session_path / d['rel_path'] 1o

968 file_uuid = add_uuid_string(file_path, uuid) 1o

969 file_link = SDSC_TMP.joinpath(file_path) 1o

970 file_link.parent.mkdir(exist_ok=True, parents=True) 1o

971 try: 1o

972 file_link.symlink_to( 1o

973 Path(self.root_path.joinpath(file_uuid))) 

974 except FileExistsError: 

975 pass 

976 

977 task.session_path = SDSC_TMP.joinpath(session_path) 1o

978 

979 def uploadData(self, outputs, version, **kwargs): 

980 """ 

981 Function to upload and register data of completed task via SDSC patcher 

982 :param outputs: output files from task to register 

983 :param version: ibllib version 

984 :return: output info of registered datasets 

985 """ 

986 versions = super().uploadData(outputs, version) 

987 sdsc_patcher = SDSCPatcher(one=self.one) 

988 return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs) 

989 

990 def cleanUp(self, task): 

991 """Function to clean up symlinks created to run task.""" 

992 assert self.patch_path.parts[0:4] == task.session_path.parts[0:4] 1o

993 shutil.rmtree(task.session_path) 1o

994 

995 

996class PopeyeDataHandler(SDSCDataHandler): 

997 

998 def __init__(self, session_path, signatures, one=None): 

999 super().__init__(session_path, signatures, one=one) 

1000 self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/")) 

1001 self.root_path = Path("/mnt/sdceph/users/ibl/data") 

1002 

1003 def uploadData(self, outputs, version, **kwargs): 

1004 raise NotImplementedError( 

1005 "Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task." 

1006 ) 

1007 

1008 def cleanUp(self, **_): 

1009 """Symlinks are preserved until registration.""" 

1010 pass