Coverage for ibllib/pipes/tasks.py: 88%
395 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1from pathlib import Path
2import abc
3import logging
4import io
5import importlib
6import time
7from _collections import OrderedDict
8import traceback
9import json
11from graphviz import Digraph
13import ibllib
14from ibllib.oneibl import data_handlers
15from ibllib.oneibl.data_handlers import get_local_data_repository
16from ibllib.oneibl.registration import get_lab
17from iblutil.util import Bunch
18import one.params
19from one.api import ONE
20from one import webclient
22_logger = logging.getLogger(__name__)
23TASK_STATUS_SET = {'Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete', 'Abandoned'}
26class Task(abc.ABC):
27 log = '' # place holder to keep the log of the task for registration
28 cpu = 1 # CPU resource
29 gpu = 0 # GPU resources: as of now, either 0 or 1
30 io_charge = 5 # integer percentage
31 priority = 30 # integer percentage, 100 means highest priority
32 ram = 4 # RAM needed to run (GB)
33 one = None # one instance (optional)
34 level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task
35 outputs = None # place holder for a list of Path containing output files
36 time_elapsed_secs = None
37 time_out_secs = 3600 * 2 # time-out after which a task is considered dead
38 version = ibllib.__version__
39 signature = {'input_files': [], 'output_files': []} # list of tuples (filename, collection, required_flag)
40 force = False # whether or not to re-download missing input files on local server if not present
41 job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services
43 def __init__(self, session_path, parents=None, taskid=None, one=None,
44 machine=None, clobber=True, location='server', **kwargs):
45 """
46 Base task class
47 :param session_path: session path
48 :param parents: parents
49 :param taskid: alyx task id
50 :param one: one instance
51 :param machine:
52 :param clobber: whether or not to overwrite log on rerun
53 :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node,
54 data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS),
55 or 'SDSC' (SDSC flatiron compute node) # TODO 'Globus' (remote compute node, data required for task downloaded via Globus)
56 :param args: running arguments
57 """
58 self.taskid = taskid 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
59 self.one = one 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
60 self.session_path = session_path 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
61 self.register_kwargs = {} 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
62 if parents: 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
63 self.parents = parents 2d a e ^ _ ` { | } ~ abbbcbb c nb
64 self.level = max(p.level for p in self.parents) + 1 2d a e ^ _ ` { | } ~ abbbcbb c nb
65 else:
66 self.parents = [] 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
67 self.machine = machine 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
68 self.clobber = clobber 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
69 self.location = location 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
70 self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
71 self.kwargs = kwargs 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !
73 @property
74 def name(self):
75 return self.__class__.__name__ 2d f a [ ] pbe ^ _ ` { | } ~ abbbcbb c
77 def path2eid(self):
78 """
79 Fetch the experiment UUID from the Task session path, without using the REST cache.
81 This method ensures that the eid will be returned for newly created sessions.
83 Returns
84 -------
85 str
86 The experiment UUID corresponding to the session path.
87 """
88 assert self.session_path and self.one and not self.one.offline 2ob
89 with webclient.no_cache(self.one.alyx): 2ob
90 return self.one.path2eid(self.session_path, query_type='remote') 2ob
92 def run(self, **kwargs):
93 """
94 --- do not overload, see _run() below---
95 wraps the _run() method with
96 - error management
97 - logging to variable
98 - writing a lock file if the GPU is used
99 - labels the status property of the object. The status value is labeled as:
100 0: Complete
101 -1: Errored
102 -2: Didn't run as a lock was encountered
103 -3: Incomplete
104 """
105 # if task id of one properties are not available, local run only without alyx
106 use_alyx = self.one is not None and self.taskid is not None 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
107 if use_alyx: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
108 # check that alyx user is logged in
109 if not self.one.alyx.is_logged_in: 1aebc
110 self.one.alyx.authenticate()
111 tdict = self.one.alyx.rest('tasks', 'partial_update', id=self.taskid, 1aebc
112 data={'status': 'Started'})
113 self.log = ('' if not tdict['log'] else tdict['log'] + 1aebc
114 '\n\n=============================RERUN=============================\n')
116 # Setup the console handler with a StringIO object
117 logger_level = _logger.level 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
118 log_capture_string = io.StringIO() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
119 ch = logging.StreamHandler(log_capture_string) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
120 str_format = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
121 ch.setFormatter(logging.Formatter(str_format)) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
122 _logger.parent.addHandler(ch) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
123 _logger.parent.setLevel(logging.INFO) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
124 _logger.info(f'Starting job {self.__class__}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
125 if self.machine: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
126 _logger.info(f'Running on machine: {self.machine}') 1a
127 _logger.info(f'running ibllib version {ibllib.__version__}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
128 # setup
129 start_time = time.time() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
130 try: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
131 setup = self.setUp(**kwargs) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
132 _logger.info(f'Setup value is: {setup}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
133 self.status = 0 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
134 if not setup: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
135 # case where outputs are present but don't have input files locally to rerun task
136 # label task as complete
137 _, self.outputs = self.assert_expected_outputs()
138 else:
139 # run task
140 if self.gpu >= 1: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
141 if not self._creates_lock(): 1fab
142 self.status = -2 1fa
143 _logger.info(f'Job {self.__class__} exited as a lock was found') 1fa
144 new_log = log_capture_string.getvalue() 1fa
145 self.log = new_log if self.clobber else self.log + new_log 1fa
146 _logger.removeHandler(ch) 1fa
147 ch.close() 1fa
148 return self.status 1fa
149 self.outputs = self._run(**kwargs) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
150 _logger.info(f'Job {self.__class__} complete') 1d@#faEgMNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567/:,;?bt=-89cD!
151 except Exception: 1auvwxbc
152 _logger.error(traceback.format_exc()) 1auvwxbc
153 _logger.info(f'Job {self.__class__} errored') 1auvwxbc
154 self.status = -1 1auvwxbc
156 self.time_elapsed_secs = time.time() - start_time 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
157 # log the outputs
158 if isinstance(self.outputs, list): 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
159 nout = len(self.outputs) 1d@#aEgMNOhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()Z*0I12J3+nopqrs4567/:,;?bt=-89c!
160 elif self.outputs is None: 1faEuKvLwx:bcD
161 nout = 0 1auKvwx:bcD
162 else:
163 nout = 1 1faEL
165 _logger.info(f'N outputs: {nout}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
166 _logger.info(f'--- {self.time_elapsed_secs} seconds run-time ---') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
167 # after the run, capture the log output, amend to any existing logs if not overwrite
168 new_log = log_capture_string.getvalue() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
169 self.log = new_log if self.clobber else self.log + new_log 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
170 _logger.removeHandler(ch) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
171 ch.close() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
172 _logger.setLevel(logger_level) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
173 # tear down
174 self.tearDown() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
175 return self.status 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
177 def register_datasets(self, one=None, **kwargs):
178 """
179 Register output datasets form the task to Alyx
180 :param one:
181 :param jobid:
182 :param kwargs: directly passed to the register_dataset function
183 :return:
184 """
185 _ = self.register_images() 1aebc
187 return self.data_handler.uploadData(self.outputs, self.version, **kwargs) 1aebc
189 def register_images(self, **kwargs):
190 """
191 Registers images to alyx database
192 :return:
193 """
194 if self.one and len(self.plot_tasks) > 0: 1aebc
195 for plot_task in self.plot_tasks: 1b
196 try: 1b
197 _ = plot_task.register_images(widths=['orig']) 1b
198 except Exception:
199 _logger.error(traceback.format_exc())
200 continue
202 def rerun(self):
203 self.run(overwrite=True)
205 def get_signatures(self, **kwargs):
206 """
207 This is the default but should be overwritten for each task
208 :return:
209 """
210 self.input_files = self.signature['input_files'] 2d # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x : ? b dbt = 8 9 c D !
211 self.output_files = self.signature['output_files'] 2d # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x : ? b dbt = 8 9 c D !
213 @abc.abstractmethod
214 def _run(self, overwrite=False):
215 """
216 This is the method to implement
217 :param overwrite: (bool) if the output already exists,
218 :return: out_files: files to be registered. Could be a list of files (pathlib.Path),
219 a single file (pathlib.Path) an empty list [] or None.
220 Within the pipeline, there is a distinction between a job that returns an empty list
221 and a job that returns None. If the function returns None, the job will be labeled as
222 "empty" status in the database, otherwise, the job has an expected behaviour of not
223 returning any dataset.
224 """
226 def setUp(self, **kwargs):
227 """
228 Setup method to get the data handler and ensure all data is available locally to run task
229 :param kwargs:
230 :return:
231 """
232 if self.location == 'server': 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
233 self.get_signatures(**kwargs) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
235 input_status, _ = self.assert_expected_inputs(raise_error=False) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
236 output_status, _ = self.assert_expected(self.output_files, silent=True) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
238 if input_status: 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
239 self.data_handler = self.get_data_handler() 1d@#aEgMuNOKhyziAjkBlmCe$PQRSTUVWX%Yv'.()ZL*0123+nopqrs4567wx/:,;bt=-89c!
240 _logger.info('All input files found: running task') 1d@#aEgMuNOKhyziAjkBlmCe$PQRSTUVWX%Yv'.()ZL*0123+nopqrs4567wx/:,;bt=-89c!
241 return True 1d@#aEgMuNOKhyziAjkBlmCe$PQRSTUVWX%Yv'.()ZL*0123+nopqrs4567wx/:,;bt=-89c!
243 if not self.force: 1eFGHIJbtcD
244 self.data_handler = self.get_data_handler() 1eFGHIJbtcD
245 _logger.warning('Not all input files found locally: will still attempt to rerun task') 1eFGHIJbtcD
246 # TODO in the future once we are sure that input output task signatures work properly should return False
247 # _logger.info('All output files found but input files required not available locally: task not rerun')
248 return True 1eFGHIJbtcD
249 else:
250 # Attempts to download missing data using globus
251 _logger.info('Not all input files found locally: attempting to re-download required files')
252 self.data_handler = self.get_data_handler(location='serverglobus')
253 self.data_handler.setUp()
254 # Double check we now have the required files to run the task
255 # TODO in future should raise error if even after downloading don't have the correct files
256 self.assert_expected_inputs(raise_error=False)
257 return True
258 else:
259 self.data_handler = self.get_data_handler() 1f?
260 self.data_handler.setUp() 1f?
261 self.get_signatures(**kwargs) 1f?
262 self.assert_expected_inputs() 1f?
263 return True 1f?
265 def tearDown(self):
266 """
267 Function after runs()
268 Does not run if a lock is encountered by the task (status -2)
269 """
270 if self.gpu >= 1: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
271 if self._lock_file_path().exists(): 1fb
272 self._lock_file_path().unlink() 1fb
274 def cleanUp(self):
275 """
276 Function to optionally overload to clean up
277 :return:
278 """
279 self.data_handler.cleanUp() 1aebc
281 def assert_expected_outputs(self, raise_error=True):
282 """
283 After a run, asserts that all signature files are present at least once in the output files
284 Mainly useful for integration tests
285 :return:
286 """
287 assert self.status == 0 1ghyziAjkBlmCnopqrs
288 _logger.info('Checking output files') 1ghyziAjkBlmCnopqrs
289 everything_is_fine, files = self.assert_expected(self.output_files) 1ghyziAjkBlmCnopqrs
291 if not everything_is_fine: 1ghyziAjkBlmCnopqrs
292 for out in self.outputs:
293 _logger.error(f'{out}')
294 if raise_error:
295 raise FileNotFoundError("Missing outputs after task completion")
297 return everything_is_fine, files 1ghyziAjkBlmCnopqrs
299 def assert_expected_inputs(self, raise_error=True):
300 """
301 Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file
302 system already
303 :return:
304 """
305 _logger.info('Checking input files') 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
306 everything_is_fine, files = self.assert_expected(self.input_files) 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
308 if not everything_is_fine and raise_error: 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
309 raise FileNotFoundError('Missing inputs to run task')
311 return everything_is_fine, files 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
313 def assert_expected(self, expected_files, silent=False):
314 everything_is_fine = True 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
315 files = [] 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
316 for expected_file in expected_files: 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
317 actual_files = list(Path(self.session_path).rglob(str(Path(expected_file[1]).joinpath(expected_file[0])))) 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !
318 if len(actual_files) == 0 and expected_file[2]: 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !
319 everything_is_fine = False 2d @ E g M u N O K h i j k l m e P F Q R G S T U V W X Y H v ' ( ) Z L 0 I 1 2 J 3 n o p q r s 4 5 6 7 w x , b dbebfbgbhbibjbkblbmbt - 8 9 c D !
320 if not silent: 2d @ E g M u N O K h i j k l m e P F Q R G S T U V W X Y H v ' ( ) Z L 0 I 1 2 J 3 n o p q r s 4 5 6 7 w x , b dbebfbgbhbibjbkblbmbt - 8 9 c D !
321 _logger.error(f'Signature file expected {expected_file} not found') 2e F G H I J b dbebfbgbhbibjbkblbmbt c D
322 else:
323 if len(actual_files) != 0: 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !
324 files.append(actual_files[0]) 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !
326 return everything_is_fine, files 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !
328 def get_data_handler(self, location=None):
329 """
330 Gets the relevant data handler based on location argument
331 :return:
332 """
333 location = str.lower(location or self.location) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
334 if location == 'local': 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!
335 return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one) 1f?
336 self.one = self.one or ONE() 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
337 if location == 'server': 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
338 dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
339 elif location == 'serverglobus':
340 dhandler = data_handlers.ServerGlobusDataHandler(self.session_path, self.signature, one=self.one)
341 elif location == 'remote':
342 dhandler = data_handlers.RemoteHttpDataHandler(self.session_path, self.signature, one=self.one)
343 elif location == 'aws':
344 dhandler = data_handlers.RemoteAwsDataHandler(self, self.session_path, self.signature, one=self.one)
345 elif location == 'sdsc':
346 dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one)
347 else:
348 raise ValueError(f'Unknown location "{location}"')
349 return dhandler 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!
351 @staticmethod
352 def make_lock_file(taskname='', time_out_secs=7200):
353 """Creates a GPU lock file with a timeout of"""
354 d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs} 1fab
355 with open(Task._lock_file_path(), 'w+') as fid: 1fab
356 json.dump(d, fid) 1fab
357 return d 1fab
359 @staticmethod
360 def _lock_file_path():
361 """the lock file is in ~/.one/gpu.lock"""
362 folder = Path.home().joinpath('.one') 1b
363 folder.mkdir(exist_ok=True) 1b
364 return folder.joinpath('gpu.lock') 1b
366 def _make_lock_file(self):
367 """creates a lock file with the current time"""
368 return Task.make_lock_file(self.name, self.time_out_secs) 1fb
370 def is_locked(self):
371 """Checks if there is a lock file for this given task"""
372 lock_file = self._lock_file_path() 1fab
373 if not lock_file.exists(): 1fab
374 return False 1fb
376 with open(lock_file) as fid: 1fa
377 d = json.load(fid) 1fa
378 now = time.time() 1fa
379 if (now - d['start']) > d['time_out_secs']: 1fa
380 lock_file.unlink() 1f
381 return False 1f
382 else:
383 return True 1fa
385 def _creates_lock(self):
386 if self.is_locked(): 1fab
387 return False 1fa
388 else:
389 self._make_lock_file() 1fb
390 return True 1fb
393class Pipeline(abc.ABC):
394 """
395 Pipeline class: collection of related and potentially interdependent tasks
396 """
397 tasks = OrderedDict()
398 one = None
400 def __init__(self, session_path=None, one=None, eid=None):
401 assert session_path or eid 2d a e ^ _ ` { | } ~ abbbcbb c nb
402 self.one = one 2d a e ^ _ ` { | } ~ abbbcbb c nb
403 if one and one.alyx.cache_mode and one.alyx.default_expiry.seconds > 1: 2d a e ^ _ ` { | } ~ abbbcbb c nb
404 _logger.warning('Alyx client REST cache active; this may cause issues with jobs')
405 self.eid = eid 2d a e ^ _ ` { | } ~ abbbcbb c nb
406 if self.one and not self.one.offline: 2d a e ^ _ ` { | } ~ abbbcbb c nb
407 self.data_repo = get_local_data_repository(self.one.alyx) 1daebc
408 else:
409 self.data_repo = None 2e ^ _ ` { | } ~ abbbcbb c nb
411 if session_path: 2d a e ^ _ ` { | } ~ abbbcbb c nb
412 self.session_path = session_path 2d a e ^ _ ` { | } ~ abbbcbb c nb
413 if not self.eid: 2d a e ^ _ ` { | } ~ abbbcbb c nb
414 # eID for newer sessions may not be in cache so use remote query
415 self.eid = one.path2eid(session_path, query_type='remote') if self.one else None 2e ^ _ ` { | } ~ abbbcbb c nb
416 self.label = self.__module__ + '.' + type(self).__name__ 2d a e ^ _ ` { | } ~ abbbcbb c nb
418 @staticmethod
419 def _get_exec_name(obj):
420 """
421 For a class, get the executable name as it should be stored in Alyx. When the class
422 is created dynamically using the type() built-in function, need to revert to the base
423 class to be able to re-instantiate the class from the alyx dictionary on the client side
424 :param obj:
425 :return: string containing the full module plus class name
426 """
427 if obj.__module__ == 'abc': 2a [ ] pbe ^ _ ` { | } ~ abbbcbb c
428 exec_name = f'{obj.__class__.__base__.__module__}.{obj.__class__.__base__.__name__}' 2[ ] pbe ^ _ ` { | } ~ abbbcb
429 else:
430 exec_name = f'{obj.__module__}.{obj.name}' 1abc
431 return exec_name 2a [ ] pbe ^ _ ` { | } ~ abbbcbb c
433 def make_graph(self, out_dir=None, show=True):
434 if not out_dir: 1a
435 out_dir = self.one.alyx.cache_dir if self.one else one.params.get().CACHE_DIR 1a
436 m = Digraph('G', filename=str(Path(out_dir).joinpath(self.__module__ + '_graphs.gv'))) 1a
437 m.attr(rankdir='TD') 1a
439 e = Digraph(name='cluster_' + self.label) 1a
440 e.attr('node', shape='box') 1a
441 e.node('root', label=self.label) 1a
443 e.attr('node', shape='ellipse') 1a
444 for k in self.tasks: 1a
445 j = self.tasks[k] 1a
446 if len(j.parents) == 0: 1a
447 e.edge('root', j.name) 1a
448 else:
449 [e.edge(p.name, j.name) for p in j.parents] 1a
451 m.subgraph(e) 1a
452 m.attr(label=r'\n\Pre-processing\n') 1a
453 m.attr(fontsize='20') 1a
454 if show: 1a
455 m.view()
456 return m 1a
458 def create_alyx_tasks(self, rerun__status__in=None, tasks_list=None):
459 """
460 Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session
461 If the jobs already exist, they are left untouched. The re-run parameter will re-init the
462 job by emptying the log and set the status to Waiting.
464 Parameters
465 ----------
466 rerun__status__in : list, str
467 To re-run tasks if they already exist, specify one or more statuses strings to will be
468 re-run, or '__all__' to re-run all tasks.
469 tasks_list : list
470 The list of tasks to create on Alyx. If None, uses self.tasks.
472 Returns
473 -------
474 list
475 List of Alyx task dictionaries (existing and/or created).
476 """
477 rerun__status__in = ([rerun__status__in] 1a[]ebc
478 if isinstance(rerun__status__in, str)
479 else rerun__status__in or [])
480 if '__all__' in rerun__status__in: 1a[]ebc
481 rerun__status__in = [x for x in TASK_STATUS_SET if x != 'Abandoned']
482 assert self.eid 1a[]ebc
483 if self.one is None: 1a[]ebc
484 _logger.warning('No ONE instance found for Alyx connection, set the one property')
485 return
486 tasks_alyx_pre = self.one.alyx.rest('tasks', 'list', session=self.eid, graph=self.name, no_cache=True) 1a[]ebc
487 tasks_alyx = [] 1a[]ebc
488 # creates all the tasks by iterating through the ordered dict
490 if tasks_list is not None: 1a[]ebc
491 task_items = tasks_list
492 # need to add in the session eid and the parents
493 else:
494 task_items = self.tasks.values() 1a[]ebc
496 for t in task_items: 1a[]ebc
497 # get the parents' alyx ids to reference in the database
498 if isinstance(t, dict): 1a[]ebc
499 t = Bunch(t)
500 executable = t.executable
501 arguments = t.arguments
502 t['time_out_secs'] = t['time_out_sec']
503 if len(t.parents) > 0:
504 pnames = t.parents
505 else:
506 executable = self._get_exec_name(t) 1a[]ebc
507 arguments = t.kwargs 1a[]ebc
508 if len(t.parents): 1a[]ebc
509 pnames = [p.name for p in t.parents] 1a[]ebc
511 if len(t.parents): 1a[]ebc
512 parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames] 1a[]ebc
513 else:
514 parents_ids = [] 1a[]ebc
516 task_dict = {'executable': executable, 'priority': t.priority, 1a[]ebc
517 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu,
518 'ram': t.ram, 'module': self.label, 'parents': parents_ids,
519 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid,
520 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name,
521 'arguments': arguments}
522 if self.data_repo: 1a[]ebc
523 task_dict.update({'data_repository': self.data_repo})
524 # if the task already exists, patch it otherwise, create it
525 talyx = next(filter(lambda x: x['name'] == t.name, tasks_alyx_pre), []) 1a[]ebc
526 if len(talyx) == 0: 1a[]ebc
527 talyx = self.one.alyx.rest('tasks', 'create', data=task_dict) 1a[]ebc
528 elif talyx['status'] in rerun__status__in: 1a[]
529 talyx = self.one.alyx.rest('tasks', 'partial_update', id=talyx['id'], data=task_dict)
530 tasks_alyx.append(talyx) 1a[]ebc
531 return tasks_alyx 1a[]ebc
533 def create_tasks_list_from_pipeline(self):
534 """
535 From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to
536 create alyx tasks
537 :return:
538 """
539 tasks_list = [] 2[ ] pb^ _ ` { | } ~ abbbcb
540 for k, t in self.tasks.items(): 2[ ] pb^ _ ` { | } ~ abbbcb
541 # get the parents' alyx ids to reference in the database
542 if len(t.parents): 2[ ] pb^ _ ` { | } ~ abbbcb
543 parent_names = [p.name for p in t.parents] 2[ ] pb^ _ ` { | } ~ abbbcb
544 else:
545 parent_names = [] 2[ ] pb^ _ ` { | } ~ abbbcb
547 task_dict = {'executable': self._get_exec_name(t), 'priority': t.priority, 2[ ] pb^ _ ` { | } ~ abbbcb
548 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu,
549 'ram': t.ram, 'module': self.label, 'parents': parent_names,
550 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid,
551 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name,
552 'arguments': t.kwargs}
553 if self.data_repo: 2[ ] pb^ _ ` { | } ~ abbbcb
554 task_dict.update({'data_repository': self.data_repo})
556 tasks_list.append(task_dict) 2[ ] pb^ _ ` { | } ~ abbbcb
558 return tasks_list 2[ ] pb^ _ ` { | } ~ abbbcb
560 def run(self, status__in=('Waiting',), machine=None, clobber=True, **kwargs):
561 """
562 Get all the session related jobs from alyx and run them
563 :param status__in: lists of status strings to run in
564 ['Waiting', 'Started', 'Errored', 'Empty', 'Complete']
565 :param machine: string identifying the machine the task is run on, optional
566 :param clobber: bool, if True any existing logs are overwritten, default is True
567 :param kwargs: arguments passed downstream to run_alyx_task
568 :return: jalyx: list of REST dictionaries of the job endpoints
569 :return: job_deck: list of REST dictionaries of the jobs endpoints
570 :return: all_datasets: list of REST dictionaries of the dataset endpoints
571 """
572 assert self.session_path, 'Pipeline object has to be declared with a session path to run' 1a
573 if self.one is None: 1a
574 _logger.warning('No ONE instance found for Alyx connection, set the one property')
575 return
576 task_deck = self.one.alyx.rest('tasks', 'list', session=self.eid, no_cache=True) 1a
577 # [(t['name'], t['level']) for t in task_deck]
578 all_datasets = [] 1a
579 for i, j in enumerate(task_deck): 1a
580 if j['status'] not in status__in: 1a
581 continue 1a
582 # here we update the status in-place to avoid another hit to the database
583 task_deck[i], dsets = run_alyx_task(tdict=j, session_path=self.session_path, 1a
584 one=self.one, job_deck=task_deck,
585 machine=machine, clobber=clobber, **kwargs)
586 if dsets is not None: 1a
587 all_datasets.extend(dsets) 1a
588 return task_deck, all_datasets 1a
590 def rerun_failed(self, **kwargs):
591 return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs) 1a
593 def rerun(self, **kwargs):
595 return self.run(status__in=[x for x in TASK_STATUS_SET if x != 'Abandoned'], **kwargs)
597 @property
598 def name(self):
599 return self.__class__.__name__ 2a [ ] pbe ^ _ ` { | } ~ abbbcbb c
602def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None,
603 max_md5_size=None, machine=None, clobber=True, location='server', mode='log'):
604 """
605 Runs a single Alyx job and registers output datasets
606 :param tdict:
607 :param session_path:
608 :param one:
609 :param job_deck: optional list of job dictionaries belonging to the session. Needed
610 to check dependency status if the jdict has a parent field. If jdict has a parent and
611 job_deck is not entered, will query the database
612 :param max_md5_size: in bytes, if specified, will not compute the md5 checksum above a given
613 filesize to save time
614 :param machine: string identifying the machine the task is run on, optional
615 :param clobber: bool, if True any existing logs are overwritten, default is True
616 :param location: where you are running the task, 'server' - local lab server, 'remote' - any
617 compute node/ computer, 'SDSC' - flatiron compute node, 'AWS' - using data from aws s3
618 :param mode: str ('log' or 'raise') behaviour to adopt if an error occured. If 'raise', it
619 will Raise the error at the very end of this function (ie. after having labeled the tasks)
620 :return:
621 """
622 registered_dsets = [] 1aebc
623 # here we need to check parents' status, get the job_deck if not available
624 if not job_deck: 1aebc
625 job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True) 1ebc
626 if len(tdict['parents']): 1aebc
627 # check the dependencies
628 parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck) 1aebc
629 parent_statuses = [j['status'] for j in parent_tasks] 1aebc
630 # if any of the parent tasks is not complete, throw a warning
631 if not set(parent_statuses) <= {'Complete', 'Incomplete'}: 1aebc
632 _logger.warning(f"{tdict['name']} has unmet dependencies") 1ac
633 # if parents are waiting or failed, set the current task status to Held
634 # once the parents ran, the descendent tasks will be set from Held to Waiting (see below)
635 if set(parent_statuses).intersection({'Errored', 'Held', 'Empty', 'Waiting', 'Started', 'Abandoned'}): 1ac
636 tdict = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Held'}) 1ac
637 return tdict, registered_dsets 1ac
638 # creates the job from the module name in the database
639 exec_name = tdict['executable'] 1aebc
640 strmodule, strclass = exec_name.rsplit('.', 1) 1aebc
641 classe = getattr(importlib.import_module(strmodule), strclass) 1aebc
642 tkwargs = tdict.get('arguments') or {} # if the db field is null it returns None 1aebc
643 task = classe(session_path, one=one, taskid=tdict['id'], machine=machine, clobber=clobber, 1aebc
644 location=location, **tkwargs)
645 # sets the status flag to started before running
646 one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Started'}) 1aebc
647 status = task.run() 1aebc
648 patch_data = {'time_elapsed_secs': task.time_elapsed_secs, 'log': task.log, 1aebc
649 'version': task.version}
650 # if there is no data to register, set status to Empty
651 if task.outputs is None: 1aebc
652 patch_data['status'] = 'Empty' 1abc
653 # otherwise register data and set (provisional) status to Complete
654 else:
655 try: 1aebc
656 kwargs = dict(one=one, max_md5_size=max_md5_size) 1aebc
657 if location == 'server': 1aebc
658 # Explicitly pass lab as lab cannot be inferred from path (which the registration client tries to do).
659 # To avoid making extra REST requests we can also set labs=None if using ONE v1.20.1.
660 kwargs['labs'] = get_lab(session_path, one.alyx) 1aebc
661 registered_dsets = task.register_datasets(**kwargs) 1aebc
662 patch_data['status'] = 'Complete' 1aebc
663 except Exception:
664 _logger.error(traceback.format_exc())
665 status = -1
667 # overwrite status to errored
668 if status == -1: 1aebc
669 patch_data['status'] = 'Errored' 1ab
670 # Status -2 means a lock was encountered during run, should be rerun
671 elif status == -2: 1aebc
672 patch_data['status'] = 'Waiting' 1a
673 # Status -3 should be returned if a task is Incomplete
674 elif status == -3: 1aebc
675 patch_data['status'] = 'Incomplete' 1a
676 # update task status on Alyx
677 t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data) 1aebc
678 # check for dependent held tasks
679 # NB: Assumes dependent tasks are all part of the same session!
680 next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck 1aebc
681 dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck) 1aebc
682 for d in dependent_tasks: 1aebc
683 assert d['id'] != t['id'], 'task its own parent' 1a
684 # if all their parent tasks now complete, set to waiting
685 parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']] 1a
686 if set(parent_status) <= {'Complete', 'Incomplete'}: 1a
687 one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'}) 1a
688 task.cleanUp() 1aebc
689 if mode == 'raise' and status != 0: 1aebc
690 raise ValueError(task.log)
691 return t, registered_dsets 1aebc