1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import os
16import sys
17
18import mock
19import OpenSSL
20import pytest
21from six.moves import http_client
22import urllib3
23
24from google.auth import environment_vars
25from google.auth import exceptions
26import google.auth.credentials
27import google.auth.transport._mtls_helper
28import google.auth.transport.urllib3
29from google.oauth2 import service_account
30from tests.transport import compliance
31
32
33class TestRequestResponse(compliance.RequestResponseTests):
34    def make_request(self):
35        http = urllib3.PoolManager()
36        return google.auth.transport.urllib3.Request(http)
37
38    def test_timeout(self):
39        http = mock.create_autospec(urllib3.PoolManager)
40        request = google.auth.transport.urllib3.Request(http)
41        request(url="http://example.com", method="GET", timeout=5)
42
43        assert http.request.call_args[1]["timeout"] == 5
44
45
46def test__make_default_http_with_certifi():
47    http = google.auth.transport.urllib3._make_default_http()
48    assert "cert_reqs" in http.connection_pool_kw
49
50
51@mock.patch.object(google.auth.transport.urllib3, "certifi", new=None)
52def test__make_default_http_without_certifi():
53    http = google.auth.transport.urllib3._make_default_http()
54    assert "cert_reqs" not in http.connection_pool_kw
55
56
57class CredentialsStub(google.auth.credentials.Credentials):
58    def __init__(self, token="token"):
59        super(CredentialsStub, self).__init__()
60        self.token = token
61
62    def apply(self, headers, token=None):
63        headers["authorization"] = self.token
64
65    def before_request(self, request, method, url, headers):
66        self.apply(headers)
67
68    def refresh(self, request):
69        self.token += "1"
70
71    def with_quota_project(self, quota_project_id):
72        raise NotImplementedError()
73
74
75class HttpStub(object):
76    def __init__(self, responses, headers=None):
77        self.responses = responses
78        self.requests = []
79        self.headers = headers or {}
80
81    def urlopen(self, method, url, body=None, headers=None, **kwargs):
82        self.requests.append((method, url, body, headers, kwargs))
83        return self.responses.pop(0)
84
85
86class ResponseStub(object):
87    def __init__(self, status=http_client.OK, data=None):
88        self.status = status
89        self.data = data
90
91
92class TestMakeMutualTlsHttp(object):
93    def test_success(self):
94        http = google.auth.transport.urllib3._make_mutual_tls_http(
95            pytest.public_cert_bytes, pytest.private_key_bytes
96        )
97        assert isinstance(http, urllib3.PoolManager)
98
99    def test_crypto_error(self):
100        with pytest.raises(OpenSSL.crypto.Error):
101            google.auth.transport.urllib3._make_mutual_tls_http(
102                b"invalid cert", b"invalid key"
103            )
104
105    @mock.patch.dict("sys.modules", {"OpenSSL.crypto": None})
106    def test_import_error(self):
107        with pytest.raises(ImportError):
108            google.auth.transport.urllib3._make_mutual_tls_http(
109                pytest.public_cert_bytes, pytest.private_key_bytes
110            )
111
112
113class TestAuthorizedHttp(object):
114    TEST_URL = "http://example.com"
115
116    def test_authed_http_defaults(self):
117        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
118            mock.sentinel.credentials
119        )
120
121        assert authed_http.credentials == mock.sentinel.credentials
122        assert isinstance(authed_http.http, urllib3.PoolManager)
123
124    def test_urlopen_no_refresh(self):
125        credentials = mock.Mock(wraps=CredentialsStub())
126        response = ResponseStub()
127        http = HttpStub([response])
128
129        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
130            credentials, http=http
131        )
132
133        result = authed_http.urlopen("GET", self.TEST_URL)
134
135        assert result == response
136        assert credentials.before_request.called
137        assert not credentials.refresh.called
138        assert http.requests == [
139            ("GET", self.TEST_URL, None, {"authorization": "token"}, {})
140        ]
141
142    def test_urlopen_refresh(self):
143        credentials = mock.Mock(wraps=CredentialsStub())
144        final_response = ResponseStub(status=http_client.OK)
145        # First request will 401, second request will succeed.
146        http = HttpStub([ResponseStub(status=http_client.UNAUTHORIZED), final_response])
147
148        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
149            credentials, http=http
150        )
151
152        authed_http = authed_http.urlopen("GET", "http://example.com")
153
154        assert authed_http == final_response
155        assert credentials.before_request.call_count == 2
156        assert credentials.refresh.called
157        assert http.requests == [
158            ("GET", self.TEST_URL, None, {"authorization": "token"}, {}),
159            ("GET", self.TEST_URL, None, {"authorization": "token1"}, {}),
160        ]
161
162    def test_urlopen_no_default_host(self):
163        credentials = mock.create_autospec(service_account.Credentials)
164
165        authed_http = google.auth.transport.urllib3.AuthorizedHttp(credentials)
166
167        authed_http.credentials._create_self_signed_jwt.assert_called_once_with(None)
168
169    def test_urlopen_with_default_host(self):
170        default_host = "pubsub.googleapis.com"
171        credentials = mock.create_autospec(service_account.Credentials)
172
173        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
174            credentials, default_host=default_host
175        )
176
177        authed_http.credentials._create_self_signed_jwt.assert_called_once_with(
178            "https://{}/".format(default_host)
179        )
180
181    def test_proxies(self):
182        http = mock.create_autospec(urllib3.PoolManager)
183        authed_http = google.auth.transport.urllib3.AuthorizedHttp(None, http=http)
184
185        with authed_http:
186            pass
187
188        assert http.__enter__.called
189        assert http.__exit__.called
190
191        authed_http.headers = mock.sentinel.headers
192        assert authed_http.headers == http.headers
193
194    @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True)
195    def test_configure_mtls_channel_with_callback(self, mock_make_mutual_tls_http):
196        callback = mock.Mock()
197        callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes)
198
199        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
200            credentials=mock.Mock(), http=mock.Mock()
201        )
202
203        with pytest.warns(UserWarning):
204            with mock.patch.dict(
205                os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
206            ):
207                is_mtls = authed_http.configure_mtls_channel(callback)
208
209        assert is_mtls
210        mock_make_mutual_tls_http.assert_called_once_with(
211            cert=pytest.public_cert_bytes, key=pytest.private_key_bytes
212        )
213
214    @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True)
215    @mock.patch(
216        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
217    )
218    def test_configure_mtls_channel_with_metadata(
219        self, mock_get_client_cert_and_key, mock_make_mutual_tls_http
220    ):
221        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
222            credentials=mock.Mock()
223        )
224
225        mock_get_client_cert_and_key.return_value = (
226            True,
227            pytest.public_cert_bytes,
228            pytest.private_key_bytes,
229        )
230        with mock.patch.dict(
231            os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
232        ):
233            is_mtls = authed_http.configure_mtls_channel()
234
235        assert is_mtls
236        mock_get_client_cert_and_key.assert_called_once()
237        mock_make_mutual_tls_http.assert_called_once_with(
238            cert=pytest.public_cert_bytes, key=pytest.private_key_bytes
239        )
240
241    @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True)
242    @mock.patch(
243        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
244    )
245    def test_configure_mtls_channel_non_mtls(
246        self, mock_get_client_cert_and_key, mock_make_mutual_tls_http
247    ):
248        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
249            credentials=mock.Mock()
250        )
251
252        mock_get_client_cert_and_key.return_value = (False, None, None)
253        with mock.patch.dict(
254            os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
255        ):
256            is_mtls = authed_http.configure_mtls_channel()
257
258        assert not is_mtls
259        mock_get_client_cert_and_key.assert_called_once()
260        mock_make_mutual_tls_http.assert_not_called()
261
262    @mock.patch(
263        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
264    )
265    def test_configure_mtls_channel_exceptions(self, mock_get_client_cert_and_key):
266        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
267            credentials=mock.Mock()
268        )
269
270        mock_get_client_cert_and_key.side_effect = exceptions.ClientCertError()
271        with pytest.raises(exceptions.MutualTLSChannelError):
272            with mock.patch.dict(
273                os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}
274            ):
275                authed_http.configure_mtls_channel()
276
277        mock_get_client_cert_and_key.return_value = (False, None, None)
278        with mock.patch.dict("sys.modules"):
279            sys.modules["OpenSSL"] = None
280            with pytest.raises(exceptions.MutualTLSChannelError):
281                with mock.patch.dict(
282                    os.environ,
283                    {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"},
284                ):
285                    authed_http.configure_mtls_channel()
286
287    @mock.patch(
288        "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True
289    )
290    def test_configure_mtls_channel_without_client_cert_env(
291        self, get_client_cert_and_key
292    ):
293        callback = mock.Mock()
294
295        authed_http = google.auth.transport.urllib3.AuthorizedHttp(
296            credentials=mock.Mock(), http=mock.Mock()
297        )
298
299        # Test the callback is not called if GOOGLE_API_USE_CLIENT_CERTIFICATE is not set.
300        is_mtls = authed_http.configure_mtls_channel(callback)
301        assert not is_mtls
302        callback.assert_not_called()
303
304        # Test ADC client cert is not used if GOOGLE_API_USE_CLIENT_CERTIFICATE is not set.
305        is_mtls = authed_http.configure_mtls_channel(callback)
306        assert not is_mtls
307        get_client_cert_and_key.assert_not_called()
308