Coverage for ibllib/oneibl/patcher.py: 34%

281 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import abc 

2import ftplib 

3from pathlib import Path, PurePosixPath, WindowsPath 

4import subprocess 

5import logging 

6from getpass import getpass 

7import shutil 

8 

9import globus_sdk 

10import iblutil.io.params as iopar 

11from one.alf.files import get_session_path, add_uuid_string 

12from one.alf.spec import is_uuid_string 

13from one import params 

14from one.converters import path_from_dataset 

15from one.remote import globus 

16 

17from ibllib.oneibl.registration import register_dataset 

18 

19_logger = logging.getLogger(__name__) 

20 

21FLAT_IRON_GLOBUS_ID = 'ab2d064c-413d-11eb-b188-0ee0d5d9299f' 

22FLATIRON_HOST = 'ibl.flatironinstitute.org' 

23FLATIRON_PORT = 61022 

24FLATIRON_USER = 'datauser' 

25FLATIRON_MOUNT = '/mnt/ibl' 

26FTP_HOST = 'test.alyx.internationalbrainlab.org' 

27FTP_PORT = 21 

28DMZ_REPOSITORY = 'ibl_patcher' # in alyx, the repository name containing the patched filerecords 

29SDSC_ROOT_PATH = PurePosixPath('/mnt/ibl') 

30SDSC_PATCH_PATH = PurePosixPath('/home/datauser/temp') 

31 

32 

33def _run_command(cmd, dry=True): 

34 _logger.info(cmd) 

35 if dry: 

36 return 0, '', '' 

37 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

38 info, error = p.communicate() 

39 if p.returncode != 0: 

40 _logger.error(error) 

41 raise RuntimeError(error) 

42 return p.returncode, info, error 

43 

44 

45def sdsc_globus_path_from_dataset(dset): 

46 """ 

47 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

48 Returns SDSC globus file path from a dset record or a list of dsets records from REST 

49 """ 

50 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=None, uuid=True) 1c

51 

52 

53def sdsc_path_from_dataset(dset, root_path=SDSC_ROOT_PATH): 

54 """ 

55 Returns sdsc file path from a dset record or a list of dsets records from REST 

56 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

57 :param root_path: (optional) the prefix path such as one download directory or sdsc root 

58 """ 

59 return path_from_dataset(dset, root_path=root_path, uuid=True) 1c

60 

61 

62def globus_path_from_dataset(dset, repository=None, uuid=False): 

63 """ 

64 Returns local one file path from a dset record or a list of dsets records from REST 

65 :param dset: dset dictionary or list of dictionaries from ALyx rest endpoint 

66 :param repository: (optional) repository name of the file record (if None, will take 

67 the first filerecord with an URL) 

68 """ 

69 return path_from_dataset(dset, root_path=PurePosixPath('/'), repository=repository, uuid=uuid) 1c

70 

71 

72class Patcher(abc.ABC): 

73 def __init__(self, one=None): 

74 assert one 1b

75 self.one = one 1b

76 

77 def _patch_dataset(self, path, dset_id=None, dry=False, ftp=False): 

78 """ 

79 This private methods gets the dataset information from alyx, computes the local 

80 and remote paths and initiates the file copy 

81 """ 

82 path = Path(path) 

83 if dset_id is None: 

84 dset_id = path.name.split('.')[-2] 

85 if not is_uuid_string(dset_id): 

86 dset_id = None 

87 assert dset_id 

88 assert is_uuid_string(dset_id) 

89 assert path.exists() 

90 dset = self.one.alyx.rest('datasets', "read", id=dset_id) 

91 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 

92 remote_path = Path(fr['data_repository_path']).joinpath(fr['relative_path']) 

93 remote_path = add_uuid_string(remote_path, dset_id).as_posix() 

94 if remote_path.startswith('/'): 

95 full_remote_path = PurePosixPath(FLATIRON_MOUNT + remote_path) 

96 else: 

97 full_remote_path = PurePosixPath(FLATIRON_MOUNT, remote_path) 

98 if isinstance(path, WindowsPath) and not ftp: 

99 # On Windows replace drive map with Globus uri, e.g. C:/ -> /~/C/ 

100 path = globus.as_globus_path(path) 

101 status = self._scp(path, full_remote_path, dry=dry)[0] 

102 return status 

103 

104 def register_dataset(self, file_list, **kwargs): 

105 """ 

106 Registers a set of files belonging to a session only on the server 

107 :param file_list: (list of pathlib.Path) 

108 :param created_by: (string) name of user in Alyx (defaults to 'root') 

109 :param repository: optional: (string) name of the server repository in Alyx 

110 :param versions: optional (list of strings): versions tags (defaults to ibllib version) 

111 :param dry: (bool) False by default 

112 :return: 

113 """ 

114 return register_dataset(file_list, one=self.one, server_only=True, exists=True, **kwargs) 

115 

116 def register_datasets(self, file_list, **kwargs): 

117 """ 

118 Same as register_dataset but works with files belonging to different sessions 

119 """ 

120 register_dict = {} 

121 # creates a dictionary of sessions with one file list per session 

122 for f in file_list: 

123 session_path = get_session_path(f) 

124 label = '_'.join(session_path.parts[-3:]) 

125 if label in register_dict: 

126 register_dict[label]['files'].append(f) 

127 else: 

128 register_dict[label] = {'session_path': session_path, 'files': [f]} 

129 responses = [] 

130 nses = len(register_dict) 

131 for i, label in enumerate(register_dict): 

132 _files = register_dict[label]['files'] 

133 _logger.info(f"{i}/{nses} {label}, registering {len(_files)} files") 

134 responses.append(self.register_dataset(_files, **kwargs)) 

135 return responses 

136 

137 def patch_dataset(self, file_list, dry=False, ftp=False, **kwargs): 

138 """ 

139 Creates a new dataset on FlatIron and uploads it from arbitrary location. 

140 Rules for creation/patching are the same that apply for registration via Alyx 

141 as this uses the registration endpoint to get the dataset. 

142 An existing file (same session and path relative to session) will be patched. 

143 :param path: full file path. Must be within an ALF session folder (subject/date/number) 

144 can also be a list of full file paths belonging to the same session. 

145 :param server_repository: Alyx server repository name 

146 :param created_by: alyx username for the dataset (optional, defaults to root) 

147 :param ftp: flag for case when using ftppatcher. Don't adjust windows path in 

148 _patch_dataset when ftp=True 

149 :return: the registrations response, a list of dataset records 

150 """ 

151 # first register the file 

152 if not isinstance(file_list, list): 

153 file_list = [Path(file_list)] 

154 assert len(set([get_session_path(f) for f in file_list])) == 1 

155 assert all([Path(f).exists() for f in file_list]) 

156 response = self.register_dataset(file_list, dry=dry, **kwargs) 

157 if dry: 

158 return 

159 # from the dataset info, set flatIron flag to exists=True 

160 for p, d in zip(file_list, response): 

161 self._patch_dataset(p, dset_id=d['id'], dry=dry, ftp=ftp) 

162 return response 

163 

164 def patch_datasets(self, file_list, **kwargs): 

165 """ 

166 Same as create_dataset method but works with several sessions 

167 """ 

168 register_dict = {} 

169 # creates a dictionary of sessions with one file list per session 

170 for f in file_list: 

171 session_path = get_session_path(f) 

172 label = '_'.join(session_path.parts[-3:]) 

173 if label in register_dict: 

174 register_dict[label]['files'].append(f) 

175 else: 

176 register_dict[label] = {'session_path': session_path, 'files': [f]} 

177 responses = [] 

178 nses = len(register_dict) 

179 for i, label in enumerate(register_dict): 

180 _files = register_dict[label]['files'] 

181 _logger.info(f"{i}/{nses} {label}, registering {len(_files)} files") 

182 responses.extend(self.patch_dataset(_files, **kwargs)) 

183 return responses 

184 

185 @abc.abstractmethod 

186 def _scp(self, *args, **kwargs): 

187 pass 

188 

189 @abc.abstractmethod 

190 def _rm(self, *args, **kwargs): 

191 pass 

192 

193 

194class GlobusPatcher(Patcher): 

195 """ 

196 Requires GLOBUS keys access 

197 

198 """ 

199 

200 def __init__(self, client_name='default', one=None, label='ibllib patch'): 

201 assert one 

202 self.local_endpoint = getattr(globus.load_client_params(f'globus.{client_name}'), 

203 'local_endpoint', globus.get_local_endpoint_id()) 

204 self.transfer_client = globus.create_globus_client(client_name) 

205 self.label = label 

206 # transfers/delete from the current computer to the flatiron: mandatory and executed first 

207 self.globus_transfer = globus_sdk.TransferData( 

208 self.transfer_client, self.local_endpoint, FLAT_IRON_GLOBUS_ID, verify_checksum=True, 

209 sync_level='checksum', label=label) 

210 self.globus_delete = globus_sdk.DeleteData( 

211 self.transfer_client, FLAT_IRON_GLOBUS_ID, verify_checksum=True, 

212 sync_level='checksum', label=label) 

213 # get a dictionary of data repositories from Alyx (with globus ids) 

214 self.repos = {r['name']: r for r in one.alyx.rest('data-repository', 'list')} 

215 # transfers/delete from flatiron to optional third parties to synchronize / delete 

216 self.globus_transfers_locals = {} 

217 self.globus_deletes_locals = {} 

218 super().__init__(one=one) 

219 

220 def _scp(self, local_path, remote_path, dry=True): 

221 remote_path = PurePosixPath('/').joinpath( 

222 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) 

223 ) 

224 _logger.info(f"Globus copy {local_path} to {remote_path}") 

225 if not dry: 

226 if isinstance(self.globus_transfer, globus_sdk.transfer.data.TransferData): 

227 self.globus_transfer.add_item(local_path, remote_path) 

228 else: 

229 self.globus_transfer.path_src.append(local_path) 

230 self.globus_transfer.path_dest.append(remote_path) 

231 return 0, '' 

232 

233 def _rm(self, flatiron_path, dry=True): 

234 flatiron_path = Path('/').joinpath(flatiron_path.relative_to(Path(FLATIRON_MOUNT))) 

235 _logger.info(f"Globus del {flatiron_path}") 

236 if not dry: 

237 if isinstance(self.globus_delete, globus_sdk.transfer.data.DeleteData): 

238 self.globus_delete.add_item(flatiron_path) 

239 else: 

240 self.globus_delete.path.append(flatiron_path) 

241 return 0, '' 

242 

243 def patch_datasets(self, file_list, **kwargs): 

244 """ 

245 Calls the super method that registers and updates the current computer to Python transfer 

246 Then, creates individual transfer items for each local server so that after the 

247 update on Flatiron, local server files are also updated 

248 :param file_list: 

249 :param kwargs: 

250 :return: 

251 """ 

252 responses = super().patch_datasets(file_list, **kwargs) 

253 for dset in responses: 

254 # get the flatiron path 

255 fr = next(fr for fr in dset['file_records'] if 'flatiron' in fr['data_repository']) 

256 flatiron_path = self.repos[fr['data_repository']]['globus_path'] 

257 flatiron_path = Path(flatiron_path).joinpath(fr['relative_path']) 

258 flatiron_path = add_uuid_string(flatiron_path, dset['id']).as_posix() 

259 # loop over the remaining repositories (local servers) and create a transfer 

260 # from flatiron to the local server 

261 for fr in dset['file_records']: 

262 if fr['data_repository'] == DMZ_REPOSITORY: 

263 continue 

264 repo_gid = self.repos[fr['data_repository']]['globus_endpoint_id'] 

265 if repo_gid == FLAT_IRON_GLOBUS_ID: 

266 continue 

267 # if there is no transfer already created, initialize it 

268 if repo_gid not in self.globus_transfers_locals: 

269 self.globus_transfers_locals[repo_gid] = globus_sdk.TransferData( 

270 self.transfer_client, FLAT_IRON_GLOBUS_ID, repo_gid, verify_checksum=True, 

271 sync_level='checksum', label=f"{self.label} on {fr['data_repository']}") 

272 # get the local server path and create the transfer item 

273 local_server_path = self.repos[fr['data_repository']]['globus_path'] 

274 local_server_path = Path(local_server_path).joinpath(fr['relative_path']) 

275 self.globus_transfers_locals[repo_gid].add_item(flatiron_path, local_server_path) 

276 return responses 

277 

278 def launch_transfers(self, local_servers=False): 

279 """ 

280 patcher.launch_transfers() 

281 Launches the globus transfer and delete from the local patch computer to the flat-rion 

282 :param: local_servers (False): if True, sync the local servers after the main transfer 

283 :return: None 

284 """ 

285 gtc = self.transfer_client 

286 

287 def _wait_for_task(resp): 

288 # patcher.transfer_client.get_task(task_id='364fbdd2-4deb-11eb-8ffb-0a34088e79f9') 

289 # on a good status: 

290 # Out[22]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3011090432, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 873268, 'encrypt_data': False, 'fatal_error': None, 'faults': 6, 'files': 186, 'files_skipped': 12, 'files_transferred': 76, 'history_deleted': False, 'is_ok': True, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'OK', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'OK', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 98, 'subtasks_retrying': 0, 'subtasks_succeeded': 274, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

291 # on a checksum error 

292 # Out[26]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 3715901232, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': None, 'deadline': '2021-01-06T18:10:05+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 912410, 'encrypt_data': False, 'fatal_error': None, 'faults': 7, 'files': 186, 'files_skipped': 12, 'files_transferred': 102, 'history_deleted': False, 'is_ok': False, 'is_paused': False, 'key': 'active,2021-01-03T17:52:34.427087', 'label': '3B analog sync patch', 'nice_status': 'VERIFY_CHECKSUM', 'nice_status_details': None, 'nice_status_expires_in': -1, 'nice_status_short_description': 'checksum verification failed', 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'ACTIVE', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 72, 'subtasks_retrying': 0, 'subtasks_succeeded': 300, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

293 # on a finished task 

294 # Out[4]: TransferResponse({'bytes_checksummed': 377736912, 'bytes_transferred': 4998806664, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T20:04:50+00:00', 'deadline': '2021-01-06T19:11:00+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 629960, 'encrypt_data': False, 'fatal_error': None, 'faults': 15, 'files': 186, 'files_skipped': 12, 'files_transferred': 174, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T20:04:49.540956', 'label': '3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:52:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'SUCCEEDED', 'subtasks_canceled': 0, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 372, 'subtasks_total': 372, 'symlinks': 0, 'sync_level': 3, 'task_id': '364fbdd2-4deb-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

295 # on an errored task 

296 # Out[10]: TransferResponse({'bytes_checksummed': 0, 'bytes_transferred': 0, 'canceled_by_admin': None, 'canceled_by_admin_message': None, 'command': 'API 0.10', 'completion_time': '2021-01-03T17:39:00+00:00', 'deadline': '2021-01-04T17:37:34+00:00', 'delete_destination_extra': False, 'destination_endpoint': 'simonsfoundation#ibl', 'destination_endpoint_display_name': 'IBL Flatiron SDSC Data', 'destination_endpoint_id': 'ab2d064c-413d-11eb-b188-0ee0d5d9299f', 'directories': 0, 'effective_bytes_per_second': 0, 'encrypt_data': False, 'fatal_error': {'code': 'CANCELED', 'description': 'canceled'}, 'faults': 2, 'files': 6, 'files_skipped': 0, 'files_transferred': 0, 'history_deleted': False, 'is_ok': None, 'is_paused': False, 'key': 'complete,2021-01-03T17:38:59.697413', 'label': 'test 3B analog sync patch', 'nice_status': None, 'nice_status_details': None, 'nice_status_expires_in': None, 'nice_status_short_description': None, 'owner_id': 'e633663a-8561-4a5d-ac92-f198d43b14dc', 'preserve_timestamp': False, 'recursive_symlinks': 'ignore', 'request_time': '2021-01-03T17:37:34+00:00', 'source_endpoint': 'internationalbrainlab#916c2766-bd2a-11ea-8f22-0a21f750d19b', 'source_endpoint_display_name': 'olivier_laptop', 'source_endpoint_id': '916c2766-bd2a-11ea-8f22-0a21f750d19b', 'status': 'FAILED', 'subtasks_canceled': 6, 'subtasks_expired': 0, 'subtasks_failed': 0, 'subtasks_pending': 0, 'subtasks_retrying': 0, 'subtasks_succeeded': 6, 'subtasks_total': 12, 'symlinks': 0, 'sync_level': 3, 'task_id': '5706dd2c-4dea-11eb-8ffb-0a34088e79f9', 'type': 'TRANSFER', 'username': 'internationalbrainlab', 'verify_checksum': True}) # noqa 

297 while True: 

298 tinfo = gtc.get_task(task_id=resp['task_id']) 

299 if tinfo and tinfo['completion_time'] is not None: 

300 break 

301 _ = gtc.task_wait(task_id=resp['task_id'], timeout=30) 

302 if tinfo and tinfo['fatal_error'] is not None: 

303 raise ConnectionError(f"Globus transfer failed \n {tinfo}") 

304 

305 # handles the transfers first 

306 if len(self.globus_transfer['DATA']) > 0: 

307 # launch the transfer 

308 _wait_for_task(gtc.submit_transfer(self.globus_transfer)) 

309 # re-initialize the globus_transfer property 

310 self.globus_transfer = globus_sdk.TransferData( 

311 gtc, 

312 self.globus_transfer['source_endpoint'], 

313 self.globus_transfer['destination_endpoint'], 

314 label=self.globus_transfer['label'], 

315 verify_checksum=True, sync_level='checksum') 

316 

317 # do the same for deletes 

318 if len(self.globus_delete['DATA']) > 0: 

319 _wait_for_task(gtc.submit_delete(self.globus_delete)) 

320 self.globus_delete = globus_sdk.DeleteData( 

321 gtc, 

322 endpoint=self.globus_delete['endpoint'], 

323 label=self.globus_delete['label'], 

324 verify_checksum=True, sync_level='checksum') 

325 

326 # launch the local transfers and local deletes 

327 if local_servers: 

328 self.launch_transfers_secondary() 

329 

330 def launch_transfers_secondary(self): 

331 """ 

332 patcher.launch_transfer_secondary() 

333 Launches the globus transfers from flatiron to third-party repositories (local servers) 

334 This should run after the the main transfer from patch computer to the flatiron 

335 :return: None 

336 """ 

337 for lt in self.globus_transfers_locals: 

338 transfer = self.globus_transfers_locals[lt] 

339 if len(transfer['DATA']) > 0: 

340 self.transfer_client.submit_transfer(transfer) 

341 for ld in self.globus_deletes_locals: 

342 delete = self.globus_deletes_locals[ld] 

343 if len(transfer['DATA']) > 0: 

344 self.transfer_client.submit_delete(delete) 

345 

346 

347class SSHPatcher(Patcher): 

348 """ 

349 Requires SSH keys access on the FlatIron 

350 """ 

351 def __init__(self, one=None): 

352 res = _run_command(f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} ls") 

353 if res[0] != 0: 

354 raise PermissionError("Could not connect to the Flatiron via SSH. Check your RSA keys") 

355 super().__init__(one=one) 

356 

357 def _scp(self, local_path, remote_path, dry=True): 

358 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST}" \ 

359 f" mkdir -p {remote_path.parent}; " 

360 cmd += f"scp -P {FLATIRON_PORT} {local_path} {FLATIRON_USER}@{FLATIRON_HOST}:{remote_path}" 

361 return _run_command(cmd, dry=dry) 

362 

363 def _rm(self, flatiron_path, dry=True): 

364 cmd = f"ssh -p {FLATIRON_PORT} {FLATIRON_USER}@{FLATIRON_HOST} rm {flatiron_path}" 

365 return _run_command(cmd, dry=dry) 

366 

367 

368class FTPPatcher(Patcher): 

369 """ 

370 This is used to register from anywhere without write access to FlatIron 

371 """ 

372 def __init__(self, one=None): 

373 super().__init__(one=one) 1b

374 alyx = self.one.alyx 1b

375 if not getattr(alyx._par, 'FTP_DATA_SERVER_LOGIN', False): 1b

376 alyx._par = self.setup(par=alyx._par, silent=alyx.silent) 1b

377 login, pwd = (one.alyx._par.FTP_DATA_SERVER_LOGIN, one.alyx._par.FTP_DATA_SERVER_PWD) 1b

378 self.ftp = ftplib.FTP_TLS(host=FTP_HOST, user=login, passwd=pwd) 1b

379 # self.ftp.ssl_version = ssl.PROTOCOL_TLSv1 

380 # self.ftp.auth() 

381 self.ftp.prot_p() 1b

382 self.ftp.login(login, pwd) 1b

383 # pre-fetch the repositories so as not to query them for every file registered 

384 self.repositories = self.one.alyx.rest("data-repository", "list") 1b

385 

386 @staticmethod 

387 def setup(par=None, silent=False): 

388 """ 

389 Set up (and save) FTP login parameters 

390 :param par: A parameters object to modify, if None the default Webclient parameters are 

391 loaded 

392 :param silent: If true, the defaults are used with no user input prompt 

393 :return: the modified parameters object 

394 """ 

395 DEFAULTS = { 1b

396 "FTP_DATA_SERVER": "ftp://ibl.flatironinstitute.org", 

397 "FTP_DATA_SERVER_LOGIN": "iblftp", 

398 "FTP_DATA_SERVER_PWD": None 

399 } 

400 if par is None: 1b

401 par = params.get(silent=silent) 

402 par = iopar.as_dict(par) 1b

403 

404 if silent: 1b

405 DEFAULTS.update(par) 1b

406 par = DEFAULTS 1b

407 else: 

408 for k in DEFAULTS.keys(): 1b

409 cpar = par.get(k, DEFAULTS[k]) 1b

410 # Iterate through non-password pars; skip url if client url already provided 

411 if 'PWD' not in k: 1b

412 par[k] = input(f'Param {k}, current value is ["{cpar}"]:') or cpar 1b

413 else: 

414 prompt = f'Param {k} (leave empty to leave unchanged):' 1b

415 par[k] = getpass(prompt) or cpar 1b

416 

417 # Get the client key 

418 client = par.get('ALYX_URL', None) 1b

419 client_key = params._key_from_url(client) if client else params.get_default_client() 1b

420 # Save the parameters 

421 params.save(par, client_key) # Client params 1b

422 return iopar.from_dict(par) 1b

423 

424 def create_dataset(self, path, created_by='root', dry=False, repository=DMZ_REPOSITORY, 

425 **kwargs): 

426 # overrides the superclass just to remove the server repository argument 

427 response = super().patch_dataset(path, created_by=created_by, dry=dry, 

428 repository=repository, ftp=True, **kwargs) 

429 # need to patch the file records to be consistent 

430 for ds in response: 

431 frs = ds['file_records'] 

432 fr_server = next(filter(lambda fr: 'flatiron' in fr['data_repository'], frs)) 

433 fr_ftp = next(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and 

434 fr['relative_path'] == fr_server['relative_path'], frs)) 

435 reposerver = next(filter(lambda rep: rep['name'] == fr_server['data_repository'], 

436 self.repositories)) 

437 relative_path = str(PurePosixPath(reposerver['globus_path']).joinpath( 

438 PurePosixPath(fr_ftp['relative_path'])))[1:] 

439 # 1) if there was already a file, the registration created a duplicate 

440 fr_2del = list(filter(lambda fr: fr['data_repository'] == DMZ_REPOSITORY and 

441 fr['relative_path'] == relative_path, frs)) # NOQA 

442 if len(fr_2del) == 1: 

443 self.one.alyx.rest('files', 'delete', id=fr_2del[0]['id']) 

444 # 2) the patch ftp file needs to be prepended with the server repository path 

445 self.one.alyx.rest('files', 'partial_update', id=fr_ftp['id'], 

446 data={'relative_path': relative_path, 'exists': True}) 

447 # 3) the server file is labeled as not existing 

448 self.one.alyx.rest('files', 'partial_update', id=fr_server['id'], 

449 data={'exists': False}) 

450 return response 

451 

452 def _scp(self, local_path, remote_path, dry=True): 

453 # remote_path = '/mnt/ibl/zadorlab/Subjects/flowers/2018-07-13/001 

454 remote_path = PurePosixPath('/').joinpath( 

455 remote_path.relative_to(PurePosixPath(FLATIRON_MOUNT)) 

456 ) 

457 # local_path 

458 self.mktree(remote_path.parent) 

459 self.ftp.pwd() 

460 _logger.info(f"FTP upload {local_path}") 

461 with open(local_path, 'rb') as fid: 

462 self.ftp.storbinary(f'STOR {local_path.name}', fid) 

463 return 0, '' 

464 

465 def mktree(self, remote_path): 

466 """ Browse to the tree on the ftp server, making directories on the way""" 

467 if str(remote_path) != '.': 

468 try: 

469 self.ftp.cwd(str(remote_path)) 

470 except ftplib.error_perm: 

471 self.mktree(PurePosixPath(remote_path.parent)) 

472 self.ftp.mkd(str(remote_path)) 

473 self.ftp.cwd(str(remote_path)) 

474 

475 def _rm(self, flatiron_path, dry=True): 

476 raise PermissionError("This Patcher does not have admin permissions to remove data " 

477 "from the FlatIron server. ") 

478 

479 

480class SDSCPatcher(Patcher): 

481 """ 

482 This is used to patch data on the SDSC server 

483 """ 

484 def __init__(self, one=None): 

485 assert one 

486 super().__init__(one=one) 

487 

488 def patch_datasets(self, file_list, **kwargs): 

489 response = super().patch_datasets(file_list, **kwargs) 

490 

491 # TODO check the file records to see if they have local server ones 

492 # If they do then need to remove file record and delete file from local server?? 

493 

494 return response 

495 

496 def _scp(self, local_path, remote_path, dry=True): 

497 

498 _logger.info(f"Copy {local_path} to {remote_path}") 

499 if not dry: 

500 if not Path(remote_path).parent.exists(): 

501 Path(remote_path).parent.mkdir(exist_ok=True, parents=True) 

502 shutil.copy(local_path, remote_path) 

503 return 0, '' 

504 

505 def _rm(self, flatiron_path, dry=True): 

506 raise PermissionError("This Patcher does not have admin permissions to remove data " 

507 "from the FlatIron server")