Coverage for ibllib/oneibl/patcher.py: 54%

378 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 15:25 +0000

1"""A module for ad-hoc dataset modification and registration. 

2 

3Unlike the DataHandler class in oneibl.data_handlers, the Patcher class allows one to fully remove 

4datasets (delete them from the database and repositories), and to overwrite datasets on both the 

5main repositories and the local servers. Additionally the Patcher can handle datasets from 

6multiple sessions at once. 

7 

8Examples 

9-------- 

10Delete a dataset from Alyx and all associated repositories. 

11 

12>>> dataset_id = 'f4aafe6c-a7ab-4390-82cd-2c0e245322a5' 

13>>> task_ids, files_by_repo = IBLGlobusPatcher(AlyxClient(), 'admin').delete_dataset(dataset_id) 

14 

15Patch some local datasets using Globus 

16 

17>>> from one.api import ONE 

18>>> patcher = GlobusPatcher('admin', ONE(), label='UCLA audio times patch') 

19>>> responses = patcher.patch_datasets(file_paths) # register the new datasets to Alyx 

20>>> patcher.launch_transfers(local_servers=True) # transfer to all remote repositories 

21 

22""" 

23import abc 

24import ftplib 

25from pathlib import Path, PurePosixPath, WindowsPath 

26from collections import defaultdict 

27from itertools import starmap 

28from subprocess import Popen, PIPE, STDOUT 

29import subprocess 

30import logging 

31from getpass import getpass 

32import shutil 

33 

34import globus_sdk 

35import iblutil.io.params as iopar 

36from iblutil.util import ensure_list 

37from one.alf.path import get_session_path, add_uuid_string, full_path_parts 

38from one.alf.spec import is_uuid_string, is_uuid 

39from one import params 

40from one.webclient import AlyxClient 

41from one.converters import path_from_dataset 

42from one.remote import globus 

43from one.remote.aws import url2uri, get_s3_from_alyx 

44 

45from ibllib.oneibl.registration import register_dataset 

46 

47_logger = logging.getLogger(__name__) 

48 

49FLATIRON_HOST = 'ibl.flatironinstitute.org' 

50FLATIRON_PORT = 61022 

51FLATIRON_USER = 'datauser' 

52FLATIRON_MOUNT = '/mnt/ibl' 

53FTP_HOST = 'test.alyx.internationalbrainlab.org' 

54FTP_PORT = 21 

55DMZ_REPOSITORY = 'ibl_patcher' # in alyx, the repository name containing the patched filerecords 

56SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl') 

57SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp') 

58 

59 

60def _run_command(cmd, dry=True): 

61 _logger.info(cmd) 

62 if dry: 

63 return 0, '', '' 

64 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

65 info, error = p.communicate() 

66 if p.returncode != 0: 

67 _logger.error(error) 

68 raise RuntimeError(error) 

69 return p.returncode, info, error 

70 

71 

72def sdsc_globus_path_from_dataset(dset): 

73 """ 

74 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

75 Returns SDSC globus file path from a dset record or a list of dsets records from REST 

76 """ 

77 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True) 1d

78 

79 

80def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH): 

81 """ 

82 Returns sdsc file path from a dset record or a list of dsets records from REST 

83 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

84 :param root_path: (optional) the prefix path such as one download directory or SDSC root 

85 """ 

86 return path_from_dataset(dset, root_path=root_path, uuid=True) 1d

87 

88 

89def globus_path_from_dataset(dset, repository=None, uuid=False): 

90 """ 

91 Returns local one file path from a dset record or a list of dsets records from REST 

92 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

93 :param repository: (optional) repository name of the file record (if None, will take 

94 the first filerecord with a URL) 

95 """ 

96 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid) 1d

97 

98 

99class Patcher(abc.ABC): 

100 def __init__(self, one=None): 

101 assert one 1ac

102 self.one = one 1ac

103 

104 def _patch_dataset(self, path, dset_id=None, revision=None, dry=False, ftp=False): 

105 """ 

106 This private methods gets the dataset information from alyx, computes the local 

107 and remote paths and initiates the file copy 

108 """ 

109 path = Path(path) 1b

110 if dset_id is None: 1b

111 dset_id = path.name.split('.')[-2] 

112 if not is_uuid_string(dset_id): 

113 dset_id = None 

114 assert dset_id 1b

115 assert is_uuid_string(dset_id) 1b

116 # If the revision is not None then we need to add the revision into the path. Note the moving of the file 

117 # is handled by one registration client 

118 if revision and f'#{revision}' not in str(path): 1b

119 path = path.parent.joinpath(f'#{revision}#', path.name) 

120 assert path.exists() 1b

121 dset = self.one.alyx.rest('datasets', 'read', id=dset_id) 1b

122 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 1b

123 remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path']) 1b

124 remote_path = add_uuid_string(remote_path, dset_id).as_posix() 1b

125 if remote_path.startswith('/'): 1b

126 full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path) 1b

127 else: 

128 full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path) 

129 if isinstance(path, WindowsPath) and not ftp: 1b

130 # On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/ 

131 path = globus.as_globus_path(path) 

132 status = self._scp(path, full_remote_path, dry=dry)[0] 1b

133 return status 1b

134 

135 def register_dataset(self, file_list, **kwargs): 

136 """ 

137 Registers a set of files belonging to a session only on the server 

138 :param file_list: (list of pathlib.Path) 

139 :param created_by: (string) name of user in Alyx (defaults to 'root') 

140 :param repository: optional: (string) name of the server repository in Alyx 

141 :param versions: optional (list of strings): versions tags (defaults to ibllib version) 

142 :param dry: (bool) False by default 

143 :return: 

144 """ 

145 return register_dataset(file_list, one=self.one, server_only=True, exists=True, **kwargs) 1b

146 

147 def register_datasets(self, file_list, **kwargs): 

148 """ 

149 Same as register_dataset but works with files belonging to different sessions 

150 """ 

151 register_dict = {} 

152 # creates a dictionary of sessions with one file list per session 

153 for f in file_list: 

154 session_path = get_session_path(f) 

155 label = '_'.join(session_path.parts[-3:]) 

156 if label in register_dict: 

157 register_dict[label]['files'].append(f) 

158 else: 

159 register_dict[label] = {'session_path': session_path, 'files': [f]} 

160 responses = [] 

161 nses = len(register_dict) 

162 for i, label in enumerate(register_dict): 

163 _files = register_dict[label]['files'] 

164 _logger.info(f"{i + 1}/{nses} {label}, registering {len(_files)} files") 

165 responses.append(self.register_dataset(_files, **kwargs)) 

166 return responses 

167 

168 def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs): 

169 """ 

170 Creates a new dataset on FlatIron and uploads it from arbitrary location. 

171 Rules for creation/patching are the same that apply for registration via Alyx 

172 as this uses the registration endpoint to get the dataset. 

173 An existing file (same session and path relative to session) will be patched. 

174 :param path: full file path. Must be within an ALF session folder (subject/date/number) 

175 can also be a list of full file paths belonging to the same session. 

176 :param server_repository: Alyx server repository name 

177 :param created_by: alyx username for the dataset (optional, defaults to root) 

178 :param ftp: flag for case when using ftppatcher. Don't adjust windows path in 

179 _patch_dataset when ftp=True 

180 :return: the registrations response, a list of dataset records 

181 """ 

182 # first register the file 

183 if not isinstance(file_list, list): 1b

184 file_list = [Path(file_list)] 

185 assert len(set(map(get_session_path, file_list))) == 1 1b

186 assert all(Path(f).exists() for f in file_list) 1b

187 response = ensure_list(self.register_dataset(file_list, dry=dry, **kwargs)) 1b

188 if dry: 1b

189 return 

190 # from the dataset info, set flatIron flag to exists=True 

191 for p, d in zip(file_list, response): 1b

192 try: 1b

193 self._patch_dataset(p, dset_id=d['id'], revision=d['revision'], dry=dry, ftp=ftp) 1b

194 except Exception as e: 

195 raise Exception(f'Error registering file {p}') from e 

196 return response 1b

197 

198 def patch_datasets(self, file_list, **kwargs): 

199 """Same as create_dataset method but works with several sessions.""" 

200 register_dict = {} 1b

201 # creates a dictionary of sessions with one file list per session 

202 for f in file_list: 1b

203 session_path = get_session_path(f) 1b

204 label = '_'.join(session_path.parts[-3:]) 1b

205 if label in register_dict: 1b

206 register_dict[label]['files'].append(f) 

207 else: 

208 register_dict[label] = {'session_path': session_path, 'files': [f]} 1b

209 responses = [] 1b

210 nses = len(register_dict) 1b

211 for i, label in enumerate(register_dict): 1b

212 _files = register_dict[label]['files'] 1b

213 _logger.info(f'{i + 1}/{nses} {label}, registering {len(_files)} files') 1b

214 responses.extend(self.patch_dataset(_files, **kwargs)) 1b

215 return responses 1b

216 

217 @abc.abstractmethod 

218 def _scp(self, *args, **kwargs): 

219 pass 

220 

221 @abc.abstractmethod 

222 def _rm(self, *args, **kwargs): 

223 pass 

224 

225 

226class GlobusPatcher(Patcher, globus.Globus): 

227 """ 

228 Requires GLOBUS keys access 

229 

230 """ 

231 

232 def __init__(self, client_name='default', one=None, label='ibllib patch'): 

233 assert one and not one.offline 

234 Patcher.__init__(self, one=one) 

235 globus.Globus.__init__(self, client_name) 

236 self.label = label 

237 # get a dictionary of data repositories from Alyx (with globus ids) 

238 self.fetch_endpoints_from_alyx(one.alyx) 

239 flatiron_id = self.endpoints['flatiron_cortexlab']['id'] 

240 if 'flatiron' not in self.endpoints: 

241 self.add_endpoint(flatiron_id, 'flatiron', root_path='/') 

242 self.endpoints['flatiron'] = self.endpoints['flatiron_cortexlab'] 

243 # transfers/delete from the current computer to the flatiron: mandatory and executed first 

244 local_id = self.endpoints['local']['id'] 

245 self.globus_transfer = globus_sdk.TransferData( 

246 self.client, local_id, flatiron_id, verify_checksum=True, sync_level='checksum', label=label) 

247 self.globus_delete = globus_sdk.DeleteData(self.client, flatiron_id, label=label) 

248 # transfers/delete from flatiron to optional third parties to synchronize / delete 

249 self.globus_transfers_locals = {} 

250 self.globus_deletes_locals = {} 

251 super().__init__(one=one) 

252 

253 def _scp(self, local_path, remote_path, dry=True): 

254 remote_path = PurePosixPath('/').joinpath( 1b

255 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) 

256 ) 

257 _logger.info(f"Globus copy {local_path} to {remote_path}") 1b

258 local_path = globus.as_globus_path(local_path) 1b

259 if not dry: 1b

260 if isinstance(self.globus_transfer, globus_sdk.TransferData): 1b

261 self.globus_transfer.add_item(local_path, remote_path.as_posix()) 1b

262 else: 

263 self.globus_transfer.path_src.append(local_path) 

264 self.globus_transfer.path_dest.append(remote_path.as_posix()) 

265 return 0, '' 1b

266 

267 def _rm(self, flatiron_path, dry=True): 

268 flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT))) 

269 _logger.info(f'Globus del {flatiron_path}') 

270 if not dry: 

271 if isinstance(self.globus_delete, globus_sdk.DeleteData): 

272 self.globus_delete.add_item(flatiron_path.as_posix()) 

273 else: 

274 self.globus_delete.path.append(flatiron_path.as_posix()) 

275 return 0, '' 

276 

277 def patch_datasets(self, file_list, **kwargs): 

278 """ 

279 Calls the super method that registers and updates the current computer to Python transfer 

280 Then, creates individual transfer items for each local server so that after the 

281 update on Flatiron, local server files are also updated 

282 :param file_list: 

283 :param kwargs: 

284 :return: 

285 """ 

286 responses = super().patch_datasets(file_list, **kwargs) 1b

287 for dset in responses: 1b

288 # get the flatiron path 

289 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 1b

290 relative_path = add_uuid_string(fr['relative_path'], dset['id']).as_posix() 1b

291 flatiron_path = self.to_address(relative_path, fr['data_repository']) 1b

292 # loop over the remaining repositories (local servers) and create a transfer 

293 # from flatiron to the local server 

294 for fr in dset['file_records']: 1b

295 if fr['data_repository'] == DMZ_REPOSITORY: 1b

296 continue 

297 if fr['data_repository'] not in self.endpoints: 1b

298 continue 

299 repo_gid = self.endpoints[fr['data_repository']]['id'] 1b

300 flatiron_id = self.endpoints['flatiron']['id'] 1b

301 if repo_gid == flatiron_id: 1b

302 continue 1b

303 # if there is no transfer already created, initialize it 

304 if repo_gid not in self.globus_transfers_locals: 1b

305 self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData( 1b

306 self.client, flatiron_id, repo_gid, verify_checksum=True, 

307 sync_level='checksum', label=f"{self.label} on {fr['data_repository']}") 

308 # get the local server path and create the transfer item 

309 local_server_path = self.to_address(fr['relative_path'], fr['data_repository']) 1b

310 self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path) 1b

311 return responses 1b

312 

313 def launch_transfers(self, local_servers=False): 

314 """ 

315 patcher.launch_transfers() 

316 Launches the globus transfer and delete from the local patch computer to the flat-rion 

317 :param: local_servers (False): if True, sync the local servers after the main transfer 

318 :return: None 

319 """ 

320 gtc = self.client 1b

321 

322 def _wait_for_task(resp): 1b

323 # patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9') 

324 # on a good status: 

325 # Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

326 # on a checksum error 

327 # Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

328 # on a finished task 

329 # Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

330 # on an errored task 

331 # Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

332 while True: 1b

333 tinfo = gtc.get_task(task_id=resp['task_id']) 1b

334 if tinfo and tinfo['completion_time'] is not None: 1b

335 break 1b

336 _ = gtc.task_wait(task_id=resp['task_id'], timeout=30) 

337 if tinfo and tinfo['fatal_error'] is not None: 1b

338 raise ConnectionError(f"Globus transfer failed \n {tinfo}") 

339 

340 # handles the transfers first 

341 if len(self.globus_transfer['DATA']) > 0: 1b

342 # launch the transfer 

343 _wait_for_task(gtc.submit_transfer(self.globus_transfer)) 1b

344 # re-initialize the globus_transfer property 

345 self.globus_transfer = globus_sdk.TransferData( 1b

346 gtc, 

347 self.globus_transfer['source_endpoint'], 

348 self.globus_transfer['destination_endpoint'], 

349 label=self.globus_transfer['label'], 

350 verify_checksum=True, sync_level='checksum') 

351 

352 # do the same for deletes 

353 if len(self.globus_delete['DATA']) > 0: 1b

354 _wait_for_task(gtc.submit_delete(self.globus_delete)) 

355 self.globus_delete = globus_sdk.DeleteData( 

356 gtc, 

357 endpoint=self.globus_delete['endpoint'], 

358 label=self.globus_delete['label']) 

359 

360 # launch the local transfers and local deletes 

361 if local_servers: 1b

362 self.launch_transfers_secondary() 1b

363 

364 def launch_transfers_secondary(self): 

365 """ 

366 patcher.launch_transfer_secondary() 

367 Launches the globus transfers from flatiron to third-party repositories (local servers) 

368 This should run after the the main transfer from patch computer to the flatiron 

369 :return: None 

370 """ 

371 for lt in self.globus_transfers_locals: 1b

372 transfer = self.globus_transfers_locals[lt] 1b

373 if len(transfer['DATA']) > 0: 1b

374 self.client.submit_transfer(transfer) 1b

375 for ld in self.globus_deletes_locals: 1b

376 delete = self.globus_deletes_locals[ld] 

377 if len(transfer['DATA']) > 0: 

378 self.client.submit_delete(delete) 

379 

380 

381class IBLGlobusPatcher(Patcher, globus.Globus): 

382 """This is a replacement for the GlobusPatcher class, utilizing the ONE Globus class. 

383 

384 The GlobusPatcher class is more complicated but has the advantage of being able to launch 

385 transfers independently to registration, although it remains to be seen whether this is useful. 

386 """ 

387 def __init__(self, alyx=None, client_name='default'): 

388 """ 

389 

390 Parameters 

391 ---------- 

392 alyx : one.webclient.AlyxClient 

393 An instance of Alyx to use. 

394 client_name : str, default='default' 

395 The Globus client name. 

396 """ 

397 self.alyx = alyx or AlyxClient() 

398 globus.Globus.__init__(client_name=client_name) # NB we don't init Patcher as we're not using ONE 

399 

400 def delete_dataset(self, dataset, dry=False): 

401 """ 

402 Delete a dataset off Alyx and remove file record from all Globus repositories. 

403 

404 Parameters 

405 ---------- 

406 dataset : uuid.UUID, str, dict 

407 The dataset record or ID to delete. 

408 dry : bool 

409 If true, dataset is not deleted and file paths that would be removed are returned. 

410 

411 Returns 

412 ------- 

413 list of uuid.UUID 

414 A list of Globus delete task IDs if dry is false. 

415 dict of str 

416 A map of data repository names and relative paths of the deleted files. 

417 """ 

418 if is_uuid(dataset): 

419 did = dataset 

420 dataset = self.alyx.rest('datasets', 'read', id=did) 

421 else: 

422 did = dataset['url'].split('/')[-1] 

423 

424 def is_aws(repository_name): 

425 return repository_name.startswith('aws_') 

426 

427 files_by_repo = defaultdict(list) # str -> [pathlib.PurePosixPath] 

428 s3_files = [] 

429 file_records = filter(lambda x: x['exists'], dataset['file_records']) 

430 for record in file_records: 

431 repo = self.repo_from_alyx(record['data_repository'], self.alyx) 

432 # Handle S3 files 

433 if not repo['globus_endpoint_id'] or repo['repository_type'] != 'Fileserver': 

434 if is_aws(repo['name']): 

435 s3_files.append(url2uri(record['data_url'])) 

436 files_by_repo[repo['name']].append(PurePosixPath(record['relative_path'])) 

437 else: 

438 _logger.error('Unable to delete from %s', repo['name']) 

439 else: 

440 # Handle Globus files 

441 if repo['name'] not in self.endpoints: 

442 self.add_endpoint(repo['name'], alyx=self.alyx) 

443 filepath = PurePosixPath(record['relative_path']) 

444 if 'flatiron' in repo['name']: 

445 filepath = add_uuid_string(filepath, did) 

446 files_by_repo[repo['name']].append(filepath) 

447 

448 # Remove S3 files 

449 if s3_files: 

450 cmd = ['aws', 's3', 'rm', *s3_files, '--profile', 'ibladmin'] 

451 if dry: 

452 cmd.append('--dryrun') 

453 if _logger.level > logging.DEBUG: 

454 log_function = _logger.error 

455 cmd.append('--only-show-errors') # Suppress verbose output 

456 else: 

457 log_function = _logger.debug 

458 cmd.append('--no-progress') # Suppress progress info, estimated time, etc. 

459 _logger.debug(' '.join(cmd)) 

460 process = Popen(cmd, stdout=PIPE, stderr=STDOUT) 

461 with process.stdout: 

462 for line in iter(process.stdout.readline, b''): 

463 log_function(line.decode().strip()) 

464 assert process.wait() == 0 

465 

466 if dry: 

467 return [], files_by_repo 

468 

469 # Remove Globus files 

470 globus_files_map = filter(lambda x: not is_aws(x[0]), files_by_repo.items()) 

471 task_ids = list(starmap(self.delete_data, map(reversed, globus_files_map))) 

472 

473 # Delete the dataset from Alyx 

474 self.alyx.rest('datasets', 'delete', id=did) 

475 return task_ids, files_by_repo 

476 

477 

478class SSHPatcher(Patcher): 

479 """ 

480 Requires SSH keys access on the FlatIron 

481 """ 

482 def __init__(self, one=None): 

483 res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls") 

484 if res[0] != 0: 

485 raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys") 

486 super().__init__(one=one) 

487 

488 def _scp(self, local_path, remote_path, dry=True): 

489 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \ 

490 f" mkdir -p {remote_path.parent}; " 

491 cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}" 

492 return _run_command(cmd, dry=dry) 

493 

494 def _rm(self, flatiron_path, dry=True): 

495 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}" 

496 return _run_command(cmd, dry=dry) 

497 

498 

499class FTPPatcher(Patcher): 

500 """ 

501 This is used to register from anywhere without write access to FlatIron 

502 """ 

503 def __init__(self, one=None): 

504 super().__init__(one=one) 1c

505 alyx = self.one.alyx 1c

506 if not getattr(alyx._par, 'FTP_DATA_SERVER_LOGIN', False): 1c

507 alyx._par = self.setup(par=alyx._par, silent=alyx.silent) 1c

508 login, pwd = (one.alyx._par.FTP_DATA_SERVER_LOGIN, one.alyx._par.FTP_DATA_SERVER_PWD) 1c

509 self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=login, passwd=pwd) 1c

510 # self.ftp.ssl_version = ssl.PROTOCOL_TLSv1 

511 # self.ftp.auth() 

512 self.ftp.prot_p() 1c

513 self.ftp.login(login, pwd) 1c

514 # pre-fetch the repositories so as not to query them for every file registered 

515 self.repositories = self.one.alyx.rest("data-repository", "list") 1c

516 

517 @staticmethod 

518 def setup(par=None, silent=False): 

519 """ 

520 Set up (and save) FTP login parameters 

521 :param par: A parameters object to modify, if None the default Webclient parameters are 

522 loaded 

523 :param silent: If true, the defaults are used with no user input prompt 

524 :return: the modified parameters object 

525 """ 

526 DEFAULTS = { 1c

527 "FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org", 

528 "FTP_DATA_SERVER_LOGIN": "iblftp", 

529 "FTP_DATA_SERVER_PWD": None 

530 } 

531 if par is None: 1c

532 par = params.get(silent=silent) 

533 par = iopar.as_dict(par) 1c

534 

535 if silent: 1c

536 DEFAULTS.update(par) 1c

537 par = DEFAULTS 1c

538 else: 

539 for k in DEFAULTS.keys(): 1c

540 cpar = par.get(k, DEFAULTS[k]) 1c

541 # Iterate through non-password pars; skip url if client url already provided 

542 if 'PWD' not in k: 1c

543 par[k] = input(f'Param {k}, current value is ["{cpar}"]:') or cpar 1c

544 else: 

545 prompt = f'Param {k} (leave empty to leave unchanged):' 1c

546 par[k] = getpass(prompt) or cpar 1c

547 

548 # Get the client key 

549 client = par.get('ALYX_URL', None) 1c

550 client_key = params._key_from_url(client) if client else params.get_default_client() 1c

551 # Save the parameters 

552 params.save(par, client_key) # Client params 1c

553 return iopar.from_dict(par) 1c

554 

555 def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY, 

556 **kwargs): 

557 # overrides the superclass just to remove the server repository argument 

558 response = super().patch_dataset(path, created_by=created_by, dry=dry, 

559 repository=repository, ftp=True, **kwargs) 

560 # need to patch the file records to be consistent 

561 for ds in response: 

562 frs = ds['file_records'] 

563 fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) 

564 fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and 

565 fr['relative_path'] == fr_server['relative_path'], frs)) 

566 reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'], 

567 self.repositories)) 

568 relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath( 

569 PurePosixPath(fr_ftp['relative_path'])))[1:] 

570 # 1) if there was already a file, the registration created a duplicate 

571 fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and 

572 fr['relative_path'] == relative_path, frs)) # NOQA 

573 if len(fr_2del) == 1: 

574 self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id']) 

575 # 2) the patch ftp file needs to be prepended with the server repository path 

576 self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'], 

577 data={'relative_path': relative_path, 'exists': True}) 

578 # 3) the server file is labeled as not existing 

579 self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], 

580 data={'exists': False}) 

581 return response 

582 

583 def _scp(self, local_path, remote_path, dry=True): 

584 # remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001 

585 remote_path = PurePosixPath('/').joinpath( 

586 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) 

587 ) 

588 # local_path 

589 self.mktree(remote_path.parent) 

590 self.ftp.pwd() 

591 _logger.info(f"FTP upload {local_path}") 

592 with open(local_path, 'rb') as fid: 

593 self.ftp.storbinary(f'STOR {local_path.name}', fid) 

594 return 0, '' 

595 

596 def mktree(self, remote_path): 

597 """ Browse to the tree on the ftp server, making directories on the way""" 

598 if str(remote_path) != '.': 

599 try: 

600 self.ftp.cwd(str(remote_path)) 

601 except ftplib.error_perm: 

602 self.mktree(PurePosixPath(remote_path.parent)) 

603 self.ftp.mkd(str(remote_path)) 

604 self.ftp.cwd(str(remote_path)) 

605 

606 def _rm(self, flatiron_path, dry=True): 

607 raise PermissionError("This Patcher does not have admin permissions to remove data " 

608 "from the FlatIron server. ") 

609 

610 

611class SDSCPatcher(Patcher): 

612 """ 

613 This is used to patch data on the SDSC server 

614 """ 

615 def __init__(self, one=None): 

616 assert one 

617 super().__init__(one=one) 

618 

619 def patch_datasets(self, file_list, **kwargs): 

620 response = super().patch_datasets(file_list, **kwargs) 

621 

622 # TODO check the file records to see if they have local server ones 

623 # If they do then need to remove file record and delete file from local server?? 

624 

625 return response 

626 

627 def _scp(self, local_path, remote_path, dry=True): 

628 

629 _logger.info(f"Copy {local_path} to {remote_path}") 

630 if not dry: 

631 if not Path(remote_path).parent.exists(): 

632 Path(remote_path).parent.mkdir(exist_ok=True, parents=True) 

633 shutil.copy(local_path, remote_path) 

634 return 0, '' 

635 

636 def _rm(self, flatiron_path, dry=True): 

637 raise PermissionError("This Patcher does not have admin permissions to remove data " 

638 "from the FlatIron server") 

639 

640 

641class S3Patcher(Patcher): 

642 

643 def __init__(self, one=None): 

644 assert one 

645 super().__init__(one=one) 

646 self.s3_repo = 's3_patcher' 

647 self.s3_path = 'patcher' 

648 

649 # Instantiate boto connection 

650 self.s3, self.bucket = get_s3_from_alyx(self.one.alyx, repo_name=self.s3_repo) 

651 

652 def check_datasets(self, file_list): 

653 # Here we want to check if the datasets exist, if they do we don't want to patch unless we force. 

654 exists = [] 

655 for file in file_list: 

656 collection = full_path_parts(file, as_dict=True)['collection'] 

657 dset = self.one.alyx.rest('datasets', 'list', session=self.one.path2eid(file), name=file.name, 

658 collection=collection, clobber=True) 

659 if len(dset) > 0: 

660 exists.append(file) 

661 

662 return exists 

663 

664 def patch_dataset(self, file_list, dry=False, ftp=False, force=False, **kwargs): 

665 

666 exists = self.check_datasets(file_list) 

667 if len(exists) > 0 and not force: 

668 _logger.error(f'Files: {", ".join([f.name for f in file_list])} already exist, to force set force=True') 

669 return 

670 

671 response = super().patch_dataset(file_list, dry=dry, repository=self.s3_repo, ftp=False, **kwargs) 

672 # TODO in an ideal case the flatiron filerecord won't be altered when we register this dataset. This requires 

673 # changing the the alyx.data.register_view 

674 for ds in response: 

675 frs = ds['file_records'] 

676 fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) 

677 # Update the flatiron file record to be false 

678 self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], 

679 data={'exists': False}) 

680 

681 def _scp(self, local_path, remote_path, dry=True): 

682 

683 aws_remote_path = Path(self.s3_path).joinpath(remote_path.relative_to(FLATIRON_MOUNT)) 

684 _logger.info(f'Transferring file {local_path} to {aws_remote_path}') 

685 self.s3.Bucket(self.bucket).upload_file(str(PurePosixPath(local_path)), str(PurePosixPath(aws_remote_path))) 

686 

687 return 0, '' 

688 

689 def _rm(self, *args, **kwargs): 

690 raise PermissionError("This Patcher does not have admin permissions to remove data.")