Coverage for ibllib/pipes/local_server.py: 70%

120 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""Lab server pipeline construction and task runner. 

2 

3This is the module called by the job services on the lab servers. See 

4iblscripts/deploy/serverpc/crons for the service scripts that employ this module. 

5""" 

6import logging 

7import time 

8from datetime import datetime 

9from pathlib import Path 

10import re 

11import subprocess 

12import sys 

13import traceback 

14import importlib 

15import importlib.metadata 

16 

17from one.api import ONE 

18from one.webclient import AlyxClient 

19from one.remote.globus import get_lab_from_endpoint_id, get_local_endpoint_id 

20 

21from ibllib import __version__ as ibllib_version 

22from ibllib.io.extractors.base import get_pipeline, get_session_extractor_type 

23from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing 

24from ibllib.time import date2isostr 

25from ibllib.oneibl.registration import IBLRegistrationClient 

26from ibllib.oneibl.data_handlers import get_local_data_repository 

27from ibllib.io.session_params import read_params 

28from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session 

29 

30_logger = logging.getLogger(__name__) 

31LARGE_TASKS = [ 

32 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess' 

33] 

34 

35 

36def _get_pipeline_class(session_path, one): 

37 pipeline = get_pipeline(session_path) 

38 if pipeline == 'training': 

39 PipelineClass = training_preprocessing.TrainingExtractionPipeline 

40 elif pipeline == 'ephys': 

41 PipelineClass = ephys_preprocessing.EphysExtractionPipeline 

42 else: 

43 # try and look if there is a custom extractor in the personal projects extraction class 

44 import projects.base 

45 task_type = get_session_extractor_type(session_path) 

46 PipelineClass = projects.base.get_pipeline(task_type) 

47 _logger.info(f"Using {PipelineClass} pipeline for {session_path}") 

48 return PipelineClass(session_path=session_path, one=one) 

49 

50 

51def _run_command(cmd): 

52 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 

53 stderr=subprocess.PIPE) 

54 info, error = process.communicate() 

55 if process.returncode != 0: 

56 return None 

57 else: 

58 return info.decode('utf-8').strip() 

59 

60 

61def _get_volume_usage(vol, label=''): 

62 cmd = f'df {vol}' 

63 res = _run_command(cmd) 

64 # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk'] 

65 size_list = re.split(' +', res.split('\n')[-1]) 

66 fac = 1024 ** 2 

67 d = {'total': int(size_list[1]) / fac, 

68 'used': int(size_list[2]) / fac, 

69 'available': int(size_list[3]) / fac, 

70 'volume': size_list[5]} 

71 return {f"{label}_{k}": d[k] for k in d} 

72 

73 

74def report_health(one): 

75 """ 

76 Get a few indicators and label the json field of the corresponding lab with them. 

77 """ 

78 status = {'python_version': sys.version, 

79 'ibllib_version': ibllib_version, 

80 'phylib_version': importlib.metadata.version('phylib'), 

81 'local_time': date2isostr(datetime.now())} 

82 status.update(_get_volume_usage('/mnt/s0/Data', 'raid')) 

83 status.update(_get_volume_usage('/', 'system')) 

84 

85 data_repos = one.alyx.rest('data-repository', 'list', globus_endpoint_id=get_local_endpoint_id()) 

86 

87 for dr in data_repos: 

88 one.alyx.json_field_update(endpoint='data-repository', uuid=dr['name'], field_name='json', data=status) 

89 

90 

91def job_creator(root_path, one=None, dry=False, rerun=False): 

92 """ 

93 Create new sessions and pipelines. 

94 

95 Server function that will look for 'raw_session.flag' files and for each: 

96 1) create the session on Alyx 

97 2) create the tasks to be run on Alyx 

98 

99 For legacy sessions the raw data are registered separately, instead of within a pipeline task. 

100 

101 Parameters 

102 ---------- 

103 root_path : str, pathlib.Path 

104 Main path containing sessions or a session path. 

105 one : one.api.OneAlyx 

106 An ONE instance for registering the session(s). 

107 dry : bool 

108 If true, simply log the session_path(s) found, without registering anything. 

109 rerun : bool 

110 If true and session pipeline tasks already exist, set them all to waiting. 

111 

112 Returns 

113 ------- 

114 list of ibllib.pipes.tasks.Pipeline 

115 The pipelines created. 

116 list of dicts 

117 A list of any datasets registered (only for legacy sessions) 

118 """ 

119 _logger.info('Start looking for new sessions...') 1dba

120 if not one: 1dba

121 one = ONE(cache_rest=None) 

122 rc = IBLRegistrationClient(one=one) 1dba

123 flag_files = list(Path(root_path).glob('**/raw_session.flag')) 1dba

124 pipes = [] 1dba

125 all_datasets = [] 1dba

126 for flag_file in flag_files: 1dba

127 session_path = flag_file.parent 1dba

128 _logger.info(f'creating session for {session_path}') 1dba

129 if dry: 1dba

130 continue 

131 try: 1dba

132 # if the subject doesn't exist in the database, skip 

133 rc.register_session(session_path, file_list=False) 1dba

134 

135 # NB: all sessions now extracted using dynamic pipeline 

136 if read_params(session_path) is None: 1dba

137 # Create legacy experiment description file 

138 acquisition_description_legacy_session(session_path, save=True) 1da

139 pipe = make_pipeline(session_path, one=one) 1dba

140 if rerun: 1dba

141 rerun__status__in = '__all__' 

142 else: 

143 rerun__status__in = ['Waiting'] 1dba

144 pipe.create_alyx_tasks(rerun__status__in=rerun__status__in) 1dba

145 flag_file.unlink() 1dba

146 if pipe is not None: 1dba

147 pipes.append(pipe) 1dba

148 except Exception: 1d

149 _logger.error('Failed to register session %s:\n%s', session_path.relative_to(root_path), traceback.format_exc()) 1d

150 continue 1d

151 

152 return pipes, all_datasets 1dba

153 

154 

155def task_queue(mode='all', lab=None, alyx=None, env=(None,)): 

156 """ 

157 Query waiting jobs from the specified Lab 

158 

159 Parameters 

160 ---------- 

161 mode : {'all', 'small', 'large'} 

162 Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs. 

163 lab : str 

164 Lab name as per Alyx, otherwise try to infer from local Globus install. 

165 alyx : one.webclient.AlyxClient 

166 An Alyx instance. 

167 env : list 

168 One or more environments to filter by. See :prop:`ibllib.pipes.tasks.Task.env`. 

169 

170 Returns 

171 ------- 

172 list of dict 

173 A list of Alyx tasks associated with `lab` that have a 'Waiting' status. 

174 """ 

175 def predicate(task): 1e

176 classe = tasks.str2class(task['executable']) 1e

177 return (mode == 'all' or classe.job_size == mode) and classe.env in env 1e

178 

179 alyx = alyx or AlyxClient(cache_rest=None) 1e

180 if lab is None: 1e

181 _logger.debug('Trying to infer lab from globus installation') 

182 lab = get_lab_from_endpoint_id(alyx=alyx) 

183 if lab is None: 1e

184 _logger.error('No lab provided or found') 

185 return # if the lab is none, this will return empty tasks each time 

186 data_repo = get_local_data_repository(alyx) 1e

187 # Filter for tasks 

188 waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', 1e

189 django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True) 

190 # Filter tasks by size 

191 filtered_tasks = filter(predicate, waiting_tasks) 1e

192 # Order tasks by priority 

193 sorted_tasks = sorted(filtered_tasks, key=lambda d: d['priority'], reverse=True) 1e

194 

195 return sorted_tasks 1e

196 

197 

198def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs): 

199 """ 

200 Function to run a list of tasks (task dictionary from Alyx query) on a local server 

201 

202 Parameters 

203 ---------- 

204 subjects_path : str, pathlib.Path 

205 The location of the subject session folders, e.g. '/mnt/s0/Data/Subjects'. 

206 tasks_dict : list of dict 

207 A list of tasks to run. Typically the output of `task_queue`. 

208 one : one.api.OneAlyx 

209 An instance of ONE. 

210 dry : bool, default=False 

211 If true, simply prints the full session paths and task names without running the tasks. 

212 count : int, default=5 

213 The maximum number of tasks to run from the tasks_dict list. 

214 time_out : float, optional 

215 The time in seconds to run tasks before exiting. If set this will run tasks until the 

216 timeout has elapsed. NB: Only checks between tasks and will not interrupt a running task. 

217 **kwargs 

218 See ibllib.pipes.tasks.run_alyx_task. 

219 

220 Returns 

221 ------- 

222 list of pathlib.Path 

223 A list of datasets registered to Alyx. 

224 """ 

225 if one is None: 1ba

226 one = ONE(cache_rest=None) 

227 tstart = time.time() 1ba

228 c = 0 1ba

229 last_session = None 1ba

230 all_datasets = [] 1ba

231 for tdict in tasks_dict: 1ba

232 # if the count is reached or if the time_out has been elapsed, break the loop and return 

233 if c >= count or (time_out and time.time() - tstart > time_out): 1ba

234 break 

235 # reconstruct the session local path. As many jobs belong to the same session 

236 # cache the result 

237 if last_session != tdict['session']: 1ba

238 ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] 1ba

239 session_path = Path(subjects_path).joinpath( 1ba

240 Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) 

241 last_session = tdict['session'] 1ba

242 if dry: 1ba

243 print(session_path, tdict['name']) 1a

244 else: 

245 task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path, one=one, **kwargs) 1ba

246 if dsets: 1ba

247 all_datasets.extend(dsets) 1ba

248 c += 1 1ba

249 return all_datasets 1ba