Coverage for ibllib/pipes/local_server.py: 60%
143 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import time
2from datetime import datetime
3from pathlib import Path
4import pkg_resources
5import re
6import subprocess
7import sys
8import traceback
9import importlib
11from one.api import ONE
12from one.webclient import AlyxClient
13from one.remote.globus import get_lab_from_endpoint_id
14from iblutil.util import setup_logger
16from ibllib.io.extractors.base import get_pipeline, get_task_protocol, get_session_extractor_type
17from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing
18from ibllib.time import date2isostr
19from ibllib.oneibl.registration import IBLRegistrationClient, register_session_raw_data, get_lab
20from ibllib.oneibl.data_handlers import get_local_data_repository
21from ibllib.io.session_params import read_params
22from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session
24_logger = setup_logger(__name__, level='INFO')
25LARGE_TASKS = ['EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC']
28def _get_pipeline_class(session_path, one):
29 pipeline = get_pipeline(session_path) 1ab
30 if pipeline == 'training': 1ab
31 PipelineClass = training_preprocessing.TrainingExtractionPipeline 1b
32 elif pipeline == 'ephys': 1a
33 PipelineClass = ephys_preprocessing.EphysExtractionPipeline 1a
34 else:
35 # try and look if there is a custom extractor in the personal projects extraction class
36 import projects.base
37 task_type = get_session_extractor_type(session_path)
38 PipelineClass = projects.base.get_pipeline(task_type)
39 _logger.info(f"Using {PipelineClass} pipeline for {session_path}") 1ab
40 return PipelineClass(session_path=session_path, one=one) 1ab
43def _run_command(cmd):
44 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
45 stderr=subprocess.PIPE)
46 info, error = process.communicate()
47 if process.returncode != 0:
48 return None
49 else:
50 return info.decode('utf-8').strip()
53def _get_volume_usage(vol, label=''):
54 cmd = f'df {vol}'
55 res = _run_command(cmd)
56 # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk']
57 size_list = re.split(' +', res.split('\n')[-1])
58 fac = 1024 ** 2
59 d = {'total': int(size_list[1]) / fac,
60 'used': int(size_list[2]) / fac,
61 'available': int(size_list[3]) / fac,
62 'volume': size_list[5]}
63 return {f"{label}_{k}": d[k] for k in d}
66def report_health(one):
67 """
68 Get a few indicators and label the json field of the corresponding lab with them
69 """
70 status = {'python_version': sys.version,
71 'ibllib_version': pkg_resources.get_distribution("ibllib").version,
72 'phylib_version': pkg_resources.get_distribution("phylib").version,
73 'local_time': date2isostr(datetime.now())}
74 status.update(_get_volume_usage('/mnt/s0/Data', 'raid'))
75 status.update(_get_volume_usage('/', 'system'))
77 lab_names = get_lab_from_endpoint_id(alyx=one.alyx)
78 for ln in lab_names:
79 one.alyx.json_field_update(endpoint='labs', uuid=ln, field_name='json', data=status)
82def job_creator(root_path, one=None, dry=False, rerun=False, max_md5_size=None):
83 """
84 Create new sessions and pipelines.
86 Server function that will look for 'raw_session.flag' files and for each:
87 1) create the session on Alyx
88 2) create the tasks to be run on Alyx
90 For legacy sessions the raw data are registered separately, instead of within a pipeline task.
92 Parameters
93 ----------
94 root_path : str, pathlib.Path
95 Main path containing sessions or a session path.
96 one : one.api.OneAlyx
97 An ONE instance for registering the session(s).
98 dry : bool
99 If true, simply log the session_path(s) found, without registering anything.
100 rerun : bool
101 If true and session pipeline tasks already exist, set them all to waiting.
102 max_md5_size : int
103 (legacy sessions) The maximum file size to calculate the MD5 hash sum for.
105 Returns
106 -------
107 list of ibllib.pipes.tasks.Pipeline
108 The pipelines created.
109 list of dicts
110 A list of any datasets registered (only for legacy sessions)
111 """
112 for _ in range(10): 1cab
113 _logger.info('#' * 110) 1cab
114 _logger.info('Start looking for new sessions...') 1cab
115 _logger.info('#' * 110) 1cab
116 if not one: 1cab
117 one = ONE(cache_rest=None)
118 rc = IBLRegistrationClient(one=one) 1cab
119 flag_files = list(Path(root_path).glob('**/raw_session.flag')) 1cab
120 pipes = [] 1cab
121 all_datasets = [] 1cab
122 for flag_file in flag_files: 1cab
123 session_path = flag_file.parent 1cab
124 _logger.info(f'creating session for {session_path}') 1cab
125 if dry: 1cab
126 continue
127 try: 1cab
128 # if the subject doesn't exist in the database, skip
129 rc.register_session(session_path, file_list=False) 1cab
131 # See if we need to create a dynamic pipeline
132 experiment_description_file = read_params(session_path) 1cab
133 if experiment_description_file is not None: 1cab
134 pipe = make_pipeline(session_path, one=one) 1c
135 else:
136 # Create legacy experiment description file
137 acquisition_description_legacy_session(session_path, save=True) 1ab
138 lab = get_lab(session_path, one.alyx) # Can be set to None to do this Alyx-side if using ONE v1.20.1 1ab
139 _, dsets = register_session_raw_data(session_path, one=one, max_md5_size=max_md5_size, labs=lab) 1ab
140 if dsets: 1ab
141 all_datasets.extend(dsets) 1ab
142 pipe = _get_pipeline_class(session_path, one) 1ab
143 if pipe is None: 1ab
144 task_protocol = get_task_protocol(session_path)
145 _logger.info(f'Session task protocol {task_protocol} has no matching pipeline pattern {session_path}')
146 if rerun: 1cab
147 rerun__status__in = '__all__'
148 else:
149 rerun__status__in = ['Waiting'] 1cab
150 pipe.create_alyx_tasks(rerun__status__in=rerun__status__in) 1cab
151 flag_file.unlink() 1cab
152 if pipe is not None: 1cab
153 pipes.append(pipe) 1cab
154 except Exception:
155 _logger.error(traceback.format_exc())
156 _logger.warning(f'Creating session / registering raw datasets {session_path} errored')
157 continue
159 return pipes, all_datasets 1cab
162def task_queue(mode='all', lab=None, alyx=None):
163 """
164 Query waiting jobs from the specified Lab
165 :param mode: Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs
166 :param lab: lab name as per Alyx, otherwise try to infer from local globus install
167 :param one: ONE instance
168 -------
170 """
171 alyx = alyx or AlyxClient(cache_rest=None)
172 if lab is None:
173 _logger.debug('Trying to infer lab from globus installation')
174 lab = get_lab_from_endpoint_id(alyx=alyx)
175 if lab is None:
176 _logger.error('No lab provided or found')
177 return # if the lab is none, this will return empty tasks each time
178 data_repo = get_local_data_repository(alyx)
179 # Filter for tasks
180 tasks_all = alyx.rest('tasks', 'list', status='Waiting',
181 django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True)
182 if mode == 'all':
183 waiting_tasks = tasks_all
184 else:
185 small_jobs = []
186 large_jobs = []
187 for t in tasks_all:
188 strmodule, strclass = t['executable'].rsplit('.', 1)
189 classe = getattr(importlib.import_module(strmodule), strclass)
190 job_size = classe.job_size
191 if job_size == 'small':
192 small_jobs.append(t)
193 else:
194 large_jobs.append(t)
195 if mode == 'small':
196 waiting_tasks = small_jobs
197 elif mode == 'large':
198 waiting_tasks = large_jobs
200 # Order tasks by priority
201 sorted_tasks = sorted(waiting_tasks, key=lambda d: d['priority'], reverse=True)
203 return sorted_tasks
206def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs):
207 """
208 Function to run a list of tasks (task dictionary from Alyx query) on a local server
209 :param subjects_path:
210 :param tasks_dict:
211 :param one:
212 :param dry:
213 :param count: maximum number of tasks to run
214 :param time_out: between each task, if time elapsed is greater than time out, returns (seconds)
215 :param kwargs:
216 :return: list of dataset dictionaries
217 """
218 if one is None: 1cab
219 one = ONE(cache_rest=None)
220 tstart = time.time() 1cab
221 c = 0 1cab
222 last_session = None 1cab
223 all_datasets = [] 1cab
224 for tdict in tasks_dict: 1cab
225 # if the count is reached or if the time_out has been elapsed, break the loop and return
226 if c >= count or (time_out and time.time() - tstart > time_out): 1cab
227 break
228 # reconstruct the session local path. As many jobs belong to the same session
229 # cache the result
230 if last_session != tdict['session']: 1cab
231 ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] 1cab
232 session_path = Path(subjects_path).joinpath( 1cab
233 Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3)))
234 last_session = tdict['session'] 1cab
235 if dry: 1cab
236 print(session_path, tdict['name']) 1b
237 else:
238 task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path, 1cab
239 one=one, **kwargs)
240 if dsets: 1cab
241 all_datasets.extend(dsets) 1cab
242 c += 1 1cab
243 return all_datasets 1cab