1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import os 16import sys 17 18import mock 19import OpenSSL 20import pytest 21from six.moves import http_client 22import urllib3 23 24from google.auth import environment_vars 25from google.auth import exceptions 26import google.auth.credentials 27import google.auth.transport._mtls_helper 28import google.auth.transport.urllib3 29from google.oauth2 import service_account 30from tests.transport import compliance 31 32 33class TestRequestResponse(compliance.RequestResponseTests): 34 def make_request(self): 35 http = urllib3.PoolManager() 36 return google.auth.transport.urllib3.Request(http) 37 38 def test_timeout(self): 39 http = mock.create_autospec(urllib3.PoolManager) 40 request = google.auth.transport.urllib3.Request(http) 41 request(url="http://example.com", method="GET", timeout=5) 42 43 assert http.request.call_args[1]["timeout"] == 5 44 45 46def test__make_default_http_with_certifi(): 47 http = google.auth.transport.urllib3._make_default_http() 48 assert "cert_reqs" in http.connection_pool_kw 49 50 51@mock.patch.object(google.auth.transport.urllib3, "certifi", new=None) 52def test__make_default_http_without_certifi(): 53 http = google.auth.transport.urllib3._make_default_http() 54 assert "cert_reqs" not in http.connection_pool_kw 55 56 57class CredentialsStub(google.auth.credentials.Credentials): 58 def __init__(self, token="token"): 59 super(CredentialsStub, self).__init__() 60 self.token = token 61 62 def apply(self, headers, token=None): 63 headers["authorization"] = self.token 64 65 def before_request(self, request, method, url, headers): 66 self.apply(headers) 67 68 def refresh(self, request): 69 self.token += "1" 70 71 def with_quota_project(self, quota_project_id): 72 raise NotImplementedError() 73 74 75class HttpStub(object): 76 def __init__(self, responses, headers=None): 77 self.responses = responses 78 self.requests = [] 79 self.headers = headers or {} 80 81 def urlopen(self, method, url, body=None, headers=None, **kwargs): 82 self.requests.append((method, url, body, headers, kwargs)) 83 return self.responses.pop(0) 84 85 86class ResponseStub(object): 87 def __init__(self, status=http_client.OK, data=None): 88 self.status = status 89 self.data = data 90 91 92class TestMakeMutualTlsHttp(object): 93 def test_success(self): 94 http = google.auth.transport.urllib3._make_mutual_tls_http( 95 pytest.public_cert_bytes, pytest.private_key_bytes 96 ) 97 assert isinstance(http, urllib3.PoolManager) 98 99 def test_crypto_error(self): 100 with pytest.raises(OpenSSL.crypto.Error): 101 google.auth.transport.urllib3._make_mutual_tls_http( 102 b"invalid cert", b"invalid key" 103 ) 104 105 @mock.patch.dict("sys.modules", {"OpenSSL.crypto": None}) 106 def test_import_error(self): 107 with pytest.raises(ImportError): 108 google.auth.transport.urllib3._make_mutual_tls_http( 109 pytest.public_cert_bytes, pytest.private_key_bytes 110 ) 111 112 113class TestAuthorizedHttp(object): 114 TEST_URL = "http://example.com" 115 116 def test_authed_http_defaults(self): 117 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 118 mock.sentinel.credentials 119 ) 120 121 assert authed_http.credentials == mock.sentinel.credentials 122 assert isinstance(authed_http.http, urllib3.PoolManager) 123 124 def test_urlopen_no_refresh(self): 125 credentials = mock.Mock(wraps=CredentialsStub()) 126 response = ResponseStub() 127 http = HttpStub([response]) 128 129 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 130 credentials, http=http 131 ) 132 133 result = authed_http.urlopen("GET", self.TEST_URL) 134 135 assert result == response 136 assert credentials.before_request.called 137 assert not credentials.refresh.called 138 assert http.requests == [ 139 ("GET", self.TEST_URL, None, {"authorization": "token"}, {}) 140 ] 141 142 def test_urlopen_refresh(self): 143 credentials = mock.Mock(wraps=CredentialsStub()) 144 final_response = ResponseStub(status=http_client.OK) 145 # First request will 401, second request will succeed. 146 http = HttpStub([ResponseStub(status=http_client.UNAUTHORIZED), final_response]) 147 148 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 149 credentials, http=http 150 ) 151 152 authed_http = authed_http.urlopen("GET", "http://example.com") 153 154 assert authed_http == final_response 155 assert credentials.before_request.call_count == 2 156 assert credentials.refresh.called 157 assert http.requests == [ 158 ("GET", self.TEST_URL, None, {"authorization": "token"}, {}), 159 ("GET", self.TEST_URL, None, {"authorization": "token1"}, {}), 160 ] 161 162 def test_urlopen_no_default_host(self): 163 credentials = mock.create_autospec(service_account.Credentials) 164 165 authed_http = google.auth.transport.urllib3.AuthorizedHttp(credentials) 166 167 authed_http.credentials._create_self_signed_jwt.assert_called_once_with(None) 168 169 def test_urlopen_with_default_host(self): 170 default_host = "pubsub.googleapis.com" 171 credentials = mock.create_autospec(service_account.Credentials) 172 173 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 174 credentials, default_host=default_host 175 ) 176 177 authed_http.credentials._create_self_signed_jwt.assert_called_once_with( 178 "https://{}/".format(default_host) 179 ) 180 181 def test_proxies(self): 182 http = mock.create_autospec(urllib3.PoolManager) 183 authed_http = google.auth.transport.urllib3.AuthorizedHttp(None, http=http) 184 185 with authed_http: 186 pass 187 188 assert http.__enter__.called 189 assert http.__exit__.called 190 191 authed_http.headers = mock.sentinel.headers 192 assert authed_http.headers == http.headers 193 194 @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True) 195 def test_configure_mtls_channel_with_callback(self, mock_make_mutual_tls_http): 196 callback = mock.Mock() 197 callback.return_value = (pytest.public_cert_bytes, pytest.private_key_bytes) 198 199 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 200 credentials=mock.Mock(), http=mock.Mock() 201 ) 202 203 with pytest.warns(UserWarning): 204 with mock.patch.dict( 205 os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} 206 ): 207 is_mtls = authed_http.configure_mtls_channel(callback) 208 209 assert is_mtls 210 mock_make_mutual_tls_http.assert_called_once_with( 211 cert=pytest.public_cert_bytes, key=pytest.private_key_bytes 212 ) 213 214 @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True) 215 @mock.patch( 216 "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True 217 ) 218 def test_configure_mtls_channel_with_metadata( 219 self, mock_get_client_cert_and_key, mock_make_mutual_tls_http 220 ): 221 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 222 credentials=mock.Mock() 223 ) 224 225 mock_get_client_cert_and_key.return_value = ( 226 True, 227 pytest.public_cert_bytes, 228 pytest.private_key_bytes, 229 ) 230 with mock.patch.dict( 231 os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} 232 ): 233 is_mtls = authed_http.configure_mtls_channel() 234 235 assert is_mtls 236 mock_get_client_cert_and_key.assert_called_once() 237 mock_make_mutual_tls_http.assert_called_once_with( 238 cert=pytest.public_cert_bytes, key=pytest.private_key_bytes 239 ) 240 241 @mock.patch("google.auth.transport.urllib3._make_mutual_tls_http", autospec=True) 242 @mock.patch( 243 "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True 244 ) 245 def test_configure_mtls_channel_non_mtls( 246 self, mock_get_client_cert_and_key, mock_make_mutual_tls_http 247 ): 248 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 249 credentials=mock.Mock() 250 ) 251 252 mock_get_client_cert_and_key.return_value = (False, None, None) 253 with mock.patch.dict( 254 os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} 255 ): 256 is_mtls = authed_http.configure_mtls_channel() 257 258 assert not is_mtls 259 mock_get_client_cert_and_key.assert_called_once() 260 mock_make_mutual_tls_http.assert_not_called() 261 262 @mock.patch( 263 "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True 264 ) 265 def test_configure_mtls_channel_exceptions(self, mock_get_client_cert_and_key): 266 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 267 credentials=mock.Mock() 268 ) 269 270 mock_get_client_cert_and_key.side_effect = exceptions.ClientCertError() 271 with pytest.raises(exceptions.MutualTLSChannelError): 272 with mock.patch.dict( 273 os.environ, {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"} 274 ): 275 authed_http.configure_mtls_channel() 276 277 mock_get_client_cert_and_key.return_value = (False, None, None) 278 with mock.patch.dict("sys.modules"): 279 sys.modules["OpenSSL"] = None 280 with pytest.raises(exceptions.MutualTLSChannelError): 281 with mock.patch.dict( 282 os.environ, 283 {environment_vars.GOOGLE_API_USE_CLIENT_CERTIFICATE: "true"}, 284 ): 285 authed_http.configure_mtls_channel() 286 287 @mock.patch( 288 "google.auth.transport._mtls_helper.get_client_cert_and_key", autospec=True 289 ) 290 def test_configure_mtls_channel_without_client_cert_env( 291 self, get_client_cert_and_key 292 ): 293 callback = mock.Mock() 294 295 authed_http = google.auth.transport.urllib3.AuthorizedHttp( 296 credentials=mock.Mock(), http=mock.Mock() 297 ) 298 299 # Test the callback is not called if GOOGLE_API_USE_CLIENT_CERTIFICATE is not set. 300 is_mtls = authed_http.configure_mtls_channel(callback) 301 assert not is_mtls 302 callback.assert_not_called() 303 304 # Test ADC client cert is not used if GOOGLE_API_USE_CLIENT_CERTIFICATE is not set. 305 is_mtls = authed_http.configure_mtls_channel(callback) 306 assert not is_mtls 307 get_client_cert_and_key.assert_not_called() 308