Coverage for ibllib/oneibl/patcher.py: 65%
385 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-02 18:55 +0100
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-02 18:55 +0100
1"""A module for ad-hoc dataset modification and registration.
3Unlike the DataHandler class in oneibl.data_handlers, the Patcher class allows one to fully remove
4datasets (delete them from the database and repositories), and to overwrite datasets on both the
5main repositories and the local servers. Additionally the Patcher can handle datasets from
6multiple sessions at once.
8Examples
9--------
10Delete a dataset from Alyx and all associated repositories.
12>>> dataset_id = 'f4aafe6c-a7ab-4390-82cd-2c0e245322a5'
13>>> task_ids, files_by_repo = IBLGlobusPatcher(AlyxClient(), 'admin').delete_dataset(dataset_id)
15Patch some local datasets using Globus
17>>> from one.api import ONE
18>>> patcher = GlobusPatcher('admin', ONE(), label='UCLA audio times patch')
19>>> responses = patcher.patch_datasets(file_paths) # register the new datasets to Alyx
20>>> patcher.launch_transfers(local_servers=True) # transfer to all remote repositories
22"""
23import abc
24import ftplib
25from pathlib import Path, PurePosixPath, WindowsPath
26from collections import defaultdict
27from itertools import starmap
28from subprocess import Popen, PIPE, STDOUT
29import subprocess
30import logging
31from getpass import getpass
32import shutil
34import globus_sdk
35import iblutil.io.params as iopar
36from iblutil.util import ensure_list
37from one.alf.path import get_session_path, add_uuid_string, full_path_parts
38from one.alf.spec import is_uuid_string, is_uuid
39from one import params
40from one.webclient import AlyxClient
41from one.converters import path_from_dataset
42from one.remote import globus
43from one.remote.aws import url2uri, get_s3_from_alyx
45from ibllib.oneibl.registration import register_dataset
47_logger = logging.getLogger(__name__)
49FLATIRON_HOST = 'ibl.flatironinstitute.org'
50FLATIRON_PORT = 61022
51FLATIRON_USER = 'datauser'
52FLATIRON_MOUNT = '/mnt/ibl'
53FTP_HOST = 'test.alyx.internationalbrainlab.org'
54FTP_PORT = 21
55DMZ_REPOSITORY = 'ibl_patcher' # in alyx, the repository name containing the patched filerecords
56SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl')
57SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp')
60def _run_command(cmd, dry=True):
61 _logger.info(cmd)
62 if dry:
63 return 0, '', ''
64 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
65 info, error = p.communicate()
66 if p.returncode != 0:
67 _logger.error(error)
68 raise RuntimeError(error)
69 return p.returncode, info, error
72def sdsc_globus_path_from_dataset(dset):
73 """
74 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
75 Returns SDSC globus file path from a dset record or a list of dsets records from REST
76 """
77 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True) 1e
80def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH):
81 """
82 Returns sdsc file path from a dset record or a list of dsets records from REST
83 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
84 :param root_path: (optional) the prefix path such as one download directory or SDSC root
85 """
86 return path_from_dataset(dset, root_path=root_path, uuid=True) 1e
89def globus_path_from_dataset(dset, repository=None, uuid=False):
90 """
91 Returns local one file path from a dset record or a list of dsets records from REST
92 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
93 :param repository: (optional) repository name of the file record (if None, will take
94 the first filerecord with a URL)
95 """
96 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid) 1e
99class Patcher(abc.ABC):
100 def __init__(self, one=None):
101 assert one 1ad
102 self.one = one 1ad
104 def _patch_dataset(self, path, dset_id=None, revision=None, dry=False, ftp=False):
105 """
106 This private methods gets the dataset information from alyx, computes the local
107 and remote paths and initiates the file copy
108 """
109 path = Path(path) 1b
110 if dset_id is None: 1b
111 dset_id = path.name.split('.')[-2]
112 if not is_uuid_string(dset_id):
113 dset_id = None
114 assert dset_id 1b
115 assert is_uuid_string(dset_id) 1b
116 # If the revision is not None then we need to add the revision into the path. Note the moving of the file
117 # is handled by one registration client
118 if revision and f'#{revision}' not in str(path): 1b
119 path = path.parent.joinpath(f'#{revision}#', path.name)
120 assert path.exists() 1b
121 dset = self.one.alyx.rest('datasets', 'read', id=dset_id) 1b
122 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 1b
123 remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path']) 1b
124 remote_path = add_uuid_string(remote_path, dset_id).as_posix() 1b
125 if remote_path.startswith('/'): 1b
126 full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path) 1b
127 else:
128 full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path)
129 if isinstance(path, WindowsPath) and not ftp: 1b
130 # On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/
131 path = globus.as_globus_path(path)
132 status = self._scp(path, full_remote_path, dry=dry)[0] 1b
133 return status 1b
135 def register_dataset(self, file_list, **kwargs):
136 """
137 Registers a set of files belonging to a session only on the server
138 :param file_list: (list of pathlib.Path)
139 :param created_by: (string) name of user in Alyx (defaults to 'root')
140 :param repository: optional: (string) name of the server repository in Alyx
141 :param versions: optional (list of strings): versions tags (defaults to ibllib version)
142 :param dry: (bool) False by default
143 :return:
144 """
145 return register_dataset(file_list, one=self.one, server_only=True, exists=True, **kwargs) 1b
147 def register_datasets(self, file_list, **kwargs):
148 """
149 Same as register_dataset but works with files belonging to different sessions
150 """
151 register_dict = {}
152 # creates a dictionary of sessions with one file list per session
153 for f in file_list:
154 session_path = get_session_path(f)
155 label = '_'.join(session_path.parts[-3:])
156 if label in register_dict:
157 register_dict[label]['files'].append(f)
158 else:
159 register_dict[label] = {'session_path': session_path, 'files': [f]}
160 responses = []
161 nses = len(register_dict)
162 for i, label in enumerate(register_dict):
163 _files = register_dict[label]['files']
164 _logger.info(f"{i + 1}/{nses} {label}, registering {len(_files)} files")
165 responses.append(self.register_dataset(_files, **kwargs))
166 return responses
168 def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs):
169 """
170 Creates a new dataset on FlatIron and uploads it from arbitrary location.
171 Rules for creation/patching are the same that apply for registration via Alyx
172 as this uses the registration endpoint to get the dataset.
173 An existing file (same session and path relative to session) will be patched.
174 :param path: full file path. Must be within an ALF session folder (subject/date/number)
175 can also be a list of full file paths belonging to the same session.
176 :param server_repository: Alyx server repository name
177 :param created_by: alyx username for the dataset (optional, defaults to root)
178 :param ftp: flag for case when using ftppatcher. Don't adjust windows path in
179 _patch_dataset when ftp=True
180 :return: the registrations response, a list of dataset records
181 """
182 # first register the file
183 if not isinstance(file_list, list): 1b
184 file_list = [Path(file_list)]
185 assert len(set(map(get_session_path, file_list))) == 1 1b
186 assert all(Path(f).exists() for f in file_list) 1b
187 response = ensure_list(self.register_dataset(file_list, dry=dry, **kwargs)) 1b
188 if dry: 1b
189 return
190 # from the dataset info, set flatIron flag to exists=True
191 for p, d in zip(file_list, response): 1b
192 try: 1b
193 self._patch_dataset(p, dset_id=d['id'], revision=d['revision'], dry=dry, ftp=ftp) 1b
194 except Exception as e:
195 raise Exception(f'Error registering file {p}') from e
196 return response 1b
198 def patch_datasets(self, file_list, **kwargs):
199 """Same as create_dataset method but works with several sessions."""
200 register_dict = {} 1b
201 # creates a dictionary of sessions with one file list per session
202 for f in file_list: 1b
203 session_path = get_session_path(f) 1b
204 label = '_'.join(session_path.parts[-3:]) 1b
205 if label in register_dict: 1b
206 register_dict[label]['files'].append(f)
207 else:
208 register_dict[label] = {'session_path': session_path, 'files': [f]} 1b
209 responses = [] 1b
210 nses = len(register_dict) 1b
211 for i, label in enumerate(register_dict): 1b
212 _files = register_dict[label]['files'] 1b
213 _logger.info(f'{i + 1}/{nses} {label}, registering {len(_files)} files') 1b
214 responses.extend(self.patch_dataset(_files, **kwargs)) 1b
215 return responses 1b
217 @abc.abstractmethod
218 def _scp(self, *args, **kwargs):
219 pass
221 @abc.abstractmethod
222 def _rm(self, *args, **kwargs):
223 pass
226class GlobusPatcher(Patcher, globus.Globus):
227 """
228 Requires GLOBUS keys access
230 """
232 def __init__(self, client_name='default', one=None, label='ibllib patch'):
233 assert one and not one.offline
234 Patcher.__init__(self, one=one)
235 globus.Globus.__init__(self, client_name)
236 self.label = label
237 # get a dictionary of data repositories from Alyx (with globus ids)
238 self.fetch_endpoints_from_alyx(one.alyx)
239 flatiron_id = self.endpoints['flatiron_cortexlab']['id']
240 if 'flatiron' not in self.endpoints:
241 self.add_endpoint(flatiron_id, 'flatiron', root_path='/')
242 self.endpoints['flatiron'] = self.endpoints['flatiron_cortexlab']
243 # transfers/delete from the current computer to the flatiron: mandatory and executed first
244 local_id = self.endpoints['local']['id']
245 self.globus_transfer = globus_sdk.TransferData(
246 self.client, local_id, flatiron_id, verify_checksum=True, sync_level='checksum', label=label)
247 self.globus_delete = globus_sdk.DeleteData(self.client, flatiron_id, label=label)
248 # transfers/delete from flatiron to optional third parties to synchronize / delete
249 self.globus_transfers_locals = {}
250 self.globus_deletes_locals = {}
251 super().__init__(one=one)
253 def _scp(self, local_path, remote_path, dry=True):
254 remote_path = PurePosixPath('/').joinpath( 1b
255 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT))
256 )
257 _logger.info(f"Globus copy {local_path} to {remote_path}") 1b
258 local_path = globus.as_globus_path(local_path) 1b
259 if not dry: 1b
260 if isinstance(self.globus_transfer, globus_sdk.TransferData): 1b
261 self.globus_transfer.add_item(local_path, remote_path.as_posix()) 1b
262 else:
263 self.globus_transfer.path_src.append(local_path)
264 self.globus_transfer.path_dest.append(remote_path.as_posix())
265 return 0, '' 1b
267 def _rm(self, flatiron_path, dry=True):
268 flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT)))
269 _logger.info(f'Globus del {flatiron_path}')
270 if not dry:
271 if isinstance(self.globus_delete, globus_sdk.DeleteData):
272 self.globus_delete.add_item(flatiron_path.as_posix())
273 else:
274 self.globus_delete.path.append(flatiron_path.as_posix())
275 return 0, ''
277 def patch_datasets(self, file_list, **kwargs):
278 """
279 Calls the super method that registers and updates the current computer to Python transfer
280 Then, creates individual transfer items for each local server so that after the
281 update on Flatiron, local server files are also updated
282 :param file_list:
283 :param kwargs:
284 :return:
285 """
286 responses = super().patch_datasets(file_list, **kwargs) 1b
287 for dset in responses: 1b
288 # get the flatiron path
289 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 1b
290 relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix() 1b
291 flatiron_path = self.to_address(relative_path, fr['data_repository']) 1b
292 # loop over the remaining repositories (local servers) and create a transfer
293 # from flatiron to the local server
294 for fr in dset['file_records']: 1b
295 if fr['data_repository'] == DMZ_REPOSITORY: 1b
296 continue
297 if fr['data_repository'] not in self.endpoints: 1b
298 continue
299 repo_gid = self.endpoints[fr['data_repository']]['id'] 1b
300 flatiron_id = self.endpoints['flatiron']['id'] 1b
301 if repo_gid == flatiron_id: 1b
302 continue 1b
303 # if there is no transfer already created, initialize it
304 if repo_gid not in self.globus_transfers_locals: 1b
305 self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData( 1b
306 self.client, flatiron_id, repo_gid, verify_checksum=True,
307 sync_level='checksum', label=f"{self.label} on {fr['data_repository']}")
308 # get the local server path and create the transfer item
309 local_server_path = self.to_address(fr['relative_path'], fr['data_repository']) 1b
310 self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path) 1b
311 return responses 1b
313 def launch_transfers(self, local_servers=False):
314 """
315 patcher.launch_transfers()
316 Launches the globus transfer and delete from the local patch computer to the flat-rion
317 :param: local_servers (False): if True, sync the local servers after the main transfer
318 :return: None
319 """
320 gtc = self.client 1b
322 def _wait_for_task(resp): 1b
323 # patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9')
324 # on a good status:
325 # Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
326 # on a checksum error
327 # Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
328 # on a finished task
329 # Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
330 # on an errored task
331 # Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
332 while True: 1b
333 tinfo = gtc.get_task(task_id=resp['task_id']) 1b
334 if tinfo and tinfo['completion_time'] is not None: 1b
335 break 1b
336 _ = gtc.task_wait(task_id=resp['task_id'], timeout=30)
337 if tinfo and tinfo['fatal_error'] is not None: 1b
338 raise ConnectionError(f"Globus transfer failed \n {tinfo}")
340 # handles the transfers first
341 if len(self.globus_transfer['DATA']) > 0: 1b
342 # launch the transfer
343 _wait_for_task(gtc.submit_transfer(self.globus_transfer)) 1b
344 # re-initialize the globus_transfer property
345 self.globus_transfer = globus_sdk.TransferData( 1b
346 gtc,
347 self.globus_transfer['source_endpoint'],
348 self.globus_transfer['destination_endpoint'],
349 label=self.globus_transfer['label'],
350 verify_checksum=True, sync_level='checksum')
352 # do the same for deletes
353 if len(self.globus_delete['DATA']) > 0: 1b
354 _wait_for_task(gtc.submit_delete(self.globus_delete))
355 self.globus_delete = globus_sdk.DeleteData(
356 gtc,
357 endpoint=self.globus_delete['endpoint'],
358 label=self.globus_delete['label'])
360 # launch the local transfers and local deletes
361 if local_servers: 1b
362 self.launch_transfers_secondary() 1b
364 def launch_transfers_secondary(self):
365 """
366 patcher.launch_transfer_secondary()
367 Launches the globus transfers from flatiron to third-party repositories (local servers)
368 This should run after the the main transfer from patch computer to the flatiron
369 :return: None
370 """
371 for lt in self.globus_transfers_locals: 1b
372 transfer = self.globus_transfers_locals[lt] 1b
373 if len(transfer['DATA']) > 0: 1b
374 self.client.submit_transfer(transfer) 1b
375 for ld in self.globus_deletes_locals: 1b
376 delete = self.globus_deletes_locals[ld]
377 if len(transfer['DATA']) > 0:
378 self.client.submit_delete(delete)
381class IBLGlobusPatcher(Patcher, globus.Globus):
382 """This is a replacement for the GlobusPatcher class, utilizing the ONE Globus class.
384 The GlobusPatcher class is more complicated but has the advantage of being able to launch
385 transfers independently to registration, although it remains to be seen whether this is useful.
386 """
387 def __init__(self, alyx=None, client_name='default'):
388 """
389 A Globus patcher for IBL data.
391 Parameters
392 ----------
393 alyx : one.webclient.AlyxClient
394 An instance of Alyx to use.
395 client_name : str, default='default'
396 The Globus client name.
397 """
398 self.alyx = alyx or AlyxClient()
399 globus.Globus.__init__(self, client_name=client_name) # NB we don't init Patcher as we're not using ONE
401 def delete_dataset(self, dataset, dry=False, aws_profile='ibladmin'):
402 """
403 Delete a dataset off Alyx and remove file record from all Globus repositories.
405 Parameters
406 ----------
407 dataset : uuid.UUID, str, dict
408 The dataset record or ID to delete.
409 dry : bool
410 If true, dataset is not deleted and file paths that would be removed are returned.
411 aws_profile : str
412 The AWS profile name to use for S3 deletion.
414 Returns
415 -------
416 list of uuid.UUID
417 A list of Globus delete task IDs if dry is false.
418 dict of str
419 A map of data repository names and relative paths of the deleted files.
420 """
421 if is_uuid(dataset): 1c
422 did = dataset 1c
423 dataset = self.alyx.rest('datasets', 'read', id=did) 1c
424 else:
425 did = dataset['url'].split('/')[-1] 1c
427 def is_aws(repository_name): 1c
428 return repository_name.startswith('aws_') 1c
430 files_by_repo = defaultdict(list) # str -> [pathlib.PurePosixPath] 1c
431 s3_files = [] 1c
432 file_records = filter(lambda x: x['exists'], dataset['file_records']) 1c
433 for record in file_records: 1c
434 repo = self.repo_from_alyx(record['data_repository'], self.alyx) 1c
435 # Handle S3 files
436 if not repo['globus_endpoint_id'] or repo['repository_type'] != 'Fileserver': 1c
437 if is_aws(repo['name']): 1c
438 s3_files.append(url2uri(record['data_url'])) 1c
439 files_by_repo[repo['name']].append(PurePosixPath(record['relative_path'])) 1c
440 else:
441 _logger.error('Unable to delete from %s', repo['name']) 1c
442 else:
443 # Handle Globus files
444 if repo['name'] not in self.endpoints: 1c
445 self.add_endpoint(repo['name'], alyx=self.alyx) 1c
446 filepath = PurePosixPath(record['relative_path']) 1c
447 if 'flatiron' in repo['name']: 1c
448 filepath = add_uuid_string(filepath, did) 1c
449 files_by_repo[repo['name']].append(filepath) 1c
451 # Remove S3 files
452 if s3_files: 1c
453 cmd = ['aws', 's3', 'rm', *s3_files, '--profile', aws_profile] 1c
454 if dry: 1c
455 cmd.append('--dryrun') 1c
456 if _logger.level > logging.DEBUG: 1c
457 log_function = _logger.error 1c
458 cmd.append('--only-show-errors') # Suppress verbose output 1c
459 else:
460 log_function = _logger.debug 1c
461 _logger.debug(' '.join(cmd)) 1c
462 process = Popen(cmd, stdout=PIPE, stderr=STDOUT) 1c
463 with process.stdout: 1c
464 for line in iter(process.stdout.readline, b''): 1c
465 log_function(line.decode().strip()) 1c
466 assert process.wait() == 0 1c
468 if dry: 1c
469 return [], files_by_repo 1c
471 # Remove Globus files
472 globus_files_map = filter(lambda x: not is_aws(x[0]), files_by_repo.items()) 1c
473 task_ids = list(starmap(self.delete_data, map(reversed, globus_files_map))) 1c
475 # Delete the dataset from Alyx
476 self.alyx.rest('datasets', 'delete', id=did) 1c
477 return task_ids, files_by_repo 1c
479 def _rm(self):
480 raise NotImplementedError('Use delete_dataset instead')
482 def _scp(self):
483 raise NotImplementedError('Use transfer_data instead')
485 def patch_dataset(self):
486 raise NotImplementedError
488 def patch_datasets(self):
489 raise NotImplementedError
492class SSHPatcher(Patcher):
493 """
494 Requires SSH keys access on the FlatIron
495 """
496 def __init__(self, one=None):
497 res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls")
498 if res[0] != 0:
499 raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys")
500 super().__init__(one=one)
502 def _scp(self, local_path, remote_path, dry=True):
503 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \
504 f" mkdir -p {remote_path.parent}; "
505 cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}"
506 return _run_command(cmd, dry=dry)
508 def _rm(self, flatiron_path, dry=True):
509 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}"
510 return _run_command(cmd, dry=dry)
513class FTPPatcher(Patcher):
514 """
515 This is used to register from anywhere without write access to FlatIron
516 """
517 def __init__(self, one=None):
518 super().__init__(one=one) 1d
519 alyx = self.one.alyx 1d
520 if not getattr(alyx._par, 'FTP_DATA_SERVER_LOGIN', False): 1d
521 alyx._par = self.setup(par=alyx._par, silent=alyx.silent) 1d
522 login, pwd = (one.alyx._par.FTP_DATA_SERVER_LOGIN, one.alyx._par.FTP_DATA_SERVER_PWD) 1d
523 self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=login, passwd=pwd) 1d
524 # self.ftp.ssl_version = ssl.PROTOCOL_TLSv1
525 # self.ftp.auth()
526 self.ftp.prot_p() 1d
527 self.ftp.login(login, pwd) 1d
528 # pre-fetch the repositories so as not to query them for every file registered
529 self.repositories = self.one.alyx.rest("data-repository", "list") 1d
531 @staticmethod
532 def setup(par=None, silent=False):
533 """
534 Set up (and save) FTP login parameters
535 :param par: A parameters object to modify, if None the default Webclient parameters are
536 loaded
537 :param silent: If true, the defaults are used with no user input prompt
538 :return: the modified parameters object
539 """
540 DEFAULTS = { 1d
541 "FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org",
542 "FTP_DATA_SERVER_LOGIN": "iblftp",
543 "FTP_DATA_SERVER_PWD": None
544 }
545 if par is None: 1d
546 par = params.get(silent=silent)
547 par = iopar.as_dict(par) 1d
549 if silent: 1d
550 DEFAULTS.update(par) 1d
551 par = DEFAULTS 1d
552 else:
553 for k in DEFAULTS.keys(): 1d
554 cpar = par.get(k, DEFAULTS[k]) 1d
555 # Iterate through non-password pars; skip url if client url already provided
556 if 'PWD' not in k: 1d
557 par[k] = input(f'Param {k}, current value is ["{cpar}"]:') or cpar 1d
558 else:
559 prompt = f'Param {k} (leave empty to leave unchanged):' 1d
560 par[k] = getpass(prompt) or cpar 1d
562 # Get the client key
563 client = par.get('ALYX_URL', None) 1d
564 client_key = params._key_from_url(client) if client else params.get_default_client() 1d
565 # Save the parameters
566 params.save(par, client_key) # Client params 1d
567 return iopar.from_dict(par) 1d
569 def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY,
570 **kwargs):
571 # overrides the superclass just to remove the server repository argument
572 response = super().patch_dataset(path, created_by=created_by, dry=dry,
573 repository=repository, ftp=True, **kwargs)
574 # need to patch the file records to be consistent
575 for ds in response:
576 frs = ds['file_records']
577 fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs))
578 fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and
579 fr['relative_path'] == fr_server['relative_path'], frs))
580 reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'],
581 self.repositories))
582 relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath(
583 PurePosixPath(fr_ftp['relative_path'])))[1:]
584 # 1) if there was already a file, the registration created a duplicate
585 fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and
586 fr['relative_path'] == relative_path, frs)) # NOQA
587 if len(fr_2del) == 1:
588 self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id'])
589 # 2) the patch ftp file needs to be prepended with the server repository path
590 self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'],
591 data={'relative_path': relative_path, 'exists': True})
592 # 3) the server file is labeled as not existing
593 self.one.alyx.rest('files', 'partial_update', id=fr_server['id'],
594 data={'exists': False})
595 return response
597 def _scp(self, local_path, remote_path, dry=True):
598 # remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001
599 remote_path = PurePosixPath('/').joinpath(
600 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT))
601 )
602 # local_path
603 self.mktree(remote_path.parent)
604 self.ftp.pwd()
605 _logger.info(f"FTP upload {local_path}")
606 with open(local_path, 'rb') as fid:
607 self.ftp.storbinary(f'STOR {local_path.name}', fid)
608 return 0, ''
610 def mktree(self, remote_path):
611 """ Browse to the tree on the ftp server, making directories on the way"""
612 if str(remote_path) != '.':
613 try:
614 self.ftp.cwd(str(remote_path))
615 except ftplib.error_perm:
616 self.mktree(PurePosixPath(remote_path.parent))
617 self.ftp.mkd(str(remote_path))
618 self.ftp.cwd(str(remote_path))
620 def _rm(self, flatiron_path, dry=True):
621 raise PermissionError("This Patcher does not have admin permissions to remove data "
622 "from the FlatIron server. ")
625class SDSCPatcher(Patcher):
626 """
627 This is used to patch data on the SDSC server
628 """
629 def __init__(self, one=None):
630 assert one
631 super().__init__(one=one)
633 def patch_datasets(self, file_list, **kwargs):
634 response = super().patch_datasets(file_list, **kwargs)
636 # TODO check the file records to see if they have local server ones
637 # If they do then need to remove file record and delete file from local server??
639 return response
641 def _scp(self, local_path, remote_path, dry=True):
643 _logger.info(f"Copy {local_path} to {remote_path}")
644 if not dry:
645 if not Path(remote_path).parent.exists():
646 Path(remote_path).parent.mkdir(exist_ok=True, parents=True)
647 shutil.copy(local_path, remote_path)
648 return 0, ''
650 def _rm(self, flatiron_path, dry=True):
651 raise PermissionError("This Patcher does not have admin permissions to remove data "
652 "from the FlatIron server")
655class S3Patcher(Patcher):
657 def __init__(self, one=None):
658 assert one
659 super().__init__(one=one)
660 self.s3_repo = 's3_patcher'
661 self.s3_path = 'patcher'
663 # Instantiate boto connection
664 self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo)
666 def check_datasets(self, file_list):
667 # Here we want to check if the datasets exist, if they do we don't want to patch unless we force.
668 exists = []
669 for file in file_list:
670 collection = full_path_parts(file, as_dict=True)['collection']
671 dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name,
672 collection=collection, clobber=True)
673 if len(dset) > 0:
674 exists.append(file)
676 return exists
678 def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs):
680 exists = self.check_datasets(file_list)
681 if len(exists) > 0 and not force:
682 _logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to overwrite set force=True')
683 return
685 response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs)
686 # TODO in an ideal case the flatiron filerecord won't be altered when we register this dataset. This requires
687 # changing the the alyx.data.register_view
688 for ds in response:
689 frs = ds['file_records']
690 fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs))
691 # Update the flatiron file record to be false
692 self.one.alyx.rest('files', 'partial_update', id=fr_server['id'],
693 data={'exists': False})
695 def _scp(self, local_path, remote_path, dry=True):
697 aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT))
698 _logger.info(f'Transferring file {local_path} to {aws_remote_path}')
699 self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path)))
701 return 0, ''
703 def _rm(self, *args, **kwargs):
704 raise PermissionError("This Patcher does not have admin permissions to remove data.")