Coverage for ibllib/pipes/tasks.py: 87%
446 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1"""The abstract Pipeline and Task superclasses and concrete task runner.
3Examples
4--------
61. Running a task on your local computer.
7| Download: via ONE.
8| Upload: N/A.
10>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod')
11>>> task.run()
132. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab).
14| Download: all data expected to be present.
15| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
16 jobs on Alyx transfer the data.
18>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod
19>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001'
20>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
21>>> task.run()
22>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
243. Running a task on the local server that belongs to that subject and forcing redownload of
25missing data.
26| Download: via Globus (TODO we should change this to use boto3 as globus is slow).
27| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
28 jobs on Alyx transfer the data.
30>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
31>>> task.force = True
32>>> task.run()
33>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
34>>> task.cleanUp() # Delete the files that have been downloaded
364. Running a task on the local server that doesn't belongs to that subject
37(e.g SWC054 on angelakilab).
38| Download: via boto3, the AWS file records must exist and be set to exists = True.
39| Upload: via globus, automatically uploads the datasets directly to FlatIron via globus.
40 Creates FlatIron filerecords and sets these to True once the globus task has completed.
42>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod')
43>>> task.run()
44>>> task.register_datasets()
45>>> task.cleanUp() # Delete the files that have been downloaded
475. Running a task on SDSC.
48| Download: via creating symlink to relevant datasets on SDSC.
49| Upload: via copying files to relevant location on SDSC.
51>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod')
52>>> task.run()
53>>> response = task.register_datasets()
54>>> # Here we just make sure filerecords are all correct
55>>> for resp in response:
56... fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None)
57... if fi is not None:
58... if not fi['exists']:
59... one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True})
60...
61... aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None)
62... if aws is not None:
63... one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False})
64...
65... sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None)
66... if sr is not None:
67... one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False})
68... # Finally remove symlinks once the task has completed
69... task.cleanUp()
71"""
72from pathlib import Path
73import abc
74import logging
75import io
76import importlib
77import time
78from collections import OrderedDict
79import traceback
80import json
81from typing import List, Dict
83from graphviz import Digraph
85import ibllib
86from ibllib.oneibl import data_handlers
87from ibllib.oneibl.data_handlers import get_local_data_repository
88from ibllib.oneibl.registration import get_lab
89from iblutil.util import Bunch, flatten, ensure_list
90import one.params
91from one.api import ONE
92from one import webclient
93import one.alf.io as alfio
94from one.alf.path import ALFPath
96_logger = logging.getLogger(__name__)
97TASK_STATUS_SET = {'Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete', 'Abandoned'}
100class Task(abc.ABC):
101 log = '' # placeholder to keep the log of the task for registration
102 cpu = 1 # CPU resource
103 gpu = 0 # GPU resources: as of now, either 0 or 1
104 io_charge = 5 # integer percentage
105 priority = 30 # integer percentage, 100 means highest priority
106 ram = 4 # RAM needed to run (GB)
107 one = None # one instance (optional)
108 level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task
109 outputs = None # placeholder for a list of Path containing output files
110 time_elapsed_secs = None
111 time_out_secs = 3600 * 2 # time-out after which a task is considered dead
112 version = ibllib.__version__
113 force = False # whether to re-download missing input files on local server if not present
114 job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services
115 env = None # the environment name within which to run the task (NB: the env is not activated automatically!)
116 on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue')
118 def __init__(self, session_path, parents=None, taskid=None, one=None,
119 machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs):
120 """
121 Base task class
122 :param session_path: session path
123 :param parents: parents
124 :param taskid: alyx task id
125 :param one: one instance
126 :param machine:
127 :param clobber: whether or not to overwrite log on rerun
128 :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node,
129 data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS),
130 or 'SDSC' (SDSC flatiron compute node)
131 :param scratch_folder: optional: Path where to write intermediate temporary data
132 :param args: running arguments
133 """
134 self.on_error = on_error 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
135 self.taskid = taskid 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
136 self.one = one 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
137 self.session_path = session_path 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
138 self.register_kwargs = {} 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
139 if parents: 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
140 self.parents = parents 2b ~ aba d c ) * + , - . / : ; =
141 self.level = max(p.level for p in self.parents) + 1 2b ~ aba d c ) * + , - . / : ; =
142 else:
143 self.parents = [] 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
144 self.machine = machine 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
145 self.clobber = clobber 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
146 self.location = location 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
147 self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
148 self.scratch_folder = scratch_folder 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
149 self.kwargs = kwargs 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb
151 @property
152 def signature(self) -> Dict[str, List]:
153 """
154 The signature of the task specifies inputs and outputs for the given task.
155 For some tasks it is dynamic and calculated. The legacy code specifies those as tuples.
156 The preferred way is to use the ExpectedDataset input and output constructors.
158 I = ExpectedDataset.input
159 O = ExpectedDataset.output
160 signature = {
161 'input_files': [
162 I(name='extract.me.npy', collection='raw_data', required=True, register=False, unique=False),
163 ],
164 'output_files': [
165 O(name='look.atme.npy', collection='shiny_data', required=True, register=True, unique=False)
166 ]}
167 is equivalent to:
168 signature = {
169 'input_files': [('extract.me.npy', 'raw_data', True, True)],
170 'output_files': [('look.atme.npy', 'shiny_data', True)],
171 }
172 :return:
173 """
174 return {'input_files': [], 'output_files': []} 1fad
176 @property
177 def name(self):
178 return self.__class__.__name__ 2b bbf a d ? @ cbc ) * + , - . / : ; = e
180 def path2eid(self):
181 """
182 Fetch the experiment UUID from the Task session path, without using the REST cache.
184 This method ensures that the eid will be returned for newly created sessions.
186 Returns
187 -------
188 str
189 The experiment UUID corresponding to the session path.
190 """
191 assert self.session_path and self.one and not self.one.offline 1_
192 with webclient.no_cache(self.one.alyx): 1_
193 return self.one.path2eid(self.session_path, query_type='remote') 1_
195 def run(self, **kwargs):
196 """
197 --- do not overload, see _run() below---
198 wraps the _run() method with
199 - error management
200 - logging to variable
201 - writing a lock file if the GPU is used
202 - labels the status property of the object. The status value is labeled as:
203 0: Complete
204 -1: Errored
205 -2: Didn't run as a lock was encountered
206 -3: Incomplete
208 Notes
209 -----
210 - The `run_alyx_task` will update the Alyx Task status depending on both status and outputs
211 (i.e. the output of subclassed `_run` method):
212 Assuming a return value of 0... if Task.outputs is None, the status will be Empty;
213 if Task.outputs is a list (empty or otherwise), the status will be Complete.
214 """
215 # if task id of one properties are not available, local run only without alyx
216 use_alyx = self.one is not None and self.taskid is not None 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
217 if use_alyx: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
218 # check that alyx user is logged in
219 if not self.one.alyx.is_logged_in: 1ac
220 self.one.alyx.authenticate()
221 tdict = self.one.alyx.rest('tasks', 'partial_update', id=self.taskid, 1ac
222 data={'status': 'Started'})
223 self.log = ('' if not tdict['log'] else tdict['log'] + 1ac
224 '\n\n=============================RERUN=============================\n')
226 # Setup the console handler with a StringIO object
227 logger_level = _logger.level 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
228 log_capture_string = io.StringIO() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
229 ch = logging.StreamHandler(log_capture_string) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
230 str_format = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
231 ch.setFormatter(logging.Formatter(str_format)) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
232 _logger.parent.addHandler(ch) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
233 _logger.parent.setLevel(logging.INFO) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
234 _logger.info(f'Starting job {self.__class__}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
235 if self.machine: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
236 _logger.info(f'Running on machine: {self.machine}') 1a
237 _logger.info(f'running ibllib version {ibllib.__version__}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
238 # setup
239 start_time = time.time() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
240 try: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
241 setup = self.setUp(**kwargs) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
242 self.outputs = self._input_files_to_register() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
243 _logger.info(f'Setup value is: {setup}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
244 self.status = 0 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
245 if not setup: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
246 # case where outputs are present but don't have input files locally to rerun task
247 # label task as complete
248 _, outputs = self.assert_expected_outputs()
249 else:
250 # run task
251 if self.gpu >= 1: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
252 if not self._creates_lock(): 1fae
253 self.status = -2 1fa
254 _logger.info(f'Job {self.__class__} exited as a lock was found at {self._lock_file_path()}') 1fa
255 new_log = log_capture_string.getvalue() 1fa
256 self.log = new_log if self.clobber else self.log + new_log 1fa
257 _logger.removeHandler(ch) 1fa
258 ch.close() 1fa
259 if self.on_error == 'raise': 1fa
260 raise FileExistsError(f'Job {self.__class__} exited as a lock was found at {self._lock_file_path()}')
261 return self.status 1fa
262 outputs = self._run(**kwargs) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
263 _logger.info(f'Job {self.__class__} complete') 1b$fadSTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!(R#
264 if outputs is None: 1b$fadSTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!(R#
265 # If run method returns None and no raw input files were registered, self.outputs
266 # should be None, meaning task will have an 'Empty' status. If run method returns
267 # a list, the status will be 'Complete' regardless of whether there are output files.
268 self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered 1adNR
269 else:
270 self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register 1b$faSTUlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!(#
271 except Exception as e: 1aKjLM
272 _logger.error(traceback.format_exc()) 1aKjLM
273 _logger.info(f'Job {self.__class__} errored') 1aKjLM
274 self.status = -1 1aKjLM
275 if self.on_error == 'raise': 1aKjLM
276 raise e
278 self.time_elapsed_secs = time.time() - start_time 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
279 # log the outputs
280 if isinstance(self.outputs, list): 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
281 nout = len(self.outputs) 1b$faSKTUlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(#
282 elif self.outputs is None: 1adNR
283 nout = 0 1adNR
284 else:
285 nout = 1
287 _logger.info(f'N outputs: {nout}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
288 _logger.info(f'--- {self.time_elapsed_secs} seconds run-time ---') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
289 # after the run, capture the log output, amend to any existing logs if not overwrite
290 new_log = log_capture_string.getvalue() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
291 self.log = new_log if self.clobber else self.log + new_log 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
292 _logger.removeHandler(ch) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
293 ch.close() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
294 _logger.setLevel(logger_level) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
295 # tear down
296 self.tearDown() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
297 return self.status 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
299 def register_datasets(self, **kwargs):
300 """
301 Register output datasets from the task to Alyx.
303 Parameters
304 ----------
305 kwargs
306 Directly passed to the `DataHandler.upload_data` method.
308 Returns
309 -------
310 list
311 The output of the `DataHandler.upload_data` method, e.g. a list of registered datasets.
312 """
313 _ = self.register_images() 1acH
314 return self.data_handler.uploadData(self.outputs, self.version, **kwargs) 1acH
316 def _input_files_to_register(self, assert_all_exist=False):
317 """
318 Return input datasets to be registered to Alyx.
320 These datasets are typically raw data files and are registered even if the task fails to complete.
322 Parameters
323 ----------
324 assert_all_exist
325 Raise AssertionError if not all required input datasets exist on disk.
327 Returns
328 -------
329 list of pathlib.Path
330 A list of input files to register.
332 # TODO This method currently does not support wildcards
333 """
334 I = data_handlers.ExpectedDataset.input # noqa 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
335 try: 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
336 # Ensure all input files are ExpectedDataset instances
337 input_files = [I(*i) if isinstance(i, tuple) else i for i in self.input_files or []] 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
338 except AttributeError: 1^
339 raise RuntimeError('Task.setUp must be run before calling this method.') 1^
340 to_register, missing = [], set() 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
341 for dataset in input_files: 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
342 ok, filepaths, _missing = dataset.find_files(self.session_path, register=True) 1b^SKTUNlmnopqrcxyhVWiXgEFYZ0Gj1O234H5I67J8esktuvwP9Q!LM(R#
343 to_register.extend(filepaths) 1b^SKTUNlmnopqrcxyhVWiXgEFYZ0Gj1O234H5I67J8esktuvwP9Q!LM(R#
344 if not ok and _missing is not None: 1b^SKTUNlmnopqrcxyhVWiXgEFYZ0Gj1O234H5I67J8esktuvwP9Q!LM(R#
345 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1^
346 if any(missing): # NB: These are either glob patterns that have no matches or match files that should be absent 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
347 missing_str = ', '.join(missing) 1^
348 if assert_all_exist: 1^
349 raise AssertionError(f'Missing required input files: {missing_str}') 1^
350 else:
351 _logger.error(f'Missing required input files: {missing_str}') 1^
352 return to_register 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
354 def register_images(self, **kwargs):
355 """
356 Registers images to alyx database
357 :return:
358 """
359 if self.one and len(self.plot_tasks) > 0: 1acH
360 for plot_task in self.plot_tasks:
361 try:
362 _ = plot_task.register_images(widths=['orig'])
363 except Exception:
364 _logger.error(traceback.format_exc())
365 continue
367 def rerun(self):
368 self.run(overwrite=True)
370 def get_signatures(self, **kwargs):
371 """
372 This is the default but should be overwritten for each task
373 :return:
374 """
375 signature = data_handlers._parse_signature(self.signature) 1b][`{$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1|}O234H5I67J8'esktuvwP9Q!LM(R#
376 self.input_files = signature['input_files'] 1b][`{$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1|}O234H5I67J8'esktuvwP9Q!LM(R#
377 self.output_files = signature['output_files'] 1b][`{$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1|}O234H5I67J8'esktuvwP9Q!LM(R#
379 @abc.abstractmethod
380 def _run(self, overwrite=False):
381 """Execute main task code.
383 This method contains a task's principal data processing code and should implemented
384 by each subclass.
386 Parameters
387 ----------
388 overwrite : bool
389 When false (default), the task may re-use any intermediate data from a previous run.
390 When true, the task will disregard or delete any intermediate files before running.
392 Returns
393 -------
394 pathlib.Path, list of pathlib.Path, None
395 One or more files to be registered, or an empty list if no files are registered.
396 Within the pipeline, there is a distinction between a job that returns an empty list
397 and a job that returns None. If the function returns None, the job will be labeled as
398 "empty" status in the database, otherwise, the job has an expected behaviour of not
399 returning any dataset.
400 """
402 def setUp(self, **kwargs):
403 """Get the data handler and ensure all data is available locally to run task."""
404 if self.location == 'server': 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
405 self.get_signatures(**kwargs) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
407 input_status, _ = self.assert_expected_inputs(raise_error=False) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
408 output_status, _ = self.assert_expected(self.output_files, silent=True) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
410 if input_status: 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
411 self.data_handler = self.get_data_handler() 1b][$adSKTUNlzAmBnopCqrDcxyVWXg%YZ0j1O2345678'esktuvwP9Q!LMR#
412 _logger.info('All input files found: running task') 1b][$adSKTUNlzAmBnopCqrDcxyVWXg%YZ0j1O2345678'esktuvwP9Q!LMR#
413 return True 1b][$adSKTUNlzAmBnopCqrDcxyVWXg%YZ0j1O2345678'esktuvwP9Q!LMR#
415 if not self.force: 1chigEFGIJ
416 self.data_handler = self.get_data_handler() 1chigEFGIJ
417 _logger.warning('Not all input files found locally: will still attempt to rerun task') 1chigEFGIJ
418 # TODO in the future once we are sure that input output task signatures work properly should return False
419 # _logger.info('All output files found but input files required not available locally: task not rerun')
420 return True 1chigEFGIJ
421 else:
422 # Attempts to download missing data using globus
423 _logger.info('Not all input files found locally: attempting to re-download required files')
424 self.data_handler = self.get_data_handler(location='serverglobus')
425 self.data_handler.setUp(task=self)
426 # Double check we now have the required files to run the task
427 # TODO in future should raise error if even after downloading don't have the correct files
428 self.assert_expected_inputs(raise_error=False)
429 return True
430 else:
431 self.data_handler = self.get_data_handler() 1fH(
432 self.data_handler.setUp(task=self) 1fH(
433 self.get_signatures(**kwargs) 1fH(
434 self.assert_expected_inputs(raise_error=False) 1fH(
435 return True 1fH(
437 def tearDown(self):
438 """
439 Function after runs()
440 Does not run if a lock is encountered by the task (status -2)
441 """
442 if self.gpu >= 1: 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
443 if self._lock_file_path().exists(): 1fe
444 self._lock_file_path().unlink() 1fe
446 def cleanUp(self):
447 """
448 Function to optionally overload to clean up
449 :return:
450 """
451 self.data_handler.cleanUp(task=self) 1ac
453 def assert_expected_outputs(self, raise_error=True):
454 """
455 After a run, asserts that all signature files are present at least once in the output files
456 Mainly useful for integration tests
457 :return:
458 """
459 assert self.status == 0 1lzAmBnopCqrDHesktuvw
460 _logger.info('Checking output files') 1lzAmBnopCqrDHesktuvw
461 everything_is_fine, files = self.assert_expected(self.output_files) 1lzAmBnopCqrDHesktuvw
463 if not everything_is_fine: 1lzAmBnopCqrDHesktuvw
464 for out in self.outputs:
465 _logger.error(f'{out}')
466 if raise_error:
467 raise FileNotFoundError('Missing outputs after task completion')
469 return everything_is_fine, files 1lzAmBnopCqrDHesktuvw
471 def assert_expected_inputs(self, raise_error=True, raise_ambiguous=False):
472 """
473 Check that all the files necessary to run the task have been are present on disk.
475 Parameters
476 ----------
477 raise_error : bool
478 If true, raise FileNotFoundError if required files are missing.
480 Returns
481 -------
482 bool
483 True if all required files were found.
484 list of pathlib.Path
485 A list of file paths that exist on disk.
486 """
487 _logger.info('Checking input files') 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
488 everything_is_fine, files = self.assert_expected(self.input_files) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
490 if not everything_is_fine and raise_error: 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
491 raise FileNotFoundError('Missing inputs to run task')
493 # Check for duplicate datasets that may complicate extraction.
494 # Some sessions may contain revisions and without ONE it's difficult to determine which
495 # are the default datasets. Likewise SDSC may contain multiple datasets with different
496 # UUIDs in the name after patching data.
497 valid_alf_files = filter(ALFPath.is_valid_alf, files) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
498 variant_datasets = alfio.find_variants(valid_alf_files, extra=False) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
499 if len(variant_datasets) < len(files): 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
500 _logger.warning('Some files are not ALF datasets and will not be checked for ambiguity') 1[jkPQ
501 if any(map(len, variant_datasets.values())): 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
502 # Keep those with variants and make paths relative to session for logging purposes
503 to_frag = lambda x: x.relative_to_session().as_posix() # noqa 1xyhig
504 ambiguous = { 1xyhig
505 to_frag(k): [to_frag(x) for x in v]
506 for k, v in variant_datasets.items() if any(v)}
507 _logger.error('Ambiguous input datasets found: %s', ambiguous) 1xyhig
509 if raise_ambiguous or self.location == 'sdsc': # take no chances on SDSC 1xyhig
510 # This could be mitigated if loading with data OneSDSC
511 raise NotImplementedError(
512 'Multiple variant datasets found. Loading for these is undefined.')
514 return everything_is_fine, files 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
516 def assert_expected(self, expected_files, silent=False):
517 """
518 Assert that expected files are present.
520 Parameters
521 ----------
522 expected_files : list of ExpectedDataset
523 A list of expected files in the form (file_pattern_str, collection_str, required_bool).
524 silent : bool
525 If true, log an error if any required files are not found.
527 Returns
528 -------
529 bool
530 True if all required files were found.
531 list of pathlib.Path
532 A list of file paths that exist on disk.
533 """
534 if not any(expected_files): 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
535 return True, [] 1$fadzABCDc%O'
536 ok, actual_files, missing = zip(*(x.find_files(self.session_path) for x in expected_files)) 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
537 everything_is_fine = all(ok) 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
538 # For unknown reasons only the first file of each expected dataset was returned and this
539 # behaviour was preserved after refactoring the code
540 files = [file_list[0] for file_list in actual_files if len(file_list) > 0] 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
541 if not everything_is_fine and not silent: 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
542 for missing_pattern in filter(None, flatten(missing)): 1chigEFGIJ
543 _logger.error('Signature file pattern %s not found', missing_pattern) 1chigEFGIJ
545 return everything_is_fine, files 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
547 def get_data_handler(self, location=None):
548 """
549 Gets the relevant data handler based on location argument
550 :return:
551 """
552 location = str.lower(location or self.location) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
553 if location == 'local': 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#
554 return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one) 1fH(
555 self.one = self.one or ONE() 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
556 if location == 'server': 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
557 dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
558 elif location == 'serverglobus':
559 dhandler = data_handlers.ServerGlobusDataHandler(self.session_path, self.signature, one=self.one)
560 elif location == 'remote':
561 dhandler = data_handlers.RemoteHttpDataHandler(self.session_path, self.signature, one=self.one)
562 elif location == 'aws':
563 dhandler = data_handlers.RemoteAwsDataHandler(self.session_path, self.signature, one=self.one)
564 elif location == 'sdsc':
565 dhandler = data_handlers.SDSCDataHandler(self.session_path, self.signature, one=self.one)
566 elif location == 'popeye':
567 dhandler = data_handlers.PopeyeDataHandler(self.session_path, self.signature, one=self.one)
568 elif location == 'ec2':
569 dhandler = data_handlers.RemoteEC2DataHandler(self.session_path, self.signature, one=self.one)
570 else:
571 raise ValueError(f'Unknown location "{location}"')
572 return dhandler 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#
574 @staticmethod
575 def make_lock_file(taskname='', time_out_secs=7200):
576 """Creates a GPU lock file with a timeout of"""
577 d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs} 1fae
578 with open(Task._lock_file_path(), 'w+') as fid: 1fae
579 json.dump(d, fid) 1fae
580 return d 1fae
582 @staticmethod
583 def _lock_file_path():
584 """the lock file is in ~/.one/gpu.lock"""
585 folder = Path.home().joinpath('.one') 1e
586 folder.mkdir(exist_ok=True) 1e
587 return folder.joinpath('gpu.lock') 1e
589 def _make_lock_file(self):
590 """creates a lock file with the current time"""
591 return Task.make_lock_file(self.name, self.time_out_secs) 1fe
593 def is_locked(self):
594 """Checks if there is a lock file for this given task"""
595 lock_file = self._lock_file_path() 1fae
596 if not lock_file.exists(): 1fae
597 return False 1fe
599 with open(lock_file) as fid: 1fa
600 d = json.load(fid) 1fa
601 now = time.time() 1fa
602 if (now - d['start']) > d['time_out_secs']: 1fa
603 lock_file.unlink() 1f
604 return False 1f
605 else:
606 return True 1fa
608 def _creates_lock(self):
609 if self.location == "popeye": 1fae
610 return True
611 if self.is_locked(): 1fae
612 return False 1fa
613 else:
614 self._make_lock_file() 1fe
615 return True 1fe
618class Pipeline(abc.ABC):
619 """
620 Pipeline class: collection of related and potentially interdependent tasks
621 """
622 tasks = OrderedDict()
623 one = None
625 def __init__(self, session_path=None, one=None, eid=None):
626 assert session_path or eid 1badc)*+,-./:;=
627 self.one = one 1badc)*+,-./:;=
628 if one and one.alyx.cache_mode and one.alyx.default_expiry.seconds > 1: 1badc)*+,-./:;=
629 _logger.warning('Alyx client REST cache active; this may cause issues with jobs') 1ba
630 self.eid = eid 1badc)*+,-./:;=
631 if self.one and not self.one.offline: 1badc)*+,-./:;=
632 self.data_repo = get_local_data_repository(self.one.alyx) 1badc
633 else:
634 self.data_repo = None 1)*+,-./:;=
636 if session_path: 1badc)*+,-./:;=
637 self.session_path = session_path 1badc)*+,-./:;=
638 if not self.eid: 1badc)*+,-./:;=
639 # eID for newer sessions may not be in cache so use remote query
640 self.eid = one.path2eid(session_path, query_type='remote') if self.one else None 1dc)*+,-./:;=
641 self.label = self.__module__ + '.' + type(self).__name__ 1badc)*+,-./:;=
643 @staticmethod
644 def _get_exec_name(obj):
645 """
646 For a class, get the executable name as it should be stored in Alyx. When the class
647 is created dynamically using the type() built-in function, need to revert to the base
648 class to be able to re-instantiate the class from the alyx dictionary on the client side
649 :param obj:
650 :return: string containing the full module plus class name
651 """
652 if obj.__module__ == 'abc': 2a d ? @ cbc ) * + , - . / : ; =
653 exec_name = f'{obj.__class__.__base__.__module__}.{obj.__class__.__base__.__name__}' 2d ? @ cbc ) * + , - . / : ; =
654 else:
655 exec_name = f'{obj.__module__}.{obj.name}' 1a
656 return exec_name 2a d ? @ cbc ) * + , - . / : ; =
658 def make_graph(self, out_dir=None, show=True):
659 if not out_dir: 1a
660 out_dir = self.one.alyx.cache_dir if self.one else one.params.get().CACHE_DIR 1a
661 m = Digraph('G', filename=str(Path(out_dir).joinpath(self.__module__ + '_graphs.gv'))) 1a
662 m.attr(rankdir='TD') 1a
664 e = Digraph(name='cluster_' + self.label) 1a
665 e.attr('node', shape='box') 1a
666 e.node('root', label=self.label) 1a
668 e.attr('node', shape='ellipse') 1a
669 for k in self.tasks: 1a
670 j = self.tasks[k] 1a
671 if len(j.parents) == 0: 1a
672 e.edge('root', j.name) 1a
673 else:
674 [e.edge(p.name, j.name) for p in j.parents] 1a
676 m.subgraph(e) 1a
677 m.attr(label=r'\n\Pre-processing\n') 1a
678 m.attr(fontsize='20') 1a
679 if show: 1a
680 m.view()
681 return m 1a
683 def create_alyx_tasks(self, rerun__status__in=None, tasks_list=None):
684 """
685 Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session.
687 If the jobs already exist, they are left untouched. The re-run parameter will re-init the
688 job by emptying the log and set the status to Waiting.
690 Parameters
691 ----------
692 rerun__status__in : list, str
693 To re-run tasks if they already exist, specify one or more statuses strings to will be
694 re-run, or '__all__' to re-run all tasks.
695 tasks_list : list
696 The list of tasks to create on Alyx. If None, uses self.tasks.
698 Returns
699 -------
700 list
701 List of Alyx task dictionaries (existing and/or created).
702 """
703 rerun__status__in = ensure_list(rerun__status__in) 1ad?@c
704 if '__all__' in rerun__status__in: 1ad?@c
705 rerun__status__in = [x for x in TASK_STATUS_SET if x != 'Abandoned']
706 assert self.eid 1ad?@c
707 if self.one is None: 1ad?@c
708 _logger.warning('No ONE instance found for Alyx connection, set the one property')
709 return
710 tasks_alyx_pre = self.one.alyx.rest('tasks', 'list', session=self.eid, graph=self.name, no_cache=True) 1ad?@c
711 tasks_alyx = [] 1ad?@c
712 # creates all the tasks by iterating through the ordered dict
714 if tasks_list is not None: 1ad?@c
715 task_items = tasks_list
716 # need to add in the session eid and the parents
717 else:
718 task_items = self.tasks.values() 1ad?@c
720 for t in task_items: 1ad?@c
721 # get the parents' alyx ids to reference in the database
722 if isinstance(t, dict): 1ad?@c
723 t = Bunch(t)
724 executable = t.executable
725 arguments = t.arguments
726 t['time_out_secs'] = t['time_out_sec']
727 if len(t.parents) > 0:
728 pnames = t.parents
729 else:
730 executable = self._get_exec_name(t) 1ad?@c
731 arguments = t.kwargs 1ad?@c
732 if len(t.parents): 1ad?@c
733 pnames = [p.name for p in t.parents] 1ad?@c
735 if len(t.parents): 1ad?@c
736 parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames] 1ad?@c
737 else:
738 parents_ids = [] 1ad?@c
740 task_dict = {'executable': executable, 'priority': t.priority, 1ad?@c
741 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu,
742 'ram': t.ram, 'module': self.label, 'parents': parents_ids,
743 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid,
744 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name,
745 'arguments': arguments}
746 if self.data_repo: 1ad?@c
747 task_dict.update({'data_repository': self.data_repo})
748 # if the task already exists, patch it otherwise, create it
749 talyx = next(filter(lambda x: x['name'] == t.name, tasks_alyx_pre), []) 1ad?@c
750 if len(talyx) == 0: 1ad?@c
751 talyx = self.one.alyx.rest('tasks', 'create', data=task_dict) 1ad?@c
752 elif talyx['status'] in rerun__status__in: 1a?@
753 talyx = self.one.alyx.rest('tasks', 'partial_update', id=talyx['id'], data=task_dict)
754 tasks_alyx.append(talyx) 1ad?@c
755 return tasks_alyx 1ad?@c
757 def create_tasks_list_from_pipeline(self):
758 """
759 From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to
760 create alyx tasks
761 :return:
762 """
763 tasks_list = [] 2? @ cb) * + , - . / : ; =
764 for k, t in self.tasks.items(): 2? @ cb) * + , - . / : ; =
765 # get the parents' alyx ids to reference in the database
766 if len(t.parents): 2? @ cb) * + , - . / : ; =
767 parent_names = [p.name for p in t.parents] 2? @ cb) * + , - . / : ; =
768 else:
769 parent_names = [] 2? @ cb) * + , - . / : ; =
771 task_dict = {'executable': self._get_exec_name(t), 'priority': t.priority, 2? @ cb) * + , - . / : ; =
772 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu,
773 'ram': t.ram, 'module': self.label, 'parents': parent_names,
774 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid,
775 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name,
776 'arguments': t.kwargs}
777 if self.data_repo: 2? @ cb) * + , - . / : ; =
778 task_dict.update({'data_repository': self.data_repo})
780 tasks_list.append(task_dict) 2? @ cb) * + , - . / : ; =
782 return tasks_list 2? @ cb) * + , - . / : ; =
784 def run(self, status__in=('Waiting',), machine=None, clobber=True, **kwargs):
785 """
786 Get all the session related jobs from alyx and run them
787 :param status__in: lists of status strings to run in
788 ['Waiting', 'Started', 'Errored', 'Empty', 'Complete']
789 :param machine: string identifying the machine the task is run on, optional
790 :param clobber: bool, if True any existing logs are overwritten, default is True
791 :param kwargs: arguments passed downstream to run_alyx_task
792 :return: jalyx: list of REST dictionaries of the job endpoints
793 :return: job_deck: list of REST dictionaries of the jobs endpoints
794 :return: all_datasets: list of REST dictionaries of the dataset endpoints
795 """
796 assert self.session_path, 'Pipeline object has to be declared with a session path to run' 1a
797 if self.one is None: 1a
798 _logger.warning('No ONE instance found for Alyx connection, set the one property')
799 return
800 task_deck = self.one.alyx.rest('tasks', 'list', session=self.eid, no_cache=True) 1a
801 # [(t['name'], t['level']) for t in task_deck]
802 all_datasets = [] 1a
803 for i, j in enumerate(task_deck): 1a
804 if j['status'] not in status__in: 1a
805 continue 1a
806 # here we update the status in-place to avoid another hit to the database
807 task_deck[i], dsets = run_alyx_task(tdict=j, session_path=self.session_path, 1a
808 one=self.one, job_deck=task_deck,
809 machine=machine, clobber=clobber, **kwargs)
810 if dsets is not None: 1a
811 all_datasets.extend(dsets) 1a
812 return task_deck, all_datasets 1a
814 def rerun_failed(self, **kwargs):
815 return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs) 1a
817 def rerun(self, **kwargs):
819 return self.run(status__in=[x for x in TASK_STATUS_SET if x != 'Abandoned'], **kwargs)
821 @property
822 def name(self):
823 return self.__class__.__name__ 2a d ? @ cbc ) * + , - . / : ; =
826def str2class(task_executable: str):
827 """
828 Convert task name to class.
830 Parameters
831 ----------
832 task_executable : str
833 A Task class name, e.g. 'ibllib.pipes.behavior_tasks.TrialRegisterRaw'.
835 Returns
836 -------
837 class
838 The imported class.
839 """
840 strmodule, strclass = task_executable.rsplit('.', 1) 2wbxba ybc
841 return getattr(importlib.import_module(strmodule), strclass) 2wbxba ybc
844def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None,
845 max_md5_size=None, machine=None, clobber=True, location='server', mode='log'):
846 """
847 Runs a single Alyx job and registers output datasets.
849 Parameters
850 ----------
851 tdict : dict
852 An Alyx task dictionary to instantiate and run.
853 session_path : str, pathlib.Path
854 A session path containing the task input data.
855 one : one.api.OneAlyx
856 An instance of ONE.
857 job_deck : list of dict, optional
858 A list of all tasks in the same pipeline. If None, queries Alyx to get this.
859 max_md5_size : int, optional
860 An optional maximum file size in bytes. Files with sizes larger than this will not have
861 their MD5 checksum calculated to save time.
862 machine : str, optional
863 A string identifying the machine the task is run on.
864 clobber : bool, default=True
865 If true any existing logs are overwritten on Alyx.
866 location : {'remote', 'server', 'sdsc', 'aws'}
867 Where you are running the task, 'server' - local lab server, 'remote' - any
868 compute node/ computer, 'sdsc' - Flatiron compute node, 'aws' - using data from AWS S3
869 node.
870 mode : {'log', 'raise}, default='log'
871 Behaviour to adopt if an error occurred. If 'raise', it will raise the error at the very
872 end of this function (i.e. after having labeled the tasks).
874 Returns
875 -------
876 dict
877 The updated task dict.
878 list of pathlib.Path
879 A list of registered datasets.
880 """
881 registered_dsets = [] 1ac
882 # here we need to check parents' status, get the job_deck if not available
883 if not job_deck: 1ac
884 job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True) 1c
885 if len(tdict['parents']): 1ac
886 # check the dependencies
887 parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck) 1ac
888 parent_statuses = [j['status'] for j in parent_tasks] 1ac
889 # if any of the parent tasks is not complete, throw a warning
890 if not set(parent_statuses) <= {'Complete', 'Incomplete'}: 1ac
891 _logger.warning(f"{tdict['name']} has unmet dependencies") 1a
892 # if parents are waiting or failed, set the current task status to Held
893 # once the parents ran, the descendent tasks will be set from Held to Waiting (see below)
894 if set(parent_statuses).intersection({'Errored', 'Held', 'Empty', 'Waiting', 'Started', 'Abandoned'}): 1a
895 tdict = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Held'}) 1a
896 return tdict, registered_dsets 1a
897 # creates the job from the module name in the database
898 classe = str2class(tdict['executable']) 1ac
899 tkwargs = tdict.get('arguments') or {} # if the db field is null it returns None 1ac
900 task = classe(session_path, one=one, taskid=tdict['id'], machine=machine, clobber=clobber, 1ac
901 location=location, **tkwargs)
902 # sets the status flag to started before running
903 one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Started'}) 1ac
904 status = task.run() 1ac
905 patch_data = {'time_elapsed_secs': task.time_elapsed_secs, 'log': task.log, 1ac
906 'version': task.version}
907 # if there is no data to register, set status to Empty
908 if task.outputs is None: # NB: an empty list is still considered Complete. 1ac
909 patch_data['status'] = 'Empty' 1a
910 # otherwise register data and set (provisional) status to Complete
911 else:
912 try: 1ac
913 kwargs = dict(max_md5_size=max_md5_size) 1ac
914 if location == 'server': 1ac
915 # Explicitly pass lab as lab cannot be inferred from path (which the registration client tries to do).
916 # To avoid making extra REST requests we can also set labs=None if using ONE v1.20.1.
917 kwargs['labs'] = get_lab(session_path, one.alyx) 1ac
918 registered_dsets = task.register_datasets(**kwargs) 1ac
919 patch_data['status'] = 'Complete' 1ac
920 except Exception:
921 _logger.error(traceback.format_exc())
922 status = -1
924 # overwrite status to errored
925 if status == -1: 1ac
926 patch_data['status'] = 'Errored' 1a
927 # Status -2 means a lock was encountered during run, should be rerun
928 elif status == -2: 1ac
929 patch_data['status'] = 'Waiting' 1a
930 # Status -3 should be returned if a task is Incomplete
931 elif status == -3: 1ac
932 patch_data['status'] = 'Incomplete' 1a
933 # update task status on Alyx
934 t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data) 1ac
935 # check for dependent held tasks
936 # NB: Assumes dependent tasks are all part of the same session!
937 next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck 1ac
938 dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck) 1ac
939 for d in dependent_tasks: 1ac
940 assert d['id'] != t['id'], 'task its own parent' 1a
941 # if all their parent tasks now complete, set to waiting
942 parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']] 1a
943 if set(parent_status) <= {'Complete', 'Incomplete'}: 1a
944 one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'}) 1a
945 task.cleanUp() 1ac
946 if mode == 'raise' and status != 0: 1ac
947 raise ValueError(task.log)
948 return t, registered_dsets 1ac