1# Copyright 2016 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import datetime 16import json 17import os 18import pickle 19import sys 20 21import mock 22import pytest 23 24from google.auth import _helpers 25from google.auth import exceptions 26from google.auth import transport 27from google.oauth2 import credentials 28 29 30DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") 31 32AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json") 33 34with open(AUTH_USER_JSON_FILE, "r") as fh: 35 AUTH_USER_INFO = json.load(fh) 36 37 38class TestCredentials(object): 39 TOKEN_URI = "https://example.com/oauth2/token" 40 REFRESH_TOKEN = "refresh_token" 41 RAPT_TOKEN = "rapt_token" 42 CLIENT_ID = "client_id" 43 CLIENT_SECRET = "client_secret" 44 45 @classmethod 46 def make_credentials(cls): 47 return credentials.Credentials( 48 token=None, 49 refresh_token=cls.REFRESH_TOKEN, 50 token_uri=cls.TOKEN_URI, 51 client_id=cls.CLIENT_ID, 52 client_secret=cls.CLIENT_SECRET, 53 rapt_token=cls.RAPT_TOKEN, 54 enable_reauth_refresh=True, 55 ) 56 57 def test_default_state(self): 58 credentials = self.make_credentials() 59 assert not credentials.valid 60 # Expiration hasn't been set yet 61 assert not credentials.expired 62 # Scopes aren't required for these credentials 63 assert not credentials.requires_scopes 64 # Test properties 65 assert credentials.refresh_token == self.REFRESH_TOKEN 66 assert credentials.token_uri == self.TOKEN_URI 67 assert credentials.client_id == self.CLIENT_ID 68 assert credentials.client_secret == self.CLIENT_SECRET 69 assert credentials.rapt_token == self.RAPT_TOKEN 70 assert credentials.refresh_handler is None 71 72 def test_refresh_handler_setter_and_getter(self): 73 scopes = ["email", "profile"] 74 original_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_1", None)) 75 updated_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_2", None)) 76 creds = credentials.Credentials( 77 token=None, 78 refresh_token=None, 79 token_uri=None, 80 client_id=None, 81 client_secret=None, 82 rapt_token=None, 83 scopes=scopes, 84 default_scopes=None, 85 refresh_handler=original_refresh_handler, 86 ) 87 88 assert creds.refresh_handler is original_refresh_handler 89 90 creds.refresh_handler = updated_refresh_handler 91 92 assert creds.refresh_handler is updated_refresh_handler 93 94 creds.refresh_handler = None 95 96 assert creds.refresh_handler is None 97 98 def test_invalid_refresh_handler(self): 99 scopes = ["email", "profile"] 100 with pytest.raises(TypeError) as excinfo: 101 credentials.Credentials( 102 token=None, 103 refresh_token=None, 104 token_uri=None, 105 client_id=None, 106 client_secret=None, 107 rapt_token=None, 108 scopes=scopes, 109 default_scopes=None, 110 refresh_handler=object(), 111 ) 112 113 assert excinfo.match("The provided refresh_handler is not a callable or None.") 114 115 @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) 116 @mock.patch( 117 "google.auth._helpers.utcnow", 118 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 119 ) 120 def test_refresh_success(self, unused_utcnow, refresh_grant): 121 token = "token" 122 new_rapt_token = "new_rapt_token" 123 expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) 124 grant_response = {"id_token": mock.sentinel.id_token} 125 refresh_grant.return_value = ( 126 # Access token 127 token, 128 # New refresh token 129 None, 130 # Expiry, 131 expiry, 132 # Extra data 133 grant_response, 134 # rapt_token 135 new_rapt_token, 136 ) 137 138 request = mock.create_autospec(transport.Request) 139 credentials = self.make_credentials() 140 141 # Refresh credentials 142 credentials.refresh(request) 143 144 # Check jwt grant call. 145 refresh_grant.assert_called_with( 146 request, 147 self.TOKEN_URI, 148 self.REFRESH_TOKEN, 149 self.CLIENT_ID, 150 self.CLIENT_SECRET, 151 None, 152 self.RAPT_TOKEN, 153 True, 154 ) 155 156 # Check that the credentials have the token and expiry 157 assert credentials.token == token 158 assert credentials.expiry == expiry 159 assert credentials.id_token == mock.sentinel.id_token 160 assert credentials.rapt_token == new_rapt_token 161 162 # Check that the credentials are valid (have a token and are not 163 # expired) 164 assert credentials.valid 165 166 def test_refresh_no_refresh_token(self): 167 request = mock.create_autospec(transport.Request) 168 credentials_ = credentials.Credentials(token=None, refresh_token=None) 169 170 with pytest.raises(exceptions.RefreshError, match="necessary fields"): 171 credentials_.refresh(request) 172 173 request.assert_not_called() 174 175 @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) 176 @mock.patch( 177 "google.auth._helpers.utcnow", 178 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 179 ) 180 def test_refresh_with_refresh_token_and_refresh_handler( 181 self, unused_utcnow, refresh_grant 182 ): 183 token = "token" 184 new_rapt_token = "new_rapt_token" 185 expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) 186 grant_response = {"id_token": mock.sentinel.id_token} 187 refresh_grant.return_value = ( 188 # Access token 189 token, 190 # New refresh token 191 None, 192 # Expiry, 193 expiry, 194 # Extra data 195 grant_response, 196 # rapt_token 197 new_rapt_token, 198 ) 199 200 refresh_handler = mock.Mock() 201 request = mock.create_autospec(transport.Request) 202 creds = credentials.Credentials( 203 token=None, 204 refresh_token=self.REFRESH_TOKEN, 205 token_uri=self.TOKEN_URI, 206 client_id=self.CLIENT_ID, 207 client_secret=self.CLIENT_SECRET, 208 rapt_token=self.RAPT_TOKEN, 209 refresh_handler=refresh_handler, 210 ) 211 212 # Refresh credentials 213 creds.refresh(request) 214 215 # Check jwt grant call. 216 refresh_grant.assert_called_with( 217 request, 218 self.TOKEN_URI, 219 self.REFRESH_TOKEN, 220 self.CLIENT_ID, 221 self.CLIENT_SECRET, 222 None, 223 self.RAPT_TOKEN, 224 False, 225 ) 226 227 # Check that the credentials have the token and expiry 228 assert creds.token == token 229 assert creds.expiry == expiry 230 assert creds.id_token == mock.sentinel.id_token 231 assert creds.rapt_token == new_rapt_token 232 233 # Check that the credentials are valid (have a token and are not 234 # expired) 235 assert creds.valid 236 237 # Assert refresh handler not called as the refresh token has 238 # higher priority. 239 refresh_handler.assert_not_called() 240 241 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 242 def test_refresh_with_refresh_handler_success_scopes(self, unused_utcnow): 243 expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) 244 refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry)) 245 scopes = ["email", "profile"] 246 default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] 247 request = mock.create_autospec(transport.Request) 248 creds = credentials.Credentials( 249 token=None, 250 refresh_token=None, 251 token_uri=None, 252 client_id=None, 253 client_secret=None, 254 rapt_token=None, 255 scopes=scopes, 256 default_scopes=default_scopes, 257 refresh_handler=refresh_handler, 258 ) 259 260 creds.refresh(request) 261 262 assert creds.token == "ACCESS_TOKEN" 263 assert creds.expiry == expected_expiry 264 assert creds.valid 265 assert not creds.expired 266 # Confirm refresh handler called with the expected arguments. 267 refresh_handler.assert_called_with(request, scopes=scopes) 268 269 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 270 def test_refresh_with_refresh_handler_success_default_scopes(self, unused_utcnow): 271 expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) 272 original_refresh_handler = mock.Mock( 273 return_value=("UNUSED_TOKEN", expected_expiry) 274 ) 275 refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry)) 276 default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] 277 request = mock.create_autospec(transport.Request) 278 creds = credentials.Credentials( 279 token=None, 280 refresh_token=None, 281 token_uri=None, 282 client_id=None, 283 client_secret=None, 284 rapt_token=None, 285 scopes=None, 286 default_scopes=default_scopes, 287 refresh_handler=original_refresh_handler, 288 ) 289 290 # Test newly set refresh_handler is used instead of the original one. 291 creds.refresh_handler = refresh_handler 292 creds.refresh(request) 293 294 assert creds.token == "ACCESS_TOKEN" 295 assert creds.expiry == expected_expiry 296 assert creds.valid 297 assert not creds.expired 298 # default_scopes should be used since no developer provided scopes 299 # are provided. 300 refresh_handler.assert_called_with(request, scopes=default_scopes) 301 302 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 303 def test_refresh_with_refresh_handler_invalid_token(self, unused_utcnow): 304 expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) 305 # Simulate refresh handler does not return a valid token. 306 refresh_handler = mock.Mock(return_value=(None, expected_expiry)) 307 scopes = ["email", "profile"] 308 default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] 309 request = mock.create_autospec(transport.Request) 310 creds = credentials.Credentials( 311 token=None, 312 refresh_token=None, 313 token_uri=None, 314 client_id=None, 315 client_secret=None, 316 rapt_token=None, 317 scopes=scopes, 318 default_scopes=default_scopes, 319 refresh_handler=refresh_handler, 320 ) 321 322 with pytest.raises( 323 exceptions.RefreshError, match="returned token is not a string" 324 ): 325 creds.refresh(request) 326 327 assert creds.token is None 328 assert creds.expiry is None 329 assert not creds.valid 330 # Confirm refresh handler called with the expected arguments. 331 refresh_handler.assert_called_with(request, scopes=scopes) 332 333 def test_refresh_with_refresh_handler_invalid_expiry(self): 334 # Simulate refresh handler returns expiration time in an invalid unit. 335 refresh_handler = mock.Mock(return_value=("TOKEN", 2800)) 336 scopes = ["email", "profile"] 337 default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] 338 request = mock.create_autospec(transport.Request) 339 creds = credentials.Credentials( 340 token=None, 341 refresh_token=None, 342 token_uri=None, 343 client_id=None, 344 client_secret=None, 345 rapt_token=None, 346 scopes=scopes, 347 default_scopes=default_scopes, 348 refresh_handler=refresh_handler, 349 ) 350 351 with pytest.raises( 352 exceptions.RefreshError, match="returned expiry is not a datetime object" 353 ): 354 creds.refresh(request) 355 356 assert creds.token is None 357 assert creds.expiry is None 358 assert not creds.valid 359 # Confirm refresh handler called with the expected arguments. 360 refresh_handler.assert_called_with(request, scopes=scopes) 361 362 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) 363 def test_refresh_with_refresh_handler_expired_token(self, unused_utcnow): 364 expected_expiry = datetime.datetime.min + _helpers.REFRESH_THRESHOLD 365 # Simulate refresh handler returns an expired token. 366 refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry)) 367 scopes = ["email", "profile"] 368 default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] 369 request = mock.create_autospec(transport.Request) 370 creds = credentials.Credentials( 371 token=None, 372 refresh_token=None, 373 token_uri=None, 374 client_id=None, 375 client_secret=None, 376 rapt_token=None, 377 scopes=scopes, 378 default_scopes=default_scopes, 379 refresh_handler=refresh_handler, 380 ) 381 382 with pytest.raises(exceptions.RefreshError, match="already expired"): 383 creds.refresh(request) 384 385 assert creds.token is None 386 assert creds.expiry is None 387 assert not creds.valid 388 # Confirm refresh handler called with the expected arguments. 389 refresh_handler.assert_called_with(request, scopes=scopes) 390 391 @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) 392 @mock.patch( 393 "google.auth._helpers.utcnow", 394 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 395 ) 396 def test_credentials_with_scopes_requested_refresh_success( 397 self, unused_utcnow, refresh_grant 398 ): 399 scopes = ["email", "profile"] 400 default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] 401 token = "token" 402 new_rapt_token = "new_rapt_token" 403 expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) 404 grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"} 405 refresh_grant.return_value = ( 406 # Access token 407 token, 408 # New refresh token 409 None, 410 # Expiry, 411 expiry, 412 # Extra data 413 grant_response, 414 # rapt token 415 new_rapt_token, 416 ) 417 418 request = mock.create_autospec(transport.Request) 419 creds = credentials.Credentials( 420 token=None, 421 refresh_token=self.REFRESH_TOKEN, 422 token_uri=self.TOKEN_URI, 423 client_id=self.CLIENT_ID, 424 client_secret=self.CLIENT_SECRET, 425 scopes=scopes, 426 default_scopes=default_scopes, 427 rapt_token=self.RAPT_TOKEN, 428 enable_reauth_refresh=True, 429 ) 430 431 # Refresh credentials 432 creds.refresh(request) 433 434 # Check jwt grant call. 435 refresh_grant.assert_called_with( 436 request, 437 self.TOKEN_URI, 438 self.REFRESH_TOKEN, 439 self.CLIENT_ID, 440 self.CLIENT_SECRET, 441 scopes, 442 self.RAPT_TOKEN, 443 True, 444 ) 445 446 # Check that the credentials have the token and expiry 447 assert creds.token == token 448 assert creds.expiry == expiry 449 assert creds.id_token == mock.sentinel.id_token 450 assert creds.has_scopes(scopes) 451 assert creds.rapt_token == new_rapt_token 452 453 # Check that the credentials are valid (have a token and are not 454 # expired.) 455 assert creds.valid 456 457 @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) 458 @mock.patch( 459 "google.auth._helpers.utcnow", 460 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 461 ) 462 def test_credentials_with_only_default_scopes_requested( 463 self, unused_utcnow, refresh_grant 464 ): 465 default_scopes = ["email", "profile"] 466 token = "token" 467 new_rapt_token = "new_rapt_token" 468 expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) 469 grant_response = {"id_token": mock.sentinel.id_token} 470 refresh_grant.return_value = ( 471 # Access token 472 token, 473 # New refresh token 474 None, 475 # Expiry, 476 expiry, 477 # Extra data 478 grant_response, 479 # rapt token 480 new_rapt_token, 481 ) 482 483 request = mock.create_autospec(transport.Request) 484 creds = credentials.Credentials( 485 token=None, 486 refresh_token=self.REFRESH_TOKEN, 487 token_uri=self.TOKEN_URI, 488 client_id=self.CLIENT_ID, 489 client_secret=self.CLIENT_SECRET, 490 default_scopes=default_scopes, 491 rapt_token=self.RAPT_TOKEN, 492 enable_reauth_refresh=True, 493 ) 494 495 # Refresh credentials 496 creds.refresh(request) 497 498 # Check jwt grant call. 499 refresh_grant.assert_called_with( 500 request, 501 self.TOKEN_URI, 502 self.REFRESH_TOKEN, 503 self.CLIENT_ID, 504 self.CLIENT_SECRET, 505 default_scopes, 506 self.RAPT_TOKEN, 507 True, 508 ) 509 510 # Check that the credentials have the token and expiry 511 assert creds.token == token 512 assert creds.expiry == expiry 513 assert creds.id_token == mock.sentinel.id_token 514 assert creds.has_scopes(default_scopes) 515 assert creds.rapt_token == new_rapt_token 516 517 # Check that the credentials are valid (have a token and are not 518 # expired.) 519 assert creds.valid 520 521 @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) 522 @mock.patch( 523 "google.auth._helpers.utcnow", 524 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 525 ) 526 def test_credentials_with_scopes_returned_refresh_success( 527 self, unused_utcnow, refresh_grant 528 ): 529 scopes = ["email", "profile"] 530 token = "token" 531 new_rapt_token = "new_rapt_token" 532 expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) 533 grant_response = { 534 "id_token": mock.sentinel.id_token, 535 "scopes": " ".join(scopes), 536 } 537 refresh_grant.return_value = ( 538 # Access token 539 token, 540 # New refresh token 541 None, 542 # Expiry, 543 expiry, 544 # Extra data 545 grant_response, 546 # rapt token 547 new_rapt_token, 548 ) 549 550 request = mock.create_autospec(transport.Request) 551 creds = credentials.Credentials( 552 token=None, 553 refresh_token=self.REFRESH_TOKEN, 554 token_uri=self.TOKEN_URI, 555 client_id=self.CLIENT_ID, 556 client_secret=self.CLIENT_SECRET, 557 scopes=scopes, 558 rapt_token=self.RAPT_TOKEN, 559 enable_reauth_refresh=True, 560 ) 561 562 # Refresh credentials 563 creds.refresh(request) 564 565 # Check jwt grant call. 566 refresh_grant.assert_called_with( 567 request, 568 self.TOKEN_URI, 569 self.REFRESH_TOKEN, 570 self.CLIENT_ID, 571 self.CLIENT_SECRET, 572 scopes, 573 self.RAPT_TOKEN, 574 True, 575 ) 576 577 # Check that the credentials have the token and expiry 578 assert creds.token == token 579 assert creds.expiry == expiry 580 assert creds.id_token == mock.sentinel.id_token 581 assert creds.has_scopes(scopes) 582 assert creds.rapt_token == new_rapt_token 583 584 # Check that the credentials are valid (have a token and are not 585 # expired.) 586 assert creds.valid 587 588 @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) 589 @mock.patch( 590 "google.auth._helpers.utcnow", 591 return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, 592 ) 593 def test_credentials_with_scopes_refresh_failure_raises_refresh_error( 594 self, unused_utcnow, refresh_grant 595 ): 596 scopes = ["email", "profile"] 597 scopes_returned = ["email"] 598 token = "token" 599 new_rapt_token = "new_rapt_token" 600 expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) 601 grant_response = { 602 "id_token": mock.sentinel.id_token, 603 "scope": " ".join(scopes_returned), 604 } 605 refresh_grant.return_value = ( 606 # Access token 607 token, 608 # New refresh token 609 None, 610 # Expiry, 611 expiry, 612 # Extra data 613 grant_response, 614 # rapt token 615 new_rapt_token, 616 ) 617 618 request = mock.create_autospec(transport.Request) 619 creds = credentials.Credentials( 620 token=None, 621 refresh_token=self.REFRESH_TOKEN, 622 token_uri=self.TOKEN_URI, 623 client_id=self.CLIENT_ID, 624 client_secret=self.CLIENT_SECRET, 625 scopes=scopes, 626 rapt_token=self.RAPT_TOKEN, 627 enable_reauth_refresh=True, 628 ) 629 630 # Refresh credentials 631 with pytest.raises( 632 exceptions.RefreshError, match="Not all requested scopes were granted" 633 ): 634 creds.refresh(request) 635 636 # Check jwt grant call. 637 refresh_grant.assert_called_with( 638 request, 639 self.TOKEN_URI, 640 self.REFRESH_TOKEN, 641 self.CLIENT_ID, 642 self.CLIENT_SECRET, 643 scopes, 644 self.RAPT_TOKEN, 645 True, 646 ) 647 648 # Check that the credentials have the token and expiry 649 assert creds.token == token 650 assert creds.expiry == expiry 651 assert creds.id_token == mock.sentinel.id_token 652 assert creds.has_scopes(scopes) 653 assert creds.rapt_token == new_rapt_token 654 655 # Check that the credentials are valid (have a token and are not 656 # expired.) 657 assert creds.valid 658 659 def test_apply_with_quota_project_id(self): 660 creds = credentials.Credentials( 661 token="token", 662 refresh_token=self.REFRESH_TOKEN, 663 token_uri=self.TOKEN_URI, 664 client_id=self.CLIENT_ID, 665 client_secret=self.CLIENT_SECRET, 666 quota_project_id="quota-project-123", 667 ) 668 669 headers = {} 670 creds.apply(headers) 671 assert headers["x-goog-user-project"] == "quota-project-123" 672 assert "token" in headers["authorization"] 673 674 def test_apply_with_no_quota_project_id(self): 675 creds = credentials.Credentials( 676 token="token", 677 refresh_token=self.REFRESH_TOKEN, 678 token_uri=self.TOKEN_URI, 679 client_id=self.CLIENT_ID, 680 client_secret=self.CLIENT_SECRET, 681 ) 682 683 headers = {} 684 creds.apply(headers) 685 assert "x-goog-user-project" not in headers 686 assert "token" in headers["authorization"] 687 688 def test_with_quota_project(self): 689 creds = credentials.Credentials( 690 token="token", 691 refresh_token=self.REFRESH_TOKEN, 692 token_uri=self.TOKEN_URI, 693 client_id=self.CLIENT_ID, 694 client_secret=self.CLIENT_SECRET, 695 quota_project_id="quota-project-123", 696 ) 697 698 new_creds = creds.with_quota_project("new-project-456") 699 assert new_creds.quota_project_id == "new-project-456" 700 headers = {} 701 creds.apply(headers) 702 assert "x-goog-user-project" in headers 703 704 def test_from_authorized_user_info(self): 705 info = AUTH_USER_INFO.copy() 706 707 creds = credentials.Credentials.from_authorized_user_info(info) 708 assert creds.client_secret == info["client_secret"] 709 assert creds.client_id == info["client_id"] 710 assert creds.refresh_token == info["refresh_token"] 711 assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT 712 assert creds.scopes is None 713 714 scopes = ["email", "profile"] 715 creds = credentials.Credentials.from_authorized_user_info(info, scopes) 716 assert creds.client_secret == info["client_secret"] 717 assert creds.client_id == info["client_id"] 718 assert creds.refresh_token == info["refresh_token"] 719 assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT 720 assert creds.scopes == scopes 721 722 info["scopes"] = "email" # single non-array scope from file 723 creds = credentials.Credentials.from_authorized_user_info(info) 724 assert creds.scopes == [info["scopes"]] 725 726 info["scopes"] = ["email", "profile"] # array scope from file 727 creds = credentials.Credentials.from_authorized_user_info(info) 728 assert creds.scopes == info["scopes"] 729 730 expiry = datetime.datetime(2020, 8, 14, 15, 54, 1) 731 info["expiry"] = expiry.isoformat() + "Z" 732 creds = credentials.Credentials.from_authorized_user_info(info) 733 assert creds.expiry == expiry 734 assert creds.expired 735 736 def test_from_authorized_user_file(self): 737 info = AUTH_USER_INFO.copy() 738 739 creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE) 740 assert creds.client_secret == info["client_secret"] 741 assert creds.client_id == info["client_id"] 742 assert creds.refresh_token == info["refresh_token"] 743 assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT 744 assert creds.scopes is None 745 assert creds.rapt_token is None 746 747 scopes = ["email", "profile"] 748 creds = credentials.Credentials.from_authorized_user_file( 749 AUTH_USER_JSON_FILE, scopes 750 ) 751 assert creds.client_secret == info["client_secret"] 752 assert creds.client_id == info["client_id"] 753 assert creds.refresh_token == info["refresh_token"] 754 assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT 755 assert creds.scopes == scopes 756 757 def test_from_authorized_user_file_with_rapt_token(self): 758 info = AUTH_USER_INFO.copy() 759 file_path = os.path.join(DATA_DIR, "authorized_user_with_rapt_token.json") 760 761 creds = credentials.Credentials.from_authorized_user_file(file_path) 762 assert creds.client_secret == info["client_secret"] 763 assert creds.client_id == info["client_id"] 764 assert creds.refresh_token == info["refresh_token"] 765 assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT 766 assert creds.scopes is None 767 assert creds.rapt_token == "rapt" 768 769 def test_to_json(self): 770 info = AUTH_USER_INFO.copy() 771 expiry = datetime.datetime(2020, 8, 14, 15, 54, 1) 772 info["expiry"] = expiry.isoformat() + "Z" 773 creds = credentials.Credentials.from_authorized_user_info(info) 774 assert creds.expiry == expiry 775 776 # Test with no `strip` arg 777 json_output = creds.to_json() 778 json_asdict = json.loads(json_output) 779 assert json_asdict.get("token") == creds.token 780 assert json_asdict.get("refresh_token") == creds.refresh_token 781 assert json_asdict.get("token_uri") == creds.token_uri 782 assert json_asdict.get("client_id") == creds.client_id 783 assert json_asdict.get("scopes") == creds.scopes 784 assert json_asdict.get("client_secret") == creds.client_secret 785 assert json_asdict.get("expiry") == info["expiry"] 786 787 # Test with a `strip` arg 788 json_output = creds.to_json(strip=["client_secret"]) 789 json_asdict = json.loads(json_output) 790 assert json_asdict.get("token") == creds.token 791 assert json_asdict.get("refresh_token") == creds.refresh_token 792 assert json_asdict.get("token_uri") == creds.token_uri 793 assert json_asdict.get("client_id") == creds.client_id 794 assert json_asdict.get("scopes") == creds.scopes 795 assert json_asdict.get("client_secret") is None 796 797 # Test with no expiry 798 creds.expiry = None 799 json_output = creds.to_json() 800 json_asdict = json.loads(json_output) 801 assert json_asdict.get("expiry") is None 802 803 def test_pickle_and_unpickle(self): 804 creds = self.make_credentials() 805 unpickled = pickle.loads(pickle.dumps(creds)) 806 807 # make sure attributes aren't lost during pickling 808 assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort() 809 810 for attr in list(creds.__dict__): 811 assert getattr(creds, attr) == getattr(unpickled, attr) 812 813 def test_pickle_and_unpickle_with_refresh_handler(self): 814 expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800) 815 refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry)) 816 817 creds = credentials.Credentials( 818 token=None, 819 refresh_token=None, 820 token_uri=None, 821 client_id=None, 822 client_secret=None, 823 rapt_token=None, 824 refresh_handler=refresh_handler, 825 ) 826 unpickled = pickle.loads(pickle.dumps(creds)) 827 828 # make sure attributes aren't lost during pickling 829 assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort() 830 831 for attr in list(creds.__dict__): 832 # For the _refresh_handler property, the unpickled creds should be 833 # set to None. 834 if attr == "_refresh_handler": 835 assert getattr(unpickled, attr) is None 836 else: 837 assert getattr(creds, attr) == getattr(unpickled, attr) 838 839 def test_pickle_with_missing_attribute(self): 840 creds = self.make_credentials() 841 842 # remove an optional attribute before pickling 843 # this mimics a pickle created with a previous class definition with 844 # fewer attributes 845 del creds.__dict__["_quota_project_id"] 846 847 unpickled = pickle.loads(pickle.dumps(creds)) 848 849 # Attribute should be initialized by `__setstate__` 850 assert unpickled.quota_project_id is None 851 852 # pickles are not compatible across versions 853 @pytest.mark.skipif( 854 sys.version_info < (3, 5), 855 reason="pickle file can only be loaded with Python >= 3.5", 856 ) 857 def test_unpickle_old_credentials_pickle(self): 858 # make sure a credentials file pickled with an older 859 # library version (google-auth==1.5.1) can be unpickled 860 with open( 861 os.path.join(DATA_DIR, "old_oauth_credentials_py3.pickle"), "rb" 862 ) as f: 863 credentials = pickle.load(f) 864 assert credentials.quota_project_id is None 865 866 867class TestUserAccessTokenCredentials(object): 868 def test_instance(self): 869 cred = credentials.UserAccessTokenCredentials() 870 assert cred._account is None 871 872 cred = cred.with_account("account") 873 assert cred._account == "account" 874 875 @mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True) 876 def test_refresh(self, get_auth_access_token): 877 get_auth_access_token.return_value = "access_token" 878 cred = credentials.UserAccessTokenCredentials() 879 cred.refresh(None) 880 assert cred.token == "access_token" 881 882 def test_with_quota_project(self): 883 cred = credentials.UserAccessTokenCredentials() 884 quota_project_cred = cred.with_quota_project("project-foo") 885 886 assert quota_project_cred._quota_project_id == "project-foo" 887 assert quota_project_cred._account == cred._account 888 889 @mock.patch( 890 "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True 891 ) 892 @mock.patch( 893 "google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True 894 ) 895 def test_before_request(self, refresh, apply): 896 cred = credentials.UserAccessTokenCredentials() 897 cred.before_request(mock.Mock(), "GET", "https://example.com", {}) 898 refresh.assert_called() 899 apply.assert_called() 900