Coverage for ibllib/pipes/tasks.py: 87%
428 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""The abstract Pipeline and Task superclasses and concrete task runner.
3Examples
4--------
61. Running a task on your local computer.
7| Download: via ONE.
8| Upload: N/A.
10>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod')
11>>> task.run()
132. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab).
14| Download: all data expected to be present.
15| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
16 jobs on Alyx transfer the data.
18>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod
19>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001'
20>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
21>>> task.run()
22>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
243. Running a task on the local server that belongs to that subject and forcing redownload of
25missing data.
26| Download: via Globus (TODO we should change this to use boto3 as globus is slow).
27| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer
28 jobs on Alyx transfer the data.
30>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod')
31>>> task.force = True
32>>> task.run()
33>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx))
34>>> task.cleanUp() # Delete the files that have been downloaded
364. Running a task on the local server that doesn't belongs to that subject
37(e.g SWC054 on angelakilab).
38| Download: via boto3, the AWS file records must exist and be set to exists = True.
39| Upload: via globus, automatically uploads the datasets directly to FlatIron via globus.
40 Creates FlatIron filerecords and sets these to True once the globus task has completed.
42>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod')
43>>> task.run()
44>>> task.register_datasets()
45>>> task.cleanUp() # Delete the files that have been downloaded
475. Running a task on SDSC.
48| Download: via creating symlink to relevant datasets on SDSC.
49| Upload: via copying files to relevant location on SDSC.
51>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod')
52>>> task.run()
53>>> response = task.register_datasets()
54>>> # Here we just make sure filerecords are all correct
55>>> for resp in response:
56... fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None)
57... if fi is not None:
58... if not fi['exists']:
59... one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True})
60...
61... aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None)
62... if aws is not None:
63... one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False})
64...
65... sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None)
66... if sr is not None:
67... one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False})
68... # Finally remove symlinks once the task has completed
69... task.cleanUp()
71"""
72from pathlib import Path
73import abc
74import logging
75import io
76import importlib
77import time
78from _collections import OrderedDict
79import traceback
80import json
82from graphviz import Digraph
84import ibllib
85from ibllib.oneibl import data_handlers
86from ibllib.oneibl.data_handlers import get_local_data_repository
87from ibllib.oneibl.registration import get_lab
88from iblutil.util import Bunch
89import one.params
90from one.api import ONE
91from one.util import ensure_list
92from one import webclient
94_logger = logging.getLogger(__name__)
95TASK_STATUS_SET = {'Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete', 'Abandoned'}
98class Task(abc.ABC):
99 log = '' # placeholder to keep the log of the task for registration
100 cpu = 1 # CPU resource
101 gpu = 0 # GPU resources: as of now, either 0 or 1
102 io_charge = 5 # integer percentage
103 priority = 30 # integer percentage, 100 means highest priority
104 ram = 4 # RAM needed to run (GB)
105 one = None # one instance (optional)
106 level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task
107 outputs = None # placeholder for a list of Path containing output files
108 time_elapsed_secs = None
109 time_out_secs = 3600 * 2 # time-out after which a task is considered dead
110 version = ibllib.__version__
111 signature = {'input_files': [], 'output_files': []} # list of tuples (filename, collection, required_flag[, register])
112 force = False # whether to re-download missing input files on local server if not present
113 job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services
114 env = None # the environment name within which to run the task (NB: the env is not activated automatically!)
116 def __init__(self, session_path, parents=None, taskid=None, one=None,
117 machine=None, clobber=True, location='server', **kwargs):
118 """
119 Base task class
120 :param session_path: session path
121 :param parents: parents
122 :param taskid: alyx task id
123 :param one: one instance
124 :param machine:
125 :param clobber: whether or not to overwrite log on rerun
126 :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node,
127 data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS),
128 or 'SDSC' (SDSC flatiron compute node) # TODO 'Globus' (remote compute node, data required for task downloaded via Globus)
129 :param args: running arguments
130 """
131 self.taskid = taskid 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
132 self.one = one 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
133 self.session_path = session_path 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
134 self.register_kwargs = {} 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
135 if parents: 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
136 self.parents = parents 2c nba d ] ^ _ ` { | } ~ abbbb
137 self.level = max(p.level for p in self.parents) + 1 2c nba d ] ^ _ ` { | } ~ abbbb
138 else:
139 self.parents = [] 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
140 self.machine = machine 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
141 self.clobber = clobber 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
142 self.location = location 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
143 self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
144 self.kwargs = kwargs 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb
146 @property
147 def name(self):
148 return self.__class__.__name__ 2c qbf a @ [ pbd ] ^ _ ` { | } ~ abbbe b
150 def path2eid(self):
151 """
152 Fetch the experiment UUID from the Task session path, without using the REST cache.
154 This method ensures that the eid will be returned for newly created sessions.
156 Returns
157 -------
158 str
159 The experiment UUID corresponding to the session path.
160 """
161 assert self.session_path and self.one and not self.one.offline 2ob
162 with webclient.no_cache(self.one.alyx): 2ob
163 return self.one.path2eid(self.session_path, query_type='remote') 2ob
165 def run(self, **kwargs):
166 """
167 --- do not overload, see _run() below---
168 wraps the _run() method with
169 - error management
170 - logging to variable
171 - writing a lock file if the GPU is used
172 - labels the status property of the object. The status value is labeled as:
173 0: Complete
174 -1: Errored
175 -2: Didn't run as a lock was encountered
176 -3: Incomplete
178 Notes
179 -----
180 - The `run_alyx_task` will update the Alyx Task status depending on both status and outputs
181 (i.e. the output of subclassed `_run` method):
182 Assuming a return value of 0... if Task.outputs is None, the status will be Empty;
183 if Task.outputs is a list (empty or otherwise), the status will be Complete.
184 """
185 # if task id of one properties are not available, local run only without alyx
186 use_alyx = self.one is not None and self.taskid is not None 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
187 if use_alyx: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
188 # check that alyx user is logged in
189 if not self.one.alyx.is_logged_in: 1adb
190 self.one.alyx.authenticate()
191 tdict = self.one.alyx.rest('tasks', 'partial_update', id=self.taskid, 1adb
192 data={'status': 'Started'})
193 self.log = ('' if not tdict['log'] else tdict['log'] + 1adb
194 '\n\n=============================RERUN=============================\n')
196 # Setup the console handler with a StringIO object
197 logger_level = _logger.level 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
198 log_capture_string = io.StringIO() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
199 ch = logging.StreamHandler(log_capture_string) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
200 str_format = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
201 ch.setFormatter(logging.Formatter(str_format)) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
202 _logger.parent.addHandler(ch) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
203 _logger.parent.setLevel(logging.INFO) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
204 _logger.info(f'Starting job {self.__class__}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
205 if self.machine: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
206 _logger.info(f'Running on machine: {self.machine}') 1a
207 _logger.info(f'running ibllib version {ibllib.__version__}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
208 # setup
209 start_time = time.time() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
210 try: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
211 setup = self.setUp(**kwargs) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
212 self.outputs = self._input_files_to_register() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
213 _logger.info(f'Setup value is: {setup}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
214 self.status = 0 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
215 if not setup: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
216 # case where outputs are present but don't have input files locally to rerun task
217 # label task as complete
218 _, outputs = self.assert_expected_outputs()
219 else:
220 # run task
221 if self.gpu >= 1: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
222 if not self._creates_lock(): 1fae
223 self.status = -2 1fa
224 _logger.info(f'Job {self.__class__} exited as a lock was found') 1fa
225 new_log = log_capture_string.getvalue() 1fa
226 self.log = new_log if self.clobber else self.log + new_log 1fa
227 _logger.removeHandler(ch) 1fa
228 ch.close() 1fa
229 return self.status 1fa
230 outputs = self._run(**kwargs) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
231 _logger.info(f'Job {self.__class__} complete') 1c?+faHgIJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678*:;u=)9!bw#
232 if outputs is None: 1c?+faHgIJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678*:;u=)9!bw#
233 # If run method returns None and no raw input files were registered, self.outputs
234 # should be None, meaning task will have an 'Empty' status. If run method returns
235 # a list, the status will be 'Complete' regardless of whether there are output files.
236 self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered 1aE:bw
237 else:
238 self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register 1c?+faHgIJKhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678*:;u=)9!b#
239 except Exception: 1aDAFG
240 _logger.error(traceback.format_exc()) 1aDAFG
241 _logger.info(f'Job {self.__class__} errored') 1aDAFG
242 self.status = -1 1aDAFG
244 self.time_elapsed_secs = time.time() - start_time 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
245 # log the outputs
246 if isinstance(self.outputs, list): 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
247 nout = len(self.outputs) 1c?+faHgIDJKhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!b#
248 elif self.outputs is None: 1aE:bw
249 nout = 0 1aE:bw
250 else:
251 nout = 1
253 _logger.info(f'N outputs: {nout}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
254 _logger.info(f'--- {self.time_elapsed_secs} seconds run-time ---') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
255 # after the run, capture the log output, amend to any existing logs if not overwrite
256 new_log = log_capture_string.getvalue() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
257 self.log = new_log if self.clobber else self.log + new_log 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
258 _logger.removeHandler(ch) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
259 ch.close() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
260 _logger.setLevel(logger_level) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
261 # tear down
262 self.tearDown() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
263 return self.status 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
265 def register_datasets(self, **kwargs):
266 """
267 Register output datasets from the task to Alyx.
269 Parameters
270 ----------
271 kwargs
272 Directly passed to the `DataHandler.upload_data` method.
274 Returns
275 -------
276 list
277 The output of the `DataHandler.upload_data` method, e.g. a list of registered datasets.
278 """
279 _ = self.register_images() 1adb
280 return self.data_handler.uploadData(self.outputs, self.version, **kwargs) 1adb
282 def _input_files_to_register(self, assert_all_exist=False):
283 """
284 Return input datasets to be registered to Alyx.
286 These datasets are typically raw data files and are registered even if the task fails to complete.
288 Parameters
289 ----------
290 assert_all_exist
291 Raise AssertionError if not all required input datasets exist on disk.
293 Returns
294 -------
295 list of pathlib.Path
296 A list of input files to register.
297 """
298 try: 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #
299 input_files = self.input_files 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #
300 except AttributeError: 2mb
301 raise RuntimeError('Task.setUp must be run before calling this method.') 2mb
302 to_register, missing = [], [] 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #
303 for filename, collection, required, _ in filter(lambda f: len(f) > 3 and f[3], input_files): 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #
304 filepath = self.session_path.joinpath(collection, filename) 2mb
305 if filepath.exists(): 2mb
306 to_register.append(filepath) 2mb
307 elif required: 2mb
308 missing.append(filepath) 2mb
309 if any(missing): 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #
310 missing_str = ', '.join(map(lambda x: x.relative_to(self.session_path).as_posix(), missing)) 2mb
311 if assert_all_exist: 2mb
312 raise AssertionError(f'Missing required input files: {missing_str}') 2mb
313 else:
314 _logger.error(f'Missing required input files: {missing_str}') 2mb
315 return list(set(to_register) - set(missing)) 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #
317 def register_images(self, **kwargs):
318 """
319 Registers images to alyx database
320 :return:
321 """
322 if self.one and len(self.plot_tasks) > 0: 1adb
323 for plot_task in self.plot_tasks:
324 try:
325 _ = plot_task.register_images(widths=['orig'])
326 except Exception:
327 _logger.error(traceback.format_exc())
328 continue
330 def rerun(self):
331 self.run(overwrite=True)
333 def get_signatures(self, **kwargs):
334 """
335 This is the default but should be overwritten for each task
336 :return:
337 """
338 self.input_files = self.signature['input_files'] 2c + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G : ; cbu = 9 ! b w #
339 self.output_files = self.signature['output_files'] 2c + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G : ; cbu = 9 ! b w #
341 @abc.abstractmethod
342 def _run(self, overwrite=False):
343 """
344 This is the method to implement
345 :param overwrite: (bool) if the output already exists,
346 :return: out_files: files to be registered. Could be a list of files (pathlib.Path),
347 a single file (pathlib.Path) an empty list [] or None.
348 Within the pipeline, there is a distinction between a job that returns an empty list
349 and a job that returns None. If the function returns None, the job will be labeled as
350 "empty" status in the database, otherwise, the job has an expected behaviour of not
351 returning any dataset.
352 """
354 def setUp(self, **kwargs):
355 """Get the data handler and ensure all data is available locally to run task."""
356 if self.location == 'server': 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
357 self.get_signatures(**kwargs) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
359 input_status, _ = self.assert_expected_inputs(raise_error=False) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
360 output_status, _ = self.assert_expected(self.output_files, silent=True) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
362 if input_status: 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
363 self.data_handler = self.get_data_handler() 1c?+aHgIDJKEhLviMjklNmnOd,PQRSTUVWX-YA$/%'Z0.1234(eopqrst5678FG*:u=)9!b#
364 _logger.info('All input files found: running task') 1c?+aHgIDJKEhLviMjklNmnOd,PQRSTUVWX-YA$/%'Z0.1234(eopqrst5678FG*:u=)9!b#
365 return True 1c?+aHgIDJKEhLviMjklNmnOd,PQRSTUVWX-YA$/%'Z0.1234(eopqrst5678FG*:u=)9!b#
367 if not self.force: 1dxyzBCubw
368 self.data_handler = self.get_data_handler() 1dxyzBCubw
369 _logger.warning('Not all input files found locally: will still attempt to rerun task') 1dxyzBCubw
370 # TODO in the future once we are sure that input output task signatures work properly should return False
371 # _logger.info('All output files found but input files required not available locally: task not rerun')
372 return True 1dxyzBCubw
373 else:
374 # Attempts to download missing data using globus
375 _logger.info('Not all input files found locally: attempting to re-download required files')
376 self.data_handler = self.get_data_handler(location='serverglobus')
377 self.data_handler.setUp()
378 # Double check we now have the required files to run the task
379 # TODO in future should raise error if even after downloading don't have the correct files
380 self.assert_expected_inputs(raise_error=False)
381 return True
382 else:
383 self.data_handler = self.get_data_handler() 1f;
384 self.data_handler.setUp() 1f;
385 self.get_signatures(**kwargs) 1f;
386 self.assert_expected_inputs() 1f;
387 return True 1f;
389 def tearDown(self):
390 """
391 Function after runs()
392 Does not run if a lock is encountered by the task (status -2)
393 """
394 if self.gpu >= 1: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
395 if self._lock_file_path().exists(): 1fe
396 self._lock_file_path().unlink() 1fe
398 def cleanUp(self):
399 """
400 Function to optionally overload to clean up
401 :return:
402 """
403 self.data_handler.cleanUp() 1adb
405 def assert_expected_outputs(self, raise_error=True):
406 """
407 After a run, asserts that all signature files are present at least once in the output files
408 Mainly useful for integration tests
409 :return:
410 """
411 assert self.status == 0 1ghLviMjklNmnOeopqrst
412 _logger.info('Checking output files') 1ghLviMjklNmnOeopqrst
413 everything_is_fine, files = self.assert_expected(self.output_files) 1ghLviMjklNmnOeopqrst
415 if not everything_is_fine: 1ghLviMjklNmnOeopqrst
416 for out in self.outputs:
417 _logger.error(f'{out}')
418 if raise_error:
419 raise FileNotFoundError("Missing outputs after task completion")
421 return everything_is_fine, files 1ghLviMjklNmnOeopqrst
423 def assert_expected_inputs(self, raise_error=True):
424 """
425 Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file
426 system already
427 :return:
428 """
429 _logger.info('Checking input files') 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
430 everything_is_fine, files = self.assert_expected(self.input_files) 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
432 if not everything_is_fine and raise_error: 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
433 raise FileNotFoundError('Missing inputs to run task')
435 return everything_is_fine, files 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
437 def assert_expected(self, expected_files, silent=False):
438 everything_is_fine = True 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
439 files = [] 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
440 for expected_file in expected_files: 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
441 actual_files = list(Path(self.session_path).rglob(str(Path(*filter(None, reversed(expected_file[:2])))))) 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #
442 # Account for revisions
443 if len(actual_files) == 0: 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #
444 collection = '/'.join(filter(None, (expected_file[1], '#*'))) # append pound with wildcard 2c ? H g I D J K E h v i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 ( o p q r s t 5 6 7 8 F G * cbdbebfbgbhbibjbkblbu ) 9 ! b w #
445 expected_revision = (expected_file[0], collection, expected_file[2]) 2c ? H g I D J K E h v i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 ( o p q r s t 5 6 7 8 F G * cbdbebfbgbhbibjbkblbu ) 9 ! b w #
446 actual_files = list( 2c ? H g I D J K E h v i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 ( o p q r s t 5 6 7 8 F G * cbdbebfbgbhbibjbkblbu ) 9 ! b w #
447 Path(self.session_path).rglob(str(Path(*filter(None, reversed(expected_revision[:2])))))
448 )
449 if len(actual_files) == 0 and expected_file[2]: 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #
450 everything_is_fine = False 2c ? H g I D J K E h i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 o p q r s t 5 6 7 8 F G cbdbebfbgbhbibjbkblbu ) 9 ! b w #
451 if not silent: 2c ? H g I D J K E h i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 o p q r s t 5 6 7 8 F G cbdbebfbgbhbibjbkblbu ) 9 ! b w #
452 _logger.error(f'Signature file expected {expected_file} not found') 2d x y z B C cbdbebfbgbhbibjbkblbu b w
453 else:
454 if len(actual_files) != 0: 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #
455 files.append(actual_files[0]) 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #
457 return everything_is_fine, files 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #
459 def get_data_handler(self, location=None):
460 """
461 Gets the relevant data handler based on location argument
462 :return:
463 """
464 location = str.lower(location or self.location) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
465 if location == 'local': 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#
466 return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one) 1f;
467 self.one = self.one or ONE() 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
468 if location == 'server': 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
469 dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
470 elif location == 'serverglobus':
471 dhandler = data_handlers.ServerGlobusDataHandler(self.session_path, self.signature, one=self.one)
472 elif location == 'remote':
473 dhandler = data_handlers.RemoteHttpDataHandler(self.session_path, self.signature, one=self.one)
474 elif location == 'aws':
475 dhandler = data_handlers.RemoteAwsDataHandler(self, self.session_path, self.signature, one=self.one)
476 elif location == 'sdsc':
477 dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one)
478 elif location == 'popeye':
479 dhandler = data_handlers.PopeyeDataHandler(self, self.session_path, self.signature, one=self.one)
480 else:
481 raise ValueError(f'Unknown location "{location}"')
482 return dhandler 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#
484 @staticmethod
485 def make_lock_file(taskname='', time_out_secs=7200):
486 """Creates a GPU lock file with a timeout of"""
487 d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs} 1fae
488 with open(Task._lock_file_path(), 'w+') as fid: 1fae
489 json.dump(d, fid) 1fae
490 return d 1fae
492 @staticmethod
493 def _lock_file_path():
494 """the lock file is in ~/.one/gpu.lock"""
495 folder = Path.home().joinpath('.one') 1e
496 folder.mkdir(exist_ok=True) 1e
497 return folder.joinpath('gpu.lock') 1e
499 def _make_lock_file(self):
500 """creates a lock file with the current time"""
501 return Task.make_lock_file(self.name, self.time_out_secs) 1fe
503 def is_locked(self):
504 """Checks if there is a lock file for this given task"""
505 lock_file = self._lock_file_path() 1fae
506 if not lock_file.exists(): 1fae
507 return False 1fe
509 with open(lock_file) as fid: 1fa
510 d = json.load(fid) 1fa
511 now = time.time() 1fa
512 if (now - d['start']) > d['time_out_secs']: 1fa
513 lock_file.unlink() 1f
514 return False 1f
515 else:
516 return True 1fa
518 def _creates_lock(self):
519 if self.location == "popeye": 1fae
520 return True
521 if self.is_locked(): 1fae
522 return False 1fa
523 else:
524 self._make_lock_file() 1fe
525 return True 1fe
528class Pipeline(abc.ABC):
529 """
530 Pipeline class: collection of related and potentially interdependent tasks
531 """
532 tasks = OrderedDict()
533 one = None
535 def __init__(self, session_path=None, one=None, eid=None):
536 assert session_path or eid 2c nba d ] ^ _ ` { | } ~ abbbb
537 self.one = one 2c nba d ] ^ _ ` { | } ~ abbbb
538 if one and one.alyx.cache_mode and one.alyx.default_expiry.seconds > 1: 2c nba d ] ^ _ ` { | } ~ abbbb
539 _logger.warning('Alyx client REST cache active; this may cause issues with jobs')
540 self.eid = eid 2c nba d ] ^ _ ` { | } ~ abbbb
541 if self.one and not self.one.offline: 2c nba d ] ^ _ ` { | } ~ abbbb
542 self.data_repo = get_local_data_repository(self.one.alyx) 2c nba d b
543 else:
544 self.data_repo = None 2nb] ^ _ ` { | } ~ abbb
546 if session_path: 2c nba d ] ^ _ ` { | } ~ abbbb
547 self.session_path = session_path 2c nba d ] ^ _ ` { | } ~ abbbb
548 if not self.eid: 2c nba d ] ^ _ ` { | } ~ abbbb
549 # eID for newer sessions may not be in cache so use remote query
550 self.eid = one.path2eid(session_path, query_type='remote') if self.one else None 2nbd ] ^ _ ` { | } ~ abbbb
551 self.label = self.__module__ + '.' + type(self).__name__ 2c nba d ] ^ _ ` { | } ~ abbbb
553 @staticmethod
554 def _get_exec_name(obj):
555 """
556 For a class, get the executable name as it should be stored in Alyx. When the class
557 is created dynamically using the type() built-in function, need to revert to the base
558 class to be able to re-instantiate the class from the alyx dictionary on the client side
559 :param obj:
560 :return: string containing the full module plus class name
561 """
562 if obj.__module__ == 'abc': 2a @ [ pbd ] ^ _ ` { | } ~ abbbb
563 exec_name = f'{obj.__class__.__base__.__module__}.{obj.__class__.__base__.__name__}' 2@ [ pbd ] ^ _ ` { | } ~ abbbb
564 else:
565 exec_name = f'{obj.__module__}.{obj.name}' 1a
566 return exec_name 2a @ [ pbd ] ^ _ ` { | } ~ abbbb
568 def make_graph(self, out_dir=None, show=True):
569 if not out_dir: 1a
570 out_dir = self.one.alyx.cache_dir if self.one else one.params.get().CACHE_DIR 1a
571 m = Digraph('G', filename=str(Path(out_dir).joinpath(self.__module__ + '_graphs.gv'))) 1a
572 m.attr(rankdir='TD') 1a
574 e = Digraph(name='cluster_' + self.label) 1a
575 e.attr('node', shape='box') 1a
576 e.node('root', label=self.label) 1a
578 e.attr('node', shape='ellipse') 1a
579 for k in self.tasks: 1a
580 j = self.tasks[k] 1a
581 if len(j.parents) == 0: 1a
582 e.edge('root', j.name) 1a
583 else:
584 [e.edge(p.name, j.name) for p in j.parents] 1a
586 m.subgraph(e) 1a
587 m.attr(label=r'\n\Pre-processing\n') 1a
588 m.attr(fontsize='20') 1a
589 if show: 1a
590 m.view()
591 return m 1a
593 def create_alyx_tasks(self, rerun__status__in=None, tasks_list=None):
594 """
595 Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session.
597 If the jobs already exist, they are left untouched. The re-run parameter will re-init the
598 job by emptying the log and set the status to Waiting.
600 Parameters
601 ----------
602 rerun__status__in : list, str
603 To re-run tasks if they already exist, specify one or more statuses strings to will be
604 re-run, or '__all__' to re-run all tasks.
605 tasks_list : list
606 The list of tasks to create on Alyx. If None, uses self.tasks.
608 Returns
609 -------
610 list
611 List of Alyx task dictionaries (existing and/or created).
612 """
613 rerun__status__in = ([rerun__status__in] 1a@[db
614 if isinstance(rerun__status__in, str)
615 else rerun__status__in or [])
616 if '__all__' in rerun__status__in: 1a@[db
617 rerun__status__in = [x for x in TASK_STATUS_SET if x != 'Abandoned']
618 assert self.eid 1a@[db
619 if self.one is None: 1a@[db
620 _logger.warning('No ONE instance found for Alyx connection, set the one property')
621 return
622 tasks_alyx_pre = self.one.alyx.rest('tasks', 'list', session=self.eid, graph=self.name, no_cache=True) 1a@[db
623 tasks_alyx = [] 1a@[db
624 # creates all the tasks by iterating through the ordered dict
626 if tasks_list is not None: 1a@[db
627 task_items = tasks_list
628 # need to add in the session eid and the parents
629 else:
630 task_items = self.tasks.values() 1a@[db
632 for t in task_items: 1a@[db
633 # get the parents' alyx ids to reference in the database
634 if isinstance(t, dict): 1a@[db
635 t = Bunch(t)
636 executable = t.executable
637 arguments = t.arguments
638 t['time_out_secs'] = t['time_out_sec']
639 if len(t.parents) > 0:
640 pnames = t.parents
641 else:
642 executable = self._get_exec_name(t) 1a@[db
643 arguments = t.kwargs 1a@[db
644 if len(t.parents): 1a@[db
645 pnames = [p.name for p in t.parents] 1a@[db
647 if len(t.parents): 1a@[db
648 parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames] 1a@[db
649 else:
650 parents_ids = [] 1a@[db
652 task_dict = {'executable': executable, 'priority': t.priority, 1a@[db
653 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu,
654 'ram': t.ram, 'module': self.label, 'parents': parents_ids,
655 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid,
656 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name,
657 'arguments': arguments}
658 if self.data_repo: 1a@[db
659 task_dict.update({'data_repository': self.data_repo})
660 # if the task already exists, patch it otherwise, create it
661 talyx = next(filter(lambda x: x['name'] == t.name, tasks_alyx_pre), []) 1a@[db
662 if len(talyx) == 0: 1a@[db
663 talyx = self.one.alyx.rest('tasks', 'create', data=task_dict) 1a@[db
664 elif talyx['status'] in rerun__status__in: 1a@[
665 talyx = self.one.alyx.rest('tasks', 'partial_update', id=talyx['id'], data=task_dict)
666 tasks_alyx.append(talyx) 1a@[db
667 return tasks_alyx 1a@[db
669 def create_tasks_list_from_pipeline(self):
670 """
671 From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to
672 create alyx tasks
673 :return:
674 """
675 tasks_list = [] 2@ [ pb] ^ _ ` { | } ~ abbb
676 for k, t in self.tasks.items(): 2@ [ pb] ^ _ ` { | } ~ abbb
677 # get the parents' alyx ids to reference in the database
678 if len(t.parents): 2@ [ pb] ^ _ ` { | } ~ abbb
679 parent_names = [p.name for p in t.parents] 2@ [ pb] ^ _ ` { | } ~ abbb
680 else:
681 parent_names = [] 2@ [ pb] ^ _ ` { | } ~ abbb
683 task_dict = {'executable': self._get_exec_name(t), 'priority': t.priority, 2@ [ pb] ^ _ ` { | } ~ abbb
684 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu,
685 'ram': t.ram, 'module': self.label, 'parents': parent_names,
686 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid,
687 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name,
688 'arguments': t.kwargs}
689 if self.data_repo: 2@ [ pb] ^ _ ` { | } ~ abbb
690 task_dict.update({'data_repository': self.data_repo})
692 tasks_list.append(task_dict) 2@ [ pb] ^ _ ` { | } ~ abbb
694 return tasks_list 2@ [ pb] ^ _ ` { | } ~ abbb
696 def run(self, status__in=('Waiting',), machine=None, clobber=True, **kwargs):
697 """
698 Get all the session related jobs from alyx and run them
699 :param status__in: lists of status strings to run in
700 ['Waiting', 'Started', 'Errored', 'Empty', 'Complete']
701 :param machine: string identifying the machine the task is run on, optional
702 :param clobber: bool, if True any existing logs are overwritten, default is True
703 :param kwargs: arguments passed downstream to run_alyx_task
704 :return: jalyx: list of REST dictionaries of the job endpoints
705 :return: job_deck: list of REST dictionaries of the jobs endpoints
706 :return: all_datasets: list of REST dictionaries of the dataset endpoints
707 """
708 assert self.session_path, 'Pipeline object has to be declared with a session path to run' 1a
709 if self.one is None: 1a
710 _logger.warning('No ONE instance found for Alyx connection, set the one property')
711 return
712 task_deck = self.one.alyx.rest('tasks', 'list', session=self.eid, no_cache=True) 1a
713 # [(t['name'], t['level']) for t in task_deck]
714 all_datasets = [] 1a
715 for i, j in enumerate(task_deck): 1a
716 if j['status'] not in status__in: 1a
717 continue 1a
718 # here we update the status in-place to avoid another hit to the database
719 task_deck[i], dsets = run_alyx_task(tdict=j, session_path=self.session_path, 1a
720 one=self.one, job_deck=task_deck,
721 machine=machine, clobber=clobber, **kwargs)
722 if dsets is not None: 1a
723 all_datasets.extend(dsets) 1a
724 return task_deck, all_datasets 1a
726 def rerun_failed(self, **kwargs):
727 return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs) 1a
729 def rerun(self, **kwargs):
731 return self.run(status__in=[x for x in TASK_STATUS_SET if x != 'Abandoned'], **kwargs)
733 @property
734 def name(self):
735 return self.__class__.__name__ 2a @ [ pbd ] ^ _ ` { | } ~ abbbb
738def str2class(task_executable: str):
739 """
740 Convert task name to class.
742 Parameters
743 ----------
744 task_executable : str
745 A Task class name, e.g. 'ibllib.pipes.behavior_tasks.TrialRegisterRaw'.
747 Returns
748 -------
749 class
750 The imported class.
751 """
752 strmodule, strclass = task_executable.rsplit('.', 1) 2KbLba Mbd b
753 return getattr(importlib.import_module(strmodule), strclass) 2KbLba Mbd b
756def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None,
757 max_md5_size=None, machine=None, clobber=True, location='server', mode='log'):
758 """
759 Runs a single Alyx job and registers output datasets.
761 Parameters
762 ----------
763 tdict : dict
764 An Alyx task dictionary to instantiate and run.
765 session_path : str, pathlib.Path
766 A session path containing the task input data.
767 one : one.api.OneAlyx
768 An instance of ONE.
769 job_deck : list of dict, optional
770 A list of all tasks in the same pipeline. If None, queries Alyx to get this.
771 max_md5_size : int, optional
772 An optional maximum file size in bytes. Files with sizes larger than this will not have
773 their MD5 checksum calculated to save time.
774 machine : str, optional
775 A string identifying the machine the task is run on.
776 clobber : bool, default=True
777 If true any existing logs are overwritten on Alyx.
778 location : {'remote', 'server', 'sdsc', 'aws'}
779 Where you are running the task, 'server' - local lab server, 'remote' - any
780 compute node/ computer, 'sdsc' - Flatiron compute node, 'aws' - using data from AWS S3
781 node.
782 mode : {'log', 'raise}, default='log'
783 Behaviour to adopt if an error occurred. If 'raise', it will raise the error at the very
784 end of this function (i.e. after having labeled the tasks).
786 Returns
787 -------
788 dict
789 The updated task dict.
790 list of pathlib.Path
791 A list of registered datasets.
792 """
793 registered_dsets = [] 1adb
794 # here we need to check parents' status, get the job_deck if not available
795 if not job_deck: 1adb
796 job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True) 1db
797 if len(tdict['parents']): 1adb
798 # check the dependencies
799 parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck) 1adb
800 parent_statuses = [j['status'] for j in parent_tasks] 1adb
801 # if any of the parent tasks is not complete, throw a warning
802 if not set(parent_statuses) <= {'Complete', 'Incomplete'}: 1adb
803 _logger.warning(f"{tdict['name']} has unmet dependencies") 1ab
804 # if parents are waiting or failed, set the current task status to Held
805 # once the parents ran, the descendent tasks will be set from Held to Waiting (see below)
806 if set(parent_statuses).intersection({'Errored', 'Held', 'Empty', 'Waiting', 'Started', 'Abandoned'}): 1ab
807 tdict = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Held'}) 1ab
808 return tdict, registered_dsets 1ab
809 # creates the job from the module name in the database
810 classe = str2class(tdict['executable']) 1adb
811 tkwargs = tdict.get('arguments') or {} # if the db field is null it returns None 1adb
812 task = classe(session_path, one=one, taskid=tdict['id'], machine=machine, clobber=clobber, 1adb
813 location=location, **tkwargs)
814 # sets the status flag to started before running
815 one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Started'}) 1adb
816 status = task.run() 1adb
817 patch_data = {'time_elapsed_secs': task.time_elapsed_secs, 'log': task.log, 1adb
818 'version': task.version}
819 # if there is no data to register, set status to Empty
820 if task.outputs is None: # NB: an empty list is still considered Complete. 1adb
821 patch_data['status'] = 'Empty' 1ab
822 # otherwise register data and set (provisional) status to Complete
823 else:
824 try: 1adb
825 kwargs = dict(max_md5_size=max_md5_size) 1adb
826 if location == 'server': 1adb
827 # Explicitly pass lab as lab cannot be inferred from path (which the registration client tries to do).
828 # To avoid making extra REST requests we can also set labs=None if using ONE v1.20.1.
829 kwargs['labs'] = get_lab(session_path, one.alyx) 1adb
830 registered_dsets = task.register_datasets(**kwargs) 1adb
831 patch_data['status'] = 'Complete' 1adb
832 except Exception:
833 _logger.error(traceback.format_exc())
834 status = -1
836 # overwrite status to errored
837 if status == -1: 1adb
838 patch_data['status'] = 'Errored' 1a
839 # Status -2 means a lock was encountered during run, should be rerun
840 elif status == -2: 1adb
841 patch_data['status'] = 'Waiting' 1a
842 # Status -3 should be returned if a task is Incomplete
843 elif status == -3: 1adb
844 patch_data['status'] = 'Incomplete' 1a
845 # update task status on Alyx
846 t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data) 1adb
847 # check for dependent held tasks
848 # NB: Assumes dependent tasks are all part of the same session!
849 next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck 1adb
850 dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck) 1adb
851 for d in dependent_tasks: 1adb
852 assert d['id'] != t['id'], 'task its own parent' 1a
853 # if all their parent tasks now complete, set to waiting
854 parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']] 1a
855 if set(parent_status) <= {'Complete', 'Incomplete'}: 1a
856 one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'}) 1a
857 task.cleanUp() 1adb
858 if mode == 'raise' and status != 0: 1adb
859 raise ValueError(task.log)
860 return t, registered_dsets 1adb