Coverage for ibllib/oneibl/data_handlers.py: 66%
443 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-02 18:55 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-02 18:55 +0100
1"""Downloading of task dependent datasets and registration of task output datasets.
3The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are
4present and to register and upload the output datasets. For examples on how to run a task using
5specific data handlers, see :func:`ibllib.pipes.tasks`.
6"""
7import logging
8import pandas as pd
9from pathlib import Path, PurePosixPath
10import shutil
11import os
12import abc
13from time import time
14from copy import copy
16from one.api import ONE
17from one.webclient import AlyxClient
18from one.util import filter_datasets
19from one.alf.path import add_uuid_string, get_alf_path, ensure_alf_path
20from one.alf.cache import _make_datasets_df
21from iblutil.util import flatten, ensure_list
23from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository
24from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher
27_logger = logging.getLogger(__name__)
30class ExpectedDataset:
31 """An expected input or output dataset."""
32 inverted = False
34 def __init__(self, name, collection, register=None, revision=None, unique=True):
35 """
36 An expected input or output dataset.
38 NB: This should not be instantiated directly, but rather via the `input` or `output`
39 static method.
41 Parameters
42 ----------
43 name : str, None
44 A dataset name or glob pattern.
45 collection : str, None
46 An ALF collection or pattern.
47 register : bool
48 Whether to register the file. Default is False for input files, True for output
49 files.
50 revision : str
51 An optional revision.
52 unique : bool
53 Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not
54 affect the output of `find_files`.
55 """
56 if not (collection is None or isinstance(collection, str)): 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
57 collection = '/'.join(collection)
58 self._identifiers = (collection, revision, name) 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
59 self.operator = None 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
60 self._register = register or False 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
61 self.inverted = False 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
62 self.name = None 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
63 self.unique = unique 2a bbc b ? , @ p t u ] q n h k [ r T U V W X v / ( w : x y g ; z A = j ^ _ ` { | } ~ ' B C D E F G H I J - K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
65 @property
66 def register(self):
67 """bool: whether to register the output file."""
68 return self._register 1acbptqnrTUVWXvwxygzAj'BCDEFGHIJK)YZmlesfdioS012345+6L7MNO89P!#$%QR
70 @register.setter
71 def register(self, value):
72 """bool: whether to register the output file."""
73 if self.operator is not None: 1tqn
74 raise AttributeError('cannot set register attribute for operator datasets')
75 self._register = value 1tqn
77 @property
78 def identifiers(self):
79 """tuple: the identifying parts of the dataset.
81 If no operator is applied, the identifiers are (collection, revision, name).
82 If an operator is applied, a tuple of 3-element tuples is returned.
83 """
84 if self.operator is None: 1cbp]mlesfdi
85 return self._identifiers 1cbp]mlesfdi
86 # Flatten nested identifiers into tuple of 3-element tuples
87 identifiers = [] 1bp]d
88 for x in self._identifiers: 1bp]d
89 add = identifiers.extend if x.operator else identifiers.append 1bp]d
90 add(x.identifiers) 1bp]d
91 return tuple(identifiers) 1bp]d
93 @property
94 def glob_pattern(self):
95 """str, tuple of str: one or more glob patterns."""
96 if self.operator is None: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
97 return str(PurePosixPath(*filter(None, self._identifiers))) 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
98 else:
99 return tuple(flatten(x.glob_pattern for x in self._identifiers)) 1ns
101 def __repr__(self):
102 """Represent the dataset object as a string.
104 If the `name` property is not None, it is returned, otherwise the identifies are used to
105 format the name.
106 """
107 name = self.__class__.__name__
108 if self.name:
109 return f'<{name}({self.name})>'
110 if self.operator:
111 sym = {'or': '|', 'and': '&', 'xor': '^'}
112 patterns = [d.__repr__() for d in self._identifiers]
113 pattern = f'{sym[self.operator]:^3}'.join(patterns)
114 if self.inverted:
115 pattern = f'~({pattern})'
116 else:
117 pattern = ('~' if self.inverted else '') + self.glob_pattern
118 return f'<{name}({pattern})>'
120 def find_files(self, session_path, register=False):
121 """Find files on disk.
123 Uses glob patterns to find dataset(s) on disk.
125 Parameters
126 ----------
127 session_path : pathlib.Path, str
128 A session path within which to glob for the dataset(s).
129 register : bool
130 Only return files intended to be registered.
132 Returns
133 -------
134 bool
135 True if the dataset is found on disk or is optional.
136 list of pathlib.Path
137 A list of matching dataset files.
138 missing, None, str, set of str
139 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
141 Notes
142 -----
143 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
144 although this may change in the future.
145 - If `register` is false, all files are returned regardless of whether they are intended to be registered.
146 - If `register` is true, an input with register=True may not be returned if part of an OR operation.
147 - If `inverted` is true, and files are found, the glob pattern is returned as missing.
148 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
149 patterns.
150 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
151 """
152 session_path = Path(session_path) 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
153 ok, actual_files, missing = False, [], None 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
154 if self.operator is None: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
155 if register and not self.register: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
156 return True, actual_files, missing 1atqnrTUVWXvwxygzAj'BCDEFGHIJK)YZmldioS012345+6L7MNO89P!#$%QR
157 actual_files = sorted(session_path.rglob(self.glob_pattern)) 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
158 # If no revision pattern provided and no files found, search for any revision
159 if self._identifiers[1] is None and not any(actual_files): 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
160 glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2])) 2a bbc b t q n r T U V W X v ( w x y g z A j B C D E F G H I J - K Y Z m l e d i o . 0 1 2 3 4 5 * 6 L 7 M N O 8 9 P ! # $ Q R
161 actual_files = sorted(session_path.rglob(glob_pattern)) 2a bbc b t q n r T U V W X v ( w x y g z A j B C D E F G H I J - K Y Z m l e d i o . 0 1 2 3 4 5 * 6 L 7 M N O 8 9 P ! # $ Q R
162 ok = any(actual_files) != self.inverted 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
163 if not ok: 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
164 missing = self.glob_pattern 2a bbc b t q n r T U V W X v ( w x y g z A j B C D E F G H I J - K Y Z m l e d i o . 0 1 2 3 4 5 * 6 L 7 M N O 8 9 P ! # $ Q R
165 elif self.operator == 'and': 1cbtqnrgfd
166 assert len(self._identifiers) == 2 1tgfd
167 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1tgfd
168 ok = all(_ok) 1tgfd
169 actual_files = flatten(_actual_files) 1tgfd
170 missing = set(filter(None, flatten(_missing))) 1tgfd
171 elif self.operator == 'or': 1cbqnrg
172 assert len(self._identifiers) == 2 1cbqrg
173 missing = set() 1cbqrg
174 for d in self._identifiers: 1cbqrg
175 ok, actual_files, _missing = d.find_files(session_path, register=register) 1cbqrg
176 if ok: 1cbqrg
177 break 1cbqg
178 if missing is not None: 1cbqr
179 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1cbqr
180 elif self.operator == 'xor': 1n
181 assert len(self._identifiers) == 2 1n
182 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1n
183 ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1 1n
184 # Return only those datasets that are complete if OK
185 actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files) 1n
186 if ok: 1n
187 missing = set() 1n
188 elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing 1n
189 missing = set(flatten(self.glob_pattern)) 1n
190 elif not any(_ok): # return all missing glob patterns if none present 1n
191 missing = set(filter(None, flatten(_missing))) 1n
192 elif not isinstance(self.operator, str):
193 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
194 else:
195 raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
197 return ok, actual_files, missing 2a bbc b t q n [ r ebT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
199 def filter(self, session_datasets, **kwargs):
200 """Filter dataset frame by expected datasets.
202 Parameters
203 ----------
204 session_datasets : pandas.DataFrame
205 A data frame of session datasets.
206 kwargs
207 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and
208 ignore_qc_not_set.
210 Returns
211 -------
212 bool
213 True if the required dataset(s) are present in the data frame.
214 pandas.DataFrame
215 A filtered data frame of containing the expected dataset(s).
216 """
217 # ok, datasets = False, session_datasets.iloc[0:0]
218 if self.operator is None: 1,uhk
219 collection, revision, file = self._identifiers 1,uhk
220 if self._identifiers[1] is not None: 1,uhk
221 raise NotImplementedError('revisions not yet supported')
222 datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs) 1,uhk
223 ok = datasets.empty == self.inverted 1,uhk
224 elif self.operator == 'or': 1u
225 assert len(self._identifiers) == 2 1u
226 for d in self._identifiers: 1u
227 ok, datasets = d.filter(session_datasets, **kwargs) 1u
228 if ok: 1u
229 break 1u
230 elif self.operator == 'xor': 1u
231 assert len(self._identifiers) == 2 1u
232 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 1u
233 ok = sum(_ok) == 1 1u
234 if ok: 1u
235 # Return only those datasets that are complete.
236 datasets = _datasets[_ok.index(True)] 1u
237 else:
238 datasets = pd.concat(_datasets) 1u
239 elif self.operator == 'and':
240 assert len(self._identifiers) == 2
241 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers))
242 ok = all(_ok)
243 datasets = pd.concat(_datasets)
244 elif not isinstance(self.operator, str):
245 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
246 else:
247 raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
248 return ok, datasets 1,uhk
250 def _apply_op(self, op, other):
251 """Apply an operation between two datasets."""
252 # Assert both instances of Input or both instances of Output
253 if not isinstance(other, (self.__class__, tuple)): 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
254 raise TypeError(f'logical operations not supported between objects of type '
255 f'{self.__class__.__name__} and {other.__class__.__name__}')
256 # Assert operation supported
257 if op not in {'or', 'xor', 'and'}: 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
258 raise ValueError(op)
259 # Convert tuple to ExpectDataset instance
260 if isinstance(other, tuple): 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
261 D = (self.input if isinstance(self, Input) else self.output)
262 other = D(*other)
263 # Returned instance should only be optional if both datasets are optional
264 is_input = isinstance(self, Input) 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
265 if all(isinstance(x, OptionalDataset) for x in (self, other)): 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
266 D = OptionalInput if is_input else OptionalOutput 1pesf
267 else:
268 D = Input if is_input else Output 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbd o
269 # Instantiate 'empty' object
270 d = D(None, None) 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
271 d._identifiers = (self, other) 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
272 d.operator = op 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
273 return d 2a c b ? p t u ] q n r g ^ _ ` { | } ~ e cbdbs f d o
275 def __invert__(self):
276 """Assert dataset doesn't exist on disk."""
277 obj = copy(self) 1ag^_`{|}~o
278 obj.inverted = not self.inverted 1ag^_`{|}~o
279 return obj 1ag^_`{|}~o
281 def __or__(self, b):
282 """Assert either dataset exists or another does, or both exist."""
283 return self._apply_op('or', b) 2a c b ? p u ] q r g ^ _ ` { | } ~ e cbdbo
285 def __xor__(self, b):
286 """Assert either dataset exists or another does, not both."""
287 return self._apply_op('xor', b) 1un
289 def __and__(self, b):
290 """Assert that a second dataset exists together with the first."""
291 return self._apply_op('and', b) 1aptg^_`{|}~esfdo
293 @staticmethod
294 def input(name, collection, required=True, register=False, **kwargs):
295 """
296 Create an expected input dataset.
298 By default, expected input datasets are not automatically registered.
300 Parameters
301 ----------
302 name : str
303 A dataset name or glob pattern.
304 collection : str, None
305 An ALF collection or pattern.
306 required : bool
307 Whether file must always be present, or is an optional dataset. Default is True.
308 register : bool
309 Whether to register the input file. Default is False for input files, True for output
310 files.
311 revision : str
312 An optional revision.
313 unique : bool
314 Whether identifier pattern is expected to match a single dataset or several.
316 Returns
317 -------
318 Input, OptionalInput
319 An instance of an Input dataset if required is true, otherwise an OptionalInput.
320 """
321 Class = Input if required else OptionalInput 2a bbc b ? , p t u ] q n h k r T U V W X v w x y g z A j ^ _ ` { | } ~ ' B C D E F G H I J K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 + 6 L 7 M N O 8 9 P ! # $ % Q R
322 obj = Class(name, collection, register=register, **kwargs) 2a bbc b ? , p t u ] q n h k r T U V W X v w x y g z A j ^ _ ` { | } ~ ' B C D E F G H I J K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 + 6 L 7 M N O 8 9 P ! # $ % Q R
323 return obj 2a bbc b ? , p t u ] q n h k r T U V W X v w x y g z A j ^ _ ` { | } ~ ' B C D E F G H I J K ) Y Z m l e cbdbs f d i o . S 0 1 2 3 4 5 + 6 L 7 M N O 8 9 P ! # $ % Q R
325 @staticmethod
326 def output(name, collection, required=True, register=True, **kwargs):
327 """
328 Create an expected output dataset.
330 By default, expected output datasets are automatically registered.
332 Parameters
333 ----------
334 name : str
335 A dataset name or glob pattern.
336 collection : str, None
337 An ALF collection or pattern.
338 required : bool
339 Whether file must always be present, or is an optional dataset. Default is True.
340 register : bool
341 Whether to register the output file. Default is False for input files, True for output
342 files.
343 revision : str
344 An optional revision.
345 unique : bool
346 Whether identifier pattern is expected to match a single dataset or several.
348 Returns
349 -------
350 Output, OptionalOutput
351 An instance of an Output dataset if required is true, otherwise an OptionalOutput.
352 """
353 Class = Output if required else OptionalOutput 1acb,@hk[TUVWXv/(w:xyg;zA=j'BCDEFGHIJ-K)YZmledio.S012345*+6L7MNO89P!#$%QR
354 obj = Class(name, collection, register=register, **kwargs) 1acb,@hk[TUVWXv/(w:xyg;zA=j'BCDEFGHIJ-K)YZmledio.S012345*+6L7MNO89P!#$%QR
355 return obj 1acb,@hk[TUVWXv/(w:xyg;zA=j'BCDEFGHIJ-K)YZmledio.S012345*+6L7MNO89P!#$%QR
358class OptionalDataset(ExpectedDataset):
359 """An expected dataset that is not strictly required."""
361 def find_files(self, session_path, register=False):
362 """Find files on disk.
364 Uses glob patterns to find dataset(s) on disk.
366 Parameters
367 ----------
368 session_path : pathlib.Path, str
369 A session path within which to glob for the dataset(s).
370 register : bool
371 Only return files intended to be registered.
373 Returns
374 -------
375 True
376 Always True as dataset is optional.
377 list of pathlib.Path
378 A list of matching dataset files.
379 missing, None, str, set of str
380 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
382 Notes
383 -----
384 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
385 although this may change in the future.
386 - If `register` is false, all files are returned regardless of whether they are intended to be registered.
387 - If `inverted` is true, and files are found, the glob pattern is returned as missing.
388 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
389 patterns.
390 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
391 """
392 ok, actual_files, missing = super().find_files(session_path, register=register) 2c b r ebv / ( w : x y g ; z A = j ' B C D E F G H I J K e f d i . * L M N O P % Q R
393 return True, actual_files, missing 2c b r ebv / ( w : x y g ; z A = j ' B C D E F G H I J K e f d i . * L M N O P % Q R
395 def filter(self, session_datasets, **kwargs):
396 """Filter dataset frame by expected datasets.
398 Parameters
399 ----------
400 session_datasets : pandas.DataFrame
401 An data frame of session datasets.
402 kwargs
403 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc,
404 ignore_qc_not_set, and assert_unique.
406 Returns
407 -------
408 True
409 Always True as dataset is optional.
410 pandas.DataFrame
411 A filtered data frame of containing the expected dataset(s).
412 """
413 ok, datasets = super().filter(session_datasets, **kwargs) 1h
414 return True, datasets 1h
417class Input(ExpectedDataset):
418 """An expected input dataset."""
419 pass
422class OptionalInput(Input, OptionalDataset):
423 """An optional expected input dataset."""
424 pass
427class Output(ExpectedDataset):
428 """An expected output dataset."""
429 pass
432class OptionalOutput(Output, OptionalDataset):
433 """An optional expected output dataset."""
434 pass
437def _parse_signature(signature):
438 """
439 Ensure all a signature's expected datasets are instances of ExpectedDataset.
441 Parameters
442 ----------
443 signature : Dict[str, list]
444 A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or
445 ExpectedDataset instances.
447 Returns
448 -------
449 Dict[str, list of ExpectedDataset]
450 A dict containing all tuples converted to ExpectedDataset instances.
451 """
452 I, O = ExpectedDataset.input, ExpectedDataset.output # noqa 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
453 inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']] 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
454 outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']] 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
455 return {'input_files': inputs, 'output_files': outputs} 2a c b , @ h k [ fbabgbT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l e s f d i o . S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
458def dataset_from_name(name, datasets):
459 """
460 From a list of ExpectedDataset instances, return those that match a given name.
462 Parameters
463 ----------
464 name : str, function
465 The name of the dataset or a function to match the dataset name.
466 datasets : list of ExpectedDataset
467 A list of ExpectedDataset instances.
469 Returns
470 -------
471 list of ExpectedDataset
472 The ExpectedDataset instances that match the given name.
474 """
475 matches = [] 1cb?ef
476 for dataset in datasets: 1cb?ef
477 if dataset.operator is None: 1cb?ef
478 if isinstance(name, str): 1cb?ef
479 if dataset._identifiers[2] == name: 1cb?ef
480 matches.append(dataset) 1cb?ef
481 else:
482 if name(dataset._identifiers[2]): 1?
483 matches.append(dataset) 1?
484 else:
485 matches.extend(dataset_from_name(name, dataset._identifiers)) 1cb?ef
486 return matches 1cb?ef
489def update_collections(dataset, new_collection, substring=None, unique=None, exact_match=False):
490 """
491 Update the collection of a dataset.
493 This updates all nested ExpectedDataset instances with the new collection and returns copies.
495 Parameters
496 ----------
497 dataset : ExpectedDataset
498 The dataset to update.
499 new_collection : str, list of str
500 The new collection or collections.
501 substring : str, optional
502 An optional substring in the collection to replace with new collection(s). If None, the
503 entire collection will be replaced.
504 unique : bool, optional
505 When provided, this will be used to set the `unique` attribute of the new dataset(s). If
506 None, the `unique` attribute will be set to True if the collection does not contain
507 wildcards.
508 exact_match : bool
509 If True, the collection will be replaced only if it contains `substring`.
511 Returns
512 -------
513 ExpectedDataset
514 A copy of the dataset with the updated collection(s).
516 """
517 after = ensure_list(new_collection) 1cbpmlesfdi
518 D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output 1cbpmlesfdi
519 if dataset.operator is None: 1cbpmlesfdi
520 collection, revision, name = dataset.identifiers 1cbpmlesfdi
521 if revision is not None: 1cbpmlesfdi
522 raise NotImplementedError 1p
523 if substring: 1cbpmlesfdi
524 if exact_match and substring not in collection: 1cbpmlesfdi
525 after = [collection] 1cbpledi
526 else:
527 after = [(collection or '').replace(substring, x) or None for x in after] 1cbpmlesfdi
528 if unique is None: 1cbpmlesfdi
529 unique = [not set(name + (x or '')).intersection('*[?') for x in after] 1cbpmlesfdi
530 else:
531 unique = [unique] * len(after) 1p
532 register = dataset.register 1cbpmlesfdi
533 updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0]) 1cbpmlesfdi
534 if len(after) > 1: 1cbpmlesfdi
535 for folder, unq in zip(after[1:], unique[1:]): 1pesfd
536 updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq) 1pesfd
537 else:
538 updated = copy(dataset) 1cbpe
539 updated._identifiers = [update_collections(dd, new_collection, substring, unique, exact_match) 1cbpe
540 for dd in updated._identifiers]
541 return updated 1cbpmlesfdi
544class DataHandler(abc.ABC):
545 def __init__(self, session_path, signature, one=None):
546 """
547 Base data handler class
548 :param session_path: path to session
549 :param signature: input and output file signatures
550 :param one: ONE instance
551 """
552 self.session_path = ensure_alf_path(session_path) 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
553 self.signature = _parse_signature(signature) 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
554 self.one = one 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
555 self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records) 2a c b , @ h k [ fbabT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o S 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ % Q R
557 def setUp(self, **kwargs):
558 """Function to optionally overload to download required data to run task."""
559 pass 2fbS %
561 def getData(self, one=None):
562 """Finds the datasets required for task based on input signatures.
564 Parameters
565 ----------
566 one : one.api.One, optional
567 An instance of ONE to use.
569 Returns
570 -------
571 pandas.DataFrame, None
572 A data frame of required datasets. An empty frame is returned if no registered datasets are required,
573 while None is returned if no instance of ONE is set.
574 """
575 if self.one is None and one is None: 1,hk
576 return 1,
577 one = one or self.one 1,hk
578 session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) 1,hk
579 dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] 1,hk
580 return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() 1,hk
582 def getOutputFiles(self, session_path=None):
583 """
584 Return a data frame of output datasets found on disk.
586 Returns
587 -------
588 pandas.DataFrame
589 A dataset data frame of datasets on disk that were specified in signature['output_files'].
590 """
591 session_path = self.session_path if session_path is None else session_path 1hk
592 assert session_path 1hk
593 # Next convert datasets to frame
594 # Create dataframe of all ALF datasets
595 df = _make_datasets_df(session_path, hash_files=False).set_index(['eid', 'id']) 1hk
596 # Filter outputs
597 if len(self.signature['output_files']) == 0: 1hk
598 return pd.DataFrame()
599 present = [file.filter(df)[1] for file in self.signature['output_files']] 1hk
600 return pd.concat(present).droplevel('eid') 1hk
602 def uploadData(self, outputs, version):
603 """
604 Function to optionally overload to upload and register data
605 :param outputs: output files from task to register
606 :param version: ibllib version
607 :return:
608 """
609 if isinstance(outputs, list): 2@ abj S
610 versions = [version for _ in outputs] 2@ abj S
611 else:
612 versions = [version]
614 return versions 2@ abj S
616 def cleanUp(self, **kwargs):
617 """Function to optionally overload to clean up files after running task."""
618 pass 2abj
621class LocalDataHandler(DataHandler):
622 def __init__(self, session_path, signatures, one=None):
623 """
624 Data handler for running tasks locally, with no architecture or db connection
625 :param session_path: path to session
626 :param signature: input and output file signatures
627 :param one: ONE instance
628 """
629 super().__init__(session_path, signatures, one=one) 2fbS %
632class ServerDataHandler(DataHandler):
633 def __init__(self, session_path, signatures, one=None):
634 """
635 Data handler for running tasks on lab local servers when all data is available locally
637 :param session_path: path to session
638 :param signature: input and output file signatures
639 :param one: ONE instance
640 """
641 super().__init__(session_path, signatures, one=one) 2a c b , @ [ abT U V W X v / ( w : x y g ; z A = j ' B C D E F G H I J - K ) Y Z m l f d i o 0 1 2 3 4 5 * + 6 L 7 M N O 8 9 P ! # $ Q R
643 def uploadData(self, outputs, version, clobber=False, **kwargs):
644 """
645 Upload and/or register output data.
647 This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`.
649 Parameters
650 ----------
651 outputs : list of pathlib.Path
652 A set of ALF paths to register to Alyx.
653 version : str, list of str
654 The version of ibllib used to generate these output files.
655 clobber : bool
656 If True, re-upload outputs that have already been passed to this method.
657 kwargs
658 Optional keyword arguments for one.registration.RegistrationClient.register_files.
660 Returns
661 -------
662 list of dicts, dict
663 A list of newly created Alyx dataset records or the registration data if dry.
664 """
665 versions = super().uploadData(outputs, version) 2@ abj
666 data_repo = get_local_data_repository(self.one.alyx) 2@ abj
667 # If clobber = False, do not re-upload the outputs that have already been processed
668 outputs = ensure_list(outputs) 2@ abj
669 to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs)) 2@ abj
670 records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or [] 2@ abj
671 if kwargs.get('dry', False): 2@ abj
672 return records 1@
673 # Store processed outputs
674 self.processed.update({k: v for k, v in zip(to_upload, records) if v}) 2@ abj
675 return [self.processed[x] for x in outputs if x in self.processed] 2@ abj
677 def cleanUp(self, **_):
678 """Empties and returns the processed dataset mep."""
679 super().cleanUp() 2abj
680 processed = self.processed 2abj
681 self.processed = {} 2abj
682 return processed 2abj
685class ServerGlobusDataHandler(DataHandler):
686 def __init__(self, session_path, signatures, one=None):
687 """
688 Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus
690 :param session_path: path to session
691 :param signatures: input and output file signatures
692 :param one: ONE instance
693 """
694 from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa
695 super().__init__(session_path, signatures, one=one)
696 self.globus = Globus(client_name='server', headless=True)
698 # on local servers set up the local root path manually as some have different globus config paths
699 self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects'
701 # Find the lab
702 self.lab = get_lab(self.session_path, self.one.alyx)
704 # For cortex lab we need to get the endpoint from the ibl alyx
705 if self.lab == 'cortexlab':
706 alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None)
707 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx)
708 else:
709 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx)
711 self.local_paths = []
713 def setUp(self, **_):
714 """Function to download necessary data to run tasks using globus-sdk."""
715 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
716 df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode))
717 else:
718 df = super().getData(one=self.one)
720 if len(df) == 0:
721 # If no datasets found in the cache only work off local file system do not attempt to
722 # download any missing data using Globus
723 return
725 # Check for space on local server. If less that 500 GB don't download new data
726 space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2]
727 if space_free < 500e9:
728 _logger.warning('Space left on server is < 500GB, won\'t re-download new data')
729 return
731 rel_sess_path = self.session_path.session_path_short()
732 target_paths = []
733 source_paths = []
734 for i, d in df.iterrows():
735 sess_path = Path(rel_sess_path).joinpath(d['rel_path'])
736 full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path)
737 if not full_local_path.exists():
738 uuid = i
739 self.local_paths.append(full_local_path)
740 target_paths.append(sess_path)
741 source_paths.append(add_uuid_string(sess_path, uuid))
743 if len(target_paths) != 0:
744 ts = time()
745 for sp, tp in zip(source_paths, target_paths):
746 _logger.info(f'Downloading {sp} to {tp}')
747 self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths)
748 _logger.debug(f'Complete. Time elapsed {time() - ts}')
750 def uploadData(self, outputs, version, **kwargs):
751 """
752 Function to upload and register data of completed task
753 :param outputs: output files from task to register
754 :param version: ibllib version
755 :return: output info of registered datasets
756 """
757 versions = super().uploadData(outputs, version)
758 data_repo = get_local_data_repository(self.one.alyx)
759 return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs)
761 def cleanUp(self, **_):
762 """Clean up, remove the files that were downloaded from Globus once task has completed."""
763 for file in self.local_paths:
764 os.unlink(file)
767class RemoteEC2DataHandler(DataHandler):
768 def __init__(self, session_path, signature, one=None):
769 """
770 Data handler for running tasks on remote compute node. Will download missing data via http using ONE
772 :param session_path: path to session
773 :param signature: input and output file signatures
774 :param one: ONE instance
775 """
776 super().__init__(session_path, signature, one=one)
778 def setUp(self, check_hash=True, **_):
779 """
780 Function to download necessary data to run tasks using ONE
781 :return:
782 """
783 df = super().getData()
784 self.one._check_filesystem(df, check_hash=check_hash)
786 def uploadData(self, outputs, version, **kwargs):
787 """
788 Function to upload and register data of completed task via S3 patcher
789 :param outputs: output files from task to register
790 :param version: ibllib version
791 :return: output info of registered datasets
792 """
793 versions = super().uploadData(outputs, version)
794 s3_patcher = S3Patcher(one=self.one)
795 return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user,
796 versions=versions, **kwargs)
799class RemoteHttpDataHandler(DataHandler):
800 def __init__(self, session_path, signature, one=None):
801 """
802 Data handler for running tasks on remote compute node. Will download missing data via http using ONE
804 :param session_path: path to session
805 :param signature: input and output file signatures
806 :param one: ONE instance
807 """
808 super().__init__(session_path, signature, one=one)
810 def setUp(self, **_):
811 """
812 Function to download necessary data to run tasks using ONE
813 :return:
814 """
815 df = super().getData()
816 self.one._check_filesystem(df)
818 def uploadData(self, outputs, version, **kwargs):
819 """
820 Function to upload and register data of completed task via FTP patcher
821 :param outputs: output files from task to register
822 :param version: ibllib version
823 :return: output info of registered datasets
824 """
825 versions = super().uploadData(outputs, version)
826 ftp_patcher = FTPPatcher(one=self.one)
827 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
828 versions=versions, **kwargs)
831class RemoteAwsDataHandler(DataHandler):
832 def __init__(self, session_path, signature, one=None):
833 """
834 Data handler for running tasks on remote compute node.
836 This will download missing data from the private IBL S3 AWS data bucket. New datasets are
837 uploaded via Globus.
839 :param session_path: path to session
840 :param signature: input and output file signatures
841 :param one: ONE instance
842 """
843 super().__init__(session_path, signature, one=one)
844 self.local_paths = []
846 def setUp(self, **_):
847 """Function to download necessary data to run tasks using AWS boto3."""
848 df = super().getData()
849 self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows()))
851 def uploadData(self, outputs, version, **kwargs):
852 """
853 Function to upload and register data of completed task via FTP patcher
854 :param outputs: output files from task to register
855 :param version: ibllib version
856 :return: output info of registered datasets
857 """
858 # Set up Globus
859 from one.remote.globus import Globus # noqa
860 self.globus = Globus(client_name=kwargs.pop('client_name', 'server'), headless=True)
861 self.lab = self.session_path.lab
862 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
863 base_url = 'https://alyx.internationalbrainlab.org'
864 _logger.warning('Changing Alyx client to %s', base_url)
865 ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode)
866 else:
867 ac = self.one.alyx
868 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac)
870 # register datasets
871 versions = super().uploadData(outputs, version)
872 response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs)
874 # upload directly via globus
875 source_paths = []
876 target_paths = []
877 collections = {}
879 for dset, out in zip(response, outputs):
880 assert Path(out).name == dset['name']
881 # set flag to false
882 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
883 collection = '/'.join(fr['relative_path'].split('/')[:-1])
884 if collection in collections.keys():
885 collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}})
886 else:
887 collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}
889 # Set all exists status to false for server file records
890 self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False})
892 source_paths.append(out)
893 target_paths.append(add_uuid_string(fr['relative_path'], dset['id']))
895 if len(target_paths) != 0:
896 ts = time()
897 for sp, tp in zip(source_paths, target_paths):
898 _logger.info(f'Uploading {sp} to {tp}')
899 self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths)
900 _logger.debug(f'Complete. Time elapsed {time() - ts}')
902 for collection, files in collections.items():
903 globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True)
904 file_names = [str(gl[0]) for gl in globus_files]
905 file_sizes = [gl[1] for gl in globus_files]
907 for name, details in files.items():
908 try:
909 idx = file_names.index(name)
910 size = file_sizes[idx]
911 if size == details['size']:
912 # update the file record if sizes match
913 self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True})
914 else:
915 _logger.warning(f'File {name} found on SDSC but sizes do not match')
916 except ValueError:
917 _logger.warning(f'File {name} not found on SDSC')
919 return response
921 # ftp_patcher = FTPPatcher(one=self.one)
922 # return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
923 # versions=versions, **kwargs)
925 def cleanUp(self, task):
926 """Clean up, remove the files that were downloaded from globus once task has completed."""
927 if task.status == 0:
928 for file in self.local_paths:
929 os.unlink(file)
932class RemoteGlobusDataHandler(DataHandler):
933 """
934 Data handler for running tasks on remote compute node. Will download missing data using Globus.
936 :param session_path: path to session
937 :param signature: input and output file signatures
938 :param one: ONE instance
939 """
940 def __init__(self, session_path, signature, one=None):
941 super().__init__(session_path, signature, one=one)
943 def setUp(self, **_):
944 """Function to download necessary data to run tasks using globus."""
945 # TODO
946 pass
948 def uploadData(self, outputs, version, **kwargs):
949 """
950 Function to upload and register data of completed task via FTP patcher
951 :param outputs: output files from task to register
952 :param version: ibllib version
953 :return: output info of registered datasets
954 """
955 versions = super().uploadData(outputs, version)
956 ftp_patcher = FTPPatcher(one=self.one)
957 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
958 versions=versions, **kwargs)
961class SDSCDataHandler(DataHandler):
962 """
963 Data handler for running tasks on SDSC compute node
965 :param session_path: path to session
966 :param signature: input and output file signatures
967 :param one: ONE instance
968 """
970 def __init__(self, session_path, signatures, one=None):
971 super().__init__(session_path, signatures, one=one) 1hk
972 self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) 1hk
973 self.root_path = SDSC_ROOT_PATH 1hk
974 self.linked_files = [] # List of symlinks created to run tasks 1hk
976 def setUp(self, task, **_):
977 """Function to create symlinks to necessary data to run tasks."""
978 df = super().getData() 1hk
980 SDSC_TMP = ensure_alf_path(self.patch_path.joinpath(task.__class__.__name__)) 1hk
981 session_path = Path(get_alf_path(self.session_path)) 1hk
982 for uuid, d in df.iterrows(): 1hk
983 file_path = session_path / d['rel_path'] 1hk
984 file_uuid = add_uuid_string(file_path, uuid) 1hk
985 file_link = SDSC_TMP.joinpath(file_path) 1hk
986 file_link.parent.mkdir(exist_ok=True, parents=True) 1hk
987 try: # TODO append link to task attribute 1hk
988 file_link.symlink_to( 1hk
989 Path(self.root_path.joinpath(file_uuid)))
990 self.linked_files.append(file_link) 1hk
991 except FileExistsError:
992 pass
993 task.session_path = SDSC_TMP.joinpath(session_path) 1hk
994 # If one of the symlinked input files is also an expected output, raise here to avoid overwriting
995 # In the future we may instead copy the data under this condition
996 assert self.getOutputFiles(session_path=task.session_path).shape[0] == 0, ( 1hk
997 "On SDSC patcher, output files should be distinct from input files to avoid overwriting")
999 def uploadData(self, outputs, version, **kwargs):
1000 """
1001 Function to upload and register data of completed task via SDSC patcher
1002 :param outputs: output files from task to register
1003 :param version: ibllib version
1004 :return: output info of registered datasets
1005 """
1006 versions = super().uploadData(outputs, version)
1007 sdsc_patcher = SDSCPatcher(one=self.one)
1008 return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs)
1010 def cleanUp(self, task):
1011 """Function to clean up symlinks created to run task."""
1012 assert self.patch_path.parts[0:4] == task.session_path.parts[0:4] 1h
1013 shutil.rmtree(task.session_path) 1h
1016class PopeyeDataHandler(SDSCDataHandler):
1018 def __init__(self, session_path, signatures, one=None):
1019 super().__init__(session_path, signatures, one=one)
1020 self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/"))
1021 self.root_path = Path("/mnt/sdceph/users/ibl/data")
1023 def uploadData(self, outputs, version, **kwargs):
1024 raise NotImplementedError(
1025 "Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task."
1026 )
1028 def cleanUp(self, **_):
1029 """Symlinks are preserved until registration."""
1030 pass