1# Copyright 2017 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import base64
16import datetime
17import json
18
19import mock
20import pytest
21from six.moves import http_client
22
23from google.auth import _helpers
24from google.auth import exceptions
25from google.auth import iam
26from google.auth import transport
27import google.auth.credentials
28
29
30def make_request(status, data=None):
31    response = mock.create_autospec(transport.Response, instance=True)
32    response.status = status
33
34    if data is not None:
35        response.data = json.dumps(data).encode("utf-8")
36
37    request = mock.create_autospec(transport.Request)
38    request.return_value = response
39    return request
40
41
42def make_credentials():
43    class CredentialsImpl(google.auth.credentials.Credentials):
44        def __init__(self):
45            super(CredentialsImpl, self).__init__()
46            self.token = "token"
47            # Force refresh
48            self.expiry = datetime.datetime.min + _helpers.REFRESH_THRESHOLD
49
50        def refresh(self, request):
51            pass
52
53        def with_quota_project(self, quota_project_id):
54            raise NotImplementedError()
55
56    return CredentialsImpl()
57
58
59class TestSigner(object):
60    def test_constructor(self):
61        request = mock.sentinel.request
62        credentials = mock.create_autospec(
63            google.auth.credentials.Credentials, instance=True
64        )
65
66        signer = iam.Signer(request, credentials, mock.sentinel.service_account_email)
67
68        assert signer._request == mock.sentinel.request
69        assert signer._credentials == credentials
70        assert signer._service_account_email == mock.sentinel.service_account_email
71
72    def test_key_id(self):
73        signer = iam.Signer(
74            mock.sentinel.request,
75            mock.sentinel.credentials,
76            mock.sentinel.service_account_email,
77        )
78
79        assert signer.key_id is None
80
81    def test_sign_bytes(self):
82        signature = b"DEADBEEF"
83        encoded_signature = base64.b64encode(signature).decode("utf-8")
84        request = make_request(http_client.OK, data={"signedBlob": encoded_signature})
85        credentials = make_credentials()
86
87        signer = iam.Signer(request, credentials, mock.sentinel.service_account_email)
88
89        returned_signature = signer.sign("123")
90
91        assert returned_signature == signature
92        kwargs = request.call_args[1]
93        assert kwargs["headers"]["Content-Type"] == "application/json"
94
95    def test_sign_bytes_failure(self):
96        request = make_request(http_client.UNAUTHORIZED)
97        credentials = make_credentials()
98
99        signer = iam.Signer(request, credentials, mock.sentinel.service_account_email)
100
101        with pytest.raises(exceptions.TransportError):
102            signer.sign("123")
103