Coverage for ibllib/pipes/local_server.py: 73%
114 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1"""Lab server pipeline construction and task runner.
3This is the module called by the job services on the lab servers. See
4iblscripts/deploy/serverpc/crons for the service scripts that employ this module.
5"""
6import logging
7import time
8from datetime import datetime
9from pathlib import Path
10import re
11import subprocess
12import sys
13import traceback
14import importlib
15import importlib.metadata
17from one.api import ONE
18from one.webclient import AlyxClient
19from one.remote.globus import get_lab_from_endpoint_id, get_local_endpoint_id
20from one.alf.spec import is_session_path
21from one.alf.path import session_path_parts
23from ibllib import __version__ as ibllib_version
24from ibllib.pipes import tasks
25from ibllib.time import date2isostr
26from ibllib.oneibl.registration import IBLRegistrationClient
27from ibllib.oneibl.data_handlers import get_local_data_repository
28from ibllib.io.session_params import read_params
29from ibllib.pipes.dynamic_pipeline import make_pipeline, acquisition_description_legacy_session
31_logger = logging.getLogger(__name__)
32LARGE_TASKS = [
33 'EphysVideoCompress', 'TrainingVideoCompress', 'SpikeSorting', 'EphysDLC', 'MesoscopePreprocess'
34]
37def _run_command(cmd):
38 process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
39 stderr=subprocess.PIPE)
40 info, error = process.communicate()
41 if process.returncode != 0:
42 return None
43 else:
44 return info.decode('utf-8').strip()
47def _get_volume_usage(vol, label=''):
48 cmd = f'df {vol}'
49 res = _run_command(cmd)
50 # size_list = ['/dev/sdc1', '1921802500', '1427128132', '494657984', '75%', '/datadisk']
51 size_list = re.split(' +', res.split('\n')[-1])
52 fac = 1024 ** 2
53 d = {'total': int(size_list[1]) / fac,
54 'used': int(size_list[2]) / fac,
55 'available': int(size_list[3]) / fac,
56 'volume': size_list[5]}
57 return {f"{label}_{k}": d[k] for k in d}
60def report_health(alyx):
61 """
62 Get a few indicators and label the json field of the corresponding lab with them.
63 """
64 status = {'python_version': sys.version,
65 'ibllib_version': ibllib_version,
66 'phylib_version': importlib.metadata.version('phylib'),
67 'local_time': date2isostr(datetime.now())}
68 status.update(_get_volume_usage('/mnt/s0/Data', 'raid'))
69 status.update(_get_volume_usage('/', 'system'))
71 data_repos = alyx.rest('data-repository', 'list', globus_endpoint_id=get_local_endpoint_id())
73 for dr in data_repos:
74 alyx.json_field_update(endpoint='data-repository', uuid=dr['name'], field_name='json', data=status)
77def job_creator(root_path, one=None, dry=False, rerun=False):
78 """
79 Create new sessions and pipelines.
81 Server function that will look for 'raw_session.flag' files and for each:
82 1) create the session on Alyx
83 2) create the tasks to be run on Alyx
85 For legacy sessions the raw data are registered separately, instead of within a pipeline task.
87 Parameters
88 ----------
89 root_path : str, pathlib.Path
90 Main path containing sessions or a session path.
91 one : one.api.OneAlyx
92 An ONE instance for registering the session(s).
93 dry : bool
94 If true, simply log the session_path(s) found, without registering anything.
95 rerun : bool
96 If true and session pipeline tasks already exist, set them all to waiting.
98 Returns
99 -------
100 list of ibllib.pipes.tasks.Pipeline
101 The pipelines created.
102 list of dicts
103 A list of any datasets registered (only for legacy sessions)
104 """
105 _logger.info('Start looking for new sessions...') 1ca
106 if not one: 1ca
107 one = ONE(cache_rest=None)
108 rc = IBLRegistrationClient(one=one) 1ca
109 flag_files = Path(root_path).glob('*/????-??-??/*/raw_session.flag') 1ca
110 flag_files = filter(lambda x: is_session_path(x.parent), flag_files) 1ca
111 pipes = [] 1ca
112 all_datasets = [] 1ca
113 for flag_file in flag_files: 1ca
114 session_path = flag_file.parent 1ca
115 if session_path_parts(session_path)[1] in ('test', 'test_subject'): 1ca
116 _logger.debug('skipping test session %s', session_path) 1c
117 continue 1c
118 _logger.info(f'creating session for {session_path}') 1ca
119 if dry: 1ca
120 continue
121 try: 1ca
122 # if the subject doesn't exist in the database, skip
123 rc.register_session(session_path, file_list=False) 1ca
125 # NB: all sessions now extracted using dynamic pipeline
126 if read_params(session_path) is None: 1ca
127 # Create legacy experiment description file
128 acquisition_description_legacy_session(session_path, save=True)
129 pipe = make_pipeline(session_path, one=one) 1ca
130 if rerun: 1ca
131 rerun__status__in = '__all__'
132 else:
133 rerun__status__in = ['Waiting'] 1ca
134 pipe.create_alyx_tasks(rerun__status__in=rerun__status__in) 1ca
135 flag_file.unlink() 1ca
136 if pipe is not None: 1ca
137 pipes.append(pipe) 1ca
138 except Exception:
139 _logger.error('Failed to register session %s:\n%s', session_path.relative_to(root_path), traceback.format_exc())
140 continue
142 return pipes, all_datasets 1ca
145def task_queue(mode='all', lab=None, alyx=None, env=(None,)):
146 """
147 Query waiting jobs from the specified Lab
149 Parameters
150 ----------
151 mode : {'all', 'small', 'large'}
152 Whether to return all waiting tasks, or only small or large (specified in LARGE_TASKS) jobs.
153 lab : str
154 Lab name as per Alyx, otherwise try to infer from local Globus install.
155 alyx : one.webclient.AlyxClient
156 An Alyx instance.
157 env : list
158 One or more environments to filter by. See :prop:`ibllib.pipes.tasks.Task.env`.
160 Returns
161 -------
162 list of dict
163 A list of Alyx tasks associated with `lab` that have a 'Waiting' status.
164 """
165 def predicate(task): 1d
166 classe = tasks.str2class(task['executable']) 1d
167 return (mode == 'all' or classe.job_size == mode) and classe.env in env 1d
169 alyx = alyx or AlyxClient(cache_rest=None) 1d
170 if lab is None: 1d
171 _logger.debug('Trying to infer lab from globus installation')
172 lab = get_lab_from_endpoint_id(alyx=alyx)
173 if lab is None: 1d
174 _logger.error('No lab provided or found')
175 return # if the lab is none, this will return empty tasks each time
176 data_repo = get_local_data_repository(alyx) 1d
177 # Filter for tasks
178 waiting_tasks = alyx.rest('tasks', 'list', status='Waiting', 1d
179 django=f'session__lab__name__in,{lab},data_repository__name,{data_repo}', no_cache=True)
180 # Filter tasks by size
181 filtered_tasks = filter(predicate, waiting_tasks) 1d
182 # Order tasks by priority
183 sorted_tasks = sorted(filtered_tasks, key=lambda d: d['priority'], reverse=True) 1d
185 return sorted_tasks 1d
188def tasks_runner(subjects_path, tasks_dict, one=None, dry=False, count=5, time_out=None, **kwargs):
189 """
190 Function to run a list of tasks (task dictionary from Alyx query) on a local server
192 Parameters
193 ----------
194 subjects_path : str, pathlib.Path
195 The location of the subject session folders, e.g. '/mnt/s0/Data/Subjects'.
196 tasks_dict : list of dict
197 A list of tasks to run. Typically the output of `task_queue`.
198 one : one.api.OneAlyx
199 An instance of ONE.
200 dry : bool, default=False
201 If true, simply prints the full session paths and task names without running the tasks.
202 count : int, default=5
203 The maximum number of tasks to run from the tasks_dict list.
204 time_out : float, optional
205 The time in seconds to run tasks before exiting. If set this will run tasks until the
206 timeout has elapsed. NB: Only checks between tasks and will not interrupt a running task.
207 kwargs
208 See ibllib.pipes.tasks.run_alyx_task.
210 Returns
211 -------
212 list of pathlib.Path
213 A list of datasets registered to Alyx.
214 """
215 if one is None: 1a
216 one = ONE(cache_rest=None)
217 tstart = time.time() 1a
218 c = 0 1a
219 last_session = None 1a
220 all_datasets = [] 1a
221 for tdict in tasks_dict: 1a
222 # if the count is reached or if the time_out has been elapsed, break the loop and return
223 if c >= count or (time_out and time.time() - tstart > time_out): 1a
224 break
225 # reconstruct the session local path. As many jobs belong to the same session
226 # cache the result
227 if last_session != tdict['session']: 1a
228 ses = one.alyx.rest('sessions', 'list', django=f"pk,{tdict['session']}")[0] 1a
229 session_path = Path(subjects_path).joinpath( 1a
230 Path(ses['subject'], ses['start_time'][:10], str(ses['number']).zfill(3)))
231 last_session = tdict['session'] 1a
232 if dry: 1a
233 print(session_path, tdict['name'])
234 else:
235 task, dsets = tasks.run_alyx_task(tdict=tdict, session_path=session_path, one=one, **kwargs) 1a
236 if dsets: 1a
237 all_datasets.extend(dsets) 1a
238 c += 1 1a
239 return all_datasets 1a