Basic SVN management web UI works.
[pillar-svnman.git] / svnman / remote.py
1 import typing
2
3 import attr
4 import requests
5
6 from pillar import attrs_extra
7
8 from . import exceptions
9
10 # For replacing the hash type indicator, as Apache only
11 # understands BCrypt when using the 2y marker.
12 HASH_TYPES_TO_REPLACE = {'$2a$', '$2b$'}
13
14
15 @attr.s
16 class RepoDescription:
17     repo_id: str = attrs_extra.string()
18     access: typing.List[str] = attr.ib(validator=attr.validators.instance_of(list))
19
20
21 @attr.s
22 class CreateRepo:
23     repo_id: str = attrs_extra.string()
24     project_id: str = attrs_extra.string()
25     creator: str = attrs_extra.string()
26
27
28 @attr.s
29 class API:
30     # The remote URL and credentials are separate. This way we can log the
31     # URL that is used in requests without worrying about leaking creds.
32     remote_url: str = attr.ib(validator=attr.validators.instance_of(str))
33     """URL of the remote SVNMan API.
34     
35     Should probably end in '/api/'.
36     """
37
38     username: str = attr.ib(validator=attr.validators.instance_of(str))
39     """Username for authenticating ourselves with the API."""
40     password: str = attr.ib(validator=attr.validators.instance_of(str), repr=False)
41     """Password for authenticating ourselves with the API."""
42
43     _log = attrs_extra.log('%s.Remote' % __name__)
44     _session = requests.Session()
45
46     def __attrs_post_init__(self):
47         from requests.adapters import HTTPAdapter
48         self._session.mount('/', HTTPAdapter(max_retries=10))
49
50     def _request(self, method: str, rel_url: str, **kwargs) -> requests.Response:
51         """Performs a HTTP request on the API server."""
52
53         from urllib.parse import urljoin
54
55         abs_url = urljoin(self.remote_url, rel_url)
56         self._log.getChild('request').info('%s %s', method, abs_url)
57
58         auth = (self.username, self.password) if self.username or self.password else None
59         return self._session.request(method, abs_url, auth=auth, **kwargs)
60
61     def _raise_for_status(self, resp: requests.Response):
62         """Raises the appropriate exception for the given response."""
63
64         if resp.status_code < 400:
65             return
66
67         exc_class = exceptions.http_error_map[resp.status_code]
68         raise exc_class(resp.text)
69
70     def fetch_repo(self, repo_id: str) -> RepoDescription:
71         """Fetches repository information from the remote."""
72
73         resp = self._request('GET', f'repo/{repo_id}')
74         self._raise_for_status(resp)
75
76         return RepoDescription(**resp.json())
77
78     def create_repo(self, create_repo: CreateRepo) -> str:
79         """Creates a new repository with the given ID.
80
81         Note that the repository ID may be changed by the SVNMan;
82         always use the repo ID as returned by this function.
83
84         :param create_repo: info required by the API
85         :raises svnman.exceptions.RepoAlreadyExists:
86         :returns: the repository ID as returned by the SVNMan.
87         """
88
89         self._log.info('Creating repository %r', create_repo)
90         resp = self._request('POST', 'repo', json=attr.asdict(create_repo))
91         if resp.status_code == requests.codes.conflict:
92             raise exceptions.RepoAlreadyExists(create_repo.repo_id)
93         self._raise_for_status(resp)
94
95         repo_info = resp.json()
96         return repo_info['repo_id']
97
98     def modify_access(self,
99                       repo_id: str,
100                       grant: typing.List[typing.Tuple[str, str]],
101                       revoke: typing.List[str]):
102         """Modifies user access to the repository.
103
104         Does not return anything; no exception means exection was ok.
105
106         :param repo_id: the repository ID
107         :param grant: list of (username password) tuples. The passwords should be BCrypt-hashed.
108         :param revoke: list of usernames.
109         """
110
111         # Replace the hash type indicator, as Apache only gets BCrypt
112         # when using the 2y marker.
113         def changehash(p):
114             if p[:4] in HASH_TYPES_TO_REPLACE:
115                 return f'$2y${p[4:]}'
116             return p
117
118         grants = [{'username': u,
119                    'password': changehash(p)} for u, p in grant]
120
121         self._log.info('Modifying access rules for repository %r: grants=%s revokes=%s',
122                        repo_id, [u for u, p in grant], revoke)
123
124         resp = self._request('POST', f'repo/{repo_id}/access', json={
125             'grant': grants,
126             'revoke': revoke,
127         })
128         self._raise_for_status(resp)
129
130     def delete_repo(self, repo_id: str):
131         """Deletes a repository, cannot be undone through the API."""
132
133         self._log.info('Deleting repository %r', repo_id)
134         resp = self._request('DELETE', f'repo/{repo_id}')
135         self._raise_for_status(resp)