Coverage for ibllib/pipes/tasks.py: 87%

446 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 15:25 +0000

1"""The abstract Pipeline and Task superclasses and concrete task runner. 

2 

3Examples 

4-------- 

5 

61. Running a task on your local computer. 

7| Download: via ONE. 

8| Upload: N/A. 

9 

10>>> task = VideoSyncQcBpod(session_path, one=one, location='remote', sync='bpod') 

11>>> task.run() 

12 

132. Running a task on the local server that belongs to a given subject (e.g SWC054 on floferlab). 

14| Download: all data expected to be present. 

15| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer 

16 jobs on Alyx transfer the data. 

17 

18>>> from ibllib.pipes.video_tasks import VideoSyncQcBpod 

19>>> session_path = '/mnt/ibl/s0/Data/Subjects/SWC054/2023-01-01/001' 

20>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod') 

21>>> task.run() 

22>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx)) 

23 

243. Running a task on the local server that belongs to that subject and forcing redownload of 

25missing data. 

26| Download: via Globus (TODO we should change this to use boto3 as globus is slow). 

27| Upload: normal way of registering datasets, filerecords created and bulk sync, bulk transfer 

28 jobs on Alyx transfer the data. 

29 

30>>> task = VideoSyncQcBpod(session_path, one=one, sync='bpod') 

31>>> task.force = True 

32>>> task.run() 

33>>> task.register_datasets(one=one, labs=get_lab(session_path, alyx=ONE().alyx)) 

34>>> task.cleanUp() # Delete the files that have been downloaded 

35 

364. Running a task on the local server that doesn't belongs to that subject 

37(e.g SWC054 on angelakilab). 

38| Download: via boto3, the AWS file records must exist and be set to exists = True. 

39| Upload: via globus, automatically uploads the datasets directly to FlatIron via globus. 

40 Creates FlatIron filerecords and sets these to True once the globus task has completed. 

41 

42>>> task = VideoSyncQcBpod(session_path, one=one, location='AWS', sync='bpod') 

43>>> task.run() 

44>>> task.register_datasets() 

45>>> task.cleanUp() # Delete the files that have been downloaded 

46 

475. Running a task on SDSC. 

48| Download: via creating symlink to relevant datasets on SDSC. 

49| Upload: via copying files to relevant location on SDSC. 

50 

51>>> task = VideoSyncQcBpod(session_path, one=one, location='SDSC', sync='bpod') 

52>>> task.run() 

53>>> response = task.register_datasets() 

54>>> # Here we just make sure filerecords are all correct 

55>>> for resp in response: 

56... fi = next((fr for fr in resp['file_records'] if 'flatiron' in fr['data_repository']), None) 

57... if fi is not None: 

58... if not fi['exists']: 

59... one.alyx.rest('files', 'partial_update', id=fi['id'], data={'exists': True}) 

60... 

61... aws = next((fr for fr in resp['file_records'] if 'aws' in fr['data_repository']), None) 

62... if aws is not None: 

63... one.alyx.rest('files', 'partial_update', id=aws['id'], data={'exists': False}) 

64... 

65... sr = next((fr for fr in resp['file_records'] if 'SR' in fr['data_repository']), None) 

66... if sr is not None: 

67... one.alyx.rest('files', 'partial_update', id=sr['id'], data={'exists': False}) 

68... # Finally remove symlinks once the task has completed 

69... task.cleanUp() 

70 

71""" 

72from pathlib import Path 

73import abc 

74import logging 

75import io 

76import importlib 

77import time 

78from collections import OrderedDict 

79import traceback 

80import json 

81from typing import List, Dict 

82 

83from graphviz import Digraph 

84 

85import ibllib 

86from ibllib.oneibl import data_handlers 

87from ibllib.oneibl.data_handlers import get_local_data_repository 

88from ibllib.oneibl.registration import get_lab 

89from iblutil.util import Bunch, flatten, ensure_list 

90import one.params 

91from one.api import ONE 

92from one import webclient 

93import one.alf.io as alfio 

94from one.alf.path import ALFPath 

95 

96_logger = logging.getLogger(__name__) 

97TASK_STATUS_SET = {'Waiting', 'Held', 'Started', 'Errored', 'Empty', 'Complete', 'Incomplete', 'Abandoned'} 

98 

99 

100class Task(abc.ABC): 

101 log = '' # placeholder to keep the log of the task for registration 

102 cpu = 1 # CPU resource 

103 gpu = 0 # GPU resources: as of now, either 0 or 1 

104 io_charge = 5 # integer percentage 

105 priority = 30 # integer percentage, 100 means highest priority 

106 ram = 4 # RAM needed to run (GB) 

107 one = None # one instance (optional) 

108 level = 0 # level in the pipeline hierarchy: level 0 means there is no parent task 

109 outputs = None # placeholder for a list of Path containing output files 

110 time_elapsed_secs = None 

111 time_out_secs = 3600 * 2 # time-out after which a task is considered dead 

112 version = ibllib.__version__ 

113 force = False # whether to re-download missing input files on local server if not present 

114 job_size = 'small' # either 'small' or 'large', defines whether task should be run as part of the large or small job services 

115 env = None # the environment name within which to run the task (NB: the env is not activated automatically!) 

116 on_error = 'continue' # whether to raise an exception on error ('raise') or report the error and continue ('continue') 

117 

118 def __init__(self, session_path, parents=None, taskid=None, one=None, 

119 machine=None, clobber=True, location='server', scratch_folder=None, on_error='continue', **kwargs): 

120 """ 

121 Base task class 

122 :param session_path: session path 

123 :param parents: parents 

124 :param taskid: alyx task id 

125 :param one: one instance 

126 :param machine: 

127 :param clobber: whether or not to overwrite log on rerun 

128 :param location: location where task is run. Options are 'server' (lab local servers'), 'remote' (remote compute node, 

129 data required for task downloaded via one), 'AWS' (remote compute node, data required for task downloaded via AWS), 

130 or 'SDSC' (SDSC flatiron compute node) 

131 :param scratch_folder: optional: Path where to write intermediate temporary data 

132 :param args: running arguments 

133 """ 

134 self.on_error = on_error 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

135 self.taskid = taskid 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

136 self.one = one 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

137 self.session_path = session_path 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

138 self.register_kwargs = {} 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

139 if parents: 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

140 self.parents = parents 2b ~ aba d c ) * + , - . / : ; =

141 self.level = max(p.level for p in self.parents) + 1 2b ~ aba d c ) * + , - . / : ; =

142 else: 

143 self.parents = [] 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

144 self.machine = machine 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

145 self.clobber = clobber 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

146 self.location = location 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

147 self.plot_tasks = [] # Plotting task/ tasks to create plot outputs during the task 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

148 self.scratch_folder = scratch_folder 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

149 self.kwargs = kwargs 2b ~ dbbbebfbgbhbibjbabkb_ ` { lb$ f a ^ d S K T U N l z A m B n o p C q r D c ) * + , - . / : ; = x y h V W i X g E F % Y Z 0 G j 1 | mbnb} O 2 3 ob4 H 5 I 6 7 J 8 ' e s k t u v w P 9 Q ! L M ( pbqbrbsbtbR # ubvb

150 

151 @property 

152 def signature(self) -> Dict[str, List]: 

153 """ 

154 The signature of the task specifies inputs and outputs for the given task. 

155 For some tasks it is dynamic and calculated. The legacy code specifies those as tuples. 

156 The preferred way is to use the ExpectedDataset input and output constructors. 

157 

158 I = ExpectedDataset.input 

159 O = ExpectedDataset.output 

160 signature = { 

161 'input_files': [ 

162 I(name='extract.me.npy', collection='raw_data', required=True, register=False, unique=False), 

163 ], 

164 'output_files': [ 

165 O(name='look.atme.npy', collection='shiny_data', required=True, register=True, unique=False) 

166 ]} 

167 is equivalent to: 

168 signature = { 

169 'input_files': [('extract.me.npy', 'raw_data', True, True)], 

170 'output_files': [('look.atme.npy', 'shiny_data', True)], 

171 } 

172 :return: 

173 """ 

174 return {'input_files': [], 'output_files': []} 1fad

175 

176 @property 

177 def name(self): 

178 return self.__class__.__name__ 2b bbf a d ? @ cbc ) * + , - . / : ; = e

179 

180 def path2eid(self): 

181 """ 

182 Fetch the experiment UUID from the Task session path, without using the REST cache. 

183 

184 This method ensures that the eid will be returned for newly created sessions. 

185 

186 Returns 

187 ------- 

188 str 

189 The experiment UUID corresponding to the session path. 

190 """ 

191 assert self.session_path and self.one and not self.one.offline 1_

192 with webclient.no_cache(self.one.alyx): 1_

193 return self.one.path2eid(self.session_path, query_type='remote') 1_

194 

195 def run(self, **kwargs): 

196 """ 

197 --- do not overload, see _run() below--- 

198 wraps the _run() method with 

199 - error management 

200 - logging to variable 

201 - writing a lock file if the GPU is used 

202 - labels the status property of the object. The status value is labeled as: 

203 0: Complete 

204 -1: Errored 

205 -2: Didn't run as a lock was encountered 

206 -3: Incomplete 

207 

208 Notes 

209 ----- 

210 - The `run_alyx_task` will update the Alyx Task status depending on both status and outputs 

211 (i.e. the output of subclassed `_run` method): 

212 Assuming a return value of 0... if Task.outputs is None, the status will be Empty; 

213 if Task.outputs is a list (empty or otherwise), the status will be Complete. 

214 """ 

215 # if task id of one properties are not available, local run only without alyx 

216 use_alyx = self.one is not None and self.taskid is not None 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

217 if use_alyx: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

218 # check that alyx user is logged in 

219 if not self.one.alyx.is_logged_in: 1ac

220 self.one.alyx.authenticate() 

221 tdict = self.one.alyx.rest('tasks', 'partial_update', id=self.taskid, 1ac

222 data={'status': 'Started'}) 

223 self.log = ('' if not tdict['log'] else tdict['log'] + 1ac

224 '\n\n=============================RERUN=============================\n') 

225 

226 # Setup the console handler with a StringIO object 

227 logger_level = _logger.level 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

228 log_capture_string = io.StringIO() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

229 ch = logging.StreamHandler(log_capture_string) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

230 str_format = '%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s' 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

231 ch.setFormatter(logging.Formatter(str_format)) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

232 _logger.parent.addHandler(ch) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

233 _logger.parent.setLevel(logging.INFO) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

234 _logger.info(f'Starting job {self.__class__}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

235 if self.machine: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

236 _logger.info(f'Running on machine: {self.machine}') 1a

237 _logger.info(f'running ibllib version {ibllib.__version__}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

238 # setup 

239 start_time = time.time() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

240 try: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

241 setup = self.setUp(**kwargs) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

242 self.outputs = self._input_files_to_register() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

243 _logger.info(f'Setup value is: {setup}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

244 self.status = 0 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

245 if not setup: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

246 # case where outputs are present but don't have input files locally to rerun task 

247 # label task as complete 

248 _, outputs = self.assert_expected_outputs() 

249 else: 

250 # run task 

251 if self.gpu >= 1: 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

252 if not self._creates_lock(): 1fae

253 self.status = -2 1fa

254 _logger.info(f'Job {self.__class__} exited as a lock was found at {self._lock_file_path()}') 1fa

255 new_log = log_capture_string.getvalue() 1fa

256 self.log = new_log if self.clobber else self.log + new_log 1fa

257 _logger.removeHandler(ch) 1fa

258 ch.close() 1fa

259 if self.on_error == 'raise': 1fa

260 raise FileExistsError(f'Job {self.__class__} exited as a lock was found at {self._lock_file_path()}') 

261 return self.status 1fa

262 outputs = self._run(**kwargs) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

263 _logger.info(f'Job {self.__class__} complete') 1b$fadSTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!(R#

264 if outputs is None: 1b$fadSTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!(R#

265 # If run method returns None and no raw input files were registered, self.outputs 

266 # should be None, meaning task will have an 'Empty' status. If run method returns 

267 # a list, the status will be 'Complete' regardless of whether there are output files. 

268 self.outputs = outputs if not self.outputs else self.outputs # ensure None if no inputs registered 1adNR

269 else: 

270 self.outputs.extend(ensure_list(outputs)) # Add output files to list of inputs to register 1b$faSTUlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!(#

271 except Exception as e: 1aKjLM

272 _logger.error(traceback.format_exc()) 1aKjLM

273 _logger.info(f'Job {self.__class__} errored') 1aKjLM

274 self.status = -1 1aKjLM

275 if self.on_error == 'raise': 1aKjLM

276 raise e 

277 

278 self.time_elapsed_secs = time.time() - start_time 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

279 # log the outputs 

280 if isinstance(self.outputs, list): 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

281 nout = len(self.outputs) 1b$faSKTUlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(#

282 elif self.outputs is None: 1adNR

283 nout = 0 1adNR

284 else: 

285 nout = 1 

286 

287 _logger.info(f'N outputs: {nout}') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

288 _logger.info(f'--- {self.time_elapsed_secs} seconds run-time ---') 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

289 # after the run, capture the log output, amend to any existing logs if not overwrite 

290 new_log = log_capture_string.getvalue() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

291 self.log = new_log if self.clobber else self.log + new_log 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

292 _logger.removeHandler(ch) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

293 ch.close() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

294 _logger.setLevel(logger_level) 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

295 # tear down 

296 self.tearDown() 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

297 return self.status 1b$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

298 

299 def register_datasets(self, **kwargs): 

300 """ 

301 Register output datasets from the task to Alyx. 

302 

303 Parameters 

304 ---------- 

305 kwargs 

306 Directly passed to the `DataHandler.upload_data` method. 

307 

308 Returns 

309 ------- 

310 list 

311 The output of the `DataHandler.upload_data` method, e.g. a list of registered datasets. 

312 """ 

313 _ = self.register_images() 1acH

314 return self.data_handler.uploadData(self.outputs, self.version, **kwargs) 1acH

315 

316 def _input_files_to_register(self, assert_all_exist=False): 

317 """ 

318 Return input datasets to be registered to Alyx. 

319 

320 These datasets are typically raw data files and are registered even if the task fails to complete. 

321 

322 Parameters 

323 ---------- 

324 assert_all_exist 

325 Raise AssertionError if not all required input datasets exist on disk. 

326 

327 Returns 

328 ------- 

329 list of pathlib.Path 

330 A list of input files to register. 

331 

332 # TODO This method currently does not support wildcards 

333 """ 

334 I = data_handlers.ExpectedDataset.input # noqa 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

335 try: 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

336 # Ensure all input files are ExpectedDataset instances 

337 input_files = [I(*i) if isinstance(i, tuple) else i for i in self.input_files or []] 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

338 except AttributeError: 1^

339 raise RuntimeError('Task.setUp must be run before calling this method.') 1^

340 to_register, missing = [], set() 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

341 for dataset in input_files: 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

342 ok, filepaths, _missing = dataset.find_files(self.session_path, register=True) 1b^SKTUNlmnopqrcxyhVWiXgEFYZ0Gj1O234H5I67J8esktuvwP9Q!LM(R#

343 to_register.extend(filepaths) 1b^SKTUNlmnopqrcxyhVWiXgEFYZ0Gj1O234H5I67J8esktuvwP9Q!LM(R#

344 if not ok and _missing is not None: 1b^SKTUNlmnopqrcxyhVWiXgEFYZ0Gj1O234H5I67J8esktuvwP9Q!LM(R#

345 missing.update(_missing) if isinstance(_missing, set) else missing.add(_missing) 1^

346 if any(missing): # NB: These are either glob patterns that have no matches or match files that should be absent 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

347 missing_str = ', '.join(missing) 1^

348 if assert_all_exist: 1^

349 raise AssertionError(f'Missing required input files: {missing_str}') 1^

350 else: 

351 _logger.error(f'Missing required input files: {missing_str}') 1^

352 return to_register 1b$fa^dSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

353 

354 def register_images(self, **kwargs): 

355 """ 

356 Registers images to alyx database 

357 :return: 

358 """ 

359 if self.one and len(self.plot_tasks) > 0: 1acH

360 for plot_task in self.plot_tasks: 

361 try: 

362 _ = plot_task.register_images(widths=['orig']) 

363 except Exception: 

364 _logger.error(traceback.format_exc()) 

365 continue 

366 

367 def rerun(self): 

368 self.run(overwrite=True) 

369 

370 def get_signatures(self, **kwargs): 

371 """ 

372 This is the default but should be overwritten for each task 

373 :return: 

374 """ 

375 signature = data_handlers._parse_signature(self.signature) 1b][`{$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1|}O234H5I67J8'esktuvwP9Q!LM(R#

376 self.input_files = signature['input_files'] 1b][`{$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1|}O234H5I67J8'esktuvwP9Q!LM(R#

377 self.output_files = signature['output_files'] 1b][`{$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1|}O234H5I67J8'esktuvwP9Q!LM(R#

378 

379 @abc.abstractmethod 

380 def _run(self, overwrite=False): 

381 """Execute main task code. 

382 

383 This method contains a task's principal data processing code and should implemented 

384 by each subclass. 

385 

386 Parameters 

387 ---------- 

388 overwrite : bool 

389 When false (default), the task may re-use any intermediate data from a previous run. 

390 When true, the task will disregard or delete any intermediate files before running. 

391 

392 Returns 

393 ------- 

394 pathlib.Path, list of pathlib.Path, None 

395 One or more files to be registered, or an empty list if no files are registered. 

396 Within the pipeline, there is a distinction between a job that returns an empty list 

397 and a job that returns None. If the function returns None, the job will be labeled as 

398 "empty" status in the database, otherwise, the job has an expected behaviour of not 

399 returning any dataset. 

400 """ 

401 

402 def setUp(self, **kwargs): 

403 """Get the data handler and ensure all data is available locally to run task.""" 

404 if self.location == 'server': 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

405 self.get_signatures(**kwargs) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

406 

407 input_status, _ = self.assert_expected_inputs(raise_error=False) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

408 output_status, _ = self.assert_expected(self.output_files, silent=True) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

409 

410 if input_status: 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

411 self.data_handler = self.get_data_handler() 1b][$adSKTUNlzAmBnopCqrDcxyVWXg%YZ0j1O2345678'esktuvwP9Q!LMR#

412 _logger.info('All input files found: running task') 1b][$adSKTUNlzAmBnopCqrDcxyVWXg%YZ0j1O2345678'esktuvwP9Q!LMR#

413 return True 1b][$adSKTUNlzAmBnopCqrDcxyVWXg%YZ0j1O2345678'esktuvwP9Q!LMR#

414 

415 if not self.force: 1chigEFGIJ

416 self.data_handler = self.get_data_handler() 1chigEFGIJ

417 _logger.warning('Not all input files found locally: will still attempt to rerun task') 1chigEFGIJ

418 # TODO in the future once we are sure that input output task signatures work properly should return False 

419 # _logger.info('All output files found but input files required not available locally: task not rerun') 

420 return True 1chigEFGIJ

421 else: 

422 # Attempts to download missing data using globus 

423 _logger.info('Not all input files found locally: attempting to re-download required files') 

424 self.data_handler = self.get_data_handler(location='serverglobus') 

425 self.data_handler.setUp(task=self) 

426 # Double check we now have the required files to run the task 

427 # TODO in future should raise error if even after downloading don't have the correct files 

428 self.assert_expected_inputs(raise_error=False) 

429 return True 

430 else: 

431 self.data_handler = self.get_data_handler() 1fH(

432 self.data_handler.setUp(task=self) 1fH(

433 self.get_signatures(**kwargs) 1fH(

434 self.assert_expected_inputs(raise_error=False) 1fH(

435 return True 1fH(

436 

437 def tearDown(self): 

438 """ 

439 Function after runs() 

440 Does not run if a lock is encountered by the task (status -2) 

441 """ 

442 if self.gpu >= 1: 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

443 if self._lock_file_path().exists(): 1fe

444 self._lock_file_path().unlink() 1fe

445 

446 def cleanUp(self): 

447 """ 

448 Function to optionally overload to clean up 

449 :return: 

450 """ 

451 self.data_handler.cleanUp(task=self) 1ac

452 

453 def assert_expected_outputs(self, raise_error=True): 

454 """ 

455 After a run, asserts that all signature files are present at least once in the output files 

456 Mainly useful for integration tests 

457 :return: 

458 """ 

459 assert self.status == 0 1lzAmBnopCqrDHesktuvw

460 _logger.info('Checking output files') 1lzAmBnopCqrDHesktuvw

461 everything_is_fine, files = self.assert_expected(self.output_files) 1lzAmBnopCqrDHesktuvw

462 

463 if not everything_is_fine: 1lzAmBnopCqrDHesktuvw

464 for out in self.outputs: 

465 _logger.error(f'{out}') 

466 if raise_error: 

467 raise FileNotFoundError('Missing outputs after task completion') 

468 

469 return everything_is_fine, files 1lzAmBnopCqrDHesktuvw

470 

471 def assert_expected_inputs(self, raise_error=True, raise_ambiguous=False): 

472 """ 

473 Check that all the files necessary to run the task have been are present on disk. 

474 

475 Parameters 

476 ---------- 

477 raise_error : bool 

478 If true, raise FileNotFoundError if required files are missing. 

479 

480 Returns 

481 ------- 

482 bool 

483 True if all required files were found. 

484 list of pathlib.Path 

485 A list of file paths that exist on disk. 

486 """ 

487 _logger.info('Checking input files') 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

488 everything_is_fine, files = self.assert_expected(self.input_files) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

489 

490 if not everything_is_fine and raise_error: 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

491 raise FileNotFoundError('Missing inputs to run task') 

492 

493 # Check for duplicate datasets that may complicate extraction. 

494 # Some sessions may contain revisions and without ONE it's difficult to determine which 

495 # are the default datasets. Likewise SDSC may contain multiple datasets with different 

496 # UUIDs in the name after patching data. 

497 valid_alf_files = filter(ALFPath.is_valid_alf, files) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

498 variant_datasets = alfio.find_variants(valid_alf_files, extra=False) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

499 if len(variant_datasets) < len(files): 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

500 _logger.warning('Some files are not ALF datasets and will not be checked for ambiguity') 1[jkPQ

501 if any(map(len, variant_datasets.values())): 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

502 # Keep those with variants and make paths relative to session for logging purposes 

503 to_frag = lambda x: x.relative_to_session().as_posix() # noqa 1xyhig

504 ambiguous = { 1xyhig

505 to_frag(k): [to_frag(x) for x in v] 

506 for k, v in variant_datasets.items() if any(v)} 

507 _logger.error('Ambiguous input datasets found: %s', ambiguous) 1xyhig

508 

509 if raise_ambiguous or self.location == 'sdsc': # take no chances on SDSC 1xyhig

510 # This could be mitigated if loading with data OneSDSC 

511 raise NotImplementedError( 

512 'Multiple variant datasets found. Loading for these is undefined.') 

513 

514 return everything_is_fine, files 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

515 

516 def assert_expected(self, expected_files, silent=False): 

517 """ 

518 Assert that expected files are present. 

519 

520 Parameters 

521 ---------- 

522 expected_files : list of ExpectedDataset 

523 A list of expected files in the form (file_pattern_str, collection_str, required_bool). 

524 silent : bool 

525 If true, log an error if any required files are not found. 

526 

527 Returns 

528 ------- 

529 bool 

530 True if all required files were found. 

531 list of pathlib.Path 

532 A list of file paths that exist on disk. 

533 """ 

534 if not any(expected_files): 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

535 return True, [] 1$fadzABCDc%O'

536 ok, actual_files, missing = zip(*(x.find_files(self.session_path) for x in expected_files)) 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

537 everything_is_fine = all(ok) 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

538 # For unknown reasons only the first file of each expected dataset was returned and this 

539 # behaviour was preserved after refactoring the code 

540 files = [file_list[0] for file_list in actual_files if len(file_list) > 0] 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

541 if not everything_is_fine and not silent: 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

542 for missing_pattern in filter(None, flatten(missing)): 1chigEFGIJ

543 _logger.error('Signature file pattern %s not found', missing_pattern) 1chigEFGIJ

544 

545 return everything_is_fine, files 1b][$SKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

546 

547 def get_data_handler(self, location=None): 

548 """ 

549 Gets the relevant data handler based on location argument 

550 :return: 

551 """ 

552 location = str.lower(location or self.location) 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

553 if location == 'local': 1b][$fadSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O234H5I67J8'esktuvwP9Q!LM(R#

554 return data_handlers.LocalDataHandler(self.session_path, self.signature, one=self.one) 1fH(

555 self.one = self.one or ONE() 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

556 if location == 'server': 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

557 dhandler = data_handlers.ServerDataHandler(self.session_path, self.signature, one=self.one) 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

558 elif location == 'serverglobus': 

559 dhandler = data_handlers.ServerGlobusDataHandler(self.session_path, self.signature, one=self.one) 

560 elif location == 'remote': 

561 dhandler = data_handlers.RemoteHttpDataHandler(self.session_path, self.signature, one=self.one) 

562 elif location == 'aws': 

563 dhandler = data_handlers.RemoteAwsDataHandler(self.session_path, self.signature, one=self.one) 

564 elif location == 'sdsc': 

565 dhandler = data_handlers.SDSCDataHandler(self.session_path, self.signature, one=self.one) 

566 elif location == 'popeye': 

567 dhandler = data_handlers.PopeyeDataHandler(self.session_path, self.signature, one=self.one) 

568 elif location == 'ec2': 

569 dhandler = data_handlers.RemoteEC2DataHandler(self.session_path, self.signature, one=self.one) 

570 else: 

571 raise ValueError(f'Unknown location "{location}"') 

572 return dhandler 1b][$adSKTUNlzAmBnopCqrDcxyhVWiXgEF%YZ0Gj1O2345I67J8'esktuvwP9Q!LMR#

573 

574 @staticmethod 

575 def make_lock_file(taskname='', time_out_secs=7200): 

576 """Creates a GPU lock file with a timeout of""" 

577 d = {'start': time.time(), 'name': taskname, 'time_out_secs': time_out_secs} 1fae

578 with open(Task._lock_file_path(), 'w+') as fid: 1fae

579 json.dump(d, fid) 1fae

580 return d 1fae

581 

582 @staticmethod 

583 def _lock_file_path(): 

584 """the lock file is in ~/.one/gpu.lock""" 

585 folder = Path.home().joinpath('.one') 1e

586 folder.mkdir(exist_ok=True) 1e

587 return folder.joinpath('gpu.lock') 1e

588 

589 def _make_lock_file(self): 

590 """creates a lock file with the current time""" 

591 return Task.make_lock_file(self.name, self.time_out_secs) 1fe

592 

593 def is_locked(self): 

594 """Checks if there is a lock file for this given task""" 

595 lock_file = self._lock_file_path() 1fae

596 if not lock_file.exists(): 1fae

597 return False 1fe

598 

599 with open(lock_file) as fid: 1fa

600 d = json.load(fid) 1fa

601 now = time.time() 1fa

602 if (now - d['start']) > d['time_out_secs']: 1fa

603 lock_file.unlink() 1f

604 return False 1f

605 else: 

606 return True 1fa

607 

608 def _creates_lock(self): 

609 if self.location == "popeye": 1fae

610 return True 

611 if self.is_locked(): 1fae

612 return False 1fa

613 else: 

614 self._make_lock_file() 1fe

615 return True 1fe

616 

617 

618class Pipeline(abc.ABC): 

619 """ 

620 Pipeline class: collection of related and potentially interdependent tasks 

621 """ 

622 tasks = OrderedDict() 

623 one = None 

624 

625 def __init__(self, session_path=None, one=None, eid=None): 

626 assert session_path or eid 1badc)*+,-./:;=

627 self.one = one 1badc)*+,-./:;=

628 if one and one.alyx.cache_mode and one.alyx.default_expiry.seconds > 1: 1badc)*+,-./:;=

629 _logger.warning('Alyx client REST cache active; this may cause issues with jobs') 1ba

630 self.eid = eid 1badc)*+,-./:;=

631 if self.one and not self.one.offline: 1badc)*+,-./:;=

632 self.data_repo = get_local_data_repository(self.one.alyx) 1badc

633 else: 

634 self.data_repo = None 1)*+,-./:;=

635 

636 if session_path: 1badc)*+,-./:;=

637 self.session_path = session_path 1badc)*+,-./:;=

638 if not self.eid: 1badc)*+,-./:;=

639 # eID for newer sessions may not be in cache so use remote query 

640 self.eid = one.path2eid(session_path, query_type='remote') if self.one else None 1dc)*+,-./:;=

641 self.label = self.__module__ + '.' + type(self).__name__ 1badc)*+,-./:;=

642 

643 @staticmethod 

644 def _get_exec_name(obj): 

645 """ 

646 For a class, get the executable name as it should be stored in Alyx. When the class 

647 is created dynamically using the type() built-in function, need to revert to the base 

648 class to be able to re-instantiate the class from the alyx dictionary on the client side 

649 :param obj: 

650 :return: string containing the full module plus class name 

651 """ 

652 if obj.__module__ == 'abc': 2a d ? @ cbc ) * + , - . / : ; =

653 exec_name = f'{obj.__class__.__base__.__module__}.{obj.__class__.__base__.__name__}' 2d ? @ cbc ) * + , - . / : ; =

654 else: 

655 exec_name = f'{obj.__module__}.{obj.name}' 1a

656 return exec_name 2a d ? @ cbc ) * + , - . / : ; =

657 

658 def make_graph(self, out_dir=None, show=True): 

659 if not out_dir: 1a

660 out_dir = self.one.alyx.cache_dir if self.one else one.params.get().CACHE_DIR 1a

661 m = Digraph('G', filename=str(Path(out_dir).joinpath(self.__module__ + '_graphs.gv'))) 1a

662 m.attr(rankdir='TD') 1a

663 

664 e = Digraph(name='cluster_' + self.label) 1a

665 e.attr('node', shape='box') 1a

666 e.node('root', label=self.label) 1a

667 

668 e.attr('node', shape='ellipse') 1a

669 for k in self.tasks: 1a

670 j = self.tasks[k] 1a

671 if len(j.parents) == 0: 1a

672 e.edge('root', j.name) 1a

673 else: 

674 [e.edge(p.name, j.name) for p in j.parents] 1a

675 

676 m.subgraph(e) 1a

677 m.attr(label=r'\n\Pre-processing\n') 1a

678 m.attr(fontsize='20') 1a

679 if show: 1a

680 m.view() 

681 return m 1a

682 

683 def create_alyx_tasks(self, rerun__status__in=None, tasks_list=None): 

684 """ 

685 Instantiate the pipeline and create the tasks in Alyx, then create the jobs for the session. 

686 

687 If the jobs already exist, they are left untouched. The re-run parameter will re-init the 

688 job by emptying the log and set the status to Waiting. 

689 

690 Parameters 

691 ---------- 

692 rerun__status__in : list, str 

693 To re-run tasks if they already exist, specify one or more statuses strings to will be 

694 re-run, or '__all__' to re-run all tasks. 

695 tasks_list : list 

696 The list of tasks to create on Alyx. If None, uses self.tasks. 

697 

698 Returns 

699 ------- 

700 list 

701 List of Alyx task dictionaries (existing and/or created). 

702 """ 

703 rerun__status__in = ensure_list(rerun__status__in) 1ad?@c

704 if '__all__' in rerun__status__in: 1ad?@c

705 rerun__status__in = [x for x in TASK_STATUS_SET if x != 'Abandoned'] 

706 assert self.eid 1ad?@c

707 if self.one is None: 1ad?@c

708 _logger.warning('No ONE instance found for Alyx connection, set the one property') 

709 return 

710 tasks_alyx_pre = self.one.alyx.rest('tasks', 'list', session=self.eid, graph=self.name, no_cache=True) 1ad?@c

711 tasks_alyx = [] 1ad?@c

712 # creates all the tasks by iterating through the ordered dict 

713 

714 if tasks_list is not None: 1ad?@c

715 task_items = tasks_list 

716 # need to add in the session eid and the parents 

717 else: 

718 task_items = self.tasks.values() 1ad?@c

719 

720 for t in task_items: 1ad?@c

721 # get the parents' alyx ids to reference in the database 

722 if isinstance(t, dict): 1ad?@c

723 t = Bunch(t) 

724 executable = t.executable 

725 arguments = t.arguments 

726 t['time_out_secs'] = t['time_out_sec'] 

727 if len(t.parents) > 0: 

728 pnames = t.parents 

729 else: 

730 executable = self._get_exec_name(t) 1ad?@c

731 arguments = t.kwargs 1ad?@c

732 if len(t.parents): 1ad?@c

733 pnames = [p.name for p in t.parents] 1ad?@c

734 

735 if len(t.parents): 1ad?@c

736 parents_ids = [ta['id'] for ta in tasks_alyx if ta['name'] in pnames] 1ad?@c

737 else: 

738 parents_ids = [] 1ad?@c

739 

740 task_dict = {'executable': executable, 'priority': t.priority, 1ad?@c

741 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 

742 'ram': t.ram, 'module': self.label, 'parents': parents_ids, 

743 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 

744 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 

745 'arguments': arguments} 

746 if self.data_repo: 1ad?@c

747 task_dict.update({'data_repository': self.data_repo}) 

748 # if the task already exists, patch it otherwise, create it 

749 talyx = next(filter(lambda x: x['name'] == t.name, tasks_alyx_pre), []) 1ad?@c

750 if len(talyx) == 0: 1ad?@c

751 talyx = self.one.alyx.rest('tasks', 'create', data=task_dict) 1ad?@c

752 elif talyx['status'] in rerun__status__in: 1a?@

753 talyx = self.one.alyx.rest('tasks', 'partial_update', id=talyx['id'], data=task_dict) 

754 tasks_alyx.append(talyx) 1ad?@c

755 return tasks_alyx 1ad?@c

756 

757 def create_tasks_list_from_pipeline(self): 

758 """ 

759 From a pipeline with tasks, creates a list of dictionaries containing task description that can be used to upload to 

760 create alyx tasks 

761 :return: 

762 """ 

763 tasks_list = [] 2? @ cb) * + , - . / : ; =

764 for k, t in self.tasks.items(): 2? @ cb) * + , - . / : ; =

765 # get the parents' alyx ids to reference in the database 

766 if len(t.parents): 2? @ cb) * + , - . / : ; =

767 parent_names = [p.name for p in t.parents] 2? @ cb) * + , - . / : ; =

768 else: 

769 parent_names = [] 2? @ cb) * + , - . / : ; =

770 

771 task_dict = {'executable': self._get_exec_name(t), 'priority': t.priority, 2? @ cb) * + , - . / : ; =

772 'io_charge': t.io_charge, 'gpu': t.gpu, 'cpu': t.cpu, 

773 'ram': t.ram, 'module': self.label, 'parents': parent_names, 

774 'level': t.level, 'time_out_sec': t.time_out_secs, 'session': self.eid, 

775 'status': 'Waiting', 'log': None, 'name': t.name, 'graph': self.name, 

776 'arguments': t.kwargs} 

777 if self.data_repo: 2? @ cb) * + , - . / : ; =

778 task_dict.update({'data_repository': self.data_repo}) 

779 

780 tasks_list.append(task_dict) 2? @ cb) * + , - . / : ; =

781 

782 return tasks_list 2? @ cb) * + , - . / : ; =

783 

784 def run(self, status__in=('Waiting',), machine=None, clobber=True, **kwargs): 

785 """ 

786 Get all the session related jobs from alyx and run them 

787 :param status__in: lists of status strings to run in 

788 ['Waiting', 'Started', 'Errored', 'Empty', 'Complete'] 

789 :param machine: string identifying the machine the task is run on, optional 

790 :param clobber: bool, if True any existing logs are overwritten, default is True 

791 :param kwargs: arguments passed downstream to run_alyx_task 

792 :return: jalyx: list of REST dictionaries of the job endpoints 

793 :return: job_deck: list of REST dictionaries of the jobs endpoints 

794 :return: all_datasets: list of REST dictionaries of the dataset endpoints 

795 """ 

796 assert self.session_path, 'Pipeline object has to be declared with a session path to run' 1a

797 if self.one is None: 1a

798 _logger.warning('No ONE instance found for Alyx connection, set the one property') 

799 return 

800 task_deck = self.one.alyx.rest('tasks', 'list', session=self.eid, no_cache=True) 1a

801 # [(t['name'], t['level']) for t in task_deck] 

802 all_datasets = [] 1a

803 for i, j in enumerate(task_deck): 1a

804 if j['status'] not in status__in: 1a

805 continue 1a

806 # here we update the status in-place to avoid another hit to the database 

807 task_deck[i], dsets = run_alyx_task(tdict=j, session_path=self.session_path, 1a

808 one=self.one, job_deck=task_deck, 

809 machine=machine, clobber=clobber, **kwargs) 

810 if dsets is not None: 1a

811 all_datasets.extend(dsets) 1a

812 return task_deck, all_datasets 1a

813 

814 def rerun_failed(self, **kwargs): 

815 return self.run(status__in=['Waiting', 'Held', 'Started', 'Errored', 'Empty'], **kwargs) 1a

816 

817 def rerun(self, **kwargs): 

818 

819 return self.run(status__in=[x for x in TASK_STATUS_SET if x != 'Abandoned'], **kwargs) 

820 

821 @property 

822 def name(self): 

823 return self.__class__.__name__ 2a d ? @ cbc ) * + , - . / : ; =

824 

825 

826def str2class(task_executable: str): 

827 """ 

828 Convert task name to class. 

829 

830 Parameters 

831 ---------- 

832 task_executable : str 

833 A Task class name, e.g. 'ibllib.pipes.behavior_tasks.TrialRegisterRaw'. 

834 

835 Returns 

836 ------- 

837 class 

838 The imported class. 

839 """ 

840 strmodule, strclass = task_executable.rsplit('.', 1) 2wbxba ybc

841 return getattr(importlib.import_module(strmodule), strclass) 2wbxba ybc

842 

843 

844def run_alyx_task(tdict=None, session_path=None, one=None, job_deck=None, 

845 max_md5_size=None, machine=None, clobber=True, location='server', mode='log'): 

846 """ 

847 Runs a single Alyx job and registers output datasets. 

848 

849 Parameters 

850 ---------- 

851 tdict : dict 

852 An Alyx task dictionary to instantiate and run. 

853 session_path : str, pathlib.Path 

854 A session path containing the task input data. 

855 one : one.api.OneAlyx 

856 An instance of ONE. 

857 job_deck : list of dict, optional 

858 A list of all tasks in the same pipeline. If None, queries Alyx to get this. 

859 max_md5_size : int, optional 

860 An optional maximum file size in bytes. Files with sizes larger than this will not have 

861 their MD5 checksum calculated to save time. 

862 machine : str, optional 

863 A string identifying the machine the task is run on. 

864 clobber : bool, default=True 

865 If true any existing logs are overwritten on Alyx. 

866 location : {'remote', 'server', 'sdsc', 'aws'} 

867 Where you are running the task, 'server' - local lab server, 'remote' - any 

868 compute node/ computer, 'sdsc' - Flatiron compute node, 'aws' - using data from AWS S3 

869 node. 

870 mode : {'log', 'raise}, default='log' 

871 Behaviour to adopt if an error occurred. If 'raise', it will raise the error at the very 

872 end of this function (i.e. after having labeled the tasks). 

873 

874 Returns 

875 ------- 

876 dict 

877 The updated task dict. 

878 list of pathlib.Path 

879 A list of registered datasets. 

880 """ 

881 registered_dsets = [] 1ac

882 # here we need to check parents' status, get the job_deck if not available 

883 if not job_deck: 1ac

884 job_deck = one.alyx.rest('tasks', 'list', session=tdict['session'], no_cache=True) 1c

885 if len(tdict['parents']): 1ac

886 # check the dependencies 

887 parent_tasks = filter(lambda x: x['id'] in tdict['parents'], job_deck) 1ac

888 parent_statuses = [j['status'] for j in parent_tasks] 1ac

889 # if any of the parent tasks is not complete, throw a warning 

890 if not set(parent_statuses) <= {'Complete', 'Incomplete'}: 1ac

891 _logger.warning(f"{tdict['name']} has unmet dependencies") 1a

892 # if parents are waiting or failed, set the current task status to Held 

893 # once the parents ran, the descendent tasks will be set from Held to Waiting (see below) 

894 if set(parent_statuses).intersection({'Errored', 'Held', 'Empty', 'Waiting', 'Started', 'Abandoned'}): 1a

895 tdict = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Held'}) 1a

896 return tdict, registered_dsets 1a

897 # creates the job from the module name in the database 

898 classe = str2class(tdict['executable']) 1ac

899 tkwargs = tdict.get('arguments') or {} # if the db field is null it returns None 1ac

900 task = classe(session_path, one=one, taskid=tdict['id'], machine=machine, clobber=clobber, 1ac

901 location=location, **tkwargs) 

902 # sets the status flag to started before running 

903 one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data={'status': 'Started'}) 1ac

904 status = task.run() 1ac

905 patch_data = {'time_elapsed_secs': task.time_elapsed_secs, 'log': task.log, 1ac

906 'version': task.version} 

907 # if there is no data to register, set status to Empty 

908 if task.outputs is None: # NB: an empty list is still considered Complete. 1ac

909 patch_data['status'] = 'Empty' 1a

910 # otherwise register data and set (provisional) status to Complete 

911 else: 

912 try: 1ac

913 kwargs = dict(max_md5_size=max_md5_size) 1ac

914 if location == 'server': 1ac

915 # Explicitly pass lab as lab cannot be inferred from path (which the registration client tries to do). 

916 # To avoid making extra REST requests we can also set labs=None if using ONE v1.20.1. 

917 kwargs['labs'] = get_lab(session_path, one.alyx) 1ac

918 registered_dsets = task.register_datasets(**kwargs) 1ac

919 patch_data['status'] = 'Complete' 1ac

920 except Exception: 

921 _logger.error(traceback.format_exc()) 

922 status = -1 

923 

924 # overwrite status to errored 

925 if status == -1: 1ac

926 patch_data['status'] = 'Errored' 1a

927 # Status -2 means a lock was encountered during run, should be rerun 

928 elif status == -2: 1ac

929 patch_data['status'] = 'Waiting' 1a

930 # Status -3 should be returned if a task is Incomplete 

931 elif status == -3: 1ac

932 patch_data['status'] = 'Incomplete' 1a

933 # update task status on Alyx 

934 t = one.alyx.rest('tasks', 'partial_update', id=tdict['id'], data=patch_data) 1ac

935 # check for dependent held tasks 

936 # NB: Assumes dependent tasks are all part of the same session! 

937 next(x for x in job_deck if x['id'] == t['id'])['status'] = t['status'] # Update status in job deck 1ac

938 dependent_tasks = filter(lambda x: t['id'] in x['parents'] and x['status'] == 'Held', job_deck) 1ac

939 for d in dependent_tasks: 1ac

940 assert d['id'] != t['id'], 'task its own parent' 1a

941 # if all their parent tasks now complete, set to waiting 

942 parent_status = [next(x['status'] for x in job_deck if x['id'] == y) for y in d['parents']] 1a

943 if set(parent_status) <= {'Complete', 'Incomplete'}: 1a

944 one.alyx.rest('tasks', 'partial_update', id=d['id'], data={'status': 'Waiting'}) 1a

945 task.cleanUp() 1ac

946 if mode == 'raise' and status != 0: 1ac

947 raise ValueError(task.log) 

948 return t, registered_dsets 1ac