Coverage for ibllib/io/globus.py: 48%
48 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1"""TODO: This entire module may be removed in favour of one.remote.globus"""
2import re
3import sys
4import os
5from pathlib import Path
7import globus_sdk as globus
8from iblutil.io import params
11def as_globus_path(path):
12 """
13 Convert a path into one suitable for the Globus TransferClient. NB: If using tilda in path,
14 the home folder of your Globus Connect instance must be the same as the OS home dir.
16 :param path: A path str or Path instance
17 :return: A formatted path string
19 Examples:
20 # A Windows path
21 >>> as_globus_path('E:\\FlatIron\\integration')
22 >>> '/E/FlatIron/integration'
24 # A relative POSIX path
25 >>> as_globus_path('../data/integration')
26 >>> '/mnt/data/integration'
28 # A globus path
29 >>> as_globus_path('/E/FlatIron/integration')
30 >>> '/E/FlatIron/integration'
31 TODO Remove in favour of one.remote.globus.as_globus_path
32 """
33 path = str(path) 1c
34 if (
35 re.match(r'/[A-Z]($|/)', path)
36 if sys.platform in ('win32', 'cygwin')
37 else Path(path).is_absolute()
38 ):
39 return path 1c
40 path = Path(path).resolve()
41 if path.drive:
42 path = '/' + str(path.as_posix().replace(':', '', 1))
43 return str(path)
46def _login(globus_client_id, refresh_tokens=False):
47 # TODO Import from one.remove.globus
48 client = globus.NativeAppAuthClient(globus_client_id)
49 client.oauth2_start_flow(refresh_tokens=refresh_tokens)
51 authorize_url = client.oauth2_get_authorize_url()
52 print('Please go to this URL and login: {0}'.format(authorize_url))
53 auth_code = input(
54 'Please enter the code you get after login here: ').strip()
56 token_response = client.oauth2_exchange_code_for_tokens(auth_code)
57 globus_transfer_data = token_response.by_resource_server['transfer.api.globus.org']
59 token = dict(refresh_token=globus_transfer_data['refresh_token'],
60 access_token=globus_transfer_data['access_token'],
61 expires_at_seconds=globus_transfer_data['expires_at_seconds'],
62 )
63 return token
66def login(globus_client_id):
67 # TODO Import from one.remove.globus
68 token = _login(globus_client_id, refresh_tokens=False)
69 authorizer = globus.AccessTokenAuthorizer(token['access_token'])
70 tc = globus.TransferClient(authorizer=authorizer)
71 return tc
74def setup(globus_client_id, str_app='globus/default'):
75 # TODO Import from one.remove.globus
76 # Lookup and manage consents there
77 # https://auth.globus.org/v2/web/consents
78 gtok = _login(globus_client_id, refresh_tokens=True)
79 params.write(str_app, gtok)
82def login_auto(globus_client_id, str_app='globus/default'):
83 # TODO Import from one.remove.globus
84 token = params.read(str_app, {}) 1b
85 required_fields = {'refresh_token', 'access_token', 'expires_at_seconds'} 1b
86 if not (token and required_fields.issubset(token.as_dict())): 1b
87 raise ValueError("Token file doesn't exist, run ibllib.io.globus.setup first") 1b
88 client = globus.NativeAppAuthClient(globus_client_id) 1b
89 client.oauth2_start_flow(refresh_tokens=True) 1b
90 authorizer = globus.RefreshTokenAuthorizer(token.refresh_token, client) 1b
91 return globus.TransferClient(authorizer=authorizer) 1b
94def get_local_endpoint():
95 # TODO Remove in favour of one.remote.globus.get_local_endpoint_id
96 if sys.platform == 'win32' or sys.platform == 'cygwin':
97 id_path = Path(os.environ['LOCALAPPDATA']).joinpath("Globus Connect")
98 else:
99 id_path = Path.home().joinpath(".globusonline", "lta")
100 with open(id_path / "client-id.txt", 'r') as fid:
101 globus_id = fid.read()
102 return globus_id.strip()