1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import json 16 17import mock 18import pytest 19from six.moves import http_client 20from six.moves import urllib 21 22from google.auth import exceptions 23from google.auth import transport 24from google.oauth2 import sts 25from google.oauth2 import utils 26 27CLIENT_ID = "username" 28CLIENT_SECRET = "password" 29# Base64 encoding of "username:password" 30BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" 31 32 33class TestStsClient(object): 34 GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange" 35 RESOURCE = "https://api.example.com/" 36 AUDIENCE = "urn:example:cooperation-context" 37 SCOPES = ["scope1", "scope2"] 38 REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token" 39 SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE" 40 SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" 41 ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE" 42 ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" 43 TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2" 44 ADDON_HEADERS = {"x-client-version": "0.1.2"} 45 ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}} 46 SUCCESS_RESPONSE = { 47 "access_token": "ACCESS_TOKEN", 48 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", 49 "token_type": "Bearer", 50 "expires_in": 3600, 51 "scope": "scope1 scope2", 52 } 53 ERROR_RESPONSE = { 54 "error": "invalid_request", 55 "error_description": "Invalid subject token", 56 "error_uri": "https://tools.ietf.org/html/rfc6749", 57 } 58 CLIENT_AUTH_BASIC = utils.ClientAuthentication( 59 utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET 60 ) 61 CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication( 62 utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET 63 ) 64 65 @classmethod 66 def make_client(cls, client_auth=None): 67 return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth) 68 69 @classmethod 70 def make_mock_request(cls, data, status=http_client.OK): 71 response = mock.create_autospec(transport.Response, instance=True) 72 response.status = status 73 response.data = json.dumps(data).encode("utf-8") 74 75 request = mock.create_autospec(transport.Request) 76 request.return_value = response 77 78 return request 79 80 @classmethod 81 def assert_request_kwargs(cls, request_kwargs, headers, request_data): 82 """Asserts the request was called with the expected parameters. 83 """ 84 assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT 85 assert request_kwargs["method"] == "POST" 86 assert request_kwargs["headers"] == headers 87 assert request_kwargs["body"] is not None 88 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) 89 for (k, v) in body_tuples: 90 assert v.decode("utf-8") == request_data[k.decode("utf-8")] 91 assert len(body_tuples) == len(request_data.keys()) 92 93 def test_exchange_token_full_success_without_auth(self): 94 """Test token exchange success without client authentication using full 95 parameters. 96 """ 97 client = self.make_client() 98 headers = self.ADDON_HEADERS.copy() 99 headers["Content-Type"] = "application/x-www-form-urlencoded" 100 request_data = { 101 "grant_type": self.GRANT_TYPE, 102 "resource": self.RESOURCE, 103 "audience": self.AUDIENCE, 104 "scope": " ".join(self.SCOPES), 105 "requested_token_type": self.REQUESTED_TOKEN_TYPE, 106 "subject_token": self.SUBJECT_TOKEN, 107 "subject_token_type": self.SUBJECT_TOKEN_TYPE, 108 "actor_token": self.ACTOR_TOKEN, 109 "actor_token_type": self.ACTOR_TOKEN_TYPE, 110 "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), 111 } 112 request = self.make_mock_request( 113 status=http_client.OK, data=self.SUCCESS_RESPONSE 114 ) 115 116 response = client.exchange_token( 117 request, 118 self.GRANT_TYPE, 119 self.SUBJECT_TOKEN, 120 self.SUBJECT_TOKEN_TYPE, 121 self.RESOURCE, 122 self.AUDIENCE, 123 self.SCOPES, 124 self.REQUESTED_TOKEN_TYPE, 125 self.ACTOR_TOKEN, 126 self.ACTOR_TOKEN_TYPE, 127 self.ADDON_OPTIONS, 128 self.ADDON_HEADERS, 129 ) 130 131 self.assert_request_kwargs(request.call_args[1], headers, request_data) 132 assert response == self.SUCCESS_RESPONSE 133 134 def test_exchange_token_partial_success_without_auth(self): 135 """Test token exchange success without client authentication using 136 partial (required only) parameters. 137 """ 138 client = self.make_client() 139 headers = {"Content-Type": "application/x-www-form-urlencoded"} 140 request_data = { 141 "grant_type": self.GRANT_TYPE, 142 "audience": self.AUDIENCE, 143 "requested_token_type": self.REQUESTED_TOKEN_TYPE, 144 "subject_token": self.SUBJECT_TOKEN, 145 "subject_token_type": self.SUBJECT_TOKEN_TYPE, 146 } 147 request = self.make_mock_request( 148 status=http_client.OK, data=self.SUCCESS_RESPONSE 149 ) 150 151 response = client.exchange_token( 152 request, 153 grant_type=self.GRANT_TYPE, 154 subject_token=self.SUBJECT_TOKEN, 155 subject_token_type=self.SUBJECT_TOKEN_TYPE, 156 audience=self.AUDIENCE, 157 requested_token_type=self.REQUESTED_TOKEN_TYPE, 158 ) 159 160 self.assert_request_kwargs(request.call_args[1], headers, request_data) 161 assert response == self.SUCCESS_RESPONSE 162 163 def test_exchange_token_non200_without_auth(self): 164 """Test token exchange without client auth responding with non-200 status. 165 """ 166 client = self.make_client() 167 request = self.make_mock_request( 168 status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE 169 ) 170 171 with pytest.raises(exceptions.OAuthError) as excinfo: 172 client.exchange_token( 173 request, 174 self.GRANT_TYPE, 175 self.SUBJECT_TOKEN, 176 self.SUBJECT_TOKEN_TYPE, 177 self.RESOURCE, 178 self.AUDIENCE, 179 self.SCOPES, 180 self.REQUESTED_TOKEN_TYPE, 181 self.ACTOR_TOKEN, 182 self.ACTOR_TOKEN_TYPE, 183 self.ADDON_OPTIONS, 184 self.ADDON_HEADERS, 185 ) 186 187 assert excinfo.match( 188 r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" 189 ) 190 191 def test_exchange_token_full_success_with_basic_auth(self): 192 """Test token exchange success with basic client authentication using full 193 parameters. 194 """ 195 client = self.make_client(self.CLIENT_AUTH_BASIC) 196 headers = self.ADDON_HEADERS.copy() 197 headers["Content-Type"] = "application/x-www-form-urlencoded" 198 headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING) 199 request_data = { 200 "grant_type": self.GRANT_TYPE, 201 "resource": self.RESOURCE, 202 "audience": self.AUDIENCE, 203 "scope": " ".join(self.SCOPES), 204 "requested_token_type": self.REQUESTED_TOKEN_TYPE, 205 "subject_token": self.SUBJECT_TOKEN, 206 "subject_token_type": self.SUBJECT_TOKEN_TYPE, 207 "actor_token": self.ACTOR_TOKEN, 208 "actor_token_type": self.ACTOR_TOKEN_TYPE, 209 "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), 210 } 211 request = self.make_mock_request( 212 status=http_client.OK, data=self.SUCCESS_RESPONSE 213 ) 214 215 response = client.exchange_token( 216 request, 217 self.GRANT_TYPE, 218 self.SUBJECT_TOKEN, 219 self.SUBJECT_TOKEN_TYPE, 220 self.RESOURCE, 221 self.AUDIENCE, 222 self.SCOPES, 223 self.REQUESTED_TOKEN_TYPE, 224 self.ACTOR_TOKEN, 225 self.ACTOR_TOKEN_TYPE, 226 self.ADDON_OPTIONS, 227 self.ADDON_HEADERS, 228 ) 229 230 self.assert_request_kwargs(request.call_args[1], headers, request_data) 231 assert response == self.SUCCESS_RESPONSE 232 233 def test_exchange_token_partial_success_with_basic_auth(self): 234 """Test token exchange success with basic client authentication using 235 partial (required only) parameters. 236 """ 237 client = self.make_client(self.CLIENT_AUTH_BASIC) 238 headers = { 239 "Content-Type": "application/x-www-form-urlencoded", 240 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), 241 } 242 request_data = { 243 "grant_type": self.GRANT_TYPE, 244 "audience": self.AUDIENCE, 245 "requested_token_type": self.REQUESTED_TOKEN_TYPE, 246 "subject_token": self.SUBJECT_TOKEN, 247 "subject_token_type": self.SUBJECT_TOKEN_TYPE, 248 } 249 request = self.make_mock_request( 250 status=http_client.OK, data=self.SUCCESS_RESPONSE 251 ) 252 253 response = client.exchange_token( 254 request, 255 grant_type=self.GRANT_TYPE, 256 subject_token=self.SUBJECT_TOKEN, 257 subject_token_type=self.SUBJECT_TOKEN_TYPE, 258 audience=self.AUDIENCE, 259 requested_token_type=self.REQUESTED_TOKEN_TYPE, 260 ) 261 262 self.assert_request_kwargs(request.call_args[1], headers, request_data) 263 assert response == self.SUCCESS_RESPONSE 264 265 def test_exchange_token_non200_with_basic_auth(self): 266 """Test token exchange with basic client auth responding with non-200 267 status. 268 """ 269 client = self.make_client(self.CLIENT_AUTH_BASIC) 270 request = self.make_mock_request( 271 status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE 272 ) 273 274 with pytest.raises(exceptions.OAuthError) as excinfo: 275 client.exchange_token( 276 request, 277 self.GRANT_TYPE, 278 self.SUBJECT_TOKEN, 279 self.SUBJECT_TOKEN_TYPE, 280 self.RESOURCE, 281 self.AUDIENCE, 282 self.SCOPES, 283 self.REQUESTED_TOKEN_TYPE, 284 self.ACTOR_TOKEN, 285 self.ACTOR_TOKEN_TYPE, 286 self.ADDON_OPTIONS, 287 self.ADDON_HEADERS, 288 ) 289 290 assert excinfo.match( 291 r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" 292 ) 293 294 def test_exchange_token_full_success_with_reqbody_auth(self): 295 """Test token exchange success with request body client authenticaiton 296 using full parameters. 297 """ 298 client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) 299 headers = self.ADDON_HEADERS.copy() 300 headers["Content-Type"] = "application/x-www-form-urlencoded" 301 request_data = { 302 "grant_type": self.GRANT_TYPE, 303 "resource": self.RESOURCE, 304 "audience": self.AUDIENCE, 305 "scope": " ".join(self.SCOPES), 306 "requested_token_type": self.REQUESTED_TOKEN_TYPE, 307 "subject_token": self.SUBJECT_TOKEN, 308 "subject_token_type": self.SUBJECT_TOKEN_TYPE, 309 "actor_token": self.ACTOR_TOKEN, 310 "actor_token_type": self.ACTOR_TOKEN_TYPE, 311 "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)), 312 "client_id": CLIENT_ID, 313 "client_secret": CLIENT_SECRET, 314 } 315 request = self.make_mock_request( 316 status=http_client.OK, data=self.SUCCESS_RESPONSE 317 ) 318 319 response = client.exchange_token( 320 request, 321 self.GRANT_TYPE, 322 self.SUBJECT_TOKEN, 323 self.SUBJECT_TOKEN_TYPE, 324 self.RESOURCE, 325 self.AUDIENCE, 326 self.SCOPES, 327 self.REQUESTED_TOKEN_TYPE, 328 self.ACTOR_TOKEN, 329 self.ACTOR_TOKEN_TYPE, 330 self.ADDON_OPTIONS, 331 self.ADDON_HEADERS, 332 ) 333 334 self.assert_request_kwargs(request.call_args[1], headers, request_data) 335 assert response == self.SUCCESS_RESPONSE 336 337 def test_exchange_token_partial_success_with_reqbody_auth(self): 338 """Test token exchange success with request body client authentication 339 using partial (required only) parameters. 340 """ 341 client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) 342 headers = {"Content-Type": "application/x-www-form-urlencoded"} 343 request_data = { 344 "grant_type": self.GRANT_TYPE, 345 "audience": self.AUDIENCE, 346 "requested_token_type": self.REQUESTED_TOKEN_TYPE, 347 "subject_token": self.SUBJECT_TOKEN, 348 "subject_token_type": self.SUBJECT_TOKEN_TYPE, 349 "client_id": CLIENT_ID, 350 "client_secret": CLIENT_SECRET, 351 } 352 request = self.make_mock_request( 353 status=http_client.OK, data=self.SUCCESS_RESPONSE 354 ) 355 356 response = client.exchange_token( 357 request, 358 grant_type=self.GRANT_TYPE, 359 subject_token=self.SUBJECT_TOKEN, 360 subject_token_type=self.SUBJECT_TOKEN_TYPE, 361 audience=self.AUDIENCE, 362 requested_token_type=self.REQUESTED_TOKEN_TYPE, 363 ) 364 365 self.assert_request_kwargs(request.call_args[1], headers, request_data) 366 assert response == self.SUCCESS_RESPONSE 367 368 def test_exchange_token_non200_with_reqbody_auth(self): 369 """Test token exchange with POST request body client auth responding 370 with non-200 status. 371 """ 372 client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY) 373 request = self.make_mock_request( 374 status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE 375 ) 376 377 with pytest.raises(exceptions.OAuthError) as excinfo: 378 client.exchange_token( 379 request, 380 self.GRANT_TYPE, 381 self.SUBJECT_TOKEN, 382 self.SUBJECT_TOKEN_TYPE, 383 self.RESOURCE, 384 self.AUDIENCE, 385 self.SCOPES, 386 self.REQUESTED_TOKEN_TYPE, 387 self.ACTOR_TOKEN, 388 self.ACTOR_TOKEN_TYPE, 389 self.ADDON_OPTIONS, 390 self.ADDON_HEADERS, 391 ) 392 393 assert excinfo.match( 394 r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" 395 ) 396