1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18import pickle
19import sys
20
21import mock
22import pytest
23
24from google.auth import _helpers
25from google.auth import exceptions
26from google.auth import transport
27from google.oauth2 import credentials
28
29
30DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
31
32AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json")
33
34with open(AUTH_USER_JSON_FILE, "r") as fh:
35    AUTH_USER_INFO = json.load(fh)
36
37
38class TestCredentials(object):
39    TOKEN_URI = "https://example.com/oauth2/token"
40    REFRESH_TOKEN = "refresh_token"
41    RAPT_TOKEN = "rapt_token"
42    CLIENT_ID = "client_id"
43    CLIENT_SECRET = "client_secret"
44
45    @classmethod
46    def make_credentials(cls):
47        return credentials.Credentials(
48            token=None,
49            refresh_token=cls.REFRESH_TOKEN,
50            token_uri=cls.TOKEN_URI,
51            client_id=cls.CLIENT_ID,
52            client_secret=cls.CLIENT_SECRET,
53            rapt_token=cls.RAPT_TOKEN,
54            enable_reauth_refresh=True,
55        )
56
57    def test_default_state(self):
58        credentials = self.make_credentials()
59        assert not credentials.valid
60        # Expiration hasn't been set yet
61        assert not credentials.expired
62        # Scopes aren't required for these credentials
63        assert not credentials.requires_scopes
64        # Test properties
65        assert credentials.refresh_token == self.REFRESH_TOKEN
66        assert credentials.token_uri == self.TOKEN_URI
67        assert credentials.client_id == self.CLIENT_ID
68        assert credentials.client_secret == self.CLIENT_SECRET
69        assert credentials.rapt_token == self.RAPT_TOKEN
70        assert credentials.refresh_handler is None
71
72    def test_refresh_handler_setter_and_getter(self):
73        scopes = ["email", "profile"]
74        original_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_1", None))
75        updated_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_2", None))
76        creds = credentials.Credentials(
77            token=None,
78            refresh_token=None,
79            token_uri=None,
80            client_id=None,
81            client_secret=None,
82            rapt_token=None,
83            scopes=scopes,
84            default_scopes=None,
85            refresh_handler=original_refresh_handler,
86        )
87
88        assert creds.refresh_handler is original_refresh_handler
89
90        creds.refresh_handler = updated_refresh_handler
91
92        assert creds.refresh_handler is updated_refresh_handler
93
94        creds.refresh_handler = None
95
96        assert creds.refresh_handler is None
97
98    def test_invalid_refresh_handler(self):
99        scopes = ["email", "profile"]
100        with pytest.raises(TypeError) as excinfo:
101            credentials.Credentials(
102                token=None,
103                refresh_token=None,
104                token_uri=None,
105                client_id=None,
106                client_secret=None,
107                rapt_token=None,
108                scopes=scopes,
109                default_scopes=None,
110                refresh_handler=object(),
111            )
112
113        assert excinfo.match("The provided refresh_handler is not a callable or None.")
114
115    @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
116    @mock.patch(
117        "google.auth._helpers.utcnow",
118        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
119    )
120    def test_refresh_success(self, unused_utcnow, refresh_grant):
121        token = "token"
122        new_rapt_token = "new_rapt_token"
123        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
124        grant_response = {"id_token": mock.sentinel.id_token}
125        refresh_grant.return_value = (
126            # Access token
127            token,
128            # New refresh token
129            None,
130            # Expiry,
131            expiry,
132            # Extra data
133            grant_response,
134            # rapt_token
135            new_rapt_token,
136        )
137
138        request = mock.create_autospec(transport.Request)
139        credentials = self.make_credentials()
140
141        # Refresh credentials
142        credentials.refresh(request)
143
144        # Check jwt grant call.
145        refresh_grant.assert_called_with(
146            request,
147            self.TOKEN_URI,
148            self.REFRESH_TOKEN,
149            self.CLIENT_ID,
150            self.CLIENT_SECRET,
151            None,
152            self.RAPT_TOKEN,
153            True,
154        )
155
156        # Check that the credentials have the token and expiry
157        assert credentials.token == token
158        assert credentials.expiry == expiry
159        assert credentials.id_token == mock.sentinel.id_token
160        assert credentials.rapt_token == new_rapt_token
161
162        # Check that the credentials are valid (have a token and are not
163        # expired)
164        assert credentials.valid
165
166    def test_refresh_no_refresh_token(self):
167        request = mock.create_autospec(transport.Request)
168        credentials_ = credentials.Credentials(token=None, refresh_token=None)
169
170        with pytest.raises(exceptions.RefreshError, match="necessary fields"):
171            credentials_.refresh(request)
172
173        request.assert_not_called()
174
175    @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
176    @mock.patch(
177        "google.auth._helpers.utcnow",
178        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
179    )
180    def test_refresh_with_refresh_token_and_refresh_handler(
181        self, unused_utcnow, refresh_grant
182    ):
183        token = "token"
184        new_rapt_token = "new_rapt_token"
185        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
186        grant_response = {"id_token": mock.sentinel.id_token}
187        refresh_grant.return_value = (
188            # Access token
189            token,
190            # New refresh token
191            None,
192            # Expiry,
193            expiry,
194            # Extra data
195            grant_response,
196            # rapt_token
197            new_rapt_token,
198        )
199
200        refresh_handler = mock.Mock()
201        request = mock.create_autospec(transport.Request)
202        creds = credentials.Credentials(
203            token=None,
204            refresh_token=self.REFRESH_TOKEN,
205            token_uri=self.TOKEN_URI,
206            client_id=self.CLIENT_ID,
207            client_secret=self.CLIENT_SECRET,
208            rapt_token=self.RAPT_TOKEN,
209            refresh_handler=refresh_handler,
210        )
211
212        # Refresh credentials
213        creds.refresh(request)
214
215        # Check jwt grant call.
216        refresh_grant.assert_called_with(
217            request,
218            self.TOKEN_URI,
219            self.REFRESH_TOKEN,
220            self.CLIENT_ID,
221            self.CLIENT_SECRET,
222            None,
223            self.RAPT_TOKEN,
224            False,
225        )
226
227        # Check that the credentials have the token and expiry
228        assert creds.token == token
229        assert creds.expiry == expiry
230        assert creds.id_token == mock.sentinel.id_token
231        assert creds.rapt_token == new_rapt_token
232
233        # Check that the credentials are valid (have a token and are not
234        # expired)
235        assert creds.valid
236
237        # Assert refresh handler not called as the refresh token has
238        # higher priority.
239        refresh_handler.assert_not_called()
240
241    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
242    def test_refresh_with_refresh_handler_success_scopes(self, unused_utcnow):
243        expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
244        refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
245        scopes = ["email", "profile"]
246        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
247        request = mock.create_autospec(transport.Request)
248        creds = credentials.Credentials(
249            token=None,
250            refresh_token=None,
251            token_uri=None,
252            client_id=None,
253            client_secret=None,
254            rapt_token=None,
255            scopes=scopes,
256            default_scopes=default_scopes,
257            refresh_handler=refresh_handler,
258        )
259
260        creds.refresh(request)
261
262        assert creds.token == "ACCESS_TOKEN"
263        assert creds.expiry == expected_expiry
264        assert creds.valid
265        assert not creds.expired
266        # Confirm refresh handler called with the expected arguments.
267        refresh_handler.assert_called_with(request, scopes=scopes)
268
269    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
270    def test_refresh_with_refresh_handler_success_default_scopes(self, unused_utcnow):
271        expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
272        original_refresh_handler = mock.Mock(
273            return_value=("UNUSED_TOKEN", expected_expiry)
274        )
275        refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry))
276        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
277        request = mock.create_autospec(transport.Request)
278        creds = credentials.Credentials(
279            token=None,
280            refresh_token=None,
281            token_uri=None,
282            client_id=None,
283            client_secret=None,
284            rapt_token=None,
285            scopes=None,
286            default_scopes=default_scopes,
287            refresh_handler=original_refresh_handler,
288        )
289
290        # Test newly set refresh_handler is used instead of the original one.
291        creds.refresh_handler = refresh_handler
292        creds.refresh(request)
293
294        assert creds.token == "ACCESS_TOKEN"
295        assert creds.expiry == expected_expiry
296        assert creds.valid
297        assert not creds.expired
298        # default_scopes should be used since no developer provided scopes
299        # are provided.
300        refresh_handler.assert_called_with(request, scopes=default_scopes)
301
302    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
303    def test_refresh_with_refresh_handler_invalid_token(self, unused_utcnow):
304        expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800)
305        # Simulate refresh handler does not return a valid token.
306        refresh_handler = mock.Mock(return_value=(None, expected_expiry))
307        scopes = ["email", "profile"]
308        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
309        request = mock.create_autospec(transport.Request)
310        creds = credentials.Credentials(
311            token=None,
312            refresh_token=None,
313            token_uri=None,
314            client_id=None,
315            client_secret=None,
316            rapt_token=None,
317            scopes=scopes,
318            default_scopes=default_scopes,
319            refresh_handler=refresh_handler,
320        )
321
322        with pytest.raises(
323            exceptions.RefreshError, match="returned token is not a string"
324        ):
325            creds.refresh(request)
326
327        assert creds.token is None
328        assert creds.expiry is None
329        assert not creds.valid
330        # Confirm refresh handler called with the expected arguments.
331        refresh_handler.assert_called_with(request, scopes=scopes)
332
333    def test_refresh_with_refresh_handler_invalid_expiry(self):
334        # Simulate refresh handler returns expiration time in an invalid unit.
335        refresh_handler = mock.Mock(return_value=("TOKEN", 2800))
336        scopes = ["email", "profile"]
337        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
338        request = mock.create_autospec(transport.Request)
339        creds = credentials.Credentials(
340            token=None,
341            refresh_token=None,
342            token_uri=None,
343            client_id=None,
344            client_secret=None,
345            rapt_token=None,
346            scopes=scopes,
347            default_scopes=default_scopes,
348            refresh_handler=refresh_handler,
349        )
350
351        with pytest.raises(
352            exceptions.RefreshError, match="returned expiry is not a datetime object"
353        ):
354            creds.refresh(request)
355
356        assert creds.token is None
357        assert creds.expiry is None
358        assert not creds.valid
359        # Confirm refresh handler called with the expected arguments.
360        refresh_handler.assert_called_with(request, scopes=scopes)
361
362    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
363    def test_refresh_with_refresh_handler_expired_token(self, unused_utcnow):
364        expected_expiry = datetime.datetime.min + _helpers.REFRESH_THRESHOLD
365        # Simulate refresh handler returns an expired token.
366        refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
367        scopes = ["email", "profile"]
368        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
369        request = mock.create_autospec(transport.Request)
370        creds = credentials.Credentials(
371            token=None,
372            refresh_token=None,
373            token_uri=None,
374            client_id=None,
375            client_secret=None,
376            rapt_token=None,
377            scopes=scopes,
378            default_scopes=default_scopes,
379            refresh_handler=refresh_handler,
380        )
381
382        with pytest.raises(exceptions.RefreshError, match="already expired"):
383            creds.refresh(request)
384
385        assert creds.token is None
386        assert creds.expiry is None
387        assert not creds.valid
388        # Confirm refresh handler called with the expected arguments.
389        refresh_handler.assert_called_with(request, scopes=scopes)
390
391    @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
392    @mock.patch(
393        "google.auth._helpers.utcnow",
394        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
395    )
396    def test_credentials_with_scopes_requested_refresh_success(
397        self, unused_utcnow, refresh_grant
398    ):
399        scopes = ["email", "profile"]
400        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
401        token = "token"
402        new_rapt_token = "new_rapt_token"
403        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
404        grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"}
405        refresh_grant.return_value = (
406            # Access token
407            token,
408            # New refresh token
409            None,
410            # Expiry,
411            expiry,
412            # Extra data
413            grant_response,
414            # rapt token
415            new_rapt_token,
416        )
417
418        request = mock.create_autospec(transport.Request)
419        creds = credentials.Credentials(
420            token=None,
421            refresh_token=self.REFRESH_TOKEN,
422            token_uri=self.TOKEN_URI,
423            client_id=self.CLIENT_ID,
424            client_secret=self.CLIENT_SECRET,
425            scopes=scopes,
426            default_scopes=default_scopes,
427            rapt_token=self.RAPT_TOKEN,
428            enable_reauth_refresh=True,
429        )
430
431        # Refresh credentials
432        creds.refresh(request)
433
434        # Check jwt grant call.
435        refresh_grant.assert_called_with(
436            request,
437            self.TOKEN_URI,
438            self.REFRESH_TOKEN,
439            self.CLIENT_ID,
440            self.CLIENT_SECRET,
441            scopes,
442            self.RAPT_TOKEN,
443            True,
444        )
445
446        # Check that the credentials have the token and expiry
447        assert creds.token == token
448        assert creds.expiry == expiry
449        assert creds.id_token == mock.sentinel.id_token
450        assert creds.has_scopes(scopes)
451        assert creds.rapt_token == new_rapt_token
452
453        # Check that the credentials are valid (have a token and are not
454        # expired.)
455        assert creds.valid
456
457    @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
458    @mock.patch(
459        "google.auth._helpers.utcnow",
460        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
461    )
462    def test_credentials_with_only_default_scopes_requested(
463        self, unused_utcnow, refresh_grant
464    ):
465        default_scopes = ["email", "profile"]
466        token = "token"
467        new_rapt_token = "new_rapt_token"
468        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
469        grant_response = {"id_token": mock.sentinel.id_token}
470        refresh_grant.return_value = (
471            # Access token
472            token,
473            # New refresh token
474            None,
475            # Expiry,
476            expiry,
477            # Extra data
478            grant_response,
479            # rapt token
480            new_rapt_token,
481        )
482
483        request = mock.create_autospec(transport.Request)
484        creds = credentials.Credentials(
485            token=None,
486            refresh_token=self.REFRESH_TOKEN,
487            token_uri=self.TOKEN_URI,
488            client_id=self.CLIENT_ID,
489            client_secret=self.CLIENT_SECRET,
490            default_scopes=default_scopes,
491            rapt_token=self.RAPT_TOKEN,
492            enable_reauth_refresh=True,
493        )
494
495        # Refresh credentials
496        creds.refresh(request)
497
498        # Check jwt grant call.
499        refresh_grant.assert_called_with(
500            request,
501            self.TOKEN_URI,
502            self.REFRESH_TOKEN,
503            self.CLIENT_ID,
504            self.CLIENT_SECRET,
505            default_scopes,
506            self.RAPT_TOKEN,
507            True,
508        )
509
510        # Check that the credentials have the token and expiry
511        assert creds.token == token
512        assert creds.expiry == expiry
513        assert creds.id_token == mock.sentinel.id_token
514        assert creds.has_scopes(default_scopes)
515        assert creds.rapt_token == new_rapt_token
516
517        # Check that the credentials are valid (have a token and are not
518        # expired.)
519        assert creds.valid
520
521    @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
522    @mock.patch(
523        "google.auth._helpers.utcnow",
524        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
525    )
526    def test_credentials_with_scopes_returned_refresh_success(
527        self, unused_utcnow, refresh_grant
528    ):
529        scopes = ["email", "profile"]
530        token = "token"
531        new_rapt_token = "new_rapt_token"
532        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
533        grant_response = {
534            "id_token": mock.sentinel.id_token,
535            "scopes": " ".join(scopes),
536        }
537        refresh_grant.return_value = (
538            # Access token
539            token,
540            # New refresh token
541            None,
542            # Expiry,
543            expiry,
544            # Extra data
545            grant_response,
546            # rapt token
547            new_rapt_token,
548        )
549
550        request = mock.create_autospec(transport.Request)
551        creds = credentials.Credentials(
552            token=None,
553            refresh_token=self.REFRESH_TOKEN,
554            token_uri=self.TOKEN_URI,
555            client_id=self.CLIENT_ID,
556            client_secret=self.CLIENT_SECRET,
557            scopes=scopes,
558            rapt_token=self.RAPT_TOKEN,
559            enable_reauth_refresh=True,
560        )
561
562        # Refresh credentials
563        creds.refresh(request)
564
565        # Check jwt grant call.
566        refresh_grant.assert_called_with(
567            request,
568            self.TOKEN_URI,
569            self.REFRESH_TOKEN,
570            self.CLIENT_ID,
571            self.CLIENT_SECRET,
572            scopes,
573            self.RAPT_TOKEN,
574            True,
575        )
576
577        # Check that the credentials have the token and expiry
578        assert creds.token == token
579        assert creds.expiry == expiry
580        assert creds.id_token == mock.sentinel.id_token
581        assert creds.has_scopes(scopes)
582        assert creds.rapt_token == new_rapt_token
583
584        # Check that the credentials are valid (have a token and are not
585        # expired.)
586        assert creds.valid
587
588    @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
589    @mock.patch(
590        "google.auth._helpers.utcnow",
591        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
592    )
593    def test_credentials_with_scopes_refresh_failure_raises_refresh_error(
594        self, unused_utcnow, refresh_grant
595    ):
596        scopes = ["email", "profile"]
597        scopes_returned = ["email"]
598        token = "token"
599        new_rapt_token = "new_rapt_token"
600        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
601        grant_response = {
602            "id_token": mock.sentinel.id_token,
603            "scope": " ".join(scopes_returned),
604        }
605        refresh_grant.return_value = (
606            # Access token
607            token,
608            # New refresh token
609            None,
610            # Expiry,
611            expiry,
612            # Extra data
613            grant_response,
614            # rapt token
615            new_rapt_token,
616        )
617
618        request = mock.create_autospec(transport.Request)
619        creds = credentials.Credentials(
620            token=None,
621            refresh_token=self.REFRESH_TOKEN,
622            token_uri=self.TOKEN_URI,
623            client_id=self.CLIENT_ID,
624            client_secret=self.CLIENT_SECRET,
625            scopes=scopes,
626            rapt_token=self.RAPT_TOKEN,
627            enable_reauth_refresh=True,
628        )
629
630        # Refresh credentials
631        with pytest.raises(
632            exceptions.RefreshError, match="Not all requested scopes were granted"
633        ):
634            creds.refresh(request)
635
636        # Check jwt grant call.
637        refresh_grant.assert_called_with(
638            request,
639            self.TOKEN_URI,
640            self.REFRESH_TOKEN,
641            self.CLIENT_ID,
642            self.CLIENT_SECRET,
643            scopes,
644            self.RAPT_TOKEN,
645            True,
646        )
647
648        # Check that the credentials have the token and expiry
649        assert creds.token == token
650        assert creds.expiry == expiry
651        assert creds.id_token == mock.sentinel.id_token
652        assert creds.has_scopes(scopes)
653        assert creds.rapt_token == new_rapt_token
654
655        # Check that the credentials are valid (have a token and are not
656        # expired.)
657        assert creds.valid
658
659    def test_apply_with_quota_project_id(self):
660        creds = credentials.Credentials(
661            token="token",
662            refresh_token=self.REFRESH_TOKEN,
663            token_uri=self.TOKEN_URI,
664            client_id=self.CLIENT_ID,
665            client_secret=self.CLIENT_SECRET,
666            quota_project_id="quota-project-123",
667        )
668
669        headers = {}
670        creds.apply(headers)
671        assert headers["x-goog-user-project"] == "quota-project-123"
672        assert "token" in headers["authorization"]
673
674    def test_apply_with_no_quota_project_id(self):
675        creds = credentials.Credentials(
676            token="token",
677            refresh_token=self.REFRESH_TOKEN,
678            token_uri=self.TOKEN_URI,
679            client_id=self.CLIENT_ID,
680            client_secret=self.CLIENT_SECRET,
681        )
682
683        headers = {}
684        creds.apply(headers)
685        assert "x-goog-user-project" not in headers
686        assert "token" in headers["authorization"]
687
688    def test_with_quota_project(self):
689        creds = credentials.Credentials(
690            token="token",
691            refresh_token=self.REFRESH_TOKEN,
692            token_uri=self.TOKEN_URI,
693            client_id=self.CLIENT_ID,
694            client_secret=self.CLIENT_SECRET,
695            quota_project_id="quota-project-123",
696        )
697
698        new_creds = creds.with_quota_project("new-project-456")
699        assert new_creds.quota_project_id == "new-project-456"
700        headers = {}
701        creds.apply(headers)
702        assert "x-goog-user-project" in headers
703
704    def test_from_authorized_user_info(self):
705        info = AUTH_USER_INFO.copy()
706
707        creds = credentials.Credentials.from_authorized_user_info(info)
708        assert creds.client_secret == info["client_secret"]
709        assert creds.client_id == info["client_id"]
710        assert creds.refresh_token == info["refresh_token"]
711        assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
712        assert creds.scopes is None
713
714        scopes = ["email", "profile"]
715        creds = credentials.Credentials.from_authorized_user_info(info, scopes)
716        assert creds.client_secret == info["client_secret"]
717        assert creds.client_id == info["client_id"]
718        assert creds.refresh_token == info["refresh_token"]
719        assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
720        assert creds.scopes == scopes
721
722        info["scopes"] = "email"  # single non-array scope from file
723        creds = credentials.Credentials.from_authorized_user_info(info)
724        assert creds.scopes == [info["scopes"]]
725
726        info["scopes"] = ["email", "profile"]  # array scope from file
727        creds = credentials.Credentials.from_authorized_user_info(info)
728        assert creds.scopes == info["scopes"]
729
730        expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
731        info["expiry"] = expiry.isoformat() + "Z"
732        creds = credentials.Credentials.from_authorized_user_info(info)
733        assert creds.expiry == expiry
734        assert creds.expired
735
736    def test_from_authorized_user_file(self):
737        info = AUTH_USER_INFO.copy()
738
739        creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE)
740        assert creds.client_secret == info["client_secret"]
741        assert creds.client_id == info["client_id"]
742        assert creds.refresh_token == info["refresh_token"]
743        assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
744        assert creds.scopes is None
745        assert creds.rapt_token is None
746
747        scopes = ["email", "profile"]
748        creds = credentials.Credentials.from_authorized_user_file(
749            AUTH_USER_JSON_FILE, scopes
750        )
751        assert creds.client_secret == info["client_secret"]
752        assert creds.client_id == info["client_id"]
753        assert creds.refresh_token == info["refresh_token"]
754        assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
755        assert creds.scopes == scopes
756
757    def test_from_authorized_user_file_with_rapt_token(self):
758        info = AUTH_USER_INFO.copy()
759        file_path = os.path.join(DATA_DIR, "authorized_user_with_rapt_token.json")
760
761        creds = credentials.Credentials.from_authorized_user_file(file_path)
762        assert creds.client_secret == info["client_secret"]
763        assert creds.client_id == info["client_id"]
764        assert creds.refresh_token == info["refresh_token"]
765        assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
766        assert creds.scopes is None
767        assert creds.rapt_token == "rapt"
768
769    def test_to_json(self):
770        info = AUTH_USER_INFO.copy()
771        expiry = datetime.datetime(2020, 8, 14, 15, 54, 1)
772        info["expiry"] = expiry.isoformat() + "Z"
773        creds = credentials.Credentials.from_authorized_user_info(info)
774        assert creds.expiry == expiry
775
776        # Test with no `strip` arg
777        json_output = creds.to_json()
778        json_asdict = json.loads(json_output)
779        assert json_asdict.get("token") == creds.token
780        assert json_asdict.get("refresh_token") == creds.refresh_token
781        assert json_asdict.get("token_uri") == creds.token_uri
782        assert json_asdict.get("client_id") == creds.client_id
783        assert json_asdict.get("scopes") == creds.scopes
784        assert json_asdict.get("client_secret") == creds.client_secret
785        assert json_asdict.get("expiry") == info["expiry"]
786
787        # Test with a `strip` arg
788        json_output = creds.to_json(strip=["client_secret"])
789        json_asdict = json.loads(json_output)
790        assert json_asdict.get("token") == creds.token
791        assert json_asdict.get("refresh_token") == creds.refresh_token
792        assert json_asdict.get("token_uri") == creds.token_uri
793        assert json_asdict.get("client_id") == creds.client_id
794        assert json_asdict.get("scopes") == creds.scopes
795        assert json_asdict.get("client_secret") is None
796
797        # Test with no expiry
798        creds.expiry = None
799        json_output = creds.to_json()
800        json_asdict = json.loads(json_output)
801        assert json_asdict.get("expiry") is None
802
803    def test_pickle_and_unpickle(self):
804        creds = self.make_credentials()
805        unpickled = pickle.loads(pickle.dumps(creds))
806
807        # make sure attributes aren't lost during pickling
808        assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
809
810        for attr in list(creds.__dict__):
811            assert getattr(creds, attr) == getattr(unpickled, attr)
812
813    def test_pickle_and_unpickle_with_refresh_handler(self):
814        expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800)
815        refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry))
816
817        creds = credentials.Credentials(
818            token=None,
819            refresh_token=None,
820            token_uri=None,
821            client_id=None,
822            client_secret=None,
823            rapt_token=None,
824            refresh_handler=refresh_handler,
825        )
826        unpickled = pickle.loads(pickle.dumps(creds))
827
828        # make sure attributes aren't lost during pickling
829        assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort()
830
831        for attr in list(creds.__dict__):
832            # For the _refresh_handler property, the unpickled creds should be
833            # set to None.
834            if attr == "_refresh_handler":
835                assert getattr(unpickled, attr) is None
836            else:
837                assert getattr(creds, attr) == getattr(unpickled, attr)
838
839    def test_pickle_with_missing_attribute(self):
840        creds = self.make_credentials()
841
842        # remove an optional attribute before pickling
843        # this mimics a pickle created with a previous class definition with
844        # fewer attributes
845        del creds.__dict__["_quota_project_id"]
846
847        unpickled = pickle.loads(pickle.dumps(creds))
848
849        # Attribute should be initialized by `__setstate__`
850        assert unpickled.quota_project_id is None
851
852    # pickles are not compatible across versions
853    @pytest.mark.skipif(
854        sys.version_info < (3, 5),
855        reason="pickle file can only be loaded with Python >= 3.5",
856    )
857    def test_unpickle_old_credentials_pickle(self):
858        # make sure a credentials file pickled with an older
859        # library version (google-auth==1.5.1) can be unpickled
860        with open(
861            os.path.join(DATA_DIR, "old_oauth_credentials_py3.pickle"), "rb"
862        ) as f:
863            credentials = pickle.load(f)
864            assert credentials.quota_project_id is None
865
866
867class TestUserAccessTokenCredentials(object):
868    def test_instance(self):
869        cred = credentials.UserAccessTokenCredentials()
870        assert cred._account is None
871
872        cred = cred.with_account("account")
873        assert cred._account == "account"
874
875    @mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True)
876    def test_refresh(self, get_auth_access_token):
877        get_auth_access_token.return_value = "access_token"
878        cred = credentials.UserAccessTokenCredentials()
879        cred.refresh(None)
880        assert cred.token == "access_token"
881
882    def test_with_quota_project(self):
883        cred = credentials.UserAccessTokenCredentials()
884        quota_project_cred = cred.with_quota_project("project-foo")
885
886        assert quota_project_cred._quota_project_id == "project-foo"
887        assert quota_project_cred._account == cred._account
888
889    @mock.patch(
890        "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True
891    )
892    @mock.patch(
893        "google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True
894    )
895    def test_before_request(self, refresh, apply):
896        cred = credentials.UserAccessTokenCredentials()
897        cred.before_request(mock.Mock(), "GET", "https://example.com", {})
898        refresh.assert_called()
899        apply.assert_called()
900