Removed superflous (object)
[pillar-svnman.git] / svnman / remote.py
1 import typing
2
3 import attr
4 import requests
5
6 from pillar import attrs_extra
7
8 from . import exceptions
9
10
11 @attr.s
12 class RepoDescription:
13     repo_id: str = attr.ib(validator=attr.validators.instance_of(str))
14     access: typing.List[str] = attr.ib(validator=attr.validators.instance_of(list))
15
16
17 @attr.s
18 class API:
19     # The remote URL and credentials are separate. This way we can log the
20     # URL that is used in requests without worrying about leaking creds.
21     remote_url: str = attr.ib(validator=attr.validators.instance_of(str))
22     """URL of the remote SVNMan API.
23     
24     Should probably end in '/api/'.
25     """
26
27     username: str = attr.ib(validator=attr.validators.instance_of(str))
28     """Username for authenticating ourselves with the API."""
29     password: str = attr.ib(validator=attr.validators.instance_of(str), repr=False)
30     """Password for authenticating ourselves with the API."""
31
32     _log = attrs_extra.log('%s.Remote' % __name__)
33     _session = requests.Session()
34
35     def __attrs_post_init__(self):
36         from requests.adapters import HTTPAdapter
37         self._session.mount('/', HTTPAdapter(max_retries=10))
38
39     def _request(self, method: str, rel_url: str, **kwargs) -> requests.Response:
40         """Performs a HTTP request on the API server."""
41
42         from urllib.parse import urljoin
43
44         abs_url = urljoin(self.remote_url, rel_url)
45         self._log.getChild('request').info('%s %s', method, abs_url)
46
47         auth = (self.username, self.password) if self.username or self.password else None
48         return self._session.request(method, abs_url, auth=auth, **kwargs)
49
50     def fetch_repo(self, repo_id: str) -> RepoDescription:
51         """Fetches repository information from the remote."""
52
53         resp = self._request('GET', f'repo/{repo_id}')
54
55         if resp.status_code == requests.codes.conflict:
56             raise exceptions.RepoAlreadyExists(repo_id)
57         resp.raise_for_status()
58
59         return RepoDescription(**resp.json())
60
61     def modify_access(self,
62                       repo_id: str,
63                       grant: typing.List[typing.Tuple[str, str]],
64                       revoke: typing.List[str]):
65         """Modifies user access to the repository.
66
67         Does not return anything; no exception means exection was ok.
68
69         :param repo_id: the repository ID
70         :param grant: list of (username password) tuples. The passwords should be BCrypt-hashed.
71         :param revoke: list of usernames.
72         """
73
74         grants = [{'username': u, 'password': p} for u, p in grant]
75
76         resp = self._request('POST', f'repo/{repo_id}/access', json={
77             'grant': grants,
78             'revoke': revoke,
79         })
80         resp.raise_for_status()