Coverage for ibllib/oneibl/data_handlers.py: 64%
434 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
1"""Downloading of task dependent datasets and registration of task output datasets.
3The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are
4present and to register and upload the output datasets. For examples on how to run a task using
5specific data handlers, see :func:`ibllib.pipes.tasks`.
6"""
7import logging
8import pandas as pd
9from pathlib import Path, PurePosixPath
10import shutil
11import os
12import abc
13from time import time
14from copy import copy
16from one.api import ONE
17from one.webclient import AlyxClient
18from one.util import filter_datasets
19from one.alf.path import add_uuid_string, session_path_parts, get_alf_path
20from one.alf.cache import _make_datasets_df
21from iblutil.util import flatten, ensure_list
23from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository
24from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher
27_logger = logging.getLogger(__name__)
30class ExpectedDataset:
31 """An expected input or output dataset."""
32 inverted = False
34 def __init__(self, name, collection, register=None, revision=None, unique=True):
35 """
36 An expected input or output dataset.
38 NB: This should not be instantiated directly, but rather via the `input` or `output`
39 static method.
41 Parameters
42 ----------
43 name : str, None
44 A dataset name or glob pattern.
45 collection : str, None
46 An ALF collection or pattern.
47 register : bool
48 Whether to register the file. Default is False for input files, True for output
49 files.
50 revision : str
51 An optional revision.
52 unique : bool
53 Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not
54 affect the output of `find_files`.
55 """
56 if not (collection is None or isinstance(collection, str)): 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
57 collection = '/'.join(collection)
58 self._identifiers = (collection, revision, name) 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
59 self.operator = None 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
60 self._register = register or False 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
61 self.inverted = False 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
62 self.name = None 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
63 self.unique = unique 2a abc b @ + = n s t [ o j p ? q S T U V W u . ' v / w x g : y z ; h ] ^ _ ` { | } % A B C D E F G H I , J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
65 @property
66 def register(self):
67 """bool: whether to register the output file."""
68 return self._register 1acbnsojqSTUVWuvwxgyzh%ABCDEFGHIJ(XYklerfdimRZ01234*5K6LMN78O9!#$PQ
70 @register.setter
71 def register(self, value):
72 """bool: whether to register the output file."""
73 if self.operator is not None: 1soj
74 raise AttributeError('cannot set register attribute for operator datasets')
75 self._register = value 1soj
77 @property
78 def identifiers(self):
79 """tuple: the identifying parts of the dataset.
81 If no operator is applied, the identifiers are (collection, revision, name).
82 If an operator is applied, a tuple of 3-element tuples is returned.
83 """
84 if self.operator is None: 1cbn[klerfdi
85 return self._identifiers 1cbn[klerfdi
86 # Flatten nested identifiers into tuple of 3-element tuples
87 identifiers = [] 1bn[d
88 for x in self._identifiers: 1bn[d
89 add = identifiers.extend if x.operator else identifiers.append 1bn[d
90 add(x.identifiers) 1bn[d
91 return tuple(identifiers) 1bn[d
93 @property
94 def glob_pattern(self):
95 """str, tuple of str: one or more glob patterns."""
96 if self.operator is None: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
97 return str(PurePosixPath(*filter(None, self._identifiers))) 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
98 else:
99 return tuple(flatten(x.glob_pattern for x in self._identifiers)) 1jr
101 def __repr__(self):
102 """Represent the dataset object as a string.
104 If the `name` property is not None, it is returned, otherwise the identifies are used to
105 format the name.
106 """
107 name = self.__class__.__name__
108 if self.name:
109 return f'<{name}({self.name})>'
110 if self.operator:
111 sym = {'or': '|', 'and': '&', 'xor': '^'}
112 patterns = [d.__repr__() for d in self._identifiers]
113 pattern = f'{sym[self.operator]:^3}'.join(patterns)
114 if self.inverted:
115 pattern = f'~({pattern})'
116 else:
117 pattern = ('~' if self.inverted else '') + self.glob_pattern
118 return f'<{name}({pattern})>'
120 def find_files(self, session_path, register=False):
121 """Find files on disk.
123 Uses glob patterns to find dataset(s) on disk.
125 Parameters
126 ----------
127 session_path : pathlib.Path, str
128 A session path within which to glob for the dataset(s).
129 register : bool
130 Only return files intended to be registered.
132 Returns
133 -------
134 bool
135 True if the dataset is found on disk or is optional.
136 list of pathlib.Path
137 A list of matching dataset files.
138 missing, None, str, set of str
139 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
141 Notes
142 -----
143 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
144 although this may change in the future.
145 - If `register` is false, all files are returned regardless of whether they are intended to be registered.
146 - If `register` is true, an input with register=True may not be returned if part of an OR operation.
147 - If `inverted` is true, and files are found, the glob pattern is returned as missing.
148 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
149 patterns.
150 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
151 """
152 session_path = Path(session_path) 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
153 ok, actual_files, missing = False, [], None 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
154 if self.operator is None: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
155 if register and not self.register: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
156 return True, actual_files, missing 1asojqSTUVWuvwxgyzh%ABCDEFGHIJ(XYkldimRZ01234*5K6LMN78O9!#$PQ
157 actual_files = sorted(session_path.rglob(self.glob_pattern)) 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
158 # If no revision pattern provided and no files found, search for any revision
159 if self._identifiers[1] is None and not any(actual_files): 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
160 glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2])) 2a abc b s o j q S T U V W u ' v w x g y z h A B C D E F G H I , J X Y k l e d i m - Z 0 1 2 3 4 ) 5 K 6 L M N 7 8 O 9 ! # P Q
161 actual_files = sorted(session_path.rglob(glob_pattern)) 2a abc b s o j q S T U V W u ' v w x g y z h A B C D E F G H I , J X Y k l e d i m - Z 0 1 2 3 4 ) 5 K 6 L M N 7 8 O 9 ! # P Q
162 ok = any(actual_files) != self.inverted 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
163 if not ok: 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
164 missing = self.glob_pattern 2a abc b s o j q S T U V W u ' v w x g y z h A B C D E F G H I , J X Y k l e d i m - Z 0 1 2 3 4 ) 5 K 6 L M N 7 8 O 9 ! # P Q
165 elif self.operator == 'and': 1cbsojqgefd
166 assert len(self._identifiers) == 2 1sgefd
167 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1sgefd
168 ok = all(_ok) 1sgefd
169 actual_files = flatten(_actual_files) 1sgefd
170 missing = set(filter(None, flatten(_missing))) 1sgefd
171 elif self.operator == 'or': 1cbojqg
172 assert len(self._identifiers) == 2 1cboqg
173 missing = set() 1cboqg
174 for d in self._identifiers: 1cboqg
175 ok, actual_files, _missing = d.find_files(session_path, register=register) 1cboqg
176 if ok: 1cboqg
177 break 1cbog
178 if missing is not None: 1cboq
179 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1cboq
180 elif self.operator == 'xor': 1j
181 assert len(self._identifiers) == 2 1j
182 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path, register=register), self._identifiers)) 1j
183 ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1 1j
184 # Return only those datasets that are complete if OK
185 actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files) 1j
186 if ok: 1j
187 missing = set() 1j
188 elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing 1j
189 missing = set(flatten(self.glob_pattern)) 1j
190 elif not any(_ok): # return all missing glob patterns if none present 1j
191 missing = set(filter(None, flatten(_missing))) 1j
192 elif not isinstance(self.operator, str):
193 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
194 else:
195 raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
197 return ok, actual_files, missing 2a abc b s o j ? q dbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
199 def filter(self, session_datasets, **kwargs):
200 """Filter dataset frame by expected datasets.
202 Parameters
203 ----------
204 session_datasets : pandas.DataFrame
205 A data frame of session datasets.
206 kwargs
207 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and
208 ignore_qc_not_set.
210 Returns
211 -------
212 bool
213 True if the required dataset(s) are present in the data frame.
214 pandas.DataFrame
215 A filtered data frame of containing the expected dataset(s).
216 """
217 # ok, datasets = False, session_datasets.iloc[0:0]
218 if self.operator is None: 1+tp
219 collection, revision, file = self._identifiers 1+tp
220 if self._identifiers[1] is not None: 1+tp
221 raise NotImplementedError('revisions not yet supported')
222 datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs) 1+tp
223 ok = datasets.empty == self.inverted 1+tp
224 elif self.operator == 'or': 1t
225 assert len(self._identifiers) == 2 1t
226 for d in self._identifiers: 1t
227 ok, datasets = d.filter(session_datasets, **kwargs) 1t
228 if ok: 1t
229 break 1t
230 elif self.operator == 'xor': 1t
231 assert len(self._identifiers) == 2 1t
232 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 1t
233 ok = sum(_ok) == 1 1t
234 if ok: 1t
235 # Return only those datasets that are complete.
236 datasets = _datasets[_ok.index(True)] 1t
237 else:
238 datasets = pd.concat(_datasets) 1t
239 elif self.operator == 'and':
240 assert len(self._identifiers) == 2
241 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers))
242 ok = all(_ok)
243 datasets = pd.concat(_datasets)
244 elif not isinstance(self.operator, str):
245 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
246 else:
247 raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
248 return ok, datasets 1+tp
250 def _apply_op(self, op, other):
251 """Apply an operation between two datasets."""
252 # Assert both instances of Input or both instances of Output
253 if not isinstance(other, (self.__class__, tuple)): 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
254 raise TypeError(f'logical operations not supported between objects of type '
255 f'{self.__class__.__name__} and {other.__class__.__name__}')
256 # Assert operation supported
257 if op not in {'or', 'xor', 'and'}: 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
258 raise ValueError(op)
259 # Convert tuple to ExpectDataset instance
260 if isinstance(other, tuple): 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
261 D = (self.input if isinstance(self, Input) else self.output)
262 other = D(*other)
263 # Returned instance should only be optional if both datasets are optional
264 is_input = isinstance(self, Input) 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
265 if all(isinstance(x, OptionalDataset) for x in (self, other)): 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
266 D = OptionalInput if is_input else OptionalOutput 1nerfd
267 else:
268 D = Input if is_input else Output 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbd m
269 # Instantiate 'empty' object
270 d = D(None, None) 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
271 d._identifiers = (self, other) 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
272 d.operator = op 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
273 return d 2a c b @ n s t [ o j q g ] ^ _ ` { | } e bbcbr f d m
275 def __invert__(self):
276 """Assert dataset doesn't exist on disk."""
277 obj = copy(self) 1ag]^_`{|}m
278 obj.inverted = not self.inverted 1ag]^_`{|}m
279 return obj 1ag]^_`{|}m
281 def __or__(self, b):
282 """Assert either dataset exists or another does, or both exist."""
283 return self._apply_op('or', b) 2a c b @ n t [ o q g ] ^ _ ` { | } e bbcbm
285 def __xor__(self, b):
286 """Assert either dataset exists or another does, not both."""
287 return self._apply_op('xor', b) 1tj
289 def __and__(self, b):
290 """Assert that a second dataset exists together with the first."""
291 return self._apply_op('and', b) 1ansg]^_`{|}erfdm
293 @staticmethod
294 def input(name, collection, required=True, register=False, **kwargs):
295 """
296 Create an expected input dataset.
298 By default, expected input datasets are not automatically registered.
300 Parameters
301 ----------
302 name : str
303 A dataset name or glob pattern.
304 collection : str, None
305 An ALF collection or pattern.
306 required : bool
307 Whether file must always be present, or is an optional dataset. Default is True.
308 register : bool
309 Whether to register the input file. Default is False for input files, True for output
310 files.
311 revision : str
312 An optional revision.
313 unique : bool
314 Whether identifier pattern is expected to match a single dataset or several.
316 Returns
317 -------
318 Input, OptionalInput
319 An instance of an Input dataset if required is true, otherwise an OptionalInput.
320 """
321 Class = Input if required else OptionalInput 2a abc b @ + n s t [ o j p q S T U V W u v w x g y z h ] ^ _ ` { | } % A B C D E F G H I J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q
322 obj = Class(name, collection, register=register, **kwargs) 2a abc b @ + n s t [ o j p q S T U V W u v w x g y z h ] ^ _ ` { | } % A B C D E F G H I J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q
323 return obj 2a abc b @ + n s t [ o j p q S T U V W u v w x g y z h ] ^ _ ` { | } % A B C D E F G H I J ( X Y k l e bbcbr f d i m - R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q
325 @staticmethod
326 def output(name, collection, required=True, register=True, **kwargs):
327 """
328 Create an expected output dataset.
330 By default, expected output datasets are automatically registered.
332 Parameters
333 ----------
334 name : str
335 A dataset name or glob pattern.
336 collection : str, None
337 An ALF collection or pattern.
338 required : bool
339 Whether file must always be present, or is an optional dataset. Default is True.
340 register : bool
341 Whether to register the output file. Default is False for input files, True for output
342 files.
343 revision : str
344 An optional revision.
345 unique : bool
346 Whether identifier pattern is expected to match a single dataset or several.
348 Returns
349 -------
350 Output, OptionalOutput
351 An instance of an Output dataset if required is true, otherwise an OptionalOutput.
352 """
353 Class = Output if required else OptionalOutput 1acb+=p?STUVWu.'v/wxg:yz;h%ABCDEFGHI,J(XYkledim-RZ01234)*5K6LMN78O9!#$PQ
354 obj = Class(name, collection, register=register, **kwargs) 1acb+=p?STUVWu.'v/wxg:yz;h%ABCDEFGHI,J(XYkledim-RZ01234)*5K6LMN78O9!#$PQ
355 return obj 1acb+=p?STUVWu.'v/wxg:yz;h%ABCDEFGHI,J(XYkledim-RZ01234)*5K6LMN78O9!#$PQ
358class OptionalDataset(ExpectedDataset):
359 """An expected dataset that is not strictly required."""
361 def find_files(self, session_path, register=False):
362 """Find files on disk.
364 Uses glob patterns to find dataset(s) on disk.
366 Parameters
367 ----------
368 session_path : pathlib.Path, str
369 A session path within which to glob for the dataset(s).
370 register : bool
371 Only return files intended to be registered.
373 Returns
374 -------
375 True
376 Always True as dataset is optional.
377 list of pathlib.Path
378 A list of matching dataset files.
379 missing, None, str, set of str
380 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
382 Notes
383 -----
384 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
385 although this may change in the future.
386 - If `register` is false, all files are returned regardless of whether they are intended to be registered.
387 - If `inverted` is true, and files are found, the glob pattern is returned as missing.
388 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
389 patterns.
390 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
391 """
392 ok, actual_files, missing = super().find_files(session_path, register=register) 2c b q dbu . ' v / w x g : y z ; h % A B C D E F G H I J e f d i - ) K L M N O $ P Q
393 return True, actual_files, missing 2c b q dbu . ' v / w x g : y z ; h % A B C D E F G H I J e f d i - ) K L M N O $ P Q
395 def filter(self, session_datasets, **kwargs):
396 """Filter dataset frame by expected datasets.
398 Parameters
399 ----------
400 session_datasets : pandas.DataFrame
401 An data frame of session datasets.
402 kwargs
403 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc,
404 ignore_qc_not_set, and assert_unique.
406 Returns
407 -------
408 True
409 Always True as dataset is optional.
410 pandas.DataFrame
411 A filtered data frame of containing the expected dataset(s).
412 """
413 ok, datasets = super().filter(session_datasets, **kwargs)
414 return True, datasets
417class Input(ExpectedDataset):
418 """An expected input dataset."""
419 pass
422class OptionalInput(Input, OptionalDataset):
423 """An optional expected input dataset."""
424 pass
427class Output(ExpectedDataset):
428 """An expected output dataset."""
429 pass
432class OptionalOutput(Output, OptionalDataset):
433 """An optional expected output dataset."""
434 pass
437def _parse_signature(signature):
438 """
439 Ensure all a signature's expected datasets are instances of ExpectedDataset.
441 Parameters
442 ----------
443 signature : Dict[str, list]
444 A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or
445 ExpectedDataset instances.
447 Returns
448 -------
449 Dict[str, list of ExpectedDataset]
450 A dict containing all tuples converted to ExpectedDataset instances.
451 """
452 I, O = ExpectedDataset.input, ExpectedDataset.output # noqa 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
453 inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']] 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
454 outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']] 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
455 return {'input_files': inputs, 'output_files': outputs} 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l e r f d i m - R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
458def dataset_from_name(name, datasets):
459 """
460 From a list of ExpectedDataset instances, return those that match a given name.
462 Parameters
463 ----------
464 name : str
465 The name of the dataset.
466 datasets : list of ExpectedDataset
467 A list of ExpectedDataset instances.
469 Returns
470 -------
471 list of ExpectedDataset
472 The ExpectedDataset instances that match the given name.
474 """
475 matches = [] 1cb@ef
476 for dataset in datasets: 1cb@ef
477 if dataset.operator is None: 1cb@ef
478 if dataset._identifiers[2] == name: 1cb@ef
479 matches.append(dataset) 1cb@ef
480 else:
481 matches.extend(dataset_from_name(name, dataset._identifiers)) 1cb@ef
482 return matches 1cb@ef
485def update_collections(dataset, new_collection, substring=None, unique=None):
486 """
487 Update the collection of a dataset.
489 This updates all nested ExpectedDataset instances with the new collection and returns copies.
491 Parameters
492 ----------
493 dataset : ExpectedDataset
494 The dataset to update.
495 new_collection : str, list of str
496 The new collection or collections.
497 substring : str, optional
498 An optional substring in the collection to replace with new collection(s). If None, the
499 entire collection will be replaced.
501 Returns
502 -------
503 ExpectedDataset
504 A copy of the dataset with the updated collection(s).
506 """
507 after = ensure_list(new_collection) 1cbnklerfdi
508 D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output 1cbnklerfdi
509 if dataset.operator is None: 1cbnklerfdi
510 collection, revision, name = dataset.identifiers 1cbnklerfdi
511 if revision is not None: 1cbnklerfdi
512 raise NotImplementedError 1n
513 if substring: 1cbnklerfdi
514 after = [(collection or '').replace(substring, x) or None for x in after] 1cbnklerfdi
515 if unique is None: 1cbnklerfdi
516 unique = [not set(name + (x or '')).intersection('*[?') for x in after] 1cbnklerfdi
517 else:
518 unique = [unique] * len(after) 1n
519 register = dataset.register 1cbnklerfdi
520 updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0]) 1cbnklerfdi
521 if len(after) > 1: 1cbnklerfdi
522 for folder, unq in zip(after[1:], unique[1:]): 1nerfd
523 updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq) 1nerfd
524 else:
525 updated = copy(dataset) 1cbne
526 updated._identifiers = [update_collections(dd, new_collection, substring, unique) 1cbne
527 for dd in updated._identifiers]
528 return updated 1cbnklerfdi
531class DataHandler(abc.ABC):
532 def __init__(self, session_path, signature, one=None):
533 """
534 Base data handler class
535 :param session_path: path to session
536 :param signature: input and output file signatures
537 :param one: ONE instance
538 """
539 self.session_path = session_path 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
540 self.signature = _parse_signature(signature) 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
541 self.one = one 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
542 self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records) 2a c b + = p ? eb~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
544 def setUp(self, **kwargs):
545 """Function to optionally overload to download required data to run task."""
546 pass 2ebR $
548 def getData(self, one=None):
549 """Finds the datasets required for task based on input signatures.
551 Parameters
552 ----------
553 one : one.api.One, optional
554 An instance of ONE to use.
556 Returns
557 -------
558 pandas.DataFrame, None
559 A data frame of required datasets. An empty frame is returned if no registered datasets are required,
560 while None is returned if no instance of ONE is set.
561 """
562 if self.one is None and one is None: 1+p
563 return 1+
564 one = one or self.one 1+p
565 session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) 1+p
566 dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] 1+p
567 return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() 1+p
569 def getOutputFiles(self):
570 """
571 Return a data frame of output datasets found on disk.
573 Returns
574 -------
575 pandas.DataFrame
576 A dataset data frame of datasets on disk that were specified in signature['output_files'].
577 """
578 assert self.session_path
579 # Next convert datasets to frame
580 # Create dataframe of all ALF datasets
581 df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id'])
582 # Filter outputs
583 if len(self.signature['output_files']) == 0:
584 return pd.DataFrame()
585 present = [file.filter(df)[1] for file in self.signature['output_files']]
586 return pd.concat(present).droplevel('eid')
588 def uploadData(self, outputs, version):
589 """
590 Function to optionally overload to upload and register data
591 :param outputs: output files from task to register
592 :param version: ibllib version
593 :return:
594 """
595 if isinstance(outputs, list): 1=~hR
596 versions = [version for _ in outputs] 1=~hR
597 else:
598 versions = [version]
600 return versions 1=~hR
602 def cleanUp(self, **kwargs):
603 """Function to optionally overload to clean up files after running task."""
604 pass 1~h
607class LocalDataHandler(DataHandler):
608 def __init__(self, session_path, signatures, one=None):
609 """
610 Data handler for running tasks locally, with no architecture or db connection
611 :param session_path: path to session
612 :param signature: input and output file signatures
613 :param one: ONE instance
614 """
615 super().__init__(session_path, signatures, one=one) 2ebR $
618class ServerDataHandler(DataHandler):
619 def __init__(self, session_path, signatures, one=None):
620 """
621 Data handler for running tasks on lab local servers when all data is available locally
623 :param session_path: path to session
624 :param signature: input and output file signatures
625 :param one: ONE instance
626 """
627 super().__init__(session_path, signatures, one=one) 2a c b + = ? ~ fbS T U V W u . ' v / w x g : y z ; h % A B C D E F G H I , J ( X Y k l f d i m Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # P Q
629 def uploadData(self, outputs, version, clobber=False, **kwargs):
630 """
631 Upload and/or register output data.
633 This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`.
635 Parameters
636 ----------
637 outputs : list of pathlib.Path
638 A set of ALF paths to register to Alyx.
639 version : str, list of str
640 The version of ibllib used to generate these output files.
641 clobber : bool
642 If True, re-upload outputs that have already been passed to this method.
643 kwargs
644 Optional keyword arguments for one.registration.RegistrationClient.register_files.
646 Returns
647 -------
648 list of dicts, dict
649 A list of newly created Alyx dataset records or the registration data if dry.
650 """
651 versions = super().uploadData(outputs, version) 1=~h
652 data_repo = get_local_data_repository(self.one.alyx) 1=~h
653 # If clobber = False, do not re-upload the outputs that have already been processed
654 outputs = ensure_list(outputs) 1=~h
655 to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs)) 1=~h
656 records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or [] 1=~h
657 if kwargs.get('dry', False): 1=~h
658 return records 1=
659 # Store processed outputs
660 self.processed.update({k: v for k, v in zip(to_upload, records) if v}) 1=~h
661 return [self.processed[x] for x in outputs if x in self.processed] 1=~h
663 def cleanUp(self, **_):
664 """Empties and returns the processed dataset mep."""
665 super().cleanUp() 1~h
666 processed = self.processed 1~h
667 self.processed = {} 1~h
668 return processed 1~h
671class ServerGlobusDataHandler(DataHandler):
672 def __init__(self, session_path, signatures, one=None):
673 """
674 Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus
676 :param session_path: path to session
677 :param signatures: input and output file signatures
678 :param one: ONE instance
679 """
680 from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa
681 super().__init__(session_path, signatures, one=one)
682 self.globus = Globus(client_name='server', headless=True)
684 # on local servers set up the local root path manually as some have different globus config paths
685 self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects'
687 # Find the lab
688 self.lab = get_lab(self.session_path, self.one.alyx)
690 # For cortex lab we need to get the endpoint from the ibl alyx
691 if self.lab == 'cortexlab':
692 alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None)
693 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx)
694 else:
695 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx)
697 self.local_paths = []
699 def setUp(self, **_):
700 """Function to download necessary data to run tasks using globus-sdk."""
701 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
702 df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode))
703 else:
704 df = super().getData(one=self.one)
706 if len(df) == 0:
707 # If no datasets found in the cache only work off local file system do not attempt to
708 # download any missing data using Globus
709 return
711 # Check for space on local server. If less that 500 GB don't download new data
712 space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2]
713 if space_free < 500e9:
714 _logger.warning('Space left on server is < 500GB, won\'t re-download new data')
715 return
717 rel_sess_path = '/'.join(self.session_path.parts[-3:])
718 target_paths = []
719 source_paths = []
720 for i, d in df.iterrows():
721 sess_path = Path(rel_sess_path).joinpath(d['rel_path'])
722 full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path)
723 if not full_local_path.exists():
724 uuid = i
725 self.local_paths.append(full_local_path)
726 target_paths.append(sess_path)
727 source_paths.append(add_uuid_string(sess_path, uuid))
729 if len(target_paths) != 0:
730 ts = time()
731 for sp, tp in zip(source_paths, target_paths):
732 _logger.info(f'Downloading {sp} to {tp}')
733 self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths)
734 _logger.debug(f'Complete. Time elapsed {time() - ts}')
736 def uploadData(self, outputs, version, **kwargs):
737 """
738 Function to upload and register data of completed task
739 :param outputs: output files from task to register
740 :param version: ibllib version
741 :return: output info of registered datasets
742 """
743 versions = super().uploadData(outputs, version)
744 data_repo = get_local_data_repository(self.one.alyx)
745 return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs)
747 def cleanUp(self, **_):
748 """Clean up, remove the files that were downloaded from Globus once task has completed."""
749 for file in self.local_paths:
750 os.unlink(file)
753class RemoteEC2DataHandler(DataHandler):
754 def __init__(self, session_path, signature, one=None):
755 """
756 Data handler for running tasks on remote compute node. Will download missing data via http using ONE
758 :param session_path: path to session
759 :param signature: input and output file signatures
760 :param one: ONE instance
761 """
762 super().__init__(session_path, signature, one=one)
764 def setUp(self, **_):
765 """
766 Function to download necessary data to run tasks using ONE
767 :return:
768 """
769 df = super().getData()
770 self.one._check_filesystem(df, check_hash=False)
772 def uploadData(self, outputs, version, **kwargs):
773 """
774 Function to upload and register data of completed task via S3 patcher
775 :param outputs: output files from task to register
776 :param version: ibllib version
777 :return: output info of registered datasets
778 """
779 versions = super().uploadData(outputs, version)
780 s3_patcher = S3Patcher(one=self.one)
781 return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user,
782 versions=versions, **kwargs)
785class RemoteHttpDataHandler(DataHandler):
786 def __init__(self, session_path, signature, one=None):
787 """
788 Data handler for running tasks on remote compute node. Will download missing data via http using ONE
790 :param session_path: path to session
791 :param signature: input and output file signatures
792 :param one: ONE instance
793 """
794 super().__init__(session_path, signature, one=one)
796 def setUp(self, **_):
797 """
798 Function to download necessary data to run tasks using ONE
799 :return:
800 """
801 df = super().getData()
802 self.one._check_filesystem(df)
804 def uploadData(self, outputs, version, **kwargs):
805 """
806 Function to upload and register data of completed task via FTP patcher
807 :param outputs: output files from task to register
808 :param version: ibllib version
809 :return: output info of registered datasets
810 """
811 versions = super().uploadData(outputs, version)
812 ftp_patcher = FTPPatcher(one=self.one)
813 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
814 versions=versions, **kwargs)
817class RemoteAwsDataHandler(DataHandler):
818 def __init__(self, session_path, signature, one=None):
819 """
820 Data handler for running tasks on remote compute node.
822 This will download missing data from the private IBL S3 AWS data bucket. New datasets are
823 uploaded via Globus.
825 :param session_path: path to session
826 :param signature: input and output file signatures
827 :param one: ONE instance
828 """
829 super().__init__(session_path, signature, one=one)
830 self.local_paths = []
832 def setUp(self, **_):
833 """Function to download necessary data to run tasks using AWS boto3."""
834 df = super().getData()
835 self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows()))
837 def uploadData(self, outputs, version, **kwargs):
838 """
839 Function to upload and register data of completed task via FTP patcher
840 :param outputs: output files from task to register
841 :param version: ibllib version
842 :return: output info of registered datasets
843 """
844 # Set up Globus
845 from one.remote.globus import Globus # noqa
846 self.globus = Globus(client_name='server', headless=True)
847 self.lab = session_path_parts(self.session_path, as_dict=True)['lab']
848 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
849 base_url = 'https://alyx.internationalbrainlab.org'
850 _logger.warning('Changing Alyx client to %s', base_url)
851 ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode)
852 else:
853 ac = self.one.alyx
854 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac)
856 # register datasets
857 versions = super().uploadData(outputs, version)
858 response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs)
860 # upload directly via globus
861 source_paths = []
862 target_paths = []
863 collections = {}
865 for dset, out in zip(response, outputs):
866 assert Path(out).name == dset['name']
867 # set flag to false
868 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
869 collection = '/'.join(fr['relative_path'].split('/')[:-1])
870 if collection in collections.keys():
871 collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}})
872 else:
873 collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}
875 # Set all exists status to false for server file records
876 self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False})
878 source_paths.append(out)
879 target_paths.append(add_uuid_string(fr['relative_path'], dset['id']))
881 if len(target_paths) != 0:
882 ts = time()
883 for sp, tp in zip(source_paths, target_paths):
884 _logger.info(f'Uploading {sp} to {tp}')
885 self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths)
886 _logger.debug(f'Complete. Time elapsed {time() - ts}')
888 for collection, files in collections.items():
889 globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True)
890 file_names = [str(gl[0]) for gl in globus_files]
891 file_sizes = [gl[1] for gl in globus_files]
893 for name, details in files.items():
894 try:
895 idx = file_names.index(name)
896 size = file_sizes[idx]
897 if size == details['size']:
898 # update the file record if sizes match
899 self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True})
900 else:
901 _logger.warning(f'File {name} found on SDSC but sizes do not match')
902 except ValueError:
903 _logger.warning(f'File {name} not found on SDSC')
905 return response
907 # ftp_patcher = FTPPatcher(one=self.one)
908 # return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
909 # versions=versions, **kwargs)
911 def cleanUp(self, task):
912 """Clean up, remove the files that were downloaded from globus once task has completed."""
913 if task.status == 0:
914 for file in self.local_paths:
915 os.unlink(file)
918class RemoteGlobusDataHandler(DataHandler):
919 """
920 Data handler for running tasks on remote compute node. Will download missing data using Globus.
922 :param session_path: path to session
923 :param signature: input and output file signatures
924 :param one: ONE instance
925 """
926 def __init__(self, session_path, signature, one=None):
927 super().__init__(session_path, signature, one=one)
929 def setUp(self, **_):
930 """Function to download necessary data to run tasks using globus."""
931 # TODO
932 pass
934 def uploadData(self, outputs, version, **kwargs):
935 """
936 Function to upload and register data of completed task via FTP patcher
937 :param outputs: output files from task to register
938 :param version: ibllib version
939 :return: output info of registered datasets
940 """
941 versions = super().uploadData(outputs, version)
942 ftp_patcher = FTPPatcher(one=self.one)
943 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
944 versions=versions, **kwargs)
947class SDSCDataHandler(DataHandler):
948 """
949 Data handler for running tasks on SDSC compute node
951 :param session_path: path to session
952 :param signature: input and output file signatures
953 :param one: ONE instance
954 """
956 def __init__(self, session_path, signatures, one=None):
957 super().__init__(session_path, signatures, one=one) 1p
958 self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) 1p
959 self.root_path = SDSC_ROOT_PATH 1p
961 def setUp(self, task):
962 """Function to create symlinks to necessary data to run tasks."""
963 df = super().getData() 1p
965 SDSC_TMP = Path(self.patch_path.joinpath(task.__class__.__name__)) 1p
966 session_path = Path(get_alf_path(self.session_path)) 1p
967 for uuid, d in df.iterrows(): 1p
968 file_path = session_path / d['rel_path'] 1p
969 file_uuid = add_uuid_string(file_path, uuid) 1p
970 file_link = SDSC_TMP.joinpath(file_path) 1p
971 file_link.parent.mkdir(exist_ok=True, parents=True) 1p
972 try: 1p
973 file_link.symlink_to( 1p
974 Path(self.root_path.joinpath(file_uuid)))
975 except FileExistsError:
976 pass
978 task.session_path = SDSC_TMP.joinpath(session_path) 1p
980 def uploadData(self, outputs, version, **kwargs):
981 """
982 Function to upload and register data of completed task via SDSC patcher
983 :param outputs: output files from task to register
984 :param version: ibllib version
985 :return: output info of registered datasets
986 """
987 versions = super().uploadData(outputs, version)
988 sdsc_patcher = SDSCPatcher(one=self.one)
989 return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs)
991 def cleanUp(self, task):
992 """Function to clean up symlinks created to run task."""
993 assert self.patch_path.parts[0:4] == task.session_path.parts[0:4] 1p
994 shutil.rmtree(task.session_path) 1p
997class PopeyeDataHandler(SDSCDataHandler):
999 def __init__(self, session_path, signatures, one=None):
1000 super().__init__(session_path, signatures, one=one)
1001 self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/"))
1002 self.root_path = Path("/mnt/sdceph/users/ibl/data")
1004 def uploadData(self, outputs, version, **kwargs):
1005 raise NotImplementedError(
1006 "Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task."
1007 )
1009 def cleanUp(self, **_):
1010 """Symlinks are preserved until registration."""
1011 pass