Coverage for ibllib/oneibl/patcher.py: 34%
281 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import abc
2import ftplib
3from pathlib import Path, PurePosixPath, WindowsPath
4import subprocess
5import logging
6from getpass import getpass
7import shutil
9import globus_sdk
10import iblutil.io.params as iopar
11from one.alf.files import get_session_path, add_uuid_string
12from one.alf.spec import is_uuid_string
13from one import params
14from one.converters import path_from_dataset
15from one.remote import globus
17from ibllib.oneibl.registration import register_dataset
19_logger = logging.getLogger(__name__)
21FLAT_IRON_GLOBUS_ID = 'ab2d064c-413d-11eb-b188-0ee0d5d9299f'
22FLATIRON_HOST = 'ibl.flatironinstitute.org'
23FLATIRON_PORT = 61022
24FLATIRON_USER = 'datauser'
25FLATIRON_MOUNT = '/mnt/ibl'
26FTP_HOST = 'test.alyx.internationalbrainlab.org'
27FTP_PORT = 21
28DMZ_REPOSITORY = 'ibl_patcher' # in alyx, the repository name containing the patched filerecords
29SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl')
30SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp')
33def _run_command(cmd, dry=True):
34 _logger.info(cmd)
35 if dry:
36 return 0, '', ''
37 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
38 info, error = p.communicate()
39 if p.returncode != 0:
40 _logger.error(error)
41 raise RuntimeError(error)
42 return p.returncode, info, error
45def sdsc_globus_path_from_dataset(dset):
46 """
47 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
48 Returns SDSC globus file path from a dset record or a list of dsets records from REST
49 """
50 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True) 1c
53def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH):
54 """
55 Returns sdsc file path from a dset record or a list of dsets records from REST
56 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
57 :param root_path: (optional) the prefix path such as one download directory or sdsc root
58 """
59 return path_from_dataset(dset, root_path=root_path, uuid=True) 1c
62def globus_path_from_dataset(dset, repository=None, uuid=False):
63 """
64 Returns local one file path from a dset record or a list of dsets records from REST
65 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint
66 :param repository: (optional) repository name of the file record (if None, will take
67 the first filerecord with an URL)
68 """
69 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid) 1c
72class Patcher(abc.ABC):
73 def __init__(self, one=None):
74 assert one 1b
75 self.one = one 1b
77 def _patch_dataset(self, path, dset_id=None, dry=False, ftp=False):
78 """
79 This private methods gets the dataset information from alyx, computes the local
80 and remote paths and initiates the file copy
81 """
82 path = Path(path)
83 if dset_id is None:
84 dset_id = path.name.split('.')[-2]
85 if not is_uuid_string(dset_id):
86 dset_id = None
87 assert dset_id
88 assert is_uuid_string(dset_id)
89 assert path.exists()
90 dset = self.one.alyx.rest('datasets', "read", id=dset_id)
91 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
92 remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path'])
93 remote_path = add_uuid_string(remote_path, dset_id).as_posix()
94 if remote_path.startswith('/'):
95 full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path)
96 else:
97 full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path)
98 if isinstance(path, WindowsPath) and not ftp:
99 # On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/
100 path = globus.as_globus_path(path)
101 status = self._scp(path, full_remote_path, dry=dry)[0]
102 return status
104 def register_dataset(self, file_list, **kwargs):
105 """
106 Registers a set of files belonging to a session only on the server
107 :param file_list: (list of pathlib.Path)
108 :param created_by: (string) name of user in Alyx (defaults to 'root')
109 :param repository: optional: (string) name of the server repository in Alyx
110 :param versions: optional (list of strings): versions tags (defaults to ibllib version)
111 :param dry: (bool) False by default
112 :return:
113 """
114 return register_dataset(file_list, one=self.one, server_only=True, exists=True, **kwargs)
116 def register_datasets(self, file_list, **kwargs):
117 """
118 Same as register_dataset but works with files belonging to different sessions
119 """
120 register_dict = {}
121 # creates a dictionary of sessions with one file list per session
122 for f in file_list:
123 session_path = get_session_path(f)
124 label = '_'.join(session_path.parts[-3:])
125 if label in register_dict:
126 register_dict[label]['files'].append(f)
127 else:
128 register_dict[label] = {'session_path': session_path, 'files': [f]}
129 responses = []
130 nses = len(register_dict)
131 for i, label in enumerate(register_dict):
132 _files = register_dict[label]['files']
133 _logger.info(f"{i}/{nses} {label}, registering {len(_files)} files")
134 responses.append(self.register_dataset(_files, **kwargs))
135 return responses
137 def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs):
138 """
139 Creates a new dataset on FlatIron and uploads it from arbitrary location.
140 Rules for creation/patching are the same that apply for registration via Alyx
141 as this uses the registration endpoint to get the dataset.
142 An existing file (same session and path relative to session) will be patched.
143 :param path: full file path. Must be within an ALF session folder (subject/date/number)
144 can also be a list of full file paths belonging to the same session.
145 :param server_repository: Alyx server repository name
146 :param created_by: alyx username for the dataset (optional, defaults to root)
147 :param ftp: flag for case when using ftppatcher. Don't adjust windows path in
148 _patch_dataset when ftp=True
149 :return: the registrations response, a list of dataset records
150 """
151 # first register the file
152 if not isinstance(file_list, list):
153 file_list = [Path(file_list)]
154 assert len(set([get_session_path(f) for f in file_list])) == 1
155 assert all([Path(f).exists() for f in file_list])
156 response = self.register_dataset(file_list, dry=dry, **kwargs)
157 if dry:
158 return
159 # from the dataset info, set flatIron flag to exists=True
160 for p, d in zip(file_list, response):
161 self._patch_dataset(p, dset_id=d['id'], dry=dry, ftp=ftp)
162 return response
164 def patch_datasets(self, file_list, **kwargs):
165 """
166 Same as create_dataset method but works with several sessions
167 """
168 register_dict = {}
169 # creates a dictionary of sessions with one file list per session
170 for f in file_list:
171 session_path = get_session_path(f)
172 label = '_'.join(session_path.parts[-3:])
173 if label in register_dict:
174 register_dict[label]['files'].append(f)
175 else:
176 register_dict[label] = {'session_path': session_path, 'files': [f]}
177 responses = []
178 nses = len(register_dict)
179 for i, label in enumerate(register_dict):
180 _files = register_dict[label]['files']
181 _logger.info(f"{i}/{nses} {label}, registering {len(_files)} files")
182 responses.extend(self.patch_dataset(_files, **kwargs))
183 return responses
185 @abc.abstractmethod
186 def _scp(self, *args, **kwargs):
187 pass
189 @abc.abstractmethod
190 def _rm(self, *args, **kwargs):
191 pass
194class GlobusPatcher(Patcher):
195 """
196 Requires GLOBUS keys access
198 """
200 def __init__(self, client_name='default', one=None, label='ibllib patch'):
201 assert one
202 self.local_endpoint = getattr(globus.load_client_params(f'globus.{client_name}'),
203 'local_endpoint', globus.get_local_endpoint_id())
204 self.transfer_client = globus.create_globus_client(client_name)
205 self.label = label
206 # transfers/delete from the current computer to the flatiron: mandatory and executed first
207 self.globus_transfer = globus_sdk.TransferData(
208 self.transfer_client, self.local_endpoint, FLAT_IRON_GLOBUS_ID, verify_checksum=True,
209 sync_level='checksum', label=label)
210 self.globus_delete = globus_sdk.DeleteData(
211 self.transfer_client, FLAT_IRON_GLOBUS_ID, verify_checksum=True,
212 sync_level='checksum', label=label)
213 # get a dictionary of data repositories from Alyx (with globus ids)
214 self.repos = {r['name']: r for r in one.alyx.rest('data-repository', 'list')}
215 # transfers/delete from flatiron to optional third parties to synchronize / delete
216 self.globus_transfers_locals = {}
217 self.globus_deletes_locals = {}
218 super().__init__(one=one)
220 def _scp(self, local_path, remote_path, dry=True):
221 remote_path = PurePosixPath('/').joinpath(
222 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT))
223 )
224 _logger.info(f"Globus copy {local_path} to {remote_path}")
225 if not dry:
226 if isinstance(self.globus_transfer, globus_sdk.transfer.data.TransferData):
227 self.globus_transfer.add_item(local_path, remote_path)
228 else:
229 self.globus_transfer.path_src.append(local_path)
230 self.globus_transfer.path_dest.append(remote_path)
231 return 0, ''
233 def _rm(self, flatiron_path, dry=True):
234 flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT)))
235 _logger.info(f"Globus del {flatiron_path}")
236 if not dry:
237 if isinstance(self.globus_delete, globus_sdk.transfer.data.DeleteData):
238 self.globus_delete.add_item(flatiron_path)
239 else:
240 self.globus_delete.path.append(flatiron_path)
241 return 0, ''
243 def patch_datasets(self, file_list, **kwargs):
244 """
245 Calls the super method that registers and updates the current computer to Python transfer
246 Then, creates individual transfer items for each local server so that after the
247 update on Flatiron, local server files are also updated
248 :param file_list:
249 :param kwargs:
250 :return:
251 """
252 responses = super().patch_datasets(file_list, **kwargs)
253 for dset in responses:
254 # get the flatiron path
255 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository'])
256 flatiron_path = self.repos[fr['data_repository']]['globus_path']
257 flatiron_path = Path(flatiron_path).joinpath(fr['relative_path'])
258 flatiron_path = add_uuid_string(flatiron_path, dset['id']).as_posix()
259 # loop over the remaining repositories (local servers) and create a transfer
260 # from flatiron to the local server
261 for fr in dset['file_records']:
262 if fr['data_repository'] == DMZ_REPOSITORY:
263 continue
264 repo_gid = self.repos[fr['data_repository']]['globus_endpoint_id']
265 if repo_gid == FLAT_IRON_GLOBUS_ID:
266 continue
267 # if there is no transfer already created, initialize it
268 if repo_gid not in self.globus_transfers_locals:
269 self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData(
270 self.transfer_client, FLAT_IRON_GLOBUS_ID, repo_gid, verify_checksum=True,
271 sync_level='checksum', label=f"{self.label} on {fr['data_repository']}")
272 # get the local server path and create the transfer item
273 local_server_path = self.repos[fr['data_repository']]['globus_path']
274 local_server_path = Path(local_server_path).joinpath(fr['relative_path'])
275 self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path)
276 return responses
278 def launch_transfers(self, local_servers=False):
279 """
280 patcher.launch_transfers()
281 Launches the globus transfer and delete from the local patch computer to the flat-rion
282 :param: local_servers (False): if True, sync the local servers after the main transfer
283 :return: None
284 """
285 gtc = self.transfer_client
287 def _wait_for_task(resp):
288 # patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9')
289 # on a good status:
290 # Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
291 # on a checksum error
292 # Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
293 # on a finished task
294 # Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
295 # on an errored task
296 # Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa
297 while True:
298 tinfo = gtc.get_task(task_id=resp['task_id'])
299 if tinfo and tinfo['completion_time'] is not None:
300 break
301 _ = gtc.task_wait(task_id=resp['task_id'], timeout=30)
302 if tinfo and tinfo['fatal_error'] is not None:
303 raise ConnectionError(f"Globus transfer failed \n {tinfo}")
305 # handles the transfers first
306 if len(self.globus_transfer['DATA']) > 0:
307 # launch the transfer
308 _wait_for_task(gtc.submit_transfer(self.globus_transfer))
309 # re-initialize the globus_transfer property
310 self.globus_transfer = globus_sdk.TransferData(
311 gtc,
312 self.globus_transfer['source_endpoint'],
313 self.globus_transfer['destination_endpoint'],
314 label=self.globus_transfer['label'],
315 verify_checksum=True, sync_level='checksum')
317 # do the same for deletes
318 if len(self.globus_delete['DATA']) > 0:
319 _wait_for_task(gtc.submit_delete(self.globus_delete))
320 self.globus_delete = globus_sdk.DeleteData(
321 gtc,
322 endpoint=self.globus_delete['endpoint'],
323 label=self.globus_delete['label'],
324 verify_checksum=True, sync_level='checksum')
326 # launch the local transfers and local deletes
327 if local_servers:
328 self.launch_transfers_secondary()
330 def launch_transfers_secondary(self):
331 """
332 patcher.launch_transfer_secondary()
333 Launches the globus transfers from flatiron to third-party repositories (local servers)
334 This should run after the the main transfer from patch computer to the flatiron
335 :return: None
336 """
337 for lt in self.globus_transfers_locals:
338 transfer = self.globus_transfers_locals[lt]
339 if len(transfer['DATA']) > 0:
340 self.transfer_client.submit_transfer(transfer)
341 for ld in self.globus_deletes_locals:
342 delete = self.globus_deletes_locals[ld]
343 if len(transfer['DATA']) > 0:
344 self.transfer_client.submit_delete(delete)
347class SSHPatcher(Patcher):
348 """
349 Requires SSH keys access on the FlatIron
350 """
351 def __init__(self, one=None):
352 res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls")
353 if res[0] != 0:
354 raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys")
355 super().__init__(one=one)
357 def _scp(self, local_path, remote_path, dry=True):
358 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \
359 f" mkdir -p {remote_path.parent}; "
360 cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}"
361 return _run_command(cmd, dry=dry)
363 def _rm(self, flatiron_path, dry=True):
364 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}"
365 return _run_command(cmd, dry=dry)
368class FTPPatcher(Patcher):
369 """
370 This is used to register from anywhere without write access to FlatIron
371 """
372 def __init__(self, one=None):
373 super().__init__(one=one) 1b
374 alyx = self.one.alyx 1b
375 if not getattr(alyx._par, 'FTP_DATA_SERVER_LOGIN', False): 1b
376 alyx._par = self.setup(par=alyx._par, silent=alyx.silent) 1b
377 login, pwd = (one.alyx._par.FTP_DATA_SERVER_LOGIN, one.alyx._par.FTP_DATA_SERVER_PWD) 1b
378 self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=login, passwd=pwd) 1b
379 # self.ftp.ssl_version = ssl.PROTOCOL_TLSv1
380 # self.ftp.auth()
381 self.ftp.prot_p() 1b
382 self.ftp.login(login, pwd) 1b
383 # pre-fetch the repositories so as not to query them for every file registered
384 self.repositories = self.one.alyx.rest("data-repository", "list") 1b
386 @staticmethod
387 def setup(par=None, silent=False):
388 """
389 Set up (and save) FTP login parameters
390 :param par: A parameters object to modify, if None the default Webclient parameters are
391 loaded
392 :param silent: If true, the defaults are used with no user input prompt
393 :return: the modified parameters object
394 """
395 DEFAULTS = { 1b
396 "FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org",
397 "FTP_DATA_SERVER_LOGIN": "iblftp",
398 "FTP_DATA_SERVER_PWD": None
399 }
400 if par is None: 1b
401 par = params.get(silent=silent)
402 par = iopar.as_dict(par) 1b
404 if silent: 1b
405 DEFAULTS.update(par) 1b
406 par = DEFAULTS 1b
407 else:
408 for k in DEFAULTS.keys(): 1b
409 cpar = par.get(k, DEFAULTS[k]) 1b
410 # Iterate through non-password pars; skip url if client url already provided
411 if 'PWD' not in k: 1b
412 par[k] = input(f'Param {k}, current value is ["{cpar}"]:') or cpar 1b
413 else:
414 prompt = f'Param {k} (leave empty to leave unchanged):' 1b
415 par[k] = getpass(prompt) or cpar 1b
417 # Get the client key
418 client = par.get('ALYX_URL', None) 1b
419 client_key = params._key_from_url(client) if client else params.get_default_client() 1b
420 # Save the parameters
421 params.save(par, client_key) # Client params 1b
422 return iopar.from_dict(par) 1b
424 def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY,
425 **kwargs):
426 # overrides the superclass just to remove the server repository argument
427 response = super().patch_dataset(path, created_by=created_by, dry=dry,
428 repository=repository, ftp=True, **kwargs)
429 # need to patch the file records to be consistent
430 for ds in response:
431 frs = ds['file_records']
432 fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs))
433 fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and
434 fr['relative_path'] == fr_server['relative_path'], frs))
435 reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'],
436 self.repositories))
437 relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath(
438 PurePosixPath(fr_ftp['relative_path'])))[1:]
439 # 1) if there was already a file, the registration created a duplicate
440 fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and
441 fr['relative_path'] == relative_path, frs)) # NOQA
442 if len(fr_2del) == 1:
443 self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id'])
444 # 2) the patch ftp file needs to be prepended with the server repository path
445 self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'],
446 data={'relative_path': relative_path, 'exists': True})
447 # 3) the server file is labeled as not existing
448 self.one.alyx.rest('files', 'partial_update', id=fr_server['id'],
449 data={'exists': False})
450 return response
452 def _scp(self, local_path, remote_path, dry=True):
453 # remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001
454 remote_path = PurePosixPath('/').joinpath(
455 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT))
456 )
457 # local_path
458 self.mktree(remote_path.parent)
459 self.ftp.pwd()
460 _logger.info(f"FTP upload {local_path}")
461 with open(local_path, 'rb') as fid:
462 self.ftp.storbinary(f'STOR {local_path.name}', fid)
463 return 0, ''
465 def mktree(self, remote_path):
466 """ Browse to the tree on the ftp server, making directories on the way"""
467 if str(remote_path) != '.':
468 try:
469 self.ftp.cwd(str(remote_path))
470 except ftplib.error_perm:
471 self.mktree(PurePosixPath(remote_path.parent))
472 self.ftp.mkd(str(remote_path))
473 self.ftp.cwd(str(remote_path))
475 def _rm(self, flatiron_path, dry=True):
476 raise PermissionError("This Patcher does not have admin permissions to remove data "
477 "from the FlatIron server. ")
480class SDSCPatcher(Patcher):
481 """
482 This is used to patch data on the SDSC server
483 """
484 def __init__(self, one=None):
485 assert one
486 super().__init__(one=one)
488 def patch_datasets(self, file_list, **kwargs):
489 response = super().patch_datasets(file_list, **kwargs)
491 # TODO check the file records to see if they have local server ones
492 # If they do then need to remove file record and delete file from local server??
494 return response
496 def _scp(self, local_path, remote_path, dry=True):
498 _logger.info(f"Copy {local_path} to {remote_path}")
499 if not dry:
500 if not Path(remote_path).parent.exists():
501 Path(remote_path).parent.mkdir(exist_ok=True, parents=True)
502 shutil.copy(local_path, remote_path)
503 return 0, ''
505 def _rm(self, flatiron_path, dry=True):
506 raise PermissionError("This Patcher does not have admin permissions to remove data "
507 "from the FlatIron server")