Coverage for ibllib/pipes/tasks.py: 87%

428 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""The abstract Pipeline and Task superclasses and concrete task runner. 

2 

3Examples 

4-------- 

5 

61. Running a task on your local computer. 

7| Download: via ONE. 

8| Upload: N/A. 

9 

10>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod') 

11>>> task.run() 

12 

132. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab). 

14| Download: all data expected to be present. 

15| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer 

16 jobs on Alyx transfer the data. 

17 

18>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod 

19>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001' 

20>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod') 

21>>> task.run() 

22>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx)) 

23 

243. Running a task on the local server that belongs to that subject and forcing redownload of 

25missing data. 

26| Download: via Globus (TODO we should change this to use boto3 as globus is slow). 

27| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer 

28 jobs on Alyx transfer the data. 

29 

30>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod') 

31>>> task.force = True 

32>>> task.run() 

33>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx)) 

34>>> task.cleanUp() # Delete the files that have been downloaded 

35 

364. Running a task on the local server that doesn't belongs to that subject 

37(e.g SWC054 on angelakilab). 

38| Download: via boto3, the AWS file records must exist and be set to exists = True. 

39| Upload: via globus, automatically uploads the datasets directly to FlatIron via globus. 

40 Creates FlatIron filerecords and sets these to True once the globus task has completed. 

41 

42>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod') 

43>>> task.run() 

44>>> task.register_datasets() 

45>>> task.cleanUp() # Delete the files that have been downloaded 

46 

475. Running a task on SDSC. 

48| Download: via creating symlink to relevant datasets on SDSC. 

49| Upload: via copying files to relevant location on SDSC. 

50 

51>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod') 

52>>> task.run() 

53>>> response = task.register_datasets() 

54>>> # Here we just make sure filerecords are all correct 

55>>> for resp in response: 

56... fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None) 

57... if fi is not None: 

58... if not fi['exists']: 

59... one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True}) 

60... 

61... aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None) 

62... if aws is not None: 

63... one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False}) 

64... 

65... sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None) 

66... if sr is not None: 

67... one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False}) 

68... # Finally remove symlinks once the task has completed 

69... task.cleanUp() 

70 

71""" 

72from pathlib import Path 

73import abc 

74import logging 

75import io 

76import importlib 

77import time 

78from _collections import OrderedDict 

79import traceback 

80import json 

81 

82from graphviz import Digraph 

83 

84import ibllib 

85from ibllib.oneibl import data_handlers 

86from ibllib.oneibl.data_handlers import get_local_data_repository 

87from ibllib.oneibl.registration import get_lab 

88from iblutil.util import Bunch 

89import one.params 

90from one.api import ONE 

91from one.util import ensure_list 

92from one import webclient 

93 

94_logger = logging.getLogger(__name__) 

95TASK_STATUS_SET = {'Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete', 'Abandoned'} 

96 

97 

98class Task(abc.ABC): 

99 log = '' # placeholder to keep the log of the task for registration 

100 cpu = 1 # CPU resource 

101 gpu = 0 # GPU resources: as of now, either 0 or 1 

102 io_charge = 5 # integer percentage 

103 priority = 30 # integer percentage, 100 means highest priority 

104 ram = 4 # RAM needed to run (GB) 

105 one = None # one instance (optional) 

106 level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task 

107 outputs = None # placeholder for a list of Path containing output files 

108 time_elapsed_secs = None 

109 time_out_secs = 3600 * 2 # time-out after which a task is considered dead 

110 version = ibllib.__version__ 

111 signature = {'input_files': [], 'output_files': []} # list of tuples (filename, collection, required_flag[, register]) 

112 force = False # whether to re-download missing input files on local server if not present 

113 job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services 

114 env = None # the environment name within which to run the task (NB: the env is not activated automatically!) 

115 

116 def __init__(self, session_path, parents=None, taskid=None, one=None, 

117 machine=None, clobber=True, location='server', **kwargs): 

118 """ 

119 Base task class 

120 :param session_path: session path 

121 :param parents: parents 

122 :param taskid: alyx task id 

123 :param one: one instance 

124 :param machine: 

125 :param clobber: whether or not to overwrite log on rerun 

126 :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node, 

127 data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS), 

128 or 'SDSC' (SDSC flatiron compute node) # TODO 'Globus' (remote compute node, data required for task downloaded via Globus) 

129 :param args: running arguments 

130 """ 

131 self.taskid = taskid 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

132 self.one = one 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

133 self.session_path = session_path 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

134 self.register_kwargs = {} 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

135 if parents: 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

136 self.parents = parents 2c nba d ] ^ _ ` { | } ~ abbbb

137 self.level = max(p.level for p in self.parents) + 1 2c nba d ] ^ _ ` { | } ~ abbbb

138 else: 

139 self.parents = [] 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

140 self.machine = machine 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

141 self.clobber = clobber 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

142 self.location = location 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

143 self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

144 self.kwargs = kwargs 2c rbqbsbtbubvbwbxbnbybobzb+ f a mbH g I D J K E h L v i M j k l N m n O d ] ^ _ ` { | } ~ abbb, P x Q R y S T U V W X - Y z A $ AbBbCb/ % ' DbZ 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbEbFbGbHbu = ) 9 ! b w # IbJb

145 

146 @property 

147 def name(self): 

148 return self.__class__.__name__ 2c qbf a @ [ pbd ] ^ _ ` { | } ~ abbbe b

149 

150 def path2eid(self): 

151 """ 

152 Fetch the experiment UUID from the Task session path, without using the REST cache. 

153 

154 This method ensures that the eid will be returned for newly created sessions. 

155 

156 Returns 

157 ------- 

158 str 

159 The experiment UUID corresponding to the session path. 

160 """ 

161 assert self.session_path and self.one and not self.one.offline 2ob

162 with webclient.no_cache(self.one.alyx): 2ob

163 return self.one.path2eid(self.session_path, query_type='remote') 2ob

164 

165 def run(self, **kwargs): 

166 """ 

167 --- do not overload, see _run() below--- 

168 wraps the _run() method with 

169 - error management 

170 - logging to variable 

171 - writing a lock file if the GPU is used 

172 - labels the status property of the object. The status value is labeled as: 

173 0: Complete 

174 -1: Errored 

175 -2: Didn't run as a lock was encountered 

176 -3: Incomplete 

177 

178 Notes 

179 ----- 

180 - The `run_alyx_task` will update the Alyx Task status depending on both status and outputs 

181 (i.e. the output of subclassed `_run` method): 

182 Assuming a return value of 0... if Task.outputs is None, the status will be Empty; 

183 if Task.outputs is a list (empty or otherwise), the status will be Complete. 

184 """ 

185 # if task id of one properties are not available, local run only without alyx 

186 use_alyx = self.one is not None and self.taskid is not None 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

187 if use_alyx: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

188 # check that alyx user is logged in 

189 if not self.one.alyx.is_logged_in: 1adb

190 self.one.alyx.authenticate() 

191 tdict = self.one.alyx.rest('tasks', 'partial_update', id=self.taskid, 1adb

192 data={'status': 'Started'}) 

193 self.log = ('' if not tdict['log'] else tdict['log'] + 1adb

194 '\n\n=============================RERUN=============================\n') 

195 

196 # Setup the console handler with a StringIO object 

197 logger_level = _logger.level 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

198 log_capture_string = io.StringIO() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

199 ch = logging.StreamHandler(log_capture_string) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

200 str_format = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

201 ch.setFormatter(logging.Formatter(str_format)) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

202 _logger.parent.addHandler(ch) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

203 _logger.parent.setLevel(logging.INFO) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

204 _logger.info(f'Starting job {self.__class__}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

205 if self.machine: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

206 _logger.info(f'Running on machine: {self.machine}') 1a

207 _logger.info(f'running ibllib version {ibllib.__version__}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

208 # setup 

209 start_time = time.time() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

210 try: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

211 setup = self.setUp(**kwargs) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

212 self.outputs = self._input_files_to_register() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

213 _logger.info(f'Setup value is: {setup}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

214 self.status = 0 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

215 if not setup: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

216 # case where outputs are present but don't have input files locally to rerun task 

217 # label task as complete 

218 _, outputs = self.assert_expected_outputs() 

219 else: 

220 # run task 

221 if self.gpu >= 1: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

222 if not self._creates_lock(): 1fae

223 self.status = -2 1fa

224 _logger.info(f'Job {self.__class__} exited as a lock was found') 1fa

225 new_log = log_capture_string.getvalue() 1fa

226 self.log = new_log if self.clobber else self.log + new_log 1fa

227 _logger.removeHandler(ch) 1fa

228 ch.close() 1fa

229 return self.status 1fa

230 outputs = self._run(**kwargs) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

231 _logger.info(f'Job {self.__class__} complete') 1c?+faHgIJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678*:;u=)9!bw#

232 if outputs is None: 1c?+faHgIJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678*:;u=)9!bw#

233 # If run method returns None and no raw input files were registered, self.outputs 

234 # should be None, meaning task will have an 'Empty' status. If run method returns 

235 # a list, the status will be 'Complete' regardless of whether there are output files. 

236 self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered 1aE:bw

237 else: 

238 self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register 1c?+faHgIJKhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678*:;u=)9!b#

239 except Exception: 1aDAFG

240 _logger.error(traceback.format_exc()) 1aDAFG

241 _logger.info(f'Job {self.__class__} errored') 1aDAFG

242 self.status = -1 1aDAFG

243 

244 self.time_elapsed_secs = time.time() - start_time 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

245 # log the outputs 

246 if isinstance(self.outputs, list): 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

247 nout = len(self.outputs) 1c?+faHgIDJKhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!b#

248 elif self.outputs is None: 1aE:bw

249 nout = 0 1aE:bw

250 else: 

251 nout = 1 

252 

253 _logger.info(f'N outputs: {nout}') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

254 _logger.info(f'--- {self.time_elapsed_secs} seconds run-time ---') 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

255 # after the run, capture the log output, amend to any existing logs if not overwrite 

256 new_log = log_capture_string.getvalue() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

257 self.log = new_log if self.clobber else self.log + new_log 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

258 _logger.removeHandler(ch) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

259 ch.close() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

260 _logger.setLevel(logger_level) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

261 # tear down 

262 self.tearDown() 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

263 return self.status 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

264 

265 def register_datasets(self, **kwargs): 

266 """ 

267 Register output datasets from the task to Alyx. 

268 

269 Parameters 

270 ---------- 

271 kwargs 

272 Directly passed to the `DataHandler.upload_data` method. 

273 

274 Returns 

275 ------- 

276 list 

277 The output of the `DataHandler.upload_data` method, e.g. a list of registered datasets. 

278 """ 

279 _ = self.register_images() 1adb

280 return self.data_handler.uploadData(self.outputs, self.version, **kwargs) 1adb

281 

282 def _input_files_to_register(self, assert_all_exist=False): 

283 """ 

284 Return input datasets to be registered to Alyx. 

285 

286 These datasets are typically raw data files and are registered even if the task fails to complete. 

287 

288 Parameters 

289 ---------- 

290 assert_all_exist 

291 Raise AssertionError if not all required input datasets exist on disk. 

292 

293 Returns 

294 ------- 

295 list of pathlib.Path 

296 A list of input files to register. 

297 """ 

298 try: 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #

299 input_files = self.input_files 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #

300 except AttributeError: 2mb

301 raise RuntimeError('Task.setUp must be run before calling this method.') 2mb

302 to_register, missing = [], [] 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #

303 for filename, collection, required, _ in filter(lambda f: len(f) > 3 and f[3], input_files): 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #

304 filepath = self.session_path.joinpath(collection, filename) 2mb

305 if filepath.exists(): 2mb

306 to_register.append(filepath) 2mb

307 elif required: 2mb

308 missing.append(filepath) 2mb

309 if any(missing): 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #

310 missing_str = ', '.join(map(lambda x: x.relative_to(self.session_path).as_posix(), missing)) 2mb

311 if assert_all_exist: 2mb

312 raise AssertionError(f'Missing required input files: {missing_str}') 2mb

313 else: 

314 _logger.error(f'Missing required input files: {missing_str}') 2mb

315 return list(set(to_register) - set(missing)) 2c ? + f a mbH g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; u = ) 9 ! b w #

316 

317 def register_images(self, **kwargs): 

318 """ 

319 Registers images to alyx database 

320 :return: 

321 """ 

322 if self.one and len(self.plot_tasks) > 0: 1adb

323 for plot_task in self.plot_tasks: 

324 try: 

325 _ = plot_task.register_images(widths=['orig']) 

326 except Exception: 

327 _logger.error(traceback.format_exc()) 

328 continue 

329 

330 def rerun(self): 

331 self.run(overwrite=True) 

332 

333 def get_signatures(self, **kwargs): 

334 """ 

335 This is the default but should be overwritten for each task 

336 :return: 

337 """ 

338 self.input_files = self.signature['input_files'] 2c + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G : ; cbu = 9 ! b w #

339 self.output_files = self.signature['output_files'] 2c + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G : ; cbu = 9 ! b w #

340 

341 @abc.abstractmethod 

342 def _run(self, overwrite=False): 

343 """ 

344 This is the method to implement 

345 :param overwrite: (bool) if the output already exists, 

346 :return: out_files: files to be registered. Could be a list of files (pathlib.Path), 

347 a single file (pathlib.Path) an empty list [] or None. 

348 Within the pipeline, there is a distinction between a job that returns an empty list 

349 and a job that returns None. If the function returns None, the job will be labeled as 

350 "empty" status in the database, otherwise, the job has an expected behaviour of not 

351 returning any dataset. 

352 """ 

353 

354 def setUp(self, **kwargs): 

355 """Get the data handler and ensure all data is available locally to run task.""" 

356 if self.location == 'server': 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

357 self.get_signatures(**kwargs) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

358 

359 input_status, _ = self.assert_expected_inputs(raise_error=False) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

360 output_status, _ = self.assert_expected(self.output_files, silent=True) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

361 

362 if input_status: 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

363 self.data_handler = self.get_data_handler() 1c?+aHgIDJKEhLviMjklNmnOd,PQRSTUVWX-YA$/%'Z0.1234(eopqrst5678FG*:u=)9!b#

364 _logger.info('All input files found: running task') 1c?+aHgIDJKEhLviMjklNmnOd,PQRSTUVWX-YA$/%'Z0.1234(eopqrst5678FG*:u=)9!b#

365 return True 1c?+aHgIDJKEhLviMjklNmnOd,PQRSTUVWX-YA$/%'Z0.1234(eopqrst5678FG*:u=)9!b#

366 

367 if not self.force: 1dxyzBCubw

368 self.data_handler = self.get_data_handler() 1dxyzBCubw

369 _logger.warning('Not all input files found locally: will still attempt to rerun task') 1dxyzBCubw

370 # TODO in the future once we are sure that input output task signatures work properly should return False 

371 # _logger.info('All output files found but input files required not available locally: task not rerun') 

372 return True 1dxyzBCubw

373 else: 

374 # Attempts to download missing data using globus 

375 _logger.info('Not all input files found locally: attempting to re-download required files') 

376 self.data_handler = self.get_data_handler(location='serverglobus') 

377 self.data_handler.setUp() 

378 # Double check we now have the required files to run the task 

379 # TODO in future should raise error if even after downloading don't have the correct files 

380 self.assert_expected_inputs(raise_error=False) 

381 return True 

382 else: 

383 self.data_handler = self.get_data_handler() 1f;

384 self.data_handler.setUp() 1f;

385 self.get_signatures(**kwargs) 1f;

386 self.assert_expected_inputs() 1f;

387 return True 1f;

388 

389 def tearDown(self): 

390 """ 

391 Function after runs() 

392 Does not run if a lock is encountered by the task (status -2) 

393 """ 

394 if self.gpu >= 1: 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

395 if self._lock_file_path().exists(): 1fe

396 self._lock_file_path().unlink() 1fe

397 

398 def cleanUp(self): 

399 """ 

400 Function to optionally overload to clean up 

401 :return: 

402 """ 

403 self.data_handler.cleanUp() 1adb

404 

405 def assert_expected_outputs(self, raise_error=True): 

406 """ 

407 After a run, asserts that all signature files are present at least once in the output files 

408 Mainly useful for integration tests 

409 :return: 

410 """ 

411 assert self.status == 0 1ghLviMjklNmnOeopqrst

412 _logger.info('Checking output files') 1ghLviMjklNmnOeopqrst

413 everything_is_fine, files = self.assert_expected(self.output_files) 1ghLviMjklNmnOeopqrst

414 

415 if not everything_is_fine: 1ghLviMjklNmnOeopqrst

416 for out in self.outputs: 

417 _logger.error(f'{out}') 

418 if raise_error: 

419 raise FileNotFoundError("Missing outputs after task completion") 

420 

421 return everything_is_fine, files 1ghLviMjklNmnOeopqrst

422 

423 def assert_expected_inputs(self, raise_error=True): 

424 """ 

425 Before running a task, check that all the files necessary to run the task have been downloaded/ are on the local file 

426 system already 

427 :return: 

428 """ 

429 _logger.info('Checking input files') 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

430 everything_is_fine, files = self.assert_expected(self.input_files) 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

431 

432 if not everything_is_fine and raise_error: 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

433 raise FileNotFoundError('Missing inputs to run task') 

434 

435 return everything_is_fine, files 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

436 

437 def assert_expected(self, expected_files, silent=False): 

438 everything_is_fine = True 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

439 files = [] 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

440 for expected_file in expected_files: 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

441 actual_files = list(Path(self.session_path).rglob(str(Path(*filter(None, reversed(expected_file[:2])))))) 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #

442 # Account for revisions 

443 if len(actual_files) == 0: 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #

444 collection = '/'.join(filter(None, (expected_file[1], '#*'))) # append pound with wildcard 2c ? H g I D J K E h v i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 ( o p q r s t 5 6 7 8 F G * cbdbebfbgbhbibjbkblbu ) 9 ! b w #

445 expected_revision = (expected_file[0], collection, expected_file[2]) 2c ? H g I D J K E h v i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 ( o p q r s t 5 6 7 8 F G * cbdbebfbgbhbibjbkblbu ) 9 ! b w #

446 actual_files = list( 2c ? H g I D J K E h v i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 ( o p q r s t 5 6 7 8 F G * cbdbebfbgbhbibjbkblbu ) 9 ! b w #

447 Path(self.session_path).rglob(str(Path(*filter(None, reversed(expected_revision[:2]))))) 

448 ) 

449 if len(actual_files) == 0 and expected_file[2]: 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #

450 everything_is_fine = False 2c ? H g I D J K E h i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 o p q r s t 5 6 7 8 F G cbdbebfbgbhbibjbkblbu ) 9 ! b w #

451 if not silent: 2c ? H g I D J K E h i j k l m n d P x Q R y S T U V W X Y z A $ % ' Z 0 1 B 2 3 C 4 o p q r s t 5 6 7 8 F G cbdbebfbgbhbibjbkblbu ) 9 ! b w #

452 _logger.error(f'Signature file expected {expected_file} not found') 2d x y z B C cbdbebfbgbhbibjbkblbu b w

453 else: 

454 if len(actual_files) != 0: 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #

455 files.append(actual_files[0]) 2c ? + H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * ; cbdbebfbgbhbibjbkblbu ) 9 ! b w #

456 

457 return everything_is_fine, files 2c ? + f a H g I D J K E h L v i M j k l N m n O d , P x Q R y S T U V W X - Y z A $ / % ' Z 0 . 1 B 2 3 C 4 ( e o p q r s t 5 6 7 8 F G * : ; cbdbebfbgbhbibjbkblbu = ) 9 ! b w #

458 

459 def get_data_handler(self, location=None): 

460 """ 

461 Gets the relevant data handler based on location argument 

462 :return: 

463 """ 

464 location = str.lower(location or self.location) 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

465 if location == 'local': 1c?+faHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:;u=)9!bw#

466 return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one) 1f;

467 self.one = self.one or ONE() 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

468 if location == 'server': 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

469 dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one) 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

470 elif location == 'serverglobus': 

471 dhandler = data_handlers.ServerGlobusDataHandler(self.session_path, self.signature, one=self.one) 

472 elif location == 'remote': 

473 dhandler = data_handlers.RemoteHttpDataHandler(self.session_path, self.signature, one=self.one) 

474 elif location == 'aws': 

475 dhandler = data_handlers.RemoteAwsDataHandler(self, self.session_path, self.signature, one=self.one) 

476 elif location == 'sdsc': 

477 dhandler = data_handlers.SDSCDataHandler(self, self.session_path, self.signature, one=self.one) 

478 elif location == 'popeye': 

479 dhandler = data_handlers.PopeyeDataHandler(self, self.session_path, self.signature, one=self.one) 

480 else: 

481 raise ValueError(f'Unknown location "{location}"') 

482 return dhandler 1c?+aHgIDJKEhLviMjklNmnOd,PxQRySTUVWX-YzA$/%'Z0.1B23C4(eopqrst5678FG*:u=)9!bw#

483 

484 @staticmethod 

485 def make_lock_file(taskname='', time_out_secs=7200): 

486 """Creates a GPU lock file with a timeout of""" 

487 d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs} 1fae

488 with open(Task._lock_file_path(), 'w+') as fid: 1fae

489 json.dump(d, fid) 1fae

490 return d 1fae

491 

492 @staticmethod 

493 def _lock_file_path(): 

494 """the lock file is in ~/.one/gpu.lock""" 

495 folder = Path.home().joinpath('.one') 1e

496 folder.mkdir(exist_ok=True) 1e

497 return folder.joinpath('gpu.lock') 1e

498 

499 def _make_lock_file(self): 

500 """creates a lock file with the current time""" 

501 return Task.make_lock_file(self.name, self.time_out_secs) 1fe

502 

503 def is_locked(self): 

504 """Checks if there is a lock file for this given task""" 

505 lock_file = self._lock_file_path() 1fae

506 if not lock_file.exists(): 1fae

507 return False 1fe

508 

509 with open(lock_file) as fid: 1fa

510 d = json.load(fid) 1fa

511 now = time.time() 1fa

512 if (now - d['start']) > d['time_out_secs']: 1fa

513 lock_file.unlink() 1f

514 return False 1f

515 else: 

516 return True 1fa

517 

518 def _creates_lock(self): 

519 if self.location == "popeye": 1fae

520 return True 

521 if self.is_locked(): 1fae

522 return False 1fa

523 else: 

524 self._make_lock_file() 1fe

525 return True 1fe

526 

527 

528class Pipeline(abc.ABC): 

529 """ 

530 Pipeline class: collection of related and potentially interdependent tasks 

531 """ 

532 tasks = OrderedDict() 

533 one = None 

534 

535 def __init__(self, session_path=None, one=None, eid=None): 

536 assert session_path or eid 2c nba d ] ^ _ ` { | } ~ abbbb

537 self.one = one 2c nba d ] ^ _ ` { | } ~ abbbb

538 if one and one.alyx.cache_mode and one.alyx.default_expiry.seconds > 1: 2c nba d ] ^ _ ` { | } ~ abbbb

539 _logger.warning('Alyx client REST cache active; this may cause issues with jobs') 

540 self.eid = eid 2c nba d ] ^ _ ` { | } ~ abbbb

541 if self.one and not self.one.offline: 2c nba d ] ^ _ ` { | } ~ abbbb

542 self.data_repo = get_local_data_repository(self.one.alyx) 2c nba d b

543 else: 

544 self.data_repo = None 2nb] ^ _ ` { | } ~ abbb

545 

546 if session_path: 2c nba d ] ^ _ ` { | } ~ abbbb

547 self.session_path = session_path 2c nba d ] ^ _ ` { | } ~ abbbb

548 if not self.eid: 2c nba d ] ^ _ ` { | } ~ abbbb

549 # eID for newer sessions may not be in cache so use remote query 

550 self.eid = one.path2eid(session_path, query_type='remote') if self.one else None 2nbd ] ^ _ ` { | } ~ abbbb

551 self.label = self.__module__ + '.' + type(self).__name__ 2c nba d ] ^ _ ` { | } ~ abbbb

552 

553 @staticmethod 

554 def _get_exec_name(obj): 

555 """ 

556 For a class, get the executable name as it should be stored in Alyx. When the class 

557 is created dynamically using the type() built-in function, need to revert to the base 

558 class to be able to re-instantiate the class from the alyx dictionary on the client side 

559 :param obj: 

560 :return: string containing the full module plus class name 

561 """ 

562 if obj.__module__ == 'abc': 2a @ [ pbd ] ^ _ ` { | } ~ abbbb

563 exec_name = f'{obj.__class__.__base__.__module__}.{obj.__class__.__base__.__name__}' 2@ [ pbd ] ^ _ ` { | } ~ abbbb

564 else: 

565 exec_name = f'{obj.__module__}.{obj.name}' 1a

566 return exec_name 2a @ [ pbd ] ^ _ ` { | } ~ abbbb

567 

568 def make_graph(self, out_dir=None, show=True): 

569 if not out_dir: 1a

570 out_dir = self.one.alyx.cache_dir if self.one else one.params.get().CACHE_DIR 1a

571 m = Digraph('G', filename=str(Path(out_dir).joinpath(self.__module__ + '_graphs.gv'))) 1a

572 m.attr(rankdir='TD') 1a

573 

574 e = Digraph(name='cluster_' + self.label) 1a

575 e.attr('node', shape='box') 1a

576 e.node('root', label=self.label) 1a

577 

578 e.attr('node', shape='ellipse') 1a

579 for k in self.tasks: 1a

580 j = self.tasks[k] 1a

581 if len(j.parents) == 0: 1a

582 e.edge('root', j.name) 1a

583 else: 

584 [e.edge(p.name, j.name) for p in j.parents] 1a

585 

586 m.subgraph(e) 1a

587 m.attr(label=r'\n\Pre-processing\n') 1a

588 m.attr(fontsize='20') 1a

589 if show: 1a

590 m.view() 

591 return m 1a

592 

593 def create_alyx_tasks(self, rerun__status__in=None, tasks_list=None): 

594 """ 

595 Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session. 

596 

597 If the jobs already exist, they are left untouched. The re-run parameter will re-init the 

598 job by emptying the log and set the status to Waiting. 

599 

600 Parameters 

601 ---------- 

602 rerun__status__in : list, str 

603 To re-run tasks if they already exist, specify one or more statuses strings to will be 

604 re-run, or '__all__' to re-run all tasks. 

605 tasks_list : list 

606 The list of tasks to create on Alyx. If None, uses self.tasks. 

607 

608 Returns 

609 ------- 

610 list 

611 List of Alyx task dictionaries (existing and/or created). 

612 """ 

613 rerun__status__in = ([rerun__status__in] 1a@[db

614 if isinstance(rerun__status__in, str) 

615 else rerun__status__in or []) 

616 if '__all__' in rerun__status__in: 1a@[db

617 rerun__status__in = [x for x in TASK_STATUS_SET if x != 'Abandoned'] 

618 assert self.eid 1a@[db

619 if self.one is None: 1a@[db

620 _logger.warning('No ONE instance found for Alyx connection, set the one property') 

621 return 

622 tasks_alyx_pre = self.one.alyx.rest('tasks', 'list', session=self.eid, graph=self.name, no_cache=True) 1a@[db

623 tasks_alyx = [] 1a@[db

624 # creates all the tasks by iterating through the ordered dict 

625 

626 if tasks_list is not None: 1a@[db

627 task_items = tasks_list 

628 # need to add in the session eid and the parents 

629 else: 

630 task_items = self.tasks.values() 1a@[db

631 

632 for t in task_items: 1a@[db

633 # get the parents' alyx ids to reference in the database 

634 if isinstance(t, dict): 1a@[db

635 t = Bunch(t) 

636 executable = t.executable 

637 arguments = t.arguments 

638 t['time_out_secs'] = t['time_out_sec'] 

639 if len(t.parents) > 0: 

640 pnames = t.parents 

641 else: 

642 executable = self._get_exec_name(t) 1a@[db

643 arguments = t.kwargs 1a@[db

644 if len(t.parents): 1a@[db

645 pnames = [p.name for p in t.parents] 1a@[db

646 

647 if len(t.parents): 1a@[db

648 parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames] 1a@[db

649 else: 

650 parents_ids = [] 1a@[db

651 

652 task_dict = {'executable': executable, 'priority': t.priority, 1a@[db

653 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 

654 'ram': t.ram, 'module': self.label, 'parents': parents_ids, 

655 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 

656 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 

657 'arguments': arguments} 

658 if self.data_repo: 1a@[db

659 task_dict.update({'data_repository': self.data_repo}) 

660 # if the task already exists, patch it otherwise, create it 

661 talyx = next(filter(lambda x: x['name'] == t.name, tasks_alyx_pre), []) 1a@[db

662 if len(talyx) == 0: 1a@[db

663 talyx = self.one.alyx.rest('tasks', 'create', data=task_dict) 1a@[db

664 elif talyx['status'] in rerun__status__in: 1a@[

665 talyx = self.one.alyx.rest('tasks', 'partial_update', id=talyx['id'], data=task_dict) 

666 tasks_alyx.append(talyx) 1a@[db

667 return tasks_alyx 1a@[db

668 

669 def create_tasks_list_from_pipeline(self): 

670 """ 

671 From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to 

672 create alyx tasks 

673 :return: 

674 """ 

675 tasks_list = [] 2@ [ pb] ^ _ ` { | } ~ abbb

676 for k, t in self.tasks.items(): 2@ [ pb] ^ _ ` { | } ~ abbb

677 # get the parents' alyx ids to reference in the database 

678 if len(t.parents): 2@ [ pb] ^ _ ` { | } ~ abbb

679 parent_names = [p.name for p in t.parents] 2@ [ pb] ^ _ ` { | } ~ abbb

680 else: 

681 parent_names = [] 2@ [ pb] ^ _ ` { | } ~ abbb

682 

683 task_dict = {'executable': self._get_exec_name(t), 'priority': t.priority, 2@ [ pb] ^ _ ` { | } ~ abbb

684 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 

685 'ram': t.ram, 'module': self.label, 'parents': parent_names, 

686 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 

687 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 

688 'arguments': t.kwargs} 

689 if self.data_repo: 2@ [ pb] ^ _ ` { | } ~ abbb

690 task_dict.update({'data_repository': self.data_repo}) 

691 

692 tasks_list.append(task_dict) 2@ [ pb] ^ _ ` { | } ~ abbb

693 

694 return tasks_list 2@ [ pb] ^ _ ` { | } ~ abbb

695 

696 def run(self, status__in=('Waiting',), machine=None, clobber=True, **kwargs): 

697 """ 

698 Get all the session related jobs from alyx and run them 

699 :param status__in: lists of status strings to run in 

700 ['Waiting', 'Started', 'Errored', 'Empty', 'Complete'] 

701 :param machine: string identifying the machine the task is run on, optional 

702 :param clobber: bool, if True any existing logs are overwritten, default is True 

703 :param kwargs: arguments passed downstream to run_alyx_task 

704 :return: jalyx: list of REST dictionaries of the job endpoints 

705 :return: job_deck: list of REST dictionaries of the jobs endpoints 

706 :return: all_datasets: list of REST dictionaries of the dataset endpoints 

707 """ 

708 assert self.session_path, 'Pipeline object has to be declared with a session path to run' 1a

709 if self.one is None: 1a

710 _logger.warning('No ONE instance found for Alyx connection, set the one property') 

711 return 

712 task_deck = self.one.alyx.rest('tasks', 'list', session=self.eid, no_cache=True) 1a

713 # [(t['name'], t['level']) for t in task_deck] 

714 all_datasets = [] 1a

715 for i, j in enumerate(task_deck): 1a

716 if j['status'] not in status__in: 1a

717 continue 1a

718 # here we update the status in-place to avoid another hit to the database 

719 task_deck[i], dsets = run_alyx_task(tdict=j, session_path=self.session_path, 1a

720 one=self.one, job_deck=task_deck, 

721 machine=machine, clobber=clobber, **kwargs) 

722 if dsets is not None: 1a

723 all_datasets.extend(dsets) 1a

724 return task_deck, all_datasets 1a

725 

726 def rerun_failed(self, **kwargs): 

727 return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs) 1a

728 

729 def rerun(self, **kwargs): 

730 

731 return self.run(status__in=[x for x in TASK_STATUS_SET if x != 'Abandoned'], **kwargs) 

732 

733 @property 

734 def name(self): 

735 return self.__class__.__name__ 2a @ [ pbd ] ^ _ ` { | } ~ abbbb

736 

737 

738def str2class(task_executable: str): 

739 """ 

740 Convert task name to class. 

741 

742 Parameters 

743 ---------- 

744 task_executable : str 

745 A Task class name, e.g. 'ibllib.pipes.behavior_tasks.TrialRegisterRaw'. 

746 

747 Returns 

748 ------- 

749 class 

750 The imported class. 

751 """ 

752 strmodule, strclass = task_executable.rsplit('.', 1) 2KbLba Mbd b

753 return getattr(importlib.import_module(strmodule), strclass) 2KbLba Mbd b

754 

755 

756def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, 

757 max_md5_size=None, machine=None, clobber=True, location='server', mode='log'): 

758 """ 

759 Runs a single Alyx job and registers output datasets. 

760 

761 Parameters 

762 ---------- 

763 tdict : dict 

764 An Alyx task dictionary to instantiate and run. 

765 session_path : str, pathlib.Path 

766 A session path containing the task input data. 

767 one : one.api.OneAlyx 

768 An instance of ONE. 

769 job_deck : list of dict, optional 

770 A list of all tasks in the same pipeline. If None, queries Alyx to get this. 

771 max_md5_size : int, optional 

772 An optional maximum file size in bytes. Files with sizes larger than this will not have 

773 their MD5 checksum calculated to save time. 

774 machine : str, optional 

775 A string identifying the machine the task is run on. 

776 clobber : bool, default=True 

777 If true any existing logs are overwritten on Alyx. 

778 location : {'remote', 'server', 'sdsc', 'aws'} 

779 Where you are running the task, 'server' - local lab server, 'remote' - any 

780 compute node/ computer, 'sdsc' - Flatiron compute node, 'aws' - using data from AWS S3 

781 node. 

782 mode : {'log', 'raise}, default='log' 

783 Behaviour to adopt if an error occurred. If 'raise', it will raise the error at the very 

784 end of this function (i.e. after having labeled the tasks). 

785 

786 Returns 

787 ------- 

788 dict 

789 The updated task dict. 

790 list of pathlib.Path 

791 A list of registered datasets. 

792 """ 

793 registered_dsets = [] 1adb

794 # here we need to check parents' status, get the job_deck if not available 

795 if not job_deck: 1adb

796 job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True) 1db

797 if len(tdict['parents']): 1adb

798 # check the dependencies 

799 parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck) 1adb

800 parent_statuses = [j['status'] for j in parent_tasks] 1adb

801 # if any of the parent tasks is not complete, throw a warning 

802 if not set(parent_statuses) <= {'Complete', 'Incomplete'}: 1adb

803 _logger.warning(f"{tdict['name']} has unmet dependencies") 1ab

804 # if parents are waiting or failed, set the current task status to Held 

805 # once the parents ran, the descendent tasks will be set from Held to Waiting (see below) 

806 if set(parent_statuses).intersection({'Errored', 'Held', 'Empty', 'Waiting', 'Started', 'Abandoned'}): 1ab

807 tdict = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Held'}) 1ab

808 return tdict, registered_dsets 1ab

809 # creates the job from the module name in the database 

810 classe = str2class(tdict['executable']) 1adb

811 tkwargs = tdict.get('arguments') or {} # if the db field is null it returns None 1adb

812 task = classe(session_path, one=one, taskid=tdict['id'], machine=machine, clobber=clobber, 1adb

813 location=location, **tkwargs) 

814 # sets the status flag to started before running 

815 one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Started'}) 1adb

816 status = task.run() 1adb

817 patch_data = {'time_elapsed_secs': task.time_elapsed_secs, 'log': task.log, 1adb

818 'version': task.version} 

819 # if there is no data to register, set status to Empty 

820 if task.outputs is None: # NB: an empty list is still considered Complete. 1adb

821 patch_data['status'] = 'Empty' 1ab

822 # otherwise register data and set (provisional) status to Complete 

823 else: 

824 try: 1adb

825 kwargs = dict(max_md5_size=max_md5_size) 1adb

826 if location == 'server': 1adb

827 # Explicitly pass lab as lab cannot be inferred from path (which the registration client tries to do). 

828 # To avoid making extra REST requests we can also set labs=None if using ONE v1.20.1. 

829 kwargs['labs'] = get_lab(session_path, one.alyx) 1adb

830 registered_dsets = task.register_datasets(**kwargs) 1adb

831 patch_data['status'] = 'Complete' 1adb

832 except Exception: 

833 _logger.error(traceback.format_exc()) 

834 status = -1 

835 

836 # overwrite status to errored 

837 if status == -1: 1adb

838 patch_data['status'] = 'Errored' 1a

839 # Status -2 means a lock was encountered during run, should be rerun 

840 elif status == -2: 1adb

841 patch_data['status'] = 'Waiting' 1a

842 # Status -3 should be returned if a task is Incomplete 

843 elif status == -3: 1adb

844 patch_data['status'] = 'Incomplete' 1a

845 # update task status on Alyx 

846 t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data) 1adb

847 # check for dependent held tasks 

848 # NB: Assumes dependent tasks are all part of the same session! 

849 next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck 1adb

850 dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck) 1adb

851 for d in dependent_tasks: 1adb

852 assert d['id'] != t['id'], 'task its own parent' 1a

853 # if all their parent tasks now complete, set to waiting 

854 parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']] 1a

855 if set(parent_status) <= {'Complete', 'Incomplete'}: 1a

856 one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'}) 1a

857 task.cleanUp() 1adb

858 if mode == 'raise' and status != 0: 1adb

859 raise ValueError(task.log) 

860 return t, registered_dsets 1adb