1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import base64 15import datetime 16 17import mock 18import pytest 19import responses 20 21from google.auth import _helpers 22from google.auth import exceptions 23from google.auth import jwt 24from google.auth import transport 25from google.auth.compute_engine import credentials 26from google.auth.transport import requests 27 28SAMPLE_ID_TOKEN_EXP = 1584393400 29 30# header: {"alg": "RS256", "typ": "JWT", "kid": "1"} 31# payload: {"iss": "issuer", "iat": 1584393348, "sub": "subject", 32# "exp": 1584393400,"aud": "audience"} 33SAMPLE_ID_TOKEN = ( 34 b"eyJhbGciOiAiUlMyNTYiLCAidHlwIjogIkpXVCIsICJraWQiOiAiMSJ9." 35 b"eyJpc3MiOiAiaXNzdWVyIiwgImlhdCI6IDE1ODQzOTMzNDgsICJzdWIiO" 36 b"iAic3ViamVjdCIsICJleHAiOiAxNTg0MzkzNDAwLCAiYXVkIjogImF1ZG" 37 b"llbmNlIn0." 38 b"OquNjHKhTmlgCk361omRo18F_uY-7y0f_AmLbzW062Q1Zr61HAwHYP5FM" 39 b"316CK4_0cH8MUNGASsvZc3VqXAqub6PUTfhemH8pFEwBdAdG0LhrNkU0H" 40 b"WN1YpT55IiQ31esLdL5q-qDsOPpNZJUti1y1lAreM5nIn2srdWzGXGs4i" 41 b"TRQsn0XkNUCL4RErpciXmjfhMrPkcAjKA-mXQm2fa4jmTlEZFqFmUlym1" 42 b"ozJ0yf5grjN6AslN4OGvAv1pS-_Ko_pGBS6IQtSBC6vVKCUuBfaqNjykg" 43 b"bsxbLa6Fp0SYeYwO8ifEnkRvasVpc1WTQqfRB2JCj5pTBDzJpIpFCMmnQ" 44) 45 46 47class TestCredentials(object): 48 credentials = None 49 50 @pytest.fixture(autouse=True) 51 def credentials_fixture(self): 52 self.credentials = credentials.Credentials() 53 54 def test_default_state(self): 55 assert not self.credentials.valid 56 # Expiration hasn't been set yet 57 assert not self.credentials.expired 58 # Scopes are needed 59 assert self.credentials.requires_scopes 60 # Service account email hasn't been populated 61 assert self.credentials.service_account_email == "default" 62 # No quota project 63 assert not self.credentials._quota_project_id 64 65 @mock.patch( 66 "google.auth._helpers.utcnow", 67 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 68 ) 69 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 70 def test_refresh_success(self, get, utcnow): 71 get.side_effect = [ 72 { 73 # First request is for sevice account info. 74 "email": "[email protected]", 75 "scopes": ["one", "two"], 76 }, 77 { 78 # Second request is for the token. 79 "access_token": "token", 80 "expires_in": 500, 81 }, 82 ] 83 84 # Refresh credentials 85 self.credentials.refresh(None) 86 87 # Check that the credentials have the token and proper expiration 88 assert self.credentials.token == "token" 89 assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500)) 90 91 # Check the credential info 92 assert self.credentials.service_account_email == "[email protected]" 93 assert self.credentials._scopes == ["one", "two"] 94 95 # Check that the credentials are valid (have a token and are not 96 # expired) 97 assert self.credentials.valid 98 99 @mock.patch( 100 "google.auth._helpers.utcnow", 101 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 102 ) 103 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 104 def test_refresh_success_with_scopes(self, get, utcnow): 105 get.side_effect = [ 106 { 107 # First request is for sevice account info. 108 "email": "[email protected]", 109 "scopes": ["one", "two"], 110 }, 111 { 112 # Second request is for the token. 113 "access_token": "token", 114 "expires_in": 500, 115 }, 116 ] 117 118 # Refresh credentials 119 scopes = ["three", "four"] 120 self.credentials = self.credentials.with_scopes(scopes) 121 self.credentials.refresh(None) 122 123 # Check that the credentials have the token and proper expiration 124 assert self.credentials.token == "token" 125 assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500)) 126 127 # Check the credential info 128 assert self.credentials.service_account_email == "[email protected]" 129 assert self.credentials._scopes == scopes 130 131 # Check that the credentials are valid (have a token and are not 132 # expired) 133 assert self.credentials.valid 134 135 kwargs = get.call_args[1] 136 assert kwargs == {"params": {"scopes": "three,four"}} 137 138 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 139 def test_refresh_error(self, get): 140 get.side_effect = exceptions.TransportError("http error") 141 142 with pytest.raises(exceptions.RefreshError) as excinfo: 143 self.credentials.refresh(None) 144 145 assert excinfo.match(r"http error") 146 147 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 148 def test_before_request_refreshes(self, get): 149 get.side_effect = [ 150 { 151 # First request is for sevice account info. 152 "email": "[email protected]", 153 "scopes": "one two", 154 }, 155 { 156 # Second request is for the token. 157 "access_token": "token", 158 "expires_in": 500, 159 }, 160 ] 161 162 # Credentials should start as invalid 163 assert not self.credentials.valid 164 165 # before_request should cause a refresh 166 request = mock.create_autospec(transport.Request, instance=True) 167 self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {}) 168 169 # The refresh endpoint should've been called. 170 assert get.called 171 172 # Credentials should now be valid. 173 assert self.credentials.valid 174 175 def test_with_quota_project(self): 176 quota_project_creds = self.credentials.with_quota_project("project-foo") 177 178 assert quota_project_creds._quota_project_id == "project-foo" 179 180 def test_with_scopes(self): 181 assert self.credentials._scopes is None 182 183 scopes = ["one", "two"] 184 self.credentials = self.credentials.with_scopes(scopes) 185 186 assert self.credentials._scopes == scopes 187 188 189class TestIDTokenCredentials(object): 190 credentials = None 191 192 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 193 def test_default_state(self, get): 194 get.side_effect = [ 195 {"email": "[email protected]", "scope": ["one", "two"]} 196 ] 197 198 request = mock.create_autospec(transport.Request, instance=True) 199 self.credentials = credentials.IDTokenCredentials( 200 request=request, target_audience="https://example.com" 201 ) 202 203 assert not self.credentials.valid 204 # Expiration hasn't been set yet 205 assert not self.credentials.expired 206 # Service account email hasn't been populated 207 assert self.credentials.service_account_email == "[email protected]" 208 # Signer is initialized 209 assert self.credentials.signer 210 assert self.credentials.signer_email == "[email protected]" 211 # No quota project 212 assert not self.credentials._quota_project_id 213 214 @mock.patch( 215 "google.auth._helpers.utcnow", 216 return_value=datetime.datetime.utcfromtimestamp(0), 217 ) 218 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 219 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 220 def test_make_authorization_grant_assertion(self, sign, get, utcnow): 221 get.side_effect = [ 222 {"email": "[email protected]", "scopes": ["one", "two"]} 223 ] 224 sign.side_effect = [b"signature"] 225 226 request = mock.create_autospec(transport.Request, instance=True) 227 self.credentials = credentials.IDTokenCredentials( 228 request=request, target_audience="https://audience.com" 229 ) 230 231 # Generate authorization grant: 232 token = self.credentials._make_authorization_grant_assertion() 233 payload = jwt.decode(token, verify=False) 234 235 # The JWT token signature is 'signature' encoded in base 64: 236 assert token.endswith(b".c2lnbmF0dXJl") 237 238 # Check that the credentials have the token and proper expiration 239 assert payload == { 240 "aud": "https://www.googleapis.com/oauth2/v4/token", 241 "exp": 3600, 242 "iat": 0, 243 "iss": "[email protected]", 244 "target_audience": "https://audience.com", 245 } 246 247 @mock.patch( 248 "google.auth._helpers.utcnow", 249 return_value=datetime.datetime.utcfromtimestamp(0), 250 ) 251 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 252 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 253 def test_with_service_account(self, sign, get, utcnow): 254 sign.side_effect = [b"signature"] 255 256 request = mock.create_autospec(transport.Request, instance=True) 257 self.credentials = credentials.IDTokenCredentials( 258 request=request, 259 target_audience="https://audience.com", 260 service_account_email="[email protected]", 261 ) 262 263 # Generate authorization grant: 264 token = self.credentials._make_authorization_grant_assertion() 265 payload = jwt.decode(token, verify=False) 266 267 # The JWT token signature is 'signature' encoded in base 64: 268 assert token.endswith(b".c2lnbmF0dXJl") 269 270 # Check that the credentials have the token and proper expiration 271 assert payload == { 272 "aud": "https://www.googleapis.com/oauth2/v4/token", 273 "exp": 3600, 274 "iat": 0, 275 "iss": "[email protected]", 276 "target_audience": "https://audience.com", 277 } 278 279 @mock.patch( 280 "google.auth._helpers.utcnow", 281 return_value=datetime.datetime.utcfromtimestamp(0), 282 ) 283 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 284 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 285 def test_additional_claims(self, sign, get, utcnow): 286 get.side_effect = [ 287 {"email": "[email protected]", "scopes": ["one", "two"]} 288 ] 289 sign.side_effect = [b"signature"] 290 291 request = mock.create_autospec(transport.Request, instance=True) 292 self.credentials = credentials.IDTokenCredentials( 293 request=request, 294 target_audience="https://audience.com", 295 additional_claims={"foo": "bar"}, 296 ) 297 298 # Generate authorization grant: 299 token = self.credentials._make_authorization_grant_assertion() 300 payload = jwt.decode(token, verify=False) 301 302 # The JWT token signature is 'signature' encoded in base 64: 303 assert token.endswith(b".c2lnbmF0dXJl") 304 305 # Check that the credentials have the token and proper expiration 306 assert payload == { 307 "aud": "https://www.googleapis.com/oauth2/v4/token", 308 "exp": 3600, 309 "iat": 0, 310 "iss": "[email protected]", 311 "target_audience": "https://audience.com", 312 "foo": "bar", 313 } 314 315 def test_token_uri(self): 316 request = mock.create_autospec(transport.Request, instance=True) 317 318 self.credentials = credentials.IDTokenCredentials( 319 request=request, 320 signer=mock.Mock(), 321 service_account_email="[email protected]", 322 target_audience="https://audience.com", 323 ) 324 assert self.credentials._token_uri == credentials._DEFAULT_TOKEN_URI 325 326 self.credentials = credentials.IDTokenCredentials( 327 request=request, 328 signer=mock.Mock(), 329 service_account_email="[email protected]", 330 target_audience="https://audience.com", 331 token_uri="https://example.com/token", 332 ) 333 assert self.credentials._token_uri == "https://example.com/token" 334 335 @mock.patch( 336 "google.auth._helpers.utcnow", 337 return_value=datetime.datetime.utcfromtimestamp(0), 338 ) 339 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 340 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 341 def test_with_target_audience(self, sign, get, utcnow): 342 get.side_effect = [ 343 {"email": "[email protected]", "scopes": ["one", "two"]} 344 ] 345 sign.side_effect = [b"signature"] 346 347 request = mock.create_autospec(transport.Request, instance=True) 348 self.credentials = credentials.IDTokenCredentials( 349 request=request, target_audience="https://audience.com" 350 ) 351 self.credentials = self.credentials.with_target_audience("https://actually.not") 352 353 # Generate authorization grant: 354 token = self.credentials._make_authorization_grant_assertion() 355 payload = jwt.decode(token, verify=False) 356 357 # The JWT token signature is 'signature' encoded in base 64: 358 assert token.endswith(b".c2lnbmF0dXJl") 359 360 # Check that the credentials have the token and proper expiration 361 assert payload == { 362 "aud": "https://www.googleapis.com/oauth2/v4/token", 363 "exp": 3600, 364 "iat": 0, 365 "iss": "[email protected]", 366 "target_audience": "https://actually.not", 367 } 368 369 # Check that the signer have been initialized with a Request object 370 assert isinstance(self.credentials._signer._request, transport.Request) 371 372 @responses.activate 373 def test_with_target_audience_integration(self): 374 """ Test that it is possible to refresh credentials 375 generated from `with_target_audience`. 376 377 Instead of mocking the methods, the HTTP responses 378 have been mocked. 379 """ 380 381 # mock information about credentials 382 responses.add( 383 responses.GET, 384 "http://metadata.google.internal/computeMetadata/v1/instance/" 385 "service-accounts/default/?recursive=true", 386 status=200, 387 content_type="application/json", 388 json={ 389 "scopes": "email", 390 "email": "[email protected]", 391 "aliases": ["default"], 392 }, 393 ) 394 395 # mock token for credentials 396 responses.add( 397 responses.GET, 398 "http://metadata.google.internal/computeMetadata/v1/instance/" 399 "service-accounts/service-account@example.com/token", 400 status=200, 401 content_type="application/json", 402 json={ 403 "access_token": "some-token", 404 "expires_in": 3210, 405 "token_type": "Bearer", 406 }, 407 ) 408 409 # mock sign blob endpoint 410 signature = base64.b64encode(b"some-signature").decode("utf-8") 411 responses.add( 412 responses.POST, 413 "https://iamcredentials.googleapis.com/v1/projects/-/" 414 "serviceAccounts/service-account@example.com:signBlob?alt=json", 415 status=200, 416 content_type="application/json", 417 json={"keyId": "some-key-id", "signedBlob": signature}, 418 ) 419 420 id_token = "{}.{}.{}".format( 421 base64.b64encode(b'{"some":"some"}').decode("utf-8"), 422 base64.b64encode(b'{"exp": 3210}').decode("utf-8"), 423 base64.b64encode(b"token").decode("utf-8"), 424 ) 425 426 # mock id token endpoint 427 responses.add( 428 responses.POST, 429 "https://www.googleapis.com/oauth2/v4/token", 430 status=200, 431 content_type="application/json", 432 json={"id_token": id_token, "expiry": 3210}, 433 ) 434 435 self.credentials = credentials.IDTokenCredentials( 436 request=requests.Request(), 437 service_account_email="[email protected]", 438 target_audience="https://audience.com", 439 ) 440 441 self.credentials = self.credentials.with_target_audience("https://actually.not") 442 443 self.credentials.refresh(requests.Request()) 444 445 assert self.credentials.token is not None 446 447 @mock.patch( 448 "google.auth._helpers.utcnow", 449 return_value=datetime.datetime.utcfromtimestamp(0), 450 ) 451 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 452 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 453 def test_with_quota_project(self, sign, get, utcnow): 454 get.side_effect = [ 455 {"email": "[email protected]", "scopes": ["one", "two"]} 456 ] 457 sign.side_effect = [b"signature"] 458 459 request = mock.create_autospec(transport.Request, instance=True) 460 self.credentials = credentials.IDTokenCredentials( 461 request=request, target_audience="https://audience.com" 462 ) 463 self.credentials = self.credentials.with_quota_project("project-foo") 464 465 assert self.credentials._quota_project_id == "project-foo" 466 467 # Generate authorization grant: 468 token = self.credentials._make_authorization_grant_assertion() 469 payload = jwt.decode(token, verify=False) 470 471 # The JWT token signature is 'signature' encoded in base 64: 472 assert token.endswith(b".c2lnbmF0dXJl") 473 474 # Check that the credentials have the token and proper expiration 475 assert payload == { 476 "aud": "https://www.googleapis.com/oauth2/v4/token", 477 "exp": 3600, 478 "iat": 0, 479 "iss": "[email protected]", 480 "target_audience": "https://audience.com", 481 } 482 483 # Check that the signer have been initialized with a Request object 484 assert isinstance(self.credentials._signer._request, transport.Request) 485 486 @responses.activate 487 def test_with_quota_project_integration(self): 488 """ Test that it is possible to refresh credentials 489 generated from `with_quota_project`. 490 491 Instead of mocking the methods, the HTTP responses 492 have been mocked. 493 """ 494 495 # mock information about credentials 496 responses.add( 497 responses.GET, 498 "http://metadata.google.internal/computeMetadata/v1/instance/" 499 "service-accounts/default/?recursive=true", 500 status=200, 501 content_type="application/json", 502 json={ 503 "scopes": "email", 504 "email": "[email protected]", 505 "aliases": ["default"], 506 }, 507 ) 508 509 # mock token for credentials 510 responses.add( 511 responses.GET, 512 "http://metadata.google.internal/computeMetadata/v1/instance/" 513 "service-accounts/service-account@example.com/token", 514 status=200, 515 content_type="application/json", 516 json={ 517 "access_token": "some-token", 518 "expires_in": 3210, 519 "token_type": "Bearer", 520 }, 521 ) 522 523 # mock sign blob endpoint 524 signature = base64.b64encode(b"some-signature").decode("utf-8") 525 responses.add( 526 responses.POST, 527 "https://iamcredentials.googleapis.com/v1/projects/-/" 528 "serviceAccounts/service-account@example.com:signBlob?alt=json", 529 status=200, 530 content_type="application/json", 531 json={"keyId": "some-key-id", "signedBlob": signature}, 532 ) 533 534 id_token = "{}.{}.{}".format( 535 base64.b64encode(b'{"some":"some"}').decode("utf-8"), 536 base64.b64encode(b'{"exp": 3210}').decode("utf-8"), 537 base64.b64encode(b"token").decode("utf-8"), 538 ) 539 540 # mock id token endpoint 541 responses.add( 542 responses.POST, 543 "https://www.googleapis.com/oauth2/v4/token", 544 status=200, 545 content_type="application/json", 546 json={"id_token": id_token, "expiry": 3210}, 547 ) 548 549 self.credentials = credentials.IDTokenCredentials( 550 request=requests.Request(), 551 service_account_email="[email protected]", 552 target_audience="https://audience.com", 553 ) 554 555 self.credentials = self.credentials.with_quota_project("project-foo") 556 557 self.credentials.refresh(requests.Request()) 558 559 assert self.credentials.token is not None 560 assert self.credentials._quota_project_id == "project-foo" 561 562 @mock.patch( 563 "google.auth._helpers.utcnow", 564 return_value=datetime.datetime.utcfromtimestamp(0), 565 ) 566 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 567 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 568 @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True) 569 def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow): 570 get.side_effect = [ 571 {"email": "[email protected]", "scopes": ["one", "two"]} 572 ] 573 sign.side_effect = [b"signature"] 574 id_token_jwt_grant.side_effect = [ 575 ("idtoken", datetime.datetime.utcfromtimestamp(3600), {}) 576 ] 577 578 request = mock.create_autospec(transport.Request, instance=True) 579 self.credentials = credentials.IDTokenCredentials( 580 request=request, target_audience="https://audience.com" 581 ) 582 583 # Refresh credentials 584 self.credentials.refresh(None) 585 586 # Check that the credentials have the token and proper expiration 587 assert self.credentials.token == "idtoken" 588 assert self.credentials.expiry == (datetime.datetime.utcfromtimestamp(3600)) 589 590 # Check the credential info 591 assert self.credentials.service_account_email == "[email protected]" 592 593 # Check that the credentials are valid (have a token and are not 594 # expired) 595 assert self.credentials.valid 596 597 @mock.patch( 598 "google.auth._helpers.utcnow", 599 return_value=datetime.datetime.utcfromtimestamp(0), 600 ) 601 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 602 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 603 def test_refresh_error(self, sign, get, utcnow): 604 get.side_effect = [ 605 {"email": "[email protected]", "scopes": ["one", "two"]} 606 ] 607 sign.side_effect = [b"signature"] 608 609 request = mock.create_autospec(transport.Request, instance=True) 610 response = mock.Mock() 611 response.data = b'{"error": "http error"}' 612 response.status = 500 613 request.side_effect = [response] 614 615 self.credentials = credentials.IDTokenCredentials( 616 request=request, target_audience="https://audience.com" 617 ) 618 619 with pytest.raises(exceptions.RefreshError) as excinfo: 620 self.credentials.refresh(request) 621 622 assert excinfo.match(r"http error") 623 624 @mock.patch( 625 "google.auth._helpers.utcnow", 626 return_value=datetime.datetime.utcfromtimestamp(0), 627 ) 628 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 629 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 630 @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True) 631 def test_before_request_refreshes(self, id_token_jwt_grant, sign, get, utcnow): 632 get.side_effect = [ 633 {"email": "[email protected]", "scopes": "one two"} 634 ] 635 sign.side_effect = [b"signature"] 636 id_token_jwt_grant.side_effect = [ 637 ("idtoken", datetime.datetime.utcfromtimestamp(3600), {}) 638 ] 639 640 request = mock.create_autospec(transport.Request, instance=True) 641 self.credentials = credentials.IDTokenCredentials( 642 request=request, target_audience="https://audience.com" 643 ) 644 645 # Credentials should start as invalid 646 assert not self.credentials.valid 647 648 # before_request should cause a refresh 649 request = mock.create_autospec(transport.Request, instance=True) 650 self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {}) 651 652 # The refresh endpoint should've been called. 653 assert get.called 654 655 # Credentials should now be valid. 656 assert self.credentials.valid 657 658 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 659 @mock.patch("google.auth.iam.Signer.sign", autospec=True) 660 def test_sign_bytes(self, sign, get): 661 get.side_effect = [ 662 {"email": "[email protected]", "scopes": ["one", "two"]} 663 ] 664 sign.side_effect = [b"signature"] 665 666 request = mock.create_autospec(transport.Request, instance=True) 667 response = mock.Mock() 668 response.data = b'{"signature": "c2lnbmF0dXJl"}' 669 response.status = 200 670 request.side_effect = [response] 671 672 self.credentials = credentials.IDTokenCredentials( 673 request=request, target_audience="https://audience.com" 674 ) 675 676 # Generate authorization grant: 677 signature = self.credentials.sign_bytes(b"some bytes") 678 679 # The JWT token signature is 'signature' encoded in base 64: 680 assert signature == b"signature" 681 682 @mock.patch( 683 "google.auth.compute_engine._metadata.get_service_account_info", autospec=True 684 ) 685 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 686 def test_get_id_token_from_metadata(self, get, get_service_account_info): 687 get.return_value = SAMPLE_ID_TOKEN 688 get_service_account_info.return_value = {"email": "[email protected]"} 689 690 cred = credentials.IDTokenCredentials( 691 mock.Mock(), "audience", use_metadata_identity_endpoint=True 692 ) 693 cred.refresh(request=mock.Mock()) 694 695 assert cred.token == SAMPLE_ID_TOKEN 696 assert cred.expiry == datetime.datetime.fromtimestamp(SAMPLE_ID_TOKEN_EXP) 697 assert cred._use_metadata_identity_endpoint 698 assert cred._signer is None 699 assert cred._token_uri is None 700 assert cred._service_account_email == "[email protected]" 701 assert cred._target_audience == "audience" 702 with pytest.raises(ValueError): 703 cred.sign_bytes(b"bytes") 704 705 @mock.patch( 706 "google.auth.compute_engine._metadata.get_service_account_info", autospec=True 707 ) 708 def test_with_target_audience_for_metadata(self, get_service_account_info): 709 get_service_account_info.return_value = {"email": "[email protected]"} 710 711 cred = credentials.IDTokenCredentials( 712 mock.Mock(), "audience", use_metadata_identity_endpoint=True 713 ) 714 cred = cred.with_target_audience("new_audience") 715 716 assert cred._target_audience == "new_audience" 717 assert cred._use_metadata_identity_endpoint 718 assert cred._signer is None 719 assert cred._token_uri is None 720 assert cred._service_account_email == "[email protected]" 721 722 @mock.patch( 723 "google.auth.compute_engine._metadata.get_service_account_info", autospec=True 724 ) 725 def test_id_token_with_quota_project(self, get_service_account_info): 726 get_service_account_info.return_value = {"email": "[email protected]"} 727 728 cred = credentials.IDTokenCredentials( 729 mock.Mock(), "audience", use_metadata_identity_endpoint=True 730 ) 731 cred = cred.with_quota_project("project-foo") 732 733 assert cred._quota_project_id == "project-foo" 734 assert cred._use_metadata_identity_endpoint 735 assert cred._signer is None 736 assert cred._token_uri is None 737 assert cred._service_account_email == "[email protected]" 738 739 @mock.patch( 740 "google.auth.compute_engine._metadata.get_service_account_info", autospec=True 741 ) 742 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 743 def test_invalid_id_token_from_metadata(self, get, get_service_account_info): 744 get.return_value = "invalid_id_token" 745 get_service_account_info.return_value = {"email": "[email protected]"} 746 747 cred = credentials.IDTokenCredentials( 748 mock.Mock(), "audience", use_metadata_identity_endpoint=True 749 ) 750 751 with pytest.raises(ValueError): 752 cred.refresh(request=mock.Mock()) 753 754 @mock.patch( 755 "google.auth.compute_engine._metadata.get_service_account_info", autospec=True 756 ) 757 @mock.patch("google.auth.compute_engine._metadata.get", autospec=True) 758 def test_transport_error_from_metadata(self, get, get_service_account_info): 759 get.side_effect = exceptions.TransportError("transport error") 760 get_service_account_info.return_value = {"email": "[email protected]"} 761 762 cred = credentials.IDTokenCredentials( 763 mock.Mock(), "audience", use_metadata_identity_endpoint=True 764 ) 765 766 with pytest.raises(exceptions.RefreshError) as excinfo: 767 cred.refresh(request=mock.Mock()) 768 assert excinfo.match(r"transport error") 769 770 def test_get_id_token_from_metadata_constructor(self): 771 with pytest.raises(ValueError): 772 credentials.IDTokenCredentials( 773 mock.Mock(), 774 "audience", 775 use_metadata_identity_endpoint=True, 776 token_uri="token_uri", 777 ) 778 with pytest.raises(ValueError): 779 credentials.IDTokenCredentials( 780 mock.Mock(), 781 "audience", 782 use_metadata_identity_endpoint=True, 783 signer=mock.Mock(), 784 ) 785 with pytest.raises(ValueError): 786 credentials.IDTokenCredentials( 787 mock.Mock(), 788 "audience", 789 use_metadata_identity_endpoint=True, 790 additional_claims={"key", "value"}, 791 ) 792 with pytest.raises(ValueError): 793 credentials.IDTokenCredentials( 794 mock.Mock(), 795 "audience", 796 use_metadata_identity_endpoint=True, 797 service_account_email="[email protected]", 798 ) 799