Coverage for ibllib/oneibl/patcher.py: 57%

343 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""A module for ad-hoc dataset modification and registration. 

2 

3Unlike the DataHandler class in oneibl.data_handlers, the Patcher class allows one to fully remove 

4datasets (delete them from the database and repositories), and to overwrite datasets on both the 

5main repositories and the local servers. Additionally the Patcher can handle datasets from 

6multiple sessions at once. 

7 

8Examples 

9-------- 

10Delete a dataset from Alyx and all associated repositories. 

11 

12>>> dataset_id = 'f4aafe6c-a7ab-4390-82cd-2c0e245322a5' 

13>>> task_ids, files_by_repo = IBLGlobusPatcher(AlyxClient(), 'admin').delete_dataset(dataset_id) 

14 

15Patch some local datasets using Globus 

16 

17>>> from one.api import ONE 

18>>> patcher = GlobusPatcher('admin', ONE(), label='UCLA audio times patch') 

19>>> responses = patcher.patch_datasets(file_paths) # register the new datasets to Alyx 

20>>> patcher.launch_transfers(local_servers=True) # transfer to all remote repositories 

21 

22""" 

23import abc 

24import ftplib 

25from pathlib import Path, PurePosixPath, WindowsPath 

26from collections import defaultdict 

27from itertools import starmap 

28from subprocess import Popen, PIPE, STDOUT 

29import subprocess 

30import logging 

31from getpass import getpass 

32import shutil 

33 

34import globus_sdk 

35import iblutil.io.params as iopar 

36from one.alf.files import get_session_path, add_uuid_string 

37from one.alf.spec import is_uuid_string, is_uuid 

38from one import params 

39from one.webclient import AlyxClient 

40from one.converters import path_from_dataset 

41from one.remote import globus 

42from one.remote.aws import url2uri 

43from one.util import ensure_list 

44 

45from ibllib.oneibl.registration import register_dataset 

46 

47_logger = logging.getLogger(__name__) 

48 

49FLATIRON_HOST = 'ibl.flatironinstitute.org' 

50FLATIRON_PORT = 61022 

51FLATIRON_USER = 'datauser' 

52FLATIRON_MOUNT = '/mnt/ibl' 

53FTP_HOST = 'test.alyx.internationalbrainlab.org' 

54FTP_PORT = 21 

55DMZ_REPOSITORY = 'ibl_patcher' # in alyx, the repository name containing the patched filerecords 

56SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl') 

57SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp') 

58 

59 

60def _run_command(cmd, dry=True): 

61 _logger.info(cmd) 

62 if dry: 

63 return 0, '', '' 

64 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

65 info, error = p.communicate() 

66 if p.returncode != 0: 

67 _logger.error(error) 

68 raise RuntimeError(error) 

69 return p.returncode, info, error 

70 

71 

72def sdsc_globus_path_from_dataset(dset): 

73 """ 

74 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

75 Returns SDSC globus file path from a dset record or a list of dsets records from REST 

76 """ 

77 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True) 1d

78 

79 

80def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH): 

81 """ 

82 Returns sdsc file path from a dset record or a list of dsets records from REST 

83 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

84 :param root_path: (optional) the prefix path such as one download directory or SDSC root 

85 """ 

86 return path_from_dataset(dset, root_path=root_path, uuid=True) 1d

87 

88 

89def globus_path_from_dataset(dset, repository=None, uuid=False): 

90 """ 

91 Returns local one file path from a dset record or a list of dsets records from REST 

92 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

93 :param repository: (optional) repository name of the file record (if None, will take 

94 the first filerecord with a URL) 

95 """ 

96 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid) 1d

97 

98 

99class Patcher(abc.ABC): 

100 def __init__(self, one=None): 

101 assert one 1ac

102 self.one = one 1ac

103 

104 def _patch_dataset(self, path, dset_id=None, revision=None, dry=False, ftp=False): 

105 """ 

106 This private methods gets the dataset information from alyx, computes the local 

107 and remote paths and initiates the file copy 

108 """ 

109 path = Path(path) 1b

110 if dset_id is None: 1b

111 dset_id = path.name.split('.')[-2] 

112 if not is_uuid_string(dset_id): 

113 dset_id = None 

114 assert dset_id 1b

115 assert is_uuid_string(dset_id) 1b

116 # If the revision is not None then we need to add the revision into the path. Note the moving of the file 

117 # is handled by one registration client 

118 if revision is not None and f'#{revision}' not in str(path): 1b

119 path = path.parent.joinpath(f'#{revision}#', path.name) 

120 assert path.exists() 1b

121 dset = self.one.alyx.rest('datasets', 'read', id=dset_id) 1b

122 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 1b

123 remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path']) 1b

124 remote_path = add_uuid_string(remote_path, dset_id).as_posix() 1b

125 if remote_path.startswith('/'): 1b

126 full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path) 1b

127 else: 

128 full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path) 

129 if isinstance(path, WindowsPath) and not ftp: 1b

130 # On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/ 

131 path = globus.as_globus_path(path) 

132 status = self._scp(path, full_remote_path, dry=dry)[0] 1b

133 return status 1b

134 

135 def register_dataset(self, file_list, **kwargs): 

136 """ 

137 Registers a set of files belonging to a session only on the server 

138 :param file_list: (list of pathlib.Path) 

139 :param created_by: (string) name of user in Alyx (defaults to 'root') 

140 :param repository: optional: (string) name of the server repository in Alyx 

141 :param versions: optional (list of strings): versions tags (defaults to ibllib version) 

142 :param dry: (bool) False by default 

143 :return: 

144 """ 

145 return register_dataset(file_list, one=self.one, server_only=True, exists=True, **kwargs) 1b

146 

147 def register_datasets(self, file_list, **kwargs): 

148 """ 

149 Same as register_dataset but works with files belonging to different sessions 

150 """ 

151 register_dict = {} 

152 # creates a dictionary of sessions with one file list per session 

153 for f in file_list: 

154 session_path = get_session_path(f) 

155 label = '_'.join(session_path.parts[-3:]) 

156 if label in register_dict: 

157 register_dict[label]['files'].append(f) 

158 else: 

159 register_dict[label] = {'session_path': session_path, 'files': [f]} 

160 responses = [] 

161 nses = len(register_dict) 

162 for i, label in enumerate(register_dict): 

163 _files = register_dict[label]['files'] 

164 _logger.info(f"{i + 1}/{nses} {label}, registering {len(_files)} files") 

165 responses.append(self.register_dataset(_files, **kwargs)) 

166 return responses 

167 

168 def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs): 

169 """ 

170 Creates a new dataset on FlatIron and uploads it from arbitrary location. 

171 Rules for creation/patching are the same that apply for registration via Alyx 

172 as this uses the registration endpoint to get the dataset. 

173 An existing file (same session and path relative to session) will be patched. 

174 :param path: full file path. Must be within an ALF session folder (subject/date/number) 

175 can also be a list of full file paths belonging to the same session. 

176 :param server_repository: Alyx server repository name 

177 :param created_by: alyx username for the dataset (optional, defaults to root) 

178 :param ftp: flag for case when using ftppatcher. Don't adjust windows path in 

179 _patch_dataset when ftp=True 

180 :return: the registrations response, a list of dataset records 

181 """ 

182 # first register the file 

183 if not isinstance(file_list, list): 1b

184 file_list = [Path(file_list)] 

185 assert len(set([get_session_path(f) for f in file_list])) == 1 1b

186 assert all([Path(f).exists() for f in file_list]) 1b

187 response = ensure_list(self.register_dataset(file_list, dry=dry, **kwargs)) 1b

188 if dry: 1b

189 return 

190 # from the dataset info, set flatIron flag to exists=True 

191 for p, d in zip(file_list, response): 1b

192 self._patch_dataset(p, dset_id=d['id'], revision=d['revision'], dry=dry, ftp=ftp) 1b

193 return response 1b

194 

195 def patch_datasets(self, file_list, **kwargs): 

196 """ 

197 Same as create_dataset method but works with several sessions 

198 """ 

199 register_dict = {} 1b

200 # creates a dictionary of sessions with one file list per session 

201 for f in file_list: 1b

202 session_path = get_session_path(f) 1b

203 label = '_'.join(session_path.parts[-3:]) 1b

204 if label in register_dict: 1b

205 register_dict[label]['files'].append(f) 

206 else: 

207 register_dict[label] = {'session_path': session_path, 'files': [f]} 1b

208 responses = [] 1b

209 nses = len(register_dict) 1b

210 for i, label in enumerate(register_dict): 1b

211 _files = register_dict[label]['files'] 1b

212 _logger.info(f'{i + 1}/{nses} {label}, registering {len(_files)} files') 1b

213 responses.extend(self.patch_dataset(_files, **kwargs)) 1b

214 return responses 1b

215 

216 @abc.abstractmethod 

217 def _scp(self, *args, **kwargs): 

218 pass 

219 

220 @abc.abstractmethod 

221 def _rm(self, *args, **kwargs): 

222 pass 

223 

224 

225class GlobusPatcher(Patcher, globus.Globus): 

226 """ 

227 Requires GLOBUS keys access 

228 

229 """ 

230 

231 def __init__(self, client_name='default', one=None, label='ibllib patch'): 

232 assert one and not one.offline 

233 Patcher.__init__(self, one=one) 

234 globus.Globus.__init__(self, client_name) 

235 self.label = label 

236 # get a dictionary of data repositories from Alyx (with globus ids) 

237 self.fetch_endpoints_from_alyx(one.alyx) 

238 flatiron_id = self.endpoints['flatiron_cortexlab']['id'] 

239 if 'flatiron' not in self.endpoints: 

240 self.add_endpoint(flatiron_id, 'flatiron', root_path='/') 

241 self.endpoints['flatiron'] = self.endpoints['flatiron_cortexlab'] 

242 # transfers/delete from the current computer to the flatiron: mandatory and executed first 

243 local_id = self.endpoints['local']['id'] 

244 self.globus_transfer = globus_sdk.TransferData( 

245 self.client, local_id, flatiron_id, verify_checksum=True, sync_level='checksum', label=label) 

246 self.globus_delete = globus_sdk.DeleteData(self.client, flatiron_id, label=label) 

247 # transfers/delete from flatiron to optional third parties to synchronize / delete 

248 self.globus_transfers_locals = {} 

249 self.globus_deletes_locals = {} 

250 super().__init__(one=one) 

251 

252 def _scp(self, local_path, remote_path, dry=True): 

253 remote_path = PurePosixPath('/').joinpath( 1b

254 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) 

255 ) 

256 _logger.info(f"Globus copy {local_path} to {remote_path}") 1b

257 local_path = globus.as_globus_path(local_path) 1b

258 if not dry: 1b

259 if isinstance(self.globus_transfer, globus_sdk.TransferData): 1b

260 self.globus_transfer.add_item(local_path, remote_path.as_posix()) 1b

261 else: 

262 self.globus_transfer.path_src.append(local_path) 

263 self.globus_transfer.path_dest.append(remote_path.as_posix()) 

264 return 0, '' 1b

265 

266 def _rm(self, flatiron_path, dry=True): 

267 flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT))) 

268 _logger.info(f'Globus del {flatiron_path}') 

269 if not dry: 

270 if isinstance(self.globus_delete, globus_sdk.DeleteData): 

271 self.globus_delete.add_item(flatiron_path.as_posix()) 

272 else: 

273 self.globus_delete.path.append(flatiron_path.as_posix()) 

274 return 0, '' 

275 

276 def patch_datasets(self, file_list, **kwargs): 

277 """ 

278 Calls the super method that registers and updates the current computer to Python transfer 

279 Then, creates individual transfer items for each local server so that after the 

280 update on Flatiron, local server files are also updated 

281 :param file_list: 

282 :param kwargs: 

283 :return: 

284 """ 

285 responses = super().patch_datasets(file_list, **kwargs) 1b

286 for dset in responses: 1b

287 # get the flatiron path 

288 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 1b

289 relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix() 1b

290 flatiron_path = self.to_address(relative_path, fr['data_repository']) 1b

291 # loop over the remaining repositories (local servers) and create a transfer 

292 # from flatiron to the local server 

293 for fr in dset['file_records']: 1b

294 if fr['data_repository'] == DMZ_REPOSITORY: 1b

295 continue 

296 if fr['data_repository'] not in self.endpoints: 1b

297 continue 

298 repo_gid = self.endpoints[fr['data_repository']]['id'] 1b

299 flatiron_id = self.endpoints['flatiron']['id'] 1b

300 if repo_gid == flatiron_id: 1b

301 continue 1b

302 # if there is no transfer already created, initialize it 

303 if repo_gid not in self.globus_transfers_locals: 1b

304 self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData( 1b

305 self.client, flatiron_id, repo_gid, verify_checksum=True, 

306 sync_level='checksum', label=f"{self.label} on {fr['data_repository']}") 

307 # get the local server path and create the transfer item 

308 local_server_path = self.to_address(fr['relative_path'], fr['data_repository']) 1b

309 self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path) 1b

310 return responses 1b

311 

312 def launch_transfers(self, local_servers=False): 

313 """ 

314 patcher.launch_transfers() 

315 Launches the globus transfer and delete from the local patch computer to the flat-rion 

316 :param: local_servers (False): if True, sync the local servers after the main transfer 

317 :return: None 

318 """ 

319 gtc = self.client 1b

320 

321 def _wait_for_task(resp): 1b

322 # patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9') 

323 # on a good status: 

324 # Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

325 # on a checksum error 

326 # Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

327 # on a finished task 

328 # Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

329 # on an errored task 

330 # Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

331 while True: 1b

332 tinfo = gtc.get_task(task_id=resp['task_id']) 1b

333 if tinfo and tinfo['completion_time'] is not None: 1b

334 break 1b

335 _ = gtc.task_wait(task_id=resp['task_id'], timeout=30) 

336 if tinfo and tinfo['fatal_error'] is not None: 1b

337 raise ConnectionError(f"Globus transfer failed \n {tinfo}") 

338 

339 # handles the transfers first 

340 if len(self.globus_transfer['DATA']) > 0: 1b

341 # launch the transfer 

342 _wait_for_task(gtc.submit_transfer(self.globus_transfer)) 1b

343 # re-initialize the globus_transfer property 

344 self.globus_transfer = globus_sdk.TransferData( 1b

345 gtc, 

346 self.globus_transfer['source_endpoint'], 

347 self.globus_transfer['destination_endpoint'], 

348 label=self.globus_transfer['label'], 

349 verify_checksum=True, sync_level='checksum') 

350 

351 # do the same for deletes 

352 if len(self.globus_delete['DATA']) > 0: 1b

353 _wait_for_task(gtc.submit_delete(self.globus_delete)) 

354 self.globus_delete = globus_sdk.DeleteData( 

355 gtc, 

356 endpoint=self.globus_delete['endpoint'], 

357 label=self.globus_delete['label']) 

358 

359 # launch the local transfers and local deletes 

360 if local_servers: 1b

361 self.launch_transfers_secondary() 1b

362 

363 def launch_transfers_secondary(self): 

364 """ 

365 patcher.launch_transfer_secondary() 

366 Launches the globus transfers from flatiron to third-party repositories (local servers) 

367 This should run after the the main transfer from patch computer to the flatiron 

368 :return: None 

369 """ 

370 for lt in self.globus_transfers_locals: 1b

371 transfer = self.globus_transfers_locals[lt] 1b

372 if len(transfer['DATA']) > 0: 1b

373 self.client.submit_transfer(transfer) 1b

374 for ld in self.globus_deletes_locals: 1b

375 delete = self.globus_deletes_locals[ld] 

376 if len(transfer['DATA']) > 0: 

377 self.client.submit_delete(delete) 

378 

379 

380class IBLGlobusPatcher(Patcher, globus.Globus): 

381 """This is a replacement for the GlobusPatcher class, utilizing the ONE Globus class. 

382 

383 The GlobusPatcher class is more complicated but has the advantage of being able to launch 

384 transfers independently to registration, although it remains to be seen whether this is useful. 

385 """ 

386 def __init__(self, alyx=None, client_name='default'): 

387 """ 

388 

389 Parameters 

390 ---------- 

391 alyx : one.webclient.AlyxClient 

392 An instance of Alyx to use. 

393 client_name : str, default='default' 

394 The Globus client name. 

395 """ 

396 self.alyx = alyx or AlyxClient() 

397 globus.Globus.__init__(client_name=client_name) # NB we don't init Patcher as we're not using ONE 

398 

399 def delete_dataset(self, dataset, dry=False): 

400 """ 

401 Delete a dataset off Alyx and remove file record from all Globus repositories. 

402 

403 Parameters 

404 ---------- 

405 dataset : uuid.UUID, str, dict 

406 The dataset record or ID to delete. 

407 dry : bool 

408 If true, dataset is not deleted and file paths that would be removed are returned. 

409 

410 Returns 

411 ------- 

412 list of uuid.UUID 

413 A list of Globus delete task IDs if dry is false. 

414 dict of str 

415 A map of data repository names and relative paths of the deleted files. 

416 """ 

417 if is_uuid(dataset): 

418 did = dataset 

419 dataset = self.alyx.rest('datasets', 'read', id=did) 

420 else: 

421 did = dataset['url'].split('/')[-1] 

422 

423 def is_aws(repository_name): 

424 return repository_name.startswith('aws_') 

425 

426 files_by_repo = defaultdict(list) # str -> [pathlib.PurePosixPath] 

427 s3_files = [] 

428 file_records = filter(lambda x: x['exists'], dataset['file_records']) 

429 for record in file_records: 

430 repo = self.repo_from_alyx(record['data_repository'], self.alyx) 

431 # Handle S3 files 

432 if not repo['globus_endpoint_id'] or repo['repository_type'] != 'Fileserver': 

433 if is_aws(repo['name']): 

434 s3_files.append(url2uri(record['data_url'])) 

435 files_by_repo[repo['name']].append(PurePosixPath(record['relative_path'])) 

436 else: 

437 _logger.error('Unable to delete from %s', repo['name']) 

438 else: 

439 # Handle Globus files 

440 if repo['name'] not in self.endpoints: 

441 self.add_endpoint(repo['name'], alyx=self.alyx) 

442 filepath = PurePosixPath(record['relative_path']) 

443 if 'flatiron' in repo['name']: 

444 filepath = add_uuid_string(filepath, did) 

445 files_by_repo[repo['name']].append(filepath) 

446 

447 # Remove S3 files 

448 if s3_files: 

449 cmd = ['aws', 's3', 'rm', *s3_files, '--profile', 'ibladmin'] 

450 if dry: 

451 cmd.append('--dryrun') 

452 if _logger.level > logging.DEBUG: 

453 log_function = _logger.error 

454 cmd.append('--only-show-errors') # Suppress verbose output 

455 else: 

456 log_function = _logger.debug 

457 cmd.append('--no-progress') # Suppress progress info, estimated time, etc. 

458 _logger.debug(' '.join(cmd)) 

459 process = Popen(cmd, stdout=PIPE, stderr=STDOUT) 

460 with process.stdout: 

461 for line in iter(process.stdout.readline, b''): 

462 log_function(line.decode().strip()) 

463 assert process.wait() == 0 

464 

465 if dry: 

466 return [], files_by_repo 

467 

468 # Remove Globus files 

469 globus_files_map = filter(lambda x: not is_aws(x[0]), files_by_repo.items()) 

470 task_ids = list(starmap(self.delete_data, map(reversed, globus_files_map))) 

471 

472 # Delete the dataset from Alyx 

473 self.alyx.rest('datasets', 'delete', id=did) 

474 return task_ids, files_by_repo 

475 

476 

477class SSHPatcher(Patcher): 

478 """ 

479 Requires SSH keys access on the FlatIron 

480 """ 

481 def __init__(self, one=None): 

482 res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls") 

483 if res[0] != 0: 

484 raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys") 

485 super().__init__(one=one) 

486 

487 def _scp(self, local_path, remote_path, dry=True): 

488 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \ 

489 f" mkdir -p {remote_path.parent}; " 

490 cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}" 

491 return _run_command(cmd, dry=dry) 

492 

493 def _rm(self, flatiron_path, dry=True): 

494 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}" 

495 return _run_command(cmd, dry=dry) 

496 

497 

498class FTPPatcher(Patcher): 

499 """ 

500 This is used to register from anywhere without write access to FlatIron 

501 """ 

502 def __init__(self, one=None): 

503 super().__init__(one=one) 1c

504 alyx = self.one.alyx 1c

505 if not getattr(alyx._par, 'FTP_DATA_SERVER_LOGIN', False): 1c

506 alyx._par = self.setup(par=alyx._par, silent=alyx.silent) 1c

507 login, pwd = (one.alyx._par.FTP_DATA_SERVER_LOGIN, one.alyx._par.FTP_DATA_SERVER_PWD) 1c

508 self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=login, passwd=pwd) 1c

509 # self.ftp.ssl_version = ssl.PROTOCOL_TLSv1 

510 # self.ftp.auth() 

511 self.ftp.prot_p() 1c

512 self.ftp.login(login, pwd) 1c

513 # pre-fetch the repositories so as not to query them for every file registered 

514 self.repositories = self.one.alyx.rest("data-repository", "list") 1c

515 

516 @staticmethod 

517 def setup(par=None, silent=False): 

518 """ 

519 Set up (and save) FTP login parameters 

520 :param par: A parameters object to modify, if None the default Webclient parameters are 

521 loaded 

522 :param silent: If true, the defaults are used with no user input prompt 

523 :return: the modified parameters object 

524 """ 

525 DEFAULTS = { 1c

526 "FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org", 

527 "FTP_DATA_SERVER_LOGIN": "iblftp", 

528 "FTP_DATA_SERVER_PWD": None 

529 } 

530 if par is None: 1c

531 par = params.get(silent=silent) 

532 par = iopar.as_dict(par) 1c

533 

534 if silent: 1c

535 DEFAULTS.update(par) 1c

536 par = DEFAULTS 1c

537 else: 

538 for k in DEFAULTS.keys(): 1c

539 cpar = par.get(k, DEFAULTS[k]) 1c

540 # Iterate through non-password pars; skip url if client url already provided 

541 if 'PWD' not in k: 1c

542 par[k] = input(f'Param {k}, current value is ["{cpar}"]:') or cpar 1c

543 else: 

544 prompt = f'Param {k} (leave empty to leave unchanged):' 1c

545 par[k] = getpass(prompt) or cpar 1c

546 

547 # Get the client key 

548 client = par.get('ALYX_URL', None) 1c

549 client_key = params._key_from_url(client) if client else params.get_default_client() 1c

550 # Save the parameters 

551 params.save(par, client_key) # Client params 1c

552 return iopar.from_dict(par) 1c

553 

554 def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY, 

555 **kwargs): 

556 # overrides the superclass just to remove the server repository argument 

557 response = super().patch_dataset(path, created_by=created_by, dry=dry, 

558 repository=repository, ftp=True, **kwargs) 

559 # need to patch the file records to be consistent 

560 for ds in response: 

561 frs = ds['file_records'] 

562 fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) 

563 fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and 

564 fr['relative_path'] == fr_server['relative_path'], frs)) 

565 reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'], 

566 self.repositories)) 

567 relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath( 

568 PurePosixPath(fr_ftp['relative_path'])))[1:] 

569 # 1) if there was already a file, the registration created a duplicate 

570 fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and 

571 fr['relative_path'] == relative_path, frs)) # NOQA 

572 if len(fr_2del) == 1: 

573 self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id']) 

574 # 2) the patch ftp file needs to be prepended with the server repository path 

575 self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'], 

576 data={'relative_path': relative_path, 'exists': True}) 

577 # 3) the server file is labeled as not existing 

578 self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], 

579 data={'exists': False}) 

580 return response 

581 

582 def _scp(self, local_path, remote_path, dry=True): 

583 # remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001 

584 remote_path = PurePosixPath('/').joinpath( 

585 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) 

586 ) 

587 # local_path 

588 self.mktree(remote_path.parent) 

589 self.ftp.pwd() 

590 _logger.info(f"FTP upload {local_path}") 

591 with open(local_path, 'rb') as fid: 

592 self.ftp.storbinary(f'STOR {local_path.name}', fid) 

593 return 0, '' 

594 

595 def mktree(self, remote_path): 

596 """ Browse to the tree on the ftp server, making directories on the way""" 

597 if str(remote_path) != '.': 

598 try: 

599 self.ftp.cwd(str(remote_path)) 

600 except ftplib.error_perm: 

601 self.mktree(PurePosixPath(remote_path.parent)) 

602 self.ftp.mkd(str(remote_path)) 

603 self.ftp.cwd(str(remote_path)) 

604 

605 def _rm(self, flatiron_path, dry=True): 

606 raise PermissionError("This Patcher does not have admin permissions to remove data " 

607 "from the FlatIron server. ") 

608 

609 

610class SDSCPatcher(Patcher): 

611 """ 

612 This is used to patch data on the SDSC server 

613 """ 

614 def __init__(self, one=None): 

615 assert one 

616 super().__init__(one=one) 

617 

618 def patch_datasets(self, file_list, **kwargs): 

619 response = super().patch_datasets(file_list, **kwargs) 

620 

621 # TODO check the file records to see if they have local server ones 

622 # If they do then need to remove file record and delete file from local server?? 

623 

624 return response 

625 

626 def _scp(self, local_path, remote_path, dry=True): 

627 

628 _logger.info(f"Copy {local_path} to {remote_path}") 

629 if not dry: 

630 if not Path(remote_path).parent.exists(): 

631 Path(remote_path).parent.mkdir(exist_ok=True, parents=True) 

632 shutil.copy(local_path, remote_path) 

633 return 0, '' 

634 

635 def _rm(self, flatiron_path, dry=True): 

636 raise PermissionError("This Patcher does not have admin permissions to remove data " 

637 "from the FlatIron server")