Coverage for ibllib/pipes/local_server.py: 60%

143 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import time 

2from datetime import datetime 

3from pathlib import Path 

4import pkg_resources 

5import re 

6import subprocess 

7import sys 

8import traceback 

9import importlib 

10 

11from one.api import ONE 

12from one.webclient import AlyxClient 

13from one.remote.globus import get_lab_from_endpoint_id 

14from iblutil.util import setup_logger 

15 

16from ibllib.io.extractors.base import get_pipeline, get_task_protocol, get_session_extractor_type 

17from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing 

18from ibllib.time import date2isostr 

19from ibllib.oneibl.registration import IBLRegistrationClient, register_session_raw_data, get_lab 

20from ibllib.oneibl.data_handlers import get_local_data_repository 

21from ibllib.io.session_params import read_params 

22from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session 

23 

24_logger = setup_logger(__name__, level='INFO') 

25LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC'] 

26 

27 

28def _get_pipeline_class(session_path, one): 

29 pipeline = get_pipeline(session_path) 1ab

30 if pipeline == 'training': 1ab

31 PipelineClass = training_preprocessing.TrainingExtractionPipeline 1b

32 elif pipeline == 'ephys': 1a

33 PipelineClass = ephys_preprocessing.EphysExtractionPipeline 1a

34 else: 

35 # try and look if there is a custom extractor in the personal projects extraction class 

36 import projects.base 

37 task_type = get_session_extractor_type(session_path) 

38 PipelineClass = projects.base.get_pipeline(task_type) 

39 _logger.info(f"Using {PipelineClass} pipeline for {session_path}") 1ab

40 return PipelineClass(session_path=session_path, one=one) 1ab

41 

42 

43def _run_command(cmd): 

44 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 

45 stderr=subprocess.PIPE) 

46 info, error = process.communicate() 

47 if process.returncode != 0: 

48 return None 

49 else: 

50 return info.decode('utf-8').strip() 

51 

52 

53def _get_volume_usage(vol, label=''): 

54 cmd = f'df {vol}' 

55 res = _run_command(cmd) 

56 # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk'] 

57 size_list = re.split(' +', res.split('\n')[-1]) 

58 fac = 1024 ** 2 

59 d = {'total': int(size_list[1]) / fac, 

60 'used': int(size_list[2]) / fac, 

61 'available': int(size_list[3]) / fac, 

62 'volume': size_list[5]} 

63 return {f"{label}_{k}": d[k] for k in d} 

64 

65 

66def report_health(one): 

67 """ 

68 Get a few indicators and label the json field of the corresponding lab with them 

69 """ 

70 status = {'python_version': sys.version, 

71 'ibllib_version': pkg_resources.get_distribution("ibllib").version, 

72 'phylib_version': pkg_resources.get_distribution("phylib").version, 

73 'local_time': date2isostr(datetime.now())} 

74 status.update(_get_volume_usage('/mnt/s0/Data', 'raid')) 

75 status.update(_get_volume_usage('/', 'system')) 

76 

77 lab_names = get_lab_from_endpoint_id(alyx=one.alyx) 

78 for ln in lab_names: 

79 one.alyx.json_field_update(endpoint='labs', uuid=ln, field_name='json', data=status) 

80 

81 

82def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None): 

83 """ 

84 Create new sessions and pipelines. 

85 

86 Server function that will look for 'raw_session.flag' files and for each: 

87 1) create the session on Alyx 

88 2) create the tasks to be run on Alyx 

89 

90 For legacy sessions the raw data are registered separately, instead of within a pipeline task. 

91 

92 Parameters 

93 ---------- 

94 root_path : str, pathlib.Path 

95 Main path containing sessions or a session path. 

96 one : one.api.OneAlyx 

97 An ONE instance for registering the session(s). 

98 dry : bool 

99 If true, simply log the session_path(s) found, without registering anything. 

100 rerun : bool 

101 If true and session pipeline tasks already exist, set them all to waiting. 

102 max_md5_size : int 

103 (legacy sessions) The maximum file size to calculate the MD5 hash sum for. 

104 

105 Returns 

106 ------- 

107 list of ibllib.pipes.tasks.Pipeline 

108 The pipelines created. 

109 list of dicts 

110 A list of any datasets registered (only for legacy sessions) 

111 """ 

112 for _ in range(10): 1cab

113 _logger.info('#' * 110) 1cab

114 _logger.info('Start looking for new sessions...') 1cab

115 _logger.info('#' * 110) 1cab

116 if not one: 1cab

117 one = ONE(cache_rest=None) 

118 rc = IBLRegistrationClient(one=one) 1cab

119 flag_files = list(Path(root_path).glob('**/raw_session.flag')) 1cab

120 pipes = [] 1cab

121 all_datasets = [] 1cab

122 for flag_file in flag_files: 1cab

123 session_path = flag_file.parent 1cab

124 _logger.info(f'creating session for {session_path}') 1cab

125 if dry: 1cab

126 continue 

127 try: 1cab

128 # if the subject doesn't exist in the database, skip 

129 rc.register_session(session_path, file_list=False) 1cab

130 

131 # See if we need to create a dynamic pipeline 

132 experiment_description_file = read_params(session_path) 1cab

133 if experiment_description_file is not None: 1cab

134 pipe = make_pipeline(session_path, one=one) 1c

135 else: 

136 # Create legacy experiment description file 

137 acquisition_description_legacy_session(session_path, save=True) 1ab

138 lab = get_lab(session_path, one.alyx) # Can be set to None to do this Alyx-side if using ONE v1.20.1 1ab

139 _, dsets = register_session_raw_data(session_path, one=one, max_md5_size=max_md5_size, labs=lab) 1ab

140 if dsets: 1ab

141 all_datasets.extend(dsets) 1ab

142 pipe = _get_pipeline_class(session_path, one) 1ab

143 if pipe is None: 1ab

144 task_protocol = get_task_protocol(session_path) 

145 _logger.info(f'Session task protocol {task_protocol} has no matching pipeline pattern {session_path}') 

146 if rerun: 1cab

147 rerun__status__in = '__all__' 

148 else: 

149 rerun__status__in = ['Waiting'] 1cab

150 pipe.create_alyx_tasks(rerun__status__in=rerun__status__in) 1cab

151 flag_file.unlink() 1cab

152 if pipe is not None: 1cab

153 pipes.append(pipe) 1cab

154 except Exception: 

155 _logger.error(traceback.format_exc()) 

156 _logger.warning(f'Creating session / registering raw datasets {session_path} errored') 

157 continue 

158 

159 return pipes, all_datasets 1cab

160 

161 

162def task_queue(mode='all', lab=None, alyx=None): 

163 """ 

164 Query waiting jobs from the specified Lab 

165 :param mode: Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs 

166 :param lab: lab name as per Alyx, otherwise try to infer from local globus install 

167 :param one: ONE instance 

168 ------- 

169 

170 """ 

171 alyx = alyx or AlyxClient(cache_rest=None) 

172 if lab is None: 

173 _logger.debug('Trying to infer lab from globus installation') 

174 lab = get_lab_from_endpoint_id(alyx=alyx) 

175 if lab is None: 

176 _logger.error('No lab provided or found') 

177 return # if the lab is none, this will return empty tasks each time 

178 data_repo = get_local_data_repository(alyx) 

179 # Filter for tasks 

180 tasks_all = alyx.rest('tasks', 'list', status='Waiting', 

181 django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True) 

182 if mode == 'all': 

183 waiting_tasks = tasks_all 

184 else: 

185 small_jobs = [] 

186 large_jobs = [] 

187 for t in tasks_all: 

188 strmodule, strclass = t['executable'].rsplit('.', 1) 

189 classe = getattr(importlib.import_module(strmodule), strclass) 

190 job_size = classe.job_size 

191 if job_size == 'small': 

192 small_jobs.append(t) 

193 else: 

194 large_jobs.append(t) 

195 if mode == 'small': 

196 waiting_tasks = small_jobs 

197 elif mode == 'large': 

198 waiting_tasks = large_jobs 

199 

200 # Order tasks by priority 

201 sorted_tasks = sorted(waiting_tasks, key=lambda d: d['priority'], reverse=True) 

202 

203 return sorted_tasks 

204 

205 

206def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs): 

207 """ 

208 Function to run a list of tasks (task dictionary from Alyx query) on a local server 

209 :param subjects_path: 

210 :param tasks_dict: 

211 :param one: 

212 :param dry: 

213 :param count: maximum number of tasks to run 

214 :param time_out: between each task, if time elapsed is greater than time out, returns (seconds) 

215 :param kwargs: 

216 :return: list of dataset dictionaries 

217 """ 

218 if one is None: 1cab

219 one = ONE(cache_rest=None) 

220 tstart = time.time() 1cab

221 c = 0 1cab

222 last_session = None 1cab

223 all_datasets = [] 1cab

224 for tdict in tasks_dict: 1cab

225 # if the count is reached or if the time_out has been elapsed, break the loop and return 

226 if c >= count or (time_out and time.time() - tstart > time_out): 1cab

227 break 

228 # reconstruct the session local path. As many jobs belong to the same session 

229 # cache the result 

230 if last_session != tdict['session']: 1cab

231 ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] 1cab

232 session_path = Path(subjects_path).joinpath( 1cab

233 Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3))) 

234 last_session = tdict['session'] 1cab

235 if dry: 1cab

236 print(session_path, tdict['name']) 1b

237 else: 

238 task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path, 1cab

239 one=one, **kwargs) 

240 if dsets: 1cab

241 all_datasets.extend(dsets) 1cab

242 c += 1 1cab

243 return all_datasets 1cab