Coverage for ibllib/pipes/local_server.py: 70%
120 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""Lab server pipeline construction and task runner.
3This is the module called by the job services on the lab servers. See
4iblscripts/deploy/serverpc/crons for the service scripts that employ this module.
5"""
6import logging
7import time
8from datetime import datetime
9from pathlib import Path
10import re
11import subprocess
12import sys
13import traceback
14import importlib
15import importlib.metadata
17from one.api import ONE
18from one.webclient import AlyxClient
19from one.remote.globus import get_lab_from_endpoint_id, get_local_endpoint_id
21from ibllib import __version__ as ibllib_version
22from ibllib.io.extractors.base import get_pipeline, get_session_extractor_type
23from ibllib.pipes import tasks, training_preprocessing, ephys_preprocessing
24from ibllib.time import date2isostr
25from ibllib.oneibl.registration import IBLRegistrationClient
26from ibllib.oneibl.data_handlers import get_local_data_repository
27from ibllib.io.session_params import read_params
28from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session
30_logger = logging.getLogger(__name__)
31LARGE_TASKS = [
32 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess'
33]
36def _get_pipeline_class(session_path, one):
37 pipeline = get_pipeline(session_path)
38 if pipeline == 'training':
39 PipelineClass = training_preprocessing.TrainingExtractionPipeline
40 elif pipeline == 'ephys':
41 PipelineClass = ephys_preprocessing.EphysExtractionPipeline
42 else:
43 # try and look if there is a custom extractor in the personal projects extraction class
44 import projects.base
45 task_type = get_session_extractor_type(session_path)
46 PipelineClass = projects.base.get_pipeline(task_type)
47 _logger.info(f"Using {PipelineClass} pipeline for {session_path}")
48 return PipelineClass(session_path=session_path, one=one)
51def _run_command(cmd):
52 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
53 stderr=subprocess.PIPE)
54 info, error = process.communicate()
55 if process.returncode != 0:
56 return None
57 else:
58 return info.decode('utf-8').strip()
61def _get_volume_usage(vol, label=''):
62 cmd = f'df {vol}'
63 res = _run_command(cmd)
64 # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk']
65 size_list = re.split(' +', res.split('\n')[-1])
66 fac = 1024 ** 2
67 d = {'total': int(size_list[1]) / fac,
68 'used': int(size_list[2]) / fac,
69 'available': int(size_list[3]) / fac,
70 'volume': size_list[5]}
71 return {f"{label}_{k}": d[k] for k in d}
74def report_health(one):
75 """
76 Get a few indicators and label the json field of the corresponding lab with them.
77 """
78 status = {'python_version': sys.version,
79 'ibllib_version': ibllib_version,
80 'phylib_version': importlib.metadata.version('phylib'),
81 'local_time': date2isostr(datetime.now())}
82 status.update(_get_volume_usage('/mnt/s0/Data', 'raid'))
83 status.update(_get_volume_usage('/', 'system'))
85 data_repos = one.alyx.rest('data-repository', 'list', globus_endpoint_id=get_local_endpoint_id())
87 for dr in data_repos:
88 one.alyx.json_field_update(endpoint='data-repository', uuid=dr['name'], field_name='json', data=status)
91def job_creator(root_path, one=None, dry=False, rerun=False):
92 """
93 Create new sessions and pipelines.
95 Server function that will look for 'raw_session.flag' files and for each:
96 1) create the session on Alyx
97 2) create the tasks to be run on Alyx
99 For legacy sessions the raw data are registered separately, instead of within a pipeline task.
101 Parameters
102 ----------
103 root_path : str, pathlib.Path
104 Main path containing sessions or a session path.
105 one : one.api.OneAlyx
106 An ONE instance for registering the session(s).
107 dry : bool
108 If true, simply log the session_path(s) found, without registering anything.
109 rerun : bool
110 If true and session pipeline tasks already exist, set them all to waiting.
112 Returns
113 -------
114 list of ibllib.pipes.tasks.Pipeline
115 The pipelines created.
116 list of dicts
117 A list of any datasets registered (only for legacy sessions)
118 """
119 _logger.info('Start looking for new sessions...') 1dba
120 if not one: 1dba
121 one = ONE(cache_rest=None)
122 rc = IBLRegistrationClient(one=one) 1dba
123 flag_files = list(Path(root_path).glob('**/raw_session.flag')) 1dba
124 pipes = [] 1dba
125 all_datasets = [] 1dba
126 for flag_file in flag_files: 1dba
127 session_path = flag_file.parent 1dba
128 _logger.info(f'creating session for {session_path}') 1dba
129 if dry: 1dba
130 continue
131 try: 1dba
132 # if the subject doesn't exist in the database, skip
133 rc.register_session(session_path, file_list=False) 1dba
135 # NB: all sessions now extracted using dynamic pipeline
136 if read_params(session_path) is None: 1dba
137 # Create legacy experiment description file
138 acquisition_description_legacy_session(session_path, save=True) 1da
139 pipe = make_pipeline(session_path, one=one) 1dba
140 if rerun: 1dba
141 rerun__status__in = '__all__'
142 else:
143 rerun__status__in = ['Waiting'] 1dba
144 pipe.create_alyx_tasks(rerun__status__in=rerun__status__in) 1dba
145 flag_file.unlink() 1dba
146 if pipe is not None: 1dba
147 pipes.append(pipe) 1dba
148 except Exception: 1d
149 _logger.error('Failed to register session %s:\n%s', session_path.relative_to(root_path), traceback.format_exc()) 1d
150 continue 1d
152 return pipes, all_datasets 1dba
155def task_queue(mode='all', lab=None, alyx=None, env=(None,)):
156 """
157 Query waiting jobs from the specified Lab
159 Parameters
160 ----------
161 mode : {'all', 'small', 'large'}
162 Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs.
163 lab : str
164 Lab name as per Alyx, otherwise try to infer from local Globus install.
165 alyx : one.webclient.AlyxClient
166 An Alyx instance.
167 env : list
168 One or more environments to filter by. See :prop:`ibllib.pipes.tasks.Task.env`.
170 Returns
171 -------
172 list of dict
173 A list of Alyx tasks associated with `lab` that have a 'Waiting' status.
174 """
175 def predicate(task): 1e
176 classe = tasks.str2class(task['executable']) 1e
177 return (mode == 'all' or classe.job_size == mode) and classe.env in env 1e
179 alyx = alyx or AlyxClient(cache_rest=None) 1e
180 if lab is None: 1e
181 _logger.debug('Trying to infer lab from globus installation')
182 lab = get_lab_from_endpoint_id(alyx=alyx)
183 if lab is None: 1e
184 _logger.error('No lab provided or found')
185 return # if the lab is none, this will return empty tasks each time
186 data_repo = get_local_data_repository(alyx) 1e
187 # Filter for tasks
188 waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', 1e
189 django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True)
190 # Filter tasks by size
191 filtered_tasks = filter(predicate, waiting_tasks) 1e
192 # Order tasks by priority
193 sorted_tasks = sorted(filtered_tasks, key=lambda d: d['priority'], reverse=True) 1e
195 return sorted_tasks 1e
198def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs):
199 """
200 Function to run a list of tasks (task dictionary from Alyx query) on a local server
202 Parameters
203 ----------
204 subjects_path : str, pathlib.Path
205 The location of the subject session folders, e.g. '/mnt/s0/Data/Subjects'.
206 tasks_dict : list of dict
207 A list of tasks to run. Typically the output of `task_queue`.
208 one : one.api.OneAlyx
209 An instance of ONE.
210 dry : bool, default=False
211 If true, simply prints the full session paths and task names without running the tasks.
212 count : int, default=5
213 The maximum number of tasks to run from the tasks_dict list.
214 time_out : float, optional
215 The time in seconds to run tasks before exiting. If set this will run tasks until the
216 timeout has elapsed. NB: Only checks between tasks and will not interrupt a running task.
217 **kwargs
218 See ibllib.pipes.tasks.run_alyx_task.
220 Returns
221 -------
222 list of pathlib.Path
223 A list of datasets registered to Alyx.
224 """
225 if one is None: 1ba
226 one = ONE(cache_rest=None)
227 tstart = time.time() 1ba
228 c = 0 1ba
229 last_session = None 1ba
230 all_datasets = [] 1ba
231 for tdict in tasks_dict: 1ba
232 # if the count is reached or if the time_out has been elapsed, break the loop and return
233 if c >= count or (time_out and time.time() - tstart > time_out): 1ba
234 break
235 # reconstruct the session local path. As many jobs belong to the same session
236 # cache the result
237 if last_session != tdict['session']: 1ba
238 ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] 1ba
239 session_path = Path(subjects_path).joinpath( 1ba
240 Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3)))
241 last_session = tdict['session'] 1ba
242 if dry: 1ba
243 print(session_path, tdict['name']) 1a
244 else:
245 task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path, one=one, **kwargs) 1ba
246 if dsets: 1ba
247 all_datasets.extend(dsets) 1ba
248 c += 1 1ba
249 return all_datasets 1ba