Coverage for ibllib/pipes/local_server.py: 73%

114 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 15:25 +0000

1"""Lab server pipeline construction and task runner. 

2 

3This is the module called by the job services on the lab servers. See 

4iblscripts/deploy/serverpc/crons for the service scripts that employ this module. 

5""" 

6import logging 

7import time 

8from datetime import datetime 

9from pathlib import Path 

10import re 

11import subprocess 

12import sys 

13import traceback 

14import importlib 

15import importlib.metadata 

16 

17from one.api import ONE 

18from one.webclient import AlyxClient 

19from one.remote.globus import get_lab_from_endpoint_id, get_local_endpoint_id 

20from one.alf.spec import is_session_path 

21from one.alf.path import session_path_parts 

22 

23from ibllib import __version__ as ibllib_version 

24from ibllib.pipes import tasks 

25from ibllib.time import date2isostr 

26from ibllib.oneibl.registration import IBLRegistrationClient 

27from ibllib.oneibl.data_handlers import get_local_data_repository 

28from ibllib.io.session_params import read_params 

29from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session 

30 

31_logger = logging.getLogger(__name__) 

32LARGE_TASKS = [ 

33 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess' 

34] 

35 

36 

37def _run_command(cmd): 

38 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 

39 stderr=subprocess.PIPE) 

40 info, error = process.communicate() 

41 if process.returncode != 0: 

42 return None 

43 else: 

44 return info.decode('utf-8').strip() 

45 

46 

47def _get_volume_usage(vol, label=''): 

48 cmd = f'df {vol}' 

49 res = _run_command(cmd) 

50 # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk'] 

51 size_list = re.split(' +', res.split('\n')[-1]) 

52 fac = 1024 ** 2 

53 d = {'total': int(size_list[1]) / fac, 

54 'used': int(size_list[2]) / fac, 

55 'available': int(size_list[3]) / fac, 

56 'volume': size_list[5]} 

57 return {f"{label}_{k}": d[k] for k in d} 

58 

59 

60def report_health(alyx): 

61 """ 

62 Get a few indicators and label the json field of the corresponding lab with them. 

63 """ 

64 status = {'python_version': sys.version, 

65 'ibllib_version': ibllib_version, 

66 'phylib_version': importlib.metadata.version('phylib'), 

67 'local_time': date2isostr(datetime.now())} 

68 status.update(_get_volume_usage('/mnt/s0/Data', 'raid')) 

69 status.update(_get_volume_usage('/', 'system')) 

70 

71 data_repos = alyx.rest('data-repository', 'list', globus_endpoint_id=get_local_endpoint_id()) 

72 

73 for dr in data_repos: 

74 alyx.json_field_update(endpoint='data-repository', uuid=dr['name'], field_name='json', data=status) 

75 

76 

77def job_creator(root_path, one=None, dry=False, rerun=False): 

78 """ 

79 Create new sessions and pipelines. 

80 

81 Server function that will look for 'raw_session.flag' files and for each: 

82 1) create the session on Alyx 

83 2) create the tasks to be run on Alyx 

84 

85 For legacy sessions the raw data are registered separately, instead of within a pipeline task. 

86 

87 Parameters 

88 ---------- 

89 root_path : str, pathlib.Path 

90 Main path containing sessions or a session path. 

91 one : one.api.OneAlyx 

92 An ONE instance for registering the session(s). 

93 dry : bool 

94 If true, simply log the session_path(s) found, without registering anything. 

95 rerun : bool 

96 If true and session pipeline tasks already exist, set them all to waiting. 

97 

98 Returns 

99 ------- 

100 list of ibllib.pipes.tasks.Pipeline 

101 The pipelines created. 

102 list of dicts 

103 A list of any datasets registered (only for legacy sessions) 

104 """ 

105 _logger.info('Start looking for new sessions...') 1ca

106 if not one: 1ca

107 one = ONE(cache_rest=None) 

108 rc = IBLRegistrationClient(one=one) 1ca

109 flag_files = Path(root_path).glob('*/????-??-??/*/raw_session.flag') 1ca

110 flag_files = filter(lambda x: is_session_path(x.parent), flag_files) 1ca

111 pipes = [] 1ca

112 all_datasets = [] 1ca

113 for flag_file in flag_files: 1ca

114 session_path = flag_file.parent 1ca

115 if session_path_parts(session_path)[1] in ('test', 'test_subject'): 1ca

116 _logger.debug('skipping test session %s', session_path) 1c

117 continue 1c

118 _logger.info(f'creating session for {session_path}') 1ca

119 if dry: 1ca

120 continue 

121 try: 1ca

122 # if the subject doesn't exist in the database, skip 

123 rc.register_session(session_path, file_list=False) 1ca

124 

125 # NB: all sessions now extracted using dynamic pipeline 

126 if read_params(session_path) is None: 1ca

127 # Create legacy experiment description file 

128 acquisition_description_legacy_session(session_path, save=True) 

129 pipe = make_pipeline(session_path, one=one) 1ca

130 if rerun: 1ca

131 rerun__status__in = '__all__' 

132 else: 

133 rerun__status__in = ['Waiting'] 1ca

134 pipe.create_alyx_tasks(rerun__status__in=rerun__status__in) 1ca

135 flag_file.unlink() 1ca

136 if pipe is not None: 1ca

137 pipes.append(pipe) 1ca

138 except Exception: 

139 _logger.error('Failed to register session %s:\n%s', session_path.relative_to(root_path), traceback.format_exc()) 

140 continue 

141 

142 return pipes, all_datasets 1ca

143 

144 

145def task_queue(mode='all', lab=None, alyx=None, env=(None,)): 

146 """ 

147 Query waiting jobs from the specified Lab 

148 

149 Parameters 

150 ---------- 

151 mode : {'all', 'small', 'large'} 

152 Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs. 

153 lab : str 

154 Lab name as per Alyx, otherwise try to infer from local Globus install. 

155 alyx : one.webclient.AlyxClient 

156 An Alyx instance. 

157 env : list 

158 One or more environments to filter by. See :prop:`ibllib.pipes.tasks.Task.env`. 

159 

160 Returns 

161 ------- 

162 list of dict 

163 A list of Alyx tasks associated with `lab` that have a 'Waiting' status. 

164 """ 

165 def predicate(task): 1d

166 classe = tasks.str2class(task['executable']) 1d

167 return (mode == 'all' or classe.job_size == mode) and classe.env in env 1d

168 

169 alyx = alyx or AlyxClient(cache_rest=None) 1d

170 if lab is None: 1d

171 _logger.debug('Trying to infer lab from globus installation') 

172 lab = get_lab_from_endpoint_id(alyx=alyx) 

173 if lab is None: 1d

174 _logger.error('No lab provided or found') 

175 return # if the lab is none, this will return empty tasks each time 

176 data_repo = get_local_data_repository(alyx) 1d

177 # Filter for tasks 

178 waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', 1d

179 django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True) 

180 # Filter tasks by size 

181 filtered_tasks = filter(predicate, waiting_tasks) 1d

182 # Order tasks by priority 

183 sorted_tasks = sorted(filtered_tasks, key=lambda d: d['priority'], reverse=True) 1d

184 

185 return sorted_tasks 1d

186 

187 

188def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs): 

189 """ 

190 Function to run a list of tasks (task dictionary from Alyx query) on a local server 

191 

192 Parameters 

193 ---------- 

194 subjects_path : str, pathlib.Path 

195 The location of the subject session folders, e.g. '/mnt/s0/Data/Subjects'. 

196 tasks_dict : list of dict 

197 A list of tasks to run. Typically the output of `task_queue`. 

198 one : one.api.OneAlyx 

199 An instance of ONE. 

200 dry : bool, default=False 

201 If true, simply prints the full session paths and task names without running the tasks. 

202 count : int, default=5 

203 The maximum number of tasks to run from the tasks_dict list. 

204 time_out : float, optional 

205 The time in seconds to run tasks before exiting. If set this will run tasks until the 

206 timeout has elapsed. NB: Only checks between tasks and will not interrupt a running task. 

207 kwargs 

208 See ibllib.pipes.tasks.run_alyx_task. 

209 

210 Returns 

211 ------- 

212 list of pathlib.Path 

213 A list of datasets registered to Alyx. 

214 """ 

215 if one is None: 1a

216 one = ONE(cache_rest=None) 

217 tstart = time.time() 1a

218 c = 0 1a

219 last_session = None 1a

220 all_datasets = [] 1a

221 for tdict in tasks_dict: 1a

222 # if the count is reached or if the time_out has been elapsed, break the loop and return 

223 if c >= count or (time_out and time.time() - tstart > time_out): 1a

224 break 

225 # reconstruct the session local path. As many jobs belong to the same session 

226 # cache the result 

227 if last_session != tdict['session']: 1a

228 ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] 1a

229 session_path = Path(subjects_path).joinpath( 1a

230 Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) 

231 last_session = tdict['session'] 1a

232 if dry: 1a

233 print(session_path, tdict['name']) 

234 else: 

235 task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path, one=one, **kwargs) 1a

236 if dsets: 1a

237 all_datasets.extend(dsets) 1a

238 c += 1 1a

239 return all_datasets 1a