Coverage for ibllib/pipes/tasks.py: 88%

395 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1from pathlib import Path 

2import abc 

3import logging 

4import io 

5import importlib 

6import time 

7from _collections import OrderedDict 

8import traceback 

9import json 

10 

11from graphviz import Digraph 

12 

13import ibllib 

14from ibllib.oneibl import data_handlers 

15from ibllib.oneibl.data_handlers import get_local_data_repository 

16from ibllib.oneibl.registration import get_lab 

17from iblutil.util import Bunch 

18import one.params 

19from one.api import ONE 

20from one import webclient 

21 

22_logger = logging.getLogger(__name__) 

23TASK_STATUS_SET = {'Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete', 'Abandoned'} 

24 

25 

26class Task(abc.ABC): 

27 log = '' # place holder to keep the log of the task for registration 

28 cpu = 1 # CPU resource 

29 gpu = 0 # GPU resources: as of now, either 0 or 1 

30 io_charge = 5 # integer percentage 

31 priority = 30 # integer percentage, 100 means highest priority 

32 ram = 4 # RAM needed to run (GB) 

33 one = None # one instance (optional) 

34 level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task 

35 outputs = None # place holder for a list of Path containing output files 

36 time_elapsed_secs = None 

37 time_out_secs = 3600 * 2 # time-out after which a task is considered dead 

38 version = ibllib.__version__ 

39 signature = {'input_files': [], 'output_files': []} # list of tuples (filename, collection, required_flag) 

40 force = False # whether or not to re-download missing input files on local server if not present 

41 job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services 

42 

43 def __init__(self, session_path, parents=None, taskid=None, one=None, 

44 machine=None, clobber=True, location='server', **kwargs): 

45 """ 

46 Base task class 

47 :param session_path: session path 

48 :param parents: parents 

49 :param taskid: alyx task id 

50 :param one: one instance 

51 :param machine: 

52 :param clobber: whether or not to overwrite log on rerun 

53 :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node, 

54 data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS), 

55 or 'SDSC' (SDSC flatiron compute node) # TODO 'Globus' (remote compute node, data required for task downloaded via Globus) 

56 :param args: running arguments 

57 """ 

58 self.taskid = taskid 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

59 self.one = one 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

60 self.session_path = session_path 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

61 self.register_kwargs = {} 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

62 if parents: 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

63 self.parents = parents 2d a e ^ _ ` { | } ~ abbbcbb c nb

64 self.level = max(p.level for p in self.parents) + 1 2d a e ^ _ ` { | } ~ abbbcbb c nb

65 else: 

66 self.parents = [] 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

67 self.machine = machine 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

68 self.clobber = clobber 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

69 self.location = location 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

70 self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

71 self.kwargs = kwargs 2d qbrbobsb# f a E g M u N O K h y z i A j k B l m C e ^ _ ` { | } ~ abbbcb$ P F Q R G S T U V W X % Y H v ' tbubvb. ( ) wbZ L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c nbD !

72 

73 @property 

74 def name(self): 

75 return self.__class__.__name__ 2d f a [ ] pbe ^ _ ` { | } ~ abbbcbb c

76 

77 def path2eid(self): 

78 """ 

79 Fetch the experiment UUID from the Task session path, without using the REST cache. 

80 

81 This method ensures that the eid will be returned for newly created sessions. 

82 

83 Returns 

84 ------- 

85 str 

86 The experiment UUID corresponding to the session path. 

87 """ 

88 assert self.session_path and self.one and not self.one.offline 2ob

89 with webclient.no_cache(self.one.alyx): 2ob

90 return self.one.path2eid(self.session_path, query_type='remote') 2ob

91 

92 def run(self, **kwargs): 

93 """ 

94 --- do not overload, see _run() below--- 

95 wraps the _run() method with 

96 - error management 

97 - logging to variable 

98 - writing a lock file if the GPU is used 

99 - labels the status property of the object. The status value is labeled as: 

100 0: Complete 

101 -1: Errored 

102 -2: Didn't run as a lock was encountered 

103 -3: Incomplete 

104 """ 

105 # if task id of one properties are not available, local run only without alyx 

106 use_alyx = self.one is not None and self.taskid is not None 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

107 if use_alyx: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

108 # check that alyx user is logged in 

109 if not self.one.alyx.is_logged_in: 1aebc

110 self.one.alyx.authenticate() 

111 tdict = self.one.alyx.rest('tasks', 'partial_update', id=self.taskid, 1aebc

112 data={'status': 'Started'}) 

113 self.log = ('' if not tdict['log'] else tdict['log'] + 1aebc

114 '\n\n=============================RERUN=============================\n') 

115 

116 # Setup the console handler with a StringIO object 

117 logger_level = _logger.level 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

118 log_capture_string = io.StringIO() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

119 ch = logging.StreamHandler(log_capture_string) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

120 str_format = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

121 ch.setFormatter(logging.Formatter(str_format)) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

122 _logger.parent.addHandler(ch) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

123 _logger.parent.setLevel(logging.INFO) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

124 _logger.info(f'Starting job {self.__class__}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

125 if self.machine: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

126 _logger.info(f'Running on machine: {self.machine}') 1a

127 _logger.info(f'running ibllib version {ibllib.__version__}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

128 # setup 

129 start_time = time.time() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

130 try: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

131 setup = self.setUp(**kwargs) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

132 _logger.info(f'Setup value is: {setup}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

133 self.status = 0 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

134 if not setup: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

135 # case where outputs are present but don't have input files locally to rerun task 

136 # label task as complete 

137 _, self.outputs = self.assert_expected_outputs() 

138 else: 

139 # run task 

140 if self.gpu >= 1: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

141 if not self._creates_lock(): 1fab

142 self.status = -2 1fa

143 _logger.info(f'Job {self.__class__} exited as a lock was found') 1fa

144 new_log = log_capture_string.getvalue() 1fa

145 self.log = new_log if self.clobber else self.log + new_log 1fa

146 _logger.removeHandler(ch) 1fa

147 ch.close() 1fa

148 return self.status 1fa

149 self.outputs = self._run(**kwargs) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

150 _logger.info(f'Job {self.__class__} complete') 1d@#faEgMNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567/:,;?bt=-89cD!

151 except Exception: 1auvwxbc

152 _logger.error(traceback.format_exc()) 1auvwxbc

153 _logger.info(f'Job {self.__class__} errored') 1auvwxbc

154 self.status = -1 1auvwxbc

155 

156 self.time_elapsed_secs = time.time() - start_time 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

157 # log the outputs 

158 if isinstance(self.outputs, list): 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

159 nout = len(self.outputs) 1d@#aEgMNOhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()Z*0I12J3+nopqrs4567/:,;?bt=-89c!

160 elif self.outputs is None: 1faEuKvLwx:bcD

161 nout = 0 1auKvwx:bcD

162 else: 

163 nout = 1 1faEL

164 

165 _logger.info(f'N outputs: {nout}') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

166 _logger.info(f'--- {self.time_elapsed_secs} seconds run-time ---') 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

167 # after the run, capture the log output, amend to any existing logs if not overwrite 

168 new_log = log_capture_string.getvalue() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

169 self.log = new_log if self.clobber else self.log + new_log 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

170 _logger.removeHandler(ch) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

171 ch.close() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

172 _logger.setLevel(logger_level) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

173 # tear down 

174 self.tearDown() 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

175 return self.status 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

176 

177 def register_datasets(self, one=None, **kwargs): 

178 """ 

179 Register output datasets form the task to Alyx 

180 :param one: 

181 :param jobid: 

182 :param kwargs: directly passed to the register_dataset function 

183 :return: 

184 """ 

185 _ = self.register_images() 1aebc

186 

187 return self.data_handler.uploadData(self.outputs, self.version, **kwargs) 1aebc

188 

189 def register_images(self, **kwargs): 

190 """ 

191 Registers images to alyx database 

192 :return: 

193 """ 

194 if self.one and len(self.plot_tasks) > 0: 1aebc

195 for plot_task in self.plot_tasks: 1b

196 try: 1b

197 _ = plot_task.register_images(widths=['orig']) 1b

198 except Exception: 

199 _logger.error(traceback.format_exc()) 

200 continue 

201 

202 def rerun(self): 

203 self.run(overwrite=True) 

204 

205 def get_signatures(self, **kwargs): 

206 """ 

207 This is the default but should be overwritten for each task 

208 :return: 

209 """ 

210 self.input_files = self.signature['input_files'] 2d # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x : ? b dbt = 8 9 c D !

211 self.output_files = self.signature['output_files'] 2d # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x : ? b dbt = 8 9 c D !

212 

213 @abc.abstractmethod 

214 def _run(self, overwrite=False): 

215 """ 

216 This is the method to implement 

217 :param overwrite: (bool) if the output already exists, 

218 :return: out_files: files to be registered. Could be a list of files (pathlib.Path), 

219 a single file (pathlib.Path) an empty list [] or None. 

220 Within the pipeline, there is a distinction between a job that returns an empty list 

221 and a job that returns None. If the function returns None, the job will be labeled as 

222 "empty" status in the database, otherwise, the job has an expected behaviour of not 

223 returning any dataset. 

224 """ 

225 

226 def setUp(self, **kwargs): 

227 """ 

228 Setup method to get the data handler and ensure all data is available locally to run task 

229 :param kwargs: 

230 :return: 

231 """ 

232 if self.location == 'server': 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

233 self.get_signatures(**kwargs) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

234 

235 input_status, _ = self.assert_expected_inputs(raise_error=False) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

236 output_status, _ = self.assert_expected(self.output_files, silent=True) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

237 

238 if input_status: 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

239 self.data_handler = self.get_data_handler() 1d@#aEgMuNOKhyziAjkBlmCe$PQRSTUVWX%Yv'.()ZL*0123+nopqrs4567wx/:,;bt=-89c!

240 _logger.info('All input files found: running task') 1d@#aEgMuNOKhyziAjkBlmCe$PQRSTUVWX%Yv'.()ZL*0123+nopqrs4567wx/:,;bt=-89c!

241 return True 1d@#aEgMuNOKhyziAjkBlmCe$PQRSTUVWX%Yv'.()ZL*0123+nopqrs4567wx/:,;bt=-89c!

242 

243 if not self.force: 1eFGHIJbtcD

244 self.data_handler = self.get_data_handler() 1eFGHIJbtcD

245 _logger.warning('Not all input files found locally: will still attempt to rerun task') 1eFGHIJbtcD

246 # TODO in the future once we are sure that input output task signatures work properly should return False 

247 # _logger.info('All output files found but input files required not available locally: task not rerun') 

248 return True 1eFGHIJbtcD

249 else: 

250 # Attempts to download missing data using globus 

251 _logger.info('Not all input files found locally: attempting to re-download required files') 

252 self.data_handler = self.get_data_handler(location='serverglobus') 

253 self.data_handler.setUp() 

254 # Double check we now have the required files to run the task 

255 # TODO in future should raise error if even after downloading don't have the correct files 

256 self.assert_expected_inputs(raise_error=False) 

257 return True 

258 else: 

259 self.data_handler = self.get_data_handler() 1f?

260 self.data_handler.setUp() 1f?

261 self.get_signatures(**kwargs) 1f?

262 self.assert_expected_inputs() 1f?

263 return True 1f?

264 

265 def tearDown(self): 

266 """ 

267 Function after runs() 

268 Does not run if a lock is encountered by the task (status -2) 

269 """ 

270 if self.gpu >= 1: 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

271 if self._lock_file_path().exists(): 1fb

272 self._lock_file_path().unlink() 1fb

273 

274 def cleanUp(self): 

275 """ 

276 Function to optionally overload to clean up 

277 :return: 

278 """ 

279 self.data_handler.cleanUp() 1aebc

280 

281 def assert_expected_outputs(self, raise_error=True): 

282 """ 

283 After a run, asserts that all signature files are present at least once in the output files 

284 Mainly useful for integration tests 

285 :return: 

286 """ 

287 assert self.status == 0 1ghyziAjkBlmCnopqrs

288 _logger.info('Checking output files') 1ghyziAjkBlmCnopqrs

289 everything_is_fine, files = self.assert_expected(self.output_files) 1ghyziAjkBlmCnopqrs

290 

291 if not everything_is_fine: 1ghyziAjkBlmCnopqrs

292 for out in self.outputs: 

293 _logger.error(f'{out}') 

294 if raise_error: 

295 raise FileNotFoundError("Missing outputs after task completion") 

296 

297 return everything_is_fine, files 1ghyziAjkBlmCnopqrs

298 

299 def assert_expected_inputs(self, raise_error=True): 

300 """ 

301 Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file 

302 system already 

303 :return: 

304 """ 

305 _logger.info('Checking input files') 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

306 everything_is_fine, files = self.assert_expected(self.input_files) 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

307 

308 if not everything_is_fine and raise_error: 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

309 raise FileNotFoundError('Missing inputs to run task') 

310 

311 return everything_is_fine, files 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

312 

313 def assert_expected(self, expected_files, silent=False): 

314 everything_is_fine = True 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

315 files = [] 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

316 for expected_file in expected_files: 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

317 actual_files = list(Path(self.session_path).rglob(str(Path(expected_file[1]).joinpath(expected_file[0])))) 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !

318 if len(actual_files) == 0 and expected_file[2]: 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !

319 everything_is_fine = False 2d @ E g M u N O K h i j k l m e P F Q R G S T U V W X Y H v ' ( ) Z L 0 I 1 2 J 3 n o p q r s 4 5 6 7 w x , b dbebfbgbhbibjbkblbmbt - 8 9 c D !

320 if not silent: 2d @ E g M u N O K h i j k l m e P F Q R G S T U V W X Y H v ' ( ) Z L 0 I 1 2 J 3 n o p q r s 4 5 6 7 w x , b dbebfbgbhbibjbkblbmbt - 8 9 c D !

321 _logger.error(f'Signature file expected {expected_file} not found') 2e F G H I J b dbebfbgbhbibjbkblbmbt c D

322 else: 

323 if len(actual_files) != 0: 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !

324 files.append(actual_files[0]) 2d @ # E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / , ; ? b dbebfbgbhbibjbkblbmbt - 8 9 c D !

325 

326 return everything_is_fine, files 2d @ # f a E g M u N O K h y z i A j k B l m C e $ P F Q R G S T U V W X % Y H v ' . ( ) Z L * 0 I 1 2 J 3 + n o p q r s 4 5 6 7 w x / : , ; ? b dbebfbgbhbibjbkblbmbt = - 8 9 c D !

327 

328 def get_data_handler(self, location=None): 

329 """ 

330 Gets the relevant data handler based on location argument 

331 :return: 

332 """ 

333 location = str.lower(location or self.location) 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

334 if location == 'local': 1d@#faEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;?bt=-89cD!

335 return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one) 1f?

336 self.one = self.one or ONE() 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

337 if location == 'server': 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

338 dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one) 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

339 elif location == 'serverglobus': 

340 dhandler = data_handlers.ServerGlobusDataHandler(self.session_path, self.signature, one=self.one) 

341 elif location == 'remote': 

342 dhandler = data_handlers.RemoteHttpDataHandler(self.session_path, self.signature, one=self.one) 

343 elif location == 'aws': 

344 dhandler = data_handlers.RemoteAwsDataHandler(self, self.session_path, self.signature, one=self.one) 

345 elif location == 'sdsc': 

346 dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one) 

347 else: 

348 raise ValueError(f'Unknown location "{location}"') 

349 return dhandler 1d@#aEgMuNOKhyziAjkBlmCe$PFQRGSTUVWX%YHv'.()ZL*0I12J3+nopqrs4567wx/:,;bt=-89cD!

350 

351 @staticmethod 

352 def make_lock_file(taskname='', time_out_secs=7200): 

353 """Creates a GPU lock file with a timeout of""" 

354 d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs} 1fab

355 with open(Task._lock_file_path(), 'w+') as fid: 1fab

356 json.dump(d, fid) 1fab

357 return d 1fab

358 

359 @staticmethod 

360 def _lock_file_path(): 

361 """the lock file is in ~/.one/gpu.lock""" 

362 folder = Path.home().joinpath('.one') 1b

363 folder.mkdir(exist_ok=True) 1b

364 return folder.joinpath('gpu.lock') 1b

365 

366 def _make_lock_file(self): 

367 """creates a lock file with the current time""" 

368 return Task.make_lock_file(self.name, self.time_out_secs) 1fb

369 

370 def is_locked(self): 

371 """Checks if there is a lock file for this given task""" 

372 lock_file = self._lock_file_path() 1fab

373 if not lock_file.exists(): 1fab

374 return False 1fb

375 

376 with open(lock_file) as fid: 1fa

377 d = json.load(fid) 1fa

378 now = time.time() 1fa

379 if (now - d['start']) > d['time_out_secs']: 1fa

380 lock_file.unlink() 1f

381 return False 1f

382 else: 

383 return True 1fa

384 

385 def _creates_lock(self): 

386 if self.is_locked(): 1fab

387 return False 1fa

388 else: 

389 self._make_lock_file() 1fb

390 return True 1fb

391 

392 

393class Pipeline(abc.ABC): 

394 """ 

395 Pipeline class: collection of related and potentially interdependent tasks 

396 """ 

397 tasks = OrderedDict() 

398 one = None 

399 

400 def __init__(self, session_path=None, one=None, eid=None): 

401 assert session_path or eid 2d a e ^ _ ` { | } ~ abbbcbb c nb

402 self.one = one 2d a e ^ _ ` { | } ~ abbbcbb c nb

403 if one and one.alyx.cache_mode and one.alyx.default_expiry.seconds > 1: 2d a e ^ _ ` { | } ~ abbbcbb c nb

404 _logger.warning('Alyx client REST cache active; this may cause issues with jobs') 

405 self.eid = eid 2d a e ^ _ ` { | } ~ abbbcbb c nb

406 if self.one and not self.one.offline: 2d a e ^ _ ` { | } ~ abbbcbb c nb

407 self.data_repo = get_local_data_repository(self.one.alyx) 1daebc

408 else: 

409 self.data_repo = None 2e ^ _ ` { | } ~ abbbcbb c nb

410 

411 if session_path: 2d a e ^ _ ` { | } ~ abbbcbb c nb

412 self.session_path = session_path 2d a e ^ _ ` { | } ~ abbbcbb c nb

413 if not self.eid: 2d a e ^ _ ` { | } ~ abbbcbb c nb

414 # eID for newer sessions may not be in cache so use remote query 

415 self.eid = one.path2eid(session_path, query_type='remote') if self.one else None 2e ^ _ ` { | } ~ abbbcbb c nb

416 self.label = self.__module__ + '.' + type(self).__name__ 2d a e ^ _ ` { | } ~ abbbcbb c nb

417 

418 @staticmethod 

419 def _get_exec_name(obj): 

420 """ 

421 For a class, get the executable name as it should be stored in Alyx. When the class 

422 is created dynamically using the type() built-in function, need to revert to the base 

423 class to be able to re-instantiate the class from the alyx dictionary on the client side 

424 :param obj: 

425 :return: string containing the full module plus class name 

426 """ 

427 if obj.__module__ == 'abc': 2a [ ] pbe ^ _ ` { | } ~ abbbcbb c

428 exec_name = f'{obj.__class__.__base__.__module__}.{obj.__class__.__base__.__name__}' 2[ ] pbe ^ _ ` { | } ~ abbbcb

429 else: 

430 exec_name = f'{obj.__module__}.{obj.name}' 1abc

431 return exec_name 2a [ ] pbe ^ _ ` { | } ~ abbbcbb c

432 

433 def make_graph(self, out_dir=None, show=True): 

434 if not out_dir: 1a

435 out_dir = self.one.alyx.cache_dir if self.one else one.params.get().CACHE_DIR 1a

436 m = Digraph('G', filename=str(Path(out_dir).joinpath(self.__module__ + '_graphs.gv'))) 1a

437 m.attr(rankdir='TD') 1a

438 

439 e = Digraph(name='cluster_' + self.label) 1a

440 e.attr('node', shape='box') 1a

441 e.node('root', label=self.label) 1a

442 

443 e.attr('node', shape='ellipse') 1a

444 for k in self.tasks: 1a

445 j = self.tasks[k] 1a

446 if len(j.parents) == 0: 1a

447 e.edge('root', j.name) 1a

448 else: 

449 [e.edge(p.name, j.name) for p in j.parents] 1a

450 

451 m.subgraph(e) 1a

452 m.attr(label=r'\n\Pre-processing\n') 1a

453 m.attr(fontsize='20') 1a

454 if show: 1a

455 m.view() 

456 return m 1a

457 

458 def create_alyx_tasks(self, rerun__status__in=None, tasks_list=None): 

459 """ 

460 Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session 

461 If the jobs already exist, they are left untouched. The re-run parameter will re-init the 

462 job by emptying the log and set the status to Waiting. 

463 

464 Parameters 

465 ---------- 

466 rerun__status__in : list, str 

467 To re-run tasks if they already exist, specify one or more statuses strings to will be 

468 re-run, or '__all__' to re-run all tasks. 

469 tasks_list : list 

470 The list of tasks to create on Alyx. If None, uses self.tasks. 

471 

472 Returns 

473 ------- 

474 list 

475 List of Alyx task dictionaries (existing and/or created). 

476 """ 

477 rerun__status__in = ([rerun__status__in] 1a[]ebc

478 if isinstance(rerun__status__in, str) 

479 else rerun__status__in or []) 

480 if '__all__' in rerun__status__in: 1a[]ebc

481 rerun__status__in = [x for x in TASK_STATUS_SET if x != 'Abandoned'] 

482 assert self.eid 1a[]ebc

483 if self.one is None: 1a[]ebc

484 _logger.warning('No ONE instance found for Alyx connection, set the one property') 

485 return 

486 tasks_alyx_pre = self.one.alyx.rest('tasks', 'list', session=self.eid, graph=self.name, no_cache=True) 1a[]ebc

487 tasks_alyx = [] 1a[]ebc

488 # creates all the tasks by iterating through the ordered dict 

489 

490 if tasks_list is not None: 1a[]ebc

491 task_items = tasks_list 

492 # need to add in the session eid and the parents 

493 else: 

494 task_items = self.tasks.values() 1a[]ebc

495 

496 for t in task_items: 1a[]ebc

497 # get the parents' alyx ids to reference in the database 

498 if isinstance(t, dict): 1a[]ebc

499 t = Bunch(t) 

500 executable = t.executable 

501 arguments = t.arguments 

502 t['time_out_secs'] = t['time_out_sec'] 

503 if len(t.parents) > 0: 

504 pnames = t.parents 

505 else: 

506 executable = self._get_exec_name(t) 1a[]ebc

507 arguments = t.kwargs 1a[]ebc

508 if len(t.parents): 1a[]ebc

509 pnames = [p.name for p in t.parents] 1a[]ebc

510 

511 if len(t.parents): 1a[]ebc

512 parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames] 1a[]ebc

513 else: 

514 parents_ids = [] 1a[]ebc

515 

516 task_dict = {'executable': executable, 'priority': t.priority, 1a[]ebc

517 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 

518 'ram': t.ram, 'module': self.label, 'parents': parents_ids, 

519 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 

520 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 

521 'arguments': arguments} 

522 if self.data_repo: 1a[]ebc

523 task_dict.update({'data_repository': self.data_repo}) 

524 # if the task already exists, patch it otherwise, create it 

525 talyx = next(filter(lambda x: x['name'] == t.name, tasks_alyx_pre), []) 1a[]ebc

526 if len(talyx) == 0: 1a[]ebc

527 talyx = self.one.alyx.rest('tasks', 'create', data=task_dict) 1a[]ebc

528 elif talyx['status'] in rerun__status__in: 1a[]

529 talyx = self.one.alyx.rest('tasks', 'partial_update', id=talyx['id'], data=task_dict) 

530 tasks_alyx.append(talyx) 1a[]ebc

531 return tasks_alyx 1a[]ebc

532 

533 def create_tasks_list_from_pipeline(self): 

534 """ 

535 From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to 

536 create alyx tasks 

537 :return: 

538 """ 

539 tasks_list = [] 2[ ] pb^ _ ` { | } ~ abbbcb

540 for k, t in self.tasks.items(): 2[ ] pb^ _ ` { | } ~ abbbcb

541 # get the parents' alyx ids to reference in the database 

542 if len(t.parents): 2[ ] pb^ _ ` { | } ~ abbbcb

543 parent_names = [p.name for p in t.parents] 2[ ] pb^ _ ` { | } ~ abbbcb

544 else: 

545 parent_names = [] 2[ ] pb^ _ ` { | } ~ abbbcb

546 

547 task_dict = {'executable': self._get_exec_name(t), 'priority': t.priority, 2[ ] pb^ _ ` { | } ~ abbbcb

548 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 

549 'ram': t.ram, 'module': self.label, 'parents': parent_names, 

550 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 

551 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 

552 'arguments': t.kwargs} 

553 if self.data_repo: 2[ ] pb^ _ ` { | } ~ abbbcb

554 task_dict.update({'data_repository': self.data_repo}) 

555 

556 tasks_list.append(task_dict) 2[ ] pb^ _ ` { | } ~ abbbcb

557 

558 return tasks_list 2[ ] pb^ _ ` { | } ~ abbbcb

559 

560 def run(self, status__in=('Waiting',), machine=None, clobber=True, **kwargs): 

561 """ 

562 Get all the session related jobs from alyx and run them 

563 :param status__in: lists of status strings to run in 

564 ['Waiting', 'Started', 'Errored', 'Empty', 'Complete'] 

565 :param machine: string identifying the machine the task is run on, optional 

566 :param clobber: bool, if True any existing logs are overwritten, default is True 

567 :param kwargs: arguments passed downstream to run_alyx_task 

568 :return: jalyx: list of REST dictionaries of the job endpoints 

569 :return: job_deck: list of REST dictionaries of the jobs endpoints 

570 :return: all_datasets: list of REST dictionaries of the dataset endpoints 

571 """ 

572 assert self.session_path, 'Pipeline object has to be declared with a session path to run' 1a

573 if self.one is None: 1a

574 _logger.warning('No ONE instance found for Alyx connection, set the one property') 

575 return 

576 task_deck = self.one.alyx.rest('tasks', 'list', session=self.eid, no_cache=True) 1a

577 # [(t['name'], t['level']) for t in task_deck] 

578 all_datasets = [] 1a

579 for i, j in enumerate(task_deck): 1a

580 if j['status'] not in status__in: 1a

581 continue 1a

582 # here we update the status in-place to avoid another hit to the database 

583 task_deck[i], dsets = run_alyx_task(tdict=j, session_path=self.session_path, 1a

584 one=self.one, job_deck=task_deck, 

585 machine=machine, clobber=clobber, **kwargs) 

586 if dsets is not None: 1a

587 all_datasets.extend(dsets) 1a

588 return task_deck, all_datasets 1a

589 

590 def rerun_failed(self, **kwargs): 

591 return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs) 1a

592 

593 def rerun(self, **kwargs): 

594 

595 return self.run(status__in=[x for x in TASK_STATUS_SET if x != 'Abandoned'], **kwargs) 

596 

597 @property 

598 def name(self): 

599 return self.__class__.__name__ 2a [ ] pbe ^ _ ` { | } ~ abbbcbb c

600 

601 

602def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, 

603 max_md5_size=None, machine=None, clobber=True, location='server', mode='log'): 

604 """ 

605 Runs a single Alyx job and registers output datasets 

606 :param tdict: 

607 :param session_path: 

608 :param one: 

609 :param job_deck: optional list of job dictionaries belonging to the session. Needed 

610 to check dependency status if the jdict has a parent field. If jdict has a parent and 

611 job_deck is not entered, will query the database 

612 :param max_md5_size: in bytes, if specified, will not compute the md5 checksum above a given 

613 filesize to save time 

614 :param machine: string identifying the machine the task is run on, optional 

615 :param clobber: bool, if True any existing logs are overwritten, default is True 

616 :param location: where you are running the task, 'server' - local lab server, 'remote' - any 

617 compute node/ computer, 'SDSC' - flatiron compute node, 'AWS' - using data from aws s3 

618 :param mode: str ('log' or 'raise') behaviour to adopt if an error occured. If 'raise', it 

619 will Raise the error at the very end of this function (ie. after having labeled the tasks) 

620 :return: 

621 """ 

622 registered_dsets = [] 1aebc

623 # here we need to check parents' status, get the job_deck if not available 

624 if not job_deck: 1aebc

625 job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True) 1ebc

626 if len(tdict['parents']): 1aebc

627 # check the dependencies 

628 parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck) 1aebc

629 parent_statuses = [j['status'] for j in parent_tasks] 1aebc

630 # if any of the parent tasks is not complete, throw a warning 

631 if not set(parent_statuses) <= {'Complete', 'Incomplete'}: 1aebc

632 _logger.warning(f"{tdict['name']} has unmet dependencies") 1ac

633 # if parents are waiting or failed, set the current task status to Held 

634 # once the parents ran, the descendent tasks will be set from Held to Waiting (see below) 

635 if set(parent_statuses).intersection({'Errored', 'Held', 'Empty', 'Waiting', 'Started', 'Abandoned'}): 1ac

636 tdict = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Held'}) 1ac

637 return tdict, registered_dsets 1ac

638 # creates the job from the module name in the database 

639 exec_name = tdict['executable'] 1aebc

640 strmodule, strclass = exec_name.rsplit('.', 1) 1aebc

641 classe = getattr(importlib.import_module(strmodule), strclass) 1aebc

642 tkwargs = tdict.get('arguments') or {} # if the db field is null it returns None 1aebc

643 task = classe(session_path, one=one, taskid=tdict['id'], machine=machine, clobber=clobber, 1aebc

644 location=location, **tkwargs) 

645 # sets the status flag to started before running 

646 one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Started'}) 1aebc

647 status = task.run() 1aebc

648 patch_data = {'time_elapsed_secs': task.time_elapsed_secs, 'log': task.log, 1aebc

649 'version': task.version} 

650 # if there is no data to register, set status to Empty 

651 if task.outputs is None: 1aebc

652 patch_data['status'] = 'Empty' 1abc

653 # otherwise register data and set (provisional) status to Complete 

654 else: 

655 try: 1aebc

656 kwargs = dict(one=one, max_md5_size=max_md5_size) 1aebc

657 if location == 'server': 1aebc

658 # Explicitly pass lab as lab cannot be inferred from path (which the registration client tries to do). 

659 # To avoid making extra REST requests we can also set labs=None if using ONE v1.20.1. 

660 kwargs['labs'] = get_lab(session_path, one.alyx) 1aebc

661 registered_dsets = task.register_datasets(**kwargs) 1aebc

662 patch_data['status'] = 'Complete' 1aebc

663 except Exception: 

664 _logger.error(traceback.format_exc()) 

665 status = -1 

666 

667 # overwrite status to errored 

668 if status == -1: 1aebc

669 patch_data['status'] = 'Errored' 1ab

670 # Status -2 means a lock was encountered during run, should be rerun 

671 elif status == -2: 1aebc

672 patch_data['status'] = 'Waiting' 1a

673 # Status -3 should be returned if a task is Incomplete 

674 elif status == -3: 1aebc

675 patch_data['status'] = 'Incomplete' 1a

676 # update task status on Alyx 

677 t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data) 1aebc

678 # check for dependent held tasks 

679 # NB: Assumes dependent tasks are all part of the same session! 

680 next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck 1aebc

681 dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck) 1aebc

682 for d in dependent_tasks: 1aebc

683 assert d['id'] != t['id'], 'task its own parent' 1a

684 # if all their parent tasks now complete, set to waiting 

685 parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']] 1a

686 if set(parent_status) <= {'Complete', 'Incomplete'}: 1a

687 one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'}) 1a

688 task.cleanUp() 1aebc

689 if mode == 'raise' and status != 0: 1aebc

690 raise ValueError(task.log) 

691 return t, registered_dsets 1aebc