Coverage for ibllib/oneibl/data_handlers.py: 63%
434 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1"""Downloading of task dependent datasets and registration of task output datasets.
3The DataHandler class is used by the pipes.tasks.Task class to ensure dependent datasets are
4present and to register and upload the output datasets. For examples on how to run a task using
5specific data handlers, see :func:`ibllib.pipes.tasks`.
6"""
7import logging
8import pandas as pd
9from pathlib import Path, PurePosixPath
10import shutil
11import os
12import abc
13from time import time
14from copy import copy
16from one.api import ONE
17from one.webclient import AlyxClient
18from one.util import filter_datasets
19from one.alf.path import add_uuid_string, session_path_parts, get_alf_path
20from one.alf.cache import _make_datasets_df
21from iblutil.util import flatten, ensure_list
23from ibllib.oneibl.registration import register_dataset, get_lab, get_local_data_repository
24from ibllib.oneibl.patcher import FTPPatcher, SDSCPatcher, SDSC_ROOT_PATH, SDSC_PATCH_PATH, S3Patcher
27_logger = logging.getLogger(__name__)
30class ExpectedDataset:
31 """An expected input or output dataset."""
32 inverted = False
34 def __init__(self, name, collection, register=None, revision=None, unique=True):
35 """
36 An expected input or output dataset.
38 NB: This should not be instantiated directly, but rather via the `input` or `output`
39 static method.
41 Parameters
42 ----------
43 name : str, None
44 A dataset name or glob pattern.
45 collection : str, None
46 An ALF collection or pattern.
47 register : bool
48 Whether to register the file. Default is False for input files, True for output
49 files.
50 revision : str
51 An optional revision.
52 unique : bool
53 Whether identifier pattern is expected to match a single dataset or several. NB: This currently does not
54 affect the output of `find_files`.
55 """
56 if not (collection is None or isinstance(collection, str)): 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
57 collection = '/'.join(collection)
58 self._identifiers = (collection, revision, name) 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
59 self.operator = None 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
60 self._register = register or False 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
61 self.inverted = False 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
62 self.name = None 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
63 self.unique = unique 2a ~ c b ? + ; m s t @ r n o = p S T U V W u - ' v . w x g / y z : h [ ] ^ _ ` { | % A B C D E F G H I , J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
65 @property
66 def register(self):
67 """bool: whether to register the output file."""
68 return self._register 1acbmpSTUVWuvwxgyzh%ABCDEFGHIJ(XYjkdqfielRZ01234*5K6LMN78O9!#$PQ
70 @register.setter
71 def register(self, value):
72 """bool: whether to register the output file."""
73 if self.operator is not None:
74 raise AttributeError('cannot set register attribute for operator datasets')
75 self._register = value
77 @property
78 def identifiers(self):
79 """tuple: the identifying parts of the dataset.
81 If no operator is applied, the identifiers are (collection, revision, name).
82 If an operator is applied, a tuple of 3-element tuples is returned.
83 """
84 if self.operator is None: 1cbm@jkdqfie
85 return self._identifiers 1cbm@jkdqfie
86 # Flatten nested identifiers into tuple of 3-element tuples
87 identifiers = [] 1bm@e
88 for x in self._identifiers: 1bm@e
89 add = identifiers.extend if x.operator else identifiers.append 1bm@e
90 add(x.identifiers) 1bm@e
91 return tuple(identifiers) 1bm@e
93 @property
94 def glob_pattern(self):
95 """str, tuple of str: one or more glob patterns."""
96 if self.operator is None: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
97 return str(PurePosixPath(*filter(None, self._identifiers))) 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
98 else:
99 return tuple(flatten(x.glob_pattern for x in self._identifiers)) 1nq
101 def __repr__(self):
102 """Represent the dataset object as a string.
104 If the `name` property is not None, it is returned, otherwise the identifies are used to
105 format the name.
106 """
107 name = self.__class__.__name__
108 if self.name:
109 return f'<{name}({self.name})>'
110 if self.operator:
111 sym = {'or': '|', 'and': '&', 'xor': '^'}
112 patterns = [d.__repr__() for d in self._identifiers]
113 pattern = f'{sym[self.operator]:^3}'.join(patterns)
114 if self.inverted:
115 pattern = f'~({pattern})'
116 else:
117 pattern = ('~' if self.inverted else '') + self.glob_pattern
118 return f'<{name}({pattern})>'
120 def find_files(self, session_path, register=False):
121 """Find files on disk.
123 Uses glob patterns to find dataset(s) on disk.
125 Parameters
126 ----------
127 session_path : pathlib.Path, str
128 A session path within which to glob for the dataset(s).
129 register : bool
130 Only return files intended to be registered.
132 Returns
133 -------
134 bool
135 True if the dataset is found on disk or is optional.
136 list of pathlib.Path
137 A list of matching dataset files.
138 missing, None, str, set of str
139 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
141 Notes
142 -----
143 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
144 although this may change in the future.
145 - If `register` is false, all files are returned regardless of whether they are intended to be registered.
146 - If `inverted` is true, and files are found, the glob pattern is returned as missing.
147 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
148 patterns.
149 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
150 """
151 session_path = Path(session_path) 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
152 ok, actual_files, missing = False, [], None 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
153 if self.operator is None: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
154 if register and not self.register: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
155 return True, actual_files, missing 1apSTUVWuvwxgyzh%ABCDEFGHIJ(XYjkilRZ01234*5K6LMN78O9!#$PQ
156 actual_files = sorted(session_path.rglob(self.glob_pattern)) 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
157 # If no revision pattern provided and no files found, search for any revision
158 if self._identifiers[1] is None and not any(actual_files): 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
159 glob_pattern = str(PurePosixPath(self._identifiers[0], '#*#', self._identifiers[2])) 1a~cbsrnpSTUVWu'vwxgyzhABCDEFGHI,JXYjkdielZ01234)5K6LMN78O9!#PQ
160 actual_files = sorted(session_path.rglob(glob_pattern)) 1a~cbsrnpSTUVWu'vwxgyzhABCDEFGHI,JXYjkdielZ01234)5K6LMN78O9!#PQ
161 ok = any(actual_files) != self.inverted 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
162 if not ok: 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
163 missing = self.glob_pattern 1a~cbsrnpSTUVWu'vwxgyzhABCDEFGHI,JXYjkdielZ01234)5K6LMN78O9!#PQ
164 elif self.operator == 'and': 1cbsrnpgdfe
165 assert len(self._identifiers) == 2 1sgdfe
166 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers)) 1sgdfe
167 ok = all(_ok) 1sgdfe
168 actual_files = flatten(_actual_files) 1sgdfe
169 missing = set(filter(None, flatten(_missing))) 1sgdfe
170 elif self.operator == 'or': 1cbrnpg
171 assert len(self._identifiers) == 2 1cbrpg
172 missing = set() 1cbrpg
173 for d in self._identifiers: 1cbrpg
174 ok, actual_files, _missing = d.find_files(session_path) 1cbrpg
175 if ok: 1cbrpg
176 break 1cbrg
177 if missing is not None: 1cbrp
178 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1cbrp
179 elif self.operator == 'xor': 1n
180 assert len(self._identifiers) == 2 1n
181 _ok, _actual_files, _missing = zip(*map(lambda x: x.find_files(session_path), self._identifiers)) 1n
182 ok = sum(_ok) == 1 # and sum(map(bool, map(len, _actual_files))) == 1 1n
183 # Return only those datasets that are complete if OK
184 actual_files = _actual_files[_ok.index(True)] if ok else flatten(_actual_files) 1n
185 if ok: 1n
186 missing = set() 1n
187 elif all(_ok): # return all patterns if all present when only one should be, otherwise return all missing 1n
188 missing = set(flatten(self.glob_pattern)) 1n
189 elif not any(_ok): # return all missing glob patterns if none present 1n
190 missing = set(filter(None, flatten(_missing))) 1n
191 elif not isinstance(self.operator, str):
192 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
193 else:
194 raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
196 return ok, actual_files, missing 2a ~ c b s r n = p cbS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
198 def filter(self, session_datasets, **kwargs):
199 """Filter dataset frame by expected datasets.
201 Parameters
202 ----------
203 session_datasets : pandas.DataFrame
204 A data frame of session datasets.
205 kwargs
206 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc, and
207 ignore_qc_not_set.
209 Returns
210 -------
211 bool
212 True if the required dataset(s) are present in the data frame.
213 pandas.DataFrame
214 A filtered data frame of containing the expected dataset(s).
215 """
216 # ok, datasets = False, session_datasets.iloc[0:0]
217 if self.operator is None: 1+to
218 collection, revision, file = self._identifiers 1+to
219 if self._identifiers[1] is not None: 1+to
220 raise NotImplementedError('revisions not yet supported')
221 datasets = filter_datasets(session_datasets, file, collection, wildcards=True, assert_unique=self.unique, **kwargs) 1+to
222 ok = datasets.empty == self.inverted 1+to
223 elif self.operator == 'or': 1t
224 assert len(self._identifiers) == 2 1t
225 for d in self._identifiers: 1t
226 ok, datasets = d.filter(session_datasets, **kwargs) 1t
227 if ok: 1t
228 break 1t
229 elif self.operator == 'xor': 1t
230 assert len(self._identifiers) == 2 1t
231 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers)) 1t
232 ok = sum(_ok) == 1 1t
233 if ok: 1t
234 # Return only those datasets that are complete.
235 datasets = _datasets[_ok.index(True)] 1t
236 else:
237 datasets = pd.concat(_datasets) 1t
238 elif self.operator == 'and':
239 assert len(self._identifiers) == 2
240 _ok, _datasets = zip(*map(lambda x: x.filter(session_datasets, **kwargs), self._identifiers))
241 ok = all(_ok)
242 datasets = pd.concat(_datasets)
243 elif not isinstance(self.operator, str):
244 raise TypeError(f'Unrecognized operator type "{type(self.operator)}"')
245 else:
246 raise NotImplementedError(f'logical {self.operator.upper()} not implemented')
247 return ok, datasets 1+to
249 def _apply_op(self, op, other):
250 """Apply an operation between two datasets."""
251 # Assert both instances of Input or both instances of Output
252 if not isinstance(other, (self.__class__, tuple)): 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
253 raise TypeError(f'logical operations not supported between objects of type '
254 f'{self.__class__.__name__} and {other.__class__.__name__}')
255 # Assert operation supported
256 if op not in {'or', 'xor', 'and'}: 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
257 raise ValueError(op)
258 # Convert tuple to ExpectDataset instance
259 if isinstance(other, tuple): 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
260 D = (self.input if isinstance(self, Input) else self.output)
261 other = D(*other)
262 # Returned instance should only be optional if both datasets are optional
263 is_input = isinstance(self, Input) 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
264 if all(isinstance(x, OptionalDataset) for x in (self, other)): 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
265 D = OptionalInput if is_input else OptionalOutput 1mdqfe
266 else:
267 D = Input if is_input else Output 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbe l
268 # Instantiate 'empty' object
269 d = D(None, None) 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
270 d._identifiers = (self, other) 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
271 d.operator = op 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
272 return d 2a c b ? m s t @ r n p g [ ] ^ _ ` { | d abbbq f e l
274 def __invert__(self):
275 """Assert dataset doesn't exist on disk."""
276 obj = copy(self) 1ag[]^_`{|l
277 obj.inverted = not self.inverted 1ag[]^_`{|l
278 return obj 1ag[]^_`{|l
280 def __or__(self, b):
281 """Assert either dataset exists or another does, or both exist."""
282 return self._apply_op('or', b) 2a c b ? m t @ r p g [ ] ^ _ ` { | d abbbl
284 def __xor__(self, b):
285 """Assert either dataset exists or another does, not both."""
286 return self._apply_op('xor', b) 1tn
288 def __and__(self, b):
289 """Assert that a second dataset exists together with the first."""
290 return self._apply_op('and', b) 1amsg[]^_`{|dqfel
292 @staticmethod
293 def input(name, collection, required=True, register=False, **kwargs):
294 """
295 Create an expected input dataset.
297 By default, expected input datasets are not automatically registered.
299 Parameters
300 ----------
301 name : str
302 A dataset name or glob pattern.
303 collection : str, None
304 An ALF collection or pattern.
305 required : bool
306 Whether file must always be present, or is an optional dataset. Default is True.
307 register : bool
308 Whether to register the input file. Default is False for input files, True for output
309 files.
310 revision : str
311 An optional revision.
312 unique : bool
313 Whether identifier pattern is expected to match a single dataset or several.
315 Returns
316 -------
317 Input, OptionalInput
318 An instance of an Input dataset if required is true, otherwise an OptionalInput.
319 """
320 Class = Input if required else OptionalInput 2a ~ c b ? + m s t @ r n o p S T U V W u v w x g y z h [ ] ^ _ ` { | % A B C D E F G H I J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q
321 obj = Class(name, collection, register=register, **kwargs) 2a ~ c b ? + m s t @ r n o p S T U V W u v w x g y z h [ ] ^ _ ` { | % A B C D E F G H I J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q
322 return obj 2a ~ c b ? + m s t @ r n o p S T U V W u v w x g y z h [ ] ^ _ ` { | % A B C D E F G H I J ( X Y j k d abbbq f i e l R Z 0 1 2 3 4 * 5 K 6 L M N 7 8 O 9 ! # $ P Q
324 @staticmethod
325 def output(name, collection, required=True, register=True, **kwargs):
326 """
327 Create an expected output dataset.
329 By default, expected output datasets are automatically registered.
331 Parameters
332 ----------
333 name : str
334 A dataset name or glob pattern.
335 collection : str, None
336 An ALF collection or pattern.
337 required : bool
338 Whether file must always be present, or is an optional dataset. Default is True.
339 register : bool
340 Whether to register the output file. Default is False for input files, True for output
341 files.
342 revision : str
343 An optional revision.
344 unique : bool
345 Whether identifier pattern is expected to match a single dataset or several.
347 Returns
348 -------
349 Output, OptionalOutput
350 An instance of an Output dataset if required is true, otherwise an OptionalOutput.
351 """
352 Class = Output if required else OptionalOutput 1acb+;o=STUVWu-'v.wxg/yz:h%ABCDEFGHI,J(XYjkdielRZ01234)*5K6LMN78O9!#$PQ
353 obj = Class(name, collection, register=register, **kwargs) 1acb+;o=STUVWu-'v.wxg/yz:h%ABCDEFGHI,J(XYjkdielRZ01234)*5K6LMN78O9!#$PQ
354 return obj 1acb+;o=STUVWu-'v.wxg/yz:h%ABCDEFGHI,J(XYjkdielRZ01234)*5K6LMN78O9!#$PQ
357class OptionalDataset(ExpectedDataset):
358 """An expected dataset that is not strictly required."""
360 def find_files(self, session_path, register=False):
361 """Find files on disk.
363 Uses glob patterns to find dataset(s) on disk.
365 Parameters
366 ----------
367 session_path : pathlib.Path, str
368 A session path within which to glob for the dataset(s).
369 register : bool
370 Only return files intended to be registered.
372 Returns
373 -------
374 True
375 Always True as dataset is optional.
376 list of pathlib.Path
377 A list of matching dataset files.
378 missing, None, str, set of str
379 One or more glob patterns that either didn't yield files (or did in the case of inverted datasets).
381 Notes
382 -----
383 - Currently if `unique` is true and multiple files are found, all files are returned without an exception raised
384 although this may change in the future.
385 - If `register` is false, all files are returned regardless of whether they are intended to be registered.
386 - If `inverted` is true, and files are found, the glob pattern is returned as missing.
387 - If XOR, returns all patterns if all are present when only one should be, otherwise returns all missing
388 patterns.
389 - Missing (or unexpectedly found) patterns are returned despite the dataset being optional.
390 """
391 ok, actual_files, missing = super().find_files(session_path, register=register) 2c b p cbu - ' v . w x g / y z : h % A B C D E F G H I J d f i e ) K L M N O $ P Q
392 return True, actual_files, missing 2c b p cbu - ' v . w x g / y z : h % A B C D E F G H I J d f i e ) K L M N O $ P Q
394 def filter(self, session_datasets, **kwargs):
395 """Filter dataset frame by expected datasets.
397 Parameters
398 ----------
399 session_datasets : pandas.DataFrame
400 An data frame of session datasets.
401 kwargs
402 Extra arguments for `one.util.filter_datasets`, namely revision_last_before, qc,
403 ignore_qc_not_set, and assert_unique.
405 Returns
406 -------
407 True
408 Always True as dataset is optional.
409 pandas.DataFrame
410 A filtered data frame of containing the expected dataset(s).
411 """
412 ok, datasets = super().filter(session_datasets, **kwargs)
413 return True, datasets
416class Input(ExpectedDataset):
417 """An expected input dataset."""
418 pass
421class OptionalInput(Input, OptionalDataset):
422 """An optional expected input dataset."""
423 pass
426class Output(ExpectedDataset):
427 """An expected output dataset."""
428 pass
431class OptionalOutput(Output, OptionalDataset):
432 """An optional expected output dataset."""
433 pass
436def _parse_signature(signature):
437 """
438 Ensure all a signature's expected datasets are instances of ExpectedDataset.
440 Parameters
441 ----------
442 signature : Dict[str, list]
443 A dict with keys {'input_files', 'output_files'} containing lists of tuples and/or
444 ExpectedDataset instances.
446 Returns
447 -------
448 Dict[str, list of ExpectedDataset]
449 A dict containing all tuples converted to ExpectedDataset instances.
450 """
451 I, O = ExpectedDataset.input, ExpectedDataset.output # noqa 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
452 inputs = [i if isinstance(i, ExpectedDataset) else I(*i) for i in signature['input_files']] 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
453 outputs = [o if isinstance(o, ExpectedDataset) else O(*o) for o in signature['output_files']] 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
454 return {'input_files': inputs, 'output_files': outputs} 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k d q f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
457def dataset_from_name(name, datasets):
458 """
459 From a list of ExpectedDataset instances, return those that match a given name.
461 Parameters
462 ----------
463 name : str
464 The name of the dataset.
465 datasets : list of ExpectedDataset
466 A list of ExpectedDataset instances.
468 Returns
469 -------
470 list of ExpectedDataset
471 The ExpectedDataset instances that match the given name.
473 """
474 matches = [] 1cb?df
475 for dataset in datasets: 1cb?df
476 if dataset.operator is None: 1cb?df
477 if dataset._identifiers[2] == name: 1cb?df
478 matches.append(dataset) 1cb?df
479 else:
480 matches.extend(dataset_from_name(name, dataset._identifiers)) 1cb?df
481 return matches 1cb?df
484def update_collections(dataset, new_collection, substring=None, unique=None):
485 """
486 Update the collection of a dataset.
488 This updates all nested ExpectedDataset instances with the new collection and returns copies.
490 Parameters
491 ----------
492 dataset : ExpectedDataset
493 The dataset to update.
494 new_collection : str, list of str
495 The new collection or collections.
496 substring : str, optional
497 An optional substring in the collection to replace with new collection(s). If None, the
498 entire collection will be replaced.
500 Returns
501 -------
502 ExpectedDataset
503 A copy of the dataset with the updated collection(s).
505 """
506 after = ensure_list(new_collection) 1cbmjkdqfie
507 D = ExpectedDataset.input if isinstance(dataset, Input) else ExpectedDataset.output 1cbmjkdqfie
508 if dataset.operator is None: 1cbmjkdqfie
509 collection, revision, name = dataset.identifiers 1cbmjkdqfie
510 if revision is not None: 1cbmjkdqfie
511 raise NotImplementedError 1m
512 if substring: 1cbmjkdqfie
513 after = [(collection or '').replace(substring, x) or None for x in after] 1cbmjkdqfie
514 if unique is None: 1cbmjkdqfie
515 unique = [not set(name + (x or '')).intersection('*[?') for x in after] 1cbmjkdqfie
516 else:
517 unique = [unique] * len(after) 1m
518 register = dataset.register 1cbmjkdqfie
519 updated = D(name, after[0], not isinstance(dataset, OptionalDataset), register, unique=unique[0]) 1cbmjkdqfie
520 if len(after) > 1: 1cbmjkdqfie
521 for folder, unq in zip(after[1:], unique[1:]): 1mdqfe
522 updated &= D(name, folder, not isinstance(dataset, OptionalDataset), register, unique=unq) 1mdqfe
523 else:
524 updated = copy(dataset) 1cbmd
525 updated._identifiers = [update_collections(dd, new_collection, substring, unique) 1cbmd
526 for dd in updated._identifiers]
527 return updated 1cbmjkdqfie
530class DataHandler(abc.ABC):
531 def __init__(self, session_path, signature, one=None):
532 """
533 Base data handler class
534 :param session_path: path to session
535 :param signature: input and output file signatures
536 :param one: ONE instance
537 """
538 self.session_path = session_path 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
539 self.signature = _parse_signature(signature) 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
540 self.one = one 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
541 self.processed = {} # Map of filepaths and their processed records (e.g. upload receipts or Alyx records) 2a c b + ; o = db} ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l R Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # $ P Q
543 def setUp(self, **kwargs):
544 """Function to optionally overload to download required data to run task."""
545 pass 2dbR $
547 def getData(self, one=None):
548 """Finds the datasets required for task based on input signatures.
550 Parameters
551 ----------
552 one : one.api.One, optional
553 An instance of ONE to use.
555 Returns
556 -------
557 pandas.DataFrame, None
558 A data frame of required datasets. An empty frame is returned if no registered datasets are required,
559 while None is returned if no instance of ONE is set.
560 """
561 if self.one is None and one is None: 1+o
562 return 1+
563 one = one or self.one 1+o
564 session_datasets = one.list_datasets(one.path2eid(self.session_path), details=True) 1+o
565 dfs = [file.filter(session_datasets)[1] for file in self.signature['input_files']] 1+o
566 return one._cache.datasets.iloc[0:0] if len(dfs) == 0 else pd.concat(dfs).drop_duplicates() 1+o
568 def getOutputFiles(self):
569 """
570 Return a data frame of output datasets found on disk.
572 Returns
573 -------
574 pandas.DataFrame
575 A dataset data frame of datasets on disk that were specified in signature['output_files'].
576 """
577 assert self.session_path
578 # Next convert datasets to frame
579 # Create dataframe of all ALF datasets
580 df = _make_datasets_df(self.session_path, hash_files=False).set_index(['eid', 'id'])
581 # Filter outputs
582 if len(self.signature['output_files']) == 0:
583 return pd.DataFrame()
584 present = [file.filter(df)[1] for file in self.signature['output_files']]
585 return pd.concat(present).droplevel('eid')
587 def uploadData(self, outputs, version):
588 """
589 Function to optionally overload to upload and register data
590 :param outputs: output files from task to register
591 :param version: ibllib version
592 :return:
593 """
594 if isinstance(outputs, list): 1;}hR
595 versions = [version for _ in outputs] 1;}hR
596 else:
597 versions = [version]
599 return versions 1;}hR
601 def cleanUp(self, **kwargs):
602 """Function to optionally overload to clean up files after running task."""
603 pass 1}h
606class LocalDataHandler(DataHandler):
607 def __init__(self, session_path, signatures, one=None):
608 """
609 Data handler for running tasks locally, with no architecture or db connection
610 :param session_path: path to session
611 :param signature: input and output file signatures
612 :param one: ONE instance
613 """
614 super().__init__(session_path, signatures, one=one) 2dbR $
617class ServerDataHandler(DataHandler):
618 def __init__(self, session_path, signatures, one=None):
619 """
620 Data handler for running tasks on lab local servers when all data is available locally
622 :param session_path: path to session
623 :param signature: input and output file signatures
624 :param one: ONE instance
625 """
626 super().__init__(session_path, signatures, one=one) 2a c b + ; = } ebS T U V W u - ' v . w x g / y z : h % A B C D E F G H I , J ( X Y j k f i e l Z 0 1 2 3 4 ) * 5 K 6 L M N 7 8 O 9 ! # P Q
628 def uploadData(self, outputs, version, clobber=False, **kwargs):
629 """
630 Upload and/or register output data.
632 This is typically called by :meth:`ibllib.pipes.tasks.Task.register_datasets`.
634 Parameters
635 ----------
636 outputs : list of pathlib.Path
637 A set of ALF paths to register to Alyx.
638 version : str, list of str
639 The version of ibllib used to generate these output files.
640 clobber : bool
641 If True, re-upload outputs that have already been passed to this method.
642 kwargs
643 Optional keyword arguments for one.registration.RegistrationClient.register_files.
645 Returns
646 -------
647 list of dicts, dict
648 A list of newly created Alyx dataset records or the registration data if dry.
649 """
650 versions = super().uploadData(outputs, version) 1;}h
651 data_repo = get_local_data_repository(self.one.alyx) 1;}h
652 # If clobber = False, do not re-upload the outputs that have already been processed
653 outputs = ensure_list(outputs) 1;}h
654 to_upload = list(filter(None if clobber else lambda x: x not in self.processed, outputs)) 1;}h
655 records = register_dataset(to_upload, one=self.one, versions=versions, repository=data_repo, **kwargs) or [] 1;}h
656 if kwargs.get('dry', False): 1;}h
657 return records 1;
658 # Store processed outputs
659 self.processed.update({k: v for k, v in zip(to_upload, records) if v}) 1;}h
660 return [self.processed[x] for x in outputs if x in self.processed] 1;}h
662 def cleanUp(self, **_):
663 """Empties and returns the processed dataset mep."""
664 super().cleanUp() 1}h
665 processed = self.processed 1}h
666 self.processed = {} 1}h
667 return processed 1}h
670class ServerGlobusDataHandler(DataHandler):
671 def __init__(self, session_path, signatures, one=None):
672 """
673 Data handler for running tasks on lab local servers. Will download missing data from SDSC using Globus
675 :param session_path: path to session
676 :param signatures: input and output file signatures
677 :param one: ONE instance
678 """
679 from one.remote.globus import Globus, get_lab_from_endpoint_id # noqa
680 super().__init__(session_path, signatures, one=one)
681 self.globus = Globus(client_name='server', headless=True)
683 # on local servers set up the local root path manually as some have different globus config paths
684 self.globus.endpoints['local']['root_path'] = '/mnt/s0/Data/Subjects'
686 # Find the lab
687 self.lab = get_lab(self.session_path, self.one.alyx)
689 # For cortex lab we need to get the endpoint from the ibl alyx
690 if self.lab == 'cortexlab':
691 alyx = AlyxClient(base_url='https://alyx.internationalbrainlab.org', cache_rest=None)
692 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=alyx)
693 else:
694 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=self.one.alyx)
696 self.local_paths = []
698 def setUp(self, **_):
699 """Function to download necessary data to run tasks using globus-sdk."""
700 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
701 df = super().getData(one=ONE(base_url='https://alyx.internationalbrainlab.org', cache_rest=self.one.alyx.cache_mode))
702 else:
703 df = super().getData(one=self.one)
705 if len(df) == 0:
706 # If no datasets found in the cache only work off local file system do not attempt to
707 # download any missing data using Globus
708 return
710 # Check for space on local server. If less that 500 GB don't download new data
711 space_free = shutil.disk_usage(self.globus.endpoints['local']['root_path'])[2]
712 if space_free < 500e9:
713 _logger.warning('Space left on server is < 500GB, won\'t re-download new data')
714 return
716 rel_sess_path = '/'.join(self.session_path.parts[-3:])
717 target_paths = []
718 source_paths = []
719 for i, d in df.iterrows():
720 sess_path = Path(rel_sess_path).joinpath(d['rel_path'])
721 full_local_path = Path(self.globus.endpoints['local']['root_path']).joinpath(sess_path)
722 if not full_local_path.exists():
723 uuid = i
724 self.local_paths.append(full_local_path)
725 target_paths.append(sess_path)
726 source_paths.append(add_uuid_string(sess_path, uuid))
728 if len(target_paths) != 0:
729 ts = time()
730 for sp, tp in zip(source_paths, target_paths):
731 _logger.info(f'Downloading {sp} to {tp}')
732 self.globus.mv(f'flatiron_{self.lab}', 'local', source_paths, target_paths)
733 _logger.debug(f'Complete. Time elapsed {time() - ts}')
735 def uploadData(self, outputs, version, **kwargs):
736 """
737 Function to upload and register data of completed task
738 :param outputs: output files from task to register
739 :param version: ibllib version
740 :return: output info of registered datasets
741 """
742 versions = super().uploadData(outputs, version)
743 data_repo = get_local_data_repository(self.one.alyx)
744 return register_dataset(outputs, one=self.one, versions=versions, repository=data_repo, **kwargs)
746 def cleanUp(self, **_):
747 """Clean up, remove the files that were downloaded from Globus once task has completed."""
748 for file in self.local_paths:
749 os.unlink(file)
752class RemoteEC2DataHandler(DataHandler):
753 def __init__(self, session_path, signature, one=None):
754 """
755 Data handler for running tasks on remote compute node. Will download missing data via http using ONE
757 :param session_path: path to session
758 :param signature: input and output file signatures
759 :param one: ONE instance
760 """
761 super().__init__(session_path, signature, one=one)
763 def setUp(self, **_):
764 """
765 Function to download necessary data to run tasks using ONE
766 :return:
767 """
768 df = super().getData()
769 self.one._check_filesystem(df, check_hash=False)
771 def uploadData(self, outputs, version, **kwargs):
772 """
773 Function to upload and register data of completed task via S3 patcher
774 :param outputs: output files from task to register
775 :param version: ibllib version
776 :return: output info of registered datasets
777 """
778 versions = super().uploadData(outputs, version)
779 s3_patcher = S3Patcher(one=self.one)
780 return s3_patcher.patch_dataset(outputs, created_by=self.one.alyx.user,
781 versions=versions, **kwargs)
784class RemoteHttpDataHandler(DataHandler):
785 def __init__(self, session_path, signature, one=None):
786 """
787 Data handler for running tasks on remote compute node. Will download missing data via http using ONE
789 :param session_path: path to session
790 :param signature: input and output file signatures
791 :param one: ONE instance
792 """
793 super().__init__(session_path, signature, one=one)
795 def setUp(self, **_):
796 """
797 Function to download necessary data to run tasks using ONE
798 :return:
799 """
800 df = super().getData()
801 self.one._check_filesystem(df)
803 def uploadData(self, outputs, version, **kwargs):
804 """
805 Function to upload and register data of completed task via FTP patcher
806 :param outputs: output files from task to register
807 :param version: ibllib version
808 :return: output info of registered datasets
809 """
810 versions = super().uploadData(outputs, version)
811 ftp_patcher = FTPPatcher(one=self.one)
812 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
813 versions=versions, **kwargs)
816class RemoteAwsDataHandler(DataHandler):
817 def __init__(self, session_path, signature, one=None):
818 """
819 Data handler for running tasks on remote compute node.
821 This will download missing data from the private IBL S3 AWS data bucket. New datasets are
822 uploaded via Globus.
824 :param session_path: path to session
825 :param signature: input and output file signatures
826 :param one: ONE instance
827 """
828 super().__init__(session_path, signature, one=one)
829 self.local_paths = []
831 def setUp(self, **_):
832 """Function to download necessary data to run tasks using AWS boto3."""
833 df = super().getData()
834 self.local_paths = self.one._download_aws(map(lambda x: x[1], df.iterrows()))
836 def uploadData(self, outputs, version, **kwargs):
837 """
838 Function to upload and register data of completed task via FTP patcher
839 :param outputs: output files from task to register
840 :param version: ibllib version
841 :return: output info of registered datasets
842 """
843 # Set up Globus
844 from one.remote.globus import Globus # noqa
845 self.globus = Globus(client_name='server', headless=True)
846 self.lab = session_path_parts(self.session_path, as_dict=True)['lab']
847 if self.lab == 'cortexlab' and 'cortexlab' in self.one.alyx.base_url:
848 base_url = 'https://alyx.internationalbrainlab.org'
849 _logger.warning('Changing Alyx client to %s', base_url)
850 ac = AlyxClient(base_url=base_url, cache_rest=self.one.alyx.cache_mode)
851 else:
852 ac = self.one.alyx
853 self.globus.add_endpoint(f'flatiron_{self.lab}', alyx=ac)
855 # register datasets
856 versions = super().uploadData(outputs, version)
857 response = register_dataset(outputs, one=self.one, server_only=True, versions=versions, **kwargs)
859 # upload directly via globus
860 source_paths = []
861 target_paths = []
862 collections = {}
864 for dset, out in zip(response, outputs):
865 assert Path(out).name == dset['name']
866 # set flag to false
867 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
868 collection = '/'.join(fr['relative_path'].split('/')[:-1])
869 if collection in collections.keys():
870 collections[collection].update({f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}})
871 else:
872 collections[collection] = {f'{dset["name"]}': {'fr_id': fr['id'], 'size': dset['file_size']}}
874 # Set all exists status to false for server file records
875 self.one.alyx.rest('files', 'partial_update', id=fr['id'], data={'exists': False})
877 source_paths.append(out)
878 target_paths.append(add_uuid_string(fr['relative_path'], dset['id']))
880 if len(target_paths) != 0:
881 ts = time()
882 for sp, tp in zip(source_paths, target_paths):
883 _logger.info(f'Uploading {sp} to {tp}')
884 self.globus.mv('local', f'flatiron_{self.lab}', source_paths, target_paths)
885 _logger.debug(f'Complete. Time elapsed {time() - ts}')
887 for collection, files in collections.items():
888 globus_files = self.globus.ls(f'flatiron_{self.lab}', collection, remove_uuid=True, return_size=True)
889 file_names = [str(gl[0]) for gl in globus_files]
890 file_sizes = [gl[1] for gl in globus_files]
892 for name, details in files.items():
893 try:
894 idx = file_names.index(name)
895 size = file_sizes[idx]
896 if size == details['size']:
897 # update the file record if sizes match
898 self.one.alyx.rest('files', 'partial_update', id=details['fr_id'], data={'exists': True})
899 else:
900 _logger.warning(f'File {name} found on SDSC but sizes do not match')
901 except ValueError:
902 _logger.warning(f'File {name} not found on SDSC')
904 return response
906 # ftp_patcher = FTPPatcher(one=self.one)
907 # return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
908 # versions=versions, **kwargs)
910 def cleanUp(self, task):
911 """Clean up, remove the files that were downloaded from globus once task has completed."""
912 if task.status == 0:
913 for file in self.local_paths:
914 os.unlink(file)
917class RemoteGlobusDataHandler(DataHandler):
918 """
919 Data handler for running tasks on remote compute node. Will download missing data using Globus.
921 :param session_path: path to session
922 :param signature: input and output file signatures
923 :param one: ONE instance
924 """
925 def __init__(self, session_path, signature, one=None):
926 super().__init__(session_path, signature, one=one)
928 def setUp(self, **_):
929 """Function to download necessary data to run tasks using globus."""
930 # TODO
931 pass
933 def uploadData(self, outputs, version, **kwargs):
934 """
935 Function to upload and register data of completed task via FTP patcher
936 :param outputs: output files from task to register
937 :param version: ibllib version
938 :return: output info of registered datasets
939 """
940 versions = super().uploadData(outputs, version)
941 ftp_patcher = FTPPatcher(one=self.one)
942 return ftp_patcher.create_dataset(path=outputs, created_by=self.one.alyx.user,
943 versions=versions, **kwargs)
946class SDSCDataHandler(DataHandler):
947 """
948 Data handler for running tasks on SDSC compute node
950 :param session_path: path to session
951 :param signature: input and output file signatures
952 :param one: ONE instance
953 """
955 def __init__(self, session_path, signatures, one=None):
956 super().__init__(session_path, signatures, one=one) 1o
957 self.patch_path = os.getenv('SDSC_PATCH_PATH', SDSC_PATCH_PATH) 1o
958 self.root_path = SDSC_ROOT_PATH 1o
960 def setUp(self, task):
961 """Function to create symlinks to necessary data to run tasks."""
962 df = super().getData() 1o
964 SDSC_TMP = Path(self.patch_path.joinpath(task.__class__.__name__)) 1o
965 session_path = Path(get_alf_path(self.session_path)) 1o
966 for uuid, d in df.iterrows(): 1o
967 file_path = session_path / d['rel_path'] 1o
968 file_uuid = add_uuid_string(file_path, uuid) 1o
969 file_link = SDSC_TMP.joinpath(file_path) 1o
970 file_link.parent.mkdir(exist_ok=True, parents=True) 1o
971 try: 1o
972 file_link.symlink_to( 1o
973 Path(self.root_path.joinpath(file_uuid)))
974 except FileExistsError:
975 pass
977 task.session_path = SDSC_TMP.joinpath(session_path) 1o
979 def uploadData(self, outputs, version, **kwargs):
980 """
981 Function to upload and register data of completed task via SDSC patcher
982 :param outputs: output files from task to register
983 :param version: ibllib version
984 :return: output info of registered datasets
985 """
986 versions = super().uploadData(outputs, version)
987 sdsc_patcher = SDSCPatcher(one=self.one)
988 return sdsc_patcher.patch_datasets(outputs, dry=False, versions=versions, **kwargs)
990 def cleanUp(self, task):
991 """Function to clean up symlinks created to run task."""
992 assert self.patch_path.parts[0:4] == task.session_path.parts[0:4] 1o
993 shutil.rmtree(task.session_path) 1o
996class PopeyeDataHandler(SDSCDataHandler):
998 def __init__(self, session_path, signatures, one=None):
999 super().__init__(session_path, signatures, one=one)
1000 self.patch_path = Path(os.getenv('SDSC_PATCH_PATH', "/mnt/sdceph/users/ibl/data/quarantine/tasks/"))
1001 self.root_path = Path("/mnt/sdceph/users/ibl/data")
1003 def uploadData(self, outputs, version, **kwargs):
1004 raise NotImplementedError(
1005 "Cannot register data from Popeye. Login as Datauser and use the RegisterSpikeSortingSDSC task."
1006 )
1008 def cleanUp(self, **_):
1009 """Symlinks are preserved until registration."""
1010 pass