1# Copyright 2016 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import base64
15import datetime
16
17import mock
18import pytest
19import responses
20
21from google.auth import _helpers
22from google.auth import exceptions
23from google.auth import jwt
24from google.auth import transport
25from google.auth.compute_engine import credentials
26from google.auth.transport import requests
27
28SAMPLE_ID_TOKEN_EXP = 1584393400
29
30# header: {"alg": "RS256", "typ": "JWT", "kid": "1"}
31# payload: {"iss": "issuer", "iat": 1584393348, "sub": "subject",
32#   "exp": 1584393400,"aud": "audience"}
33SAMPLE_ID_TOKEN = (
34    b"eyJhbGciOiAiUlMyNTYiLCAidHlwIjogIkpXVCIsICJraWQiOiAiMSJ9."
35    b"eyJpc3MiOiAiaXNzdWVyIiwgImlhdCI6IDE1ODQzOTMzNDgsICJzdWIiO"
36    b"iAic3ViamVjdCIsICJleHAiOiAxNTg0MzkzNDAwLCAiYXVkIjogImF1ZG"
37    b"llbmNlIn0."
38    b"OquNjHKhTmlgCk361omRo18F_uY-7y0f_AmLbzW062Q1Zr61HAwHYP5FM"
39    b"316CK4_0cH8MUNGASsvZc3VqXAqub6PUTfhemH8pFEwBdAdG0LhrNkU0H"
40    b"WN1YpT55IiQ31esLdL5q-qDsOPpNZJUti1y1lAreM5nIn2srdWzGXGs4i"
41    b"TRQsn0XkNUCL4RErpciXmjfhMrPkcAjKA-mXQm2fa4jmTlEZFqFmUlym1"
42    b"ozJ0yf5grjN6AslN4OGvAv1pS-_Ko_pGBS6IQtSBC6vVKCUuBfaqNjykg"
43    b"bsxbLa6Fp0SYeYwO8ifEnkRvasVpc1WTQqfRB2JCj5pTBDzJpIpFCMmnQ"
44)
45
46
47class TestCredentials(object):
48    credentials = None
49
50    @pytest.fixture(autouse=True)
51    def credentials_fixture(self):
52        self.credentials = credentials.Credentials()
53
54    def test_default_state(self):
55        assert not self.credentials.valid
56        # Expiration hasn't been set yet
57        assert not self.credentials.expired
58        # Scopes are needed
59        assert self.credentials.requires_scopes
60        # Service account email hasn't been populated
61        assert self.credentials.service_account_email == "default"
62        # No quota project
63        assert not self.credentials._quota_project_id
64
65    @mock.patch(
66        "google.auth._helpers.utcnow",
67        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
68    )
69    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
70    def test_refresh_success(self, get, utcnow):
71        get.side_effect = [
72            {
73                # First request is for sevice account info.
74                "email": "[email protected]",
75                "scopes": ["one", "two"],
76            },
77            {
78                # Second request is for the token.
79                "access_token": "token",
80                "expires_in": 500,
81            },
82        ]
83
84        # Refresh credentials
85        self.credentials.refresh(None)
86
87        # Check that the credentials have the token and proper expiration
88        assert self.credentials.token == "token"
89        assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
90
91        # Check the credential info
92        assert self.credentials.service_account_email == "[email protected]"
93        assert self.credentials._scopes == ["one", "two"]
94
95        # Check that the credentials are valid (have a token and are not
96        # expired)
97        assert self.credentials.valid
98
99    @mock.patch(
100        "google.auth._helpers.utcnow",
101        return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
102    )
103    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
104    def test_refresh_success_with_scopes(self, get, utcnow):
105        get.side_effect = [
106            {
107                # First request is for sevice account info.
108                "email": "[email protected]",
109                "scopes": ["one", "two"],
110            },
111            {
112                # Second request is for the token.
113                "access_token": "token",
114                "expires_in": 500,
115            },
116        ]
117
118        # Refresh credentials
119        scopes = ["three", "four"]
120        self.credentials = self.credentials.with_scopes(scopes)
121        self.credentials.refresh(None)
122
123        # Check that the credentials have the token and proper expiration
124        assert self.credentials.token == "token"
125        assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
126
127        # Check the credential info
128        assert self.credentials.service_account_email == "[email protected]"
129        assert self.credentials._scopes == scopes
130
131        # Check that the credentials are valid (have a token and are not
132        # expired)
133        assert self.credentials.valid
134
135        kwargs = get.call_args[1]
136        assert kwargs == {"params": {"scopes": "three,four"}}
137
138    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
139    def test_refresh_error(self, get):
140        get.side_effect = exceptions.TransportError("http error")
141
142        with pytest.raises(exceptions.RefreshError) as excinfo:
143            self.credentials.refresh(None)
144
145        assert excinfo.match(r"http error")
146
147    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
148    def test_before_request_refreshes(self, get):
149        get.side_effect = [
150            {
151                # First request is for sevice account info.
152                "email": "[email protected]",
153                "scopes": "one two",
154            },
155            {
156                # Second request is for the token.
157                "access_token": "token",
158                "expires_in": 500,
159            },
160        ]
161
162        # Credentials should start as invalid
163        assert not self.credentials.valid
164
165        # before_request should cause a refresh
166        request = mock.create_autospec(transport.Request, instance=True)
167        self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
168
169        # The refresh endpoint should've been called.
170        assert get.called
171
172        # Credentials should now be valid.
173        assert self.credentials.valid
174
175    def test_with_quota_project(self):
176        quota_project_creds = self.credentials.with_quota_project("project-foo")
177
178        assert quota_project_creds._quota_project_id == "project-foo"
179
180    def test_with_scopes(self):
181        assert self.credentials._scopes is None
182
183        scopes = ["one", "two"]
184        self.credentials = self.credentials.with_scopes(scopes)
185
186        assert self.credentials._scopes == scopes
187
188
189class TestIDTokenCredentials(object):
190    credentials = None
191
192    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
193    def test_default_state(self, get):
194        get.side_effect = [
195            {"email": "[email protected]", "scope": ["one", "two"]}
196        ]
197
198        request = mock.create_autospec(transport.Request, instance=True)
199        self.credentials = credentials.IDTokenCredentials(
200            request=request, target_audience="https://example.com"
201        )
202
203        assert not self.credentials.valid
204        # Expiration hasn't been set yet
205        assert not self.credentials.expired
206        # Service account email hasn't been populated
207        assert self.credentials.service_account_email == "[email protected]"
208        # Signer is initialized
209        assert self.credentials.signer
210        assert self.credentials.signer_email == "[email protected]"
211        # No quota project
212        assert not self.credentials._quota_project_id
213
214    @mock.patch(
215        "google.auth._helpers.utcnow",
216        return_value=datetime.datetime.utcfromtimestamp(0),
217    )
218    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
219    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
220    def test_make_authorization_grant_assertion(self, sign, get, utcnow):
221        get.side_effect = [
222            {"email": "[email protected]", "scopes": ["one", "two"]}
223        ]
224        sign.side_effect = [b"signature"]
225
226        request = mock.create_autospec(transport.Request, instance=True)
227        self.credentials = credentials.IDTokenCredentials(
228            request=request, target_audience="https://audience.com"
229        )
230
231        # Generate authorization grant:
232        token = self.credentials._make_authorization_grant_assertion()
233        payload = jwt.decode(token, verify=False)
234
235        # The JWT token signature is 'signature' encoded in base 64:
236        assert token.endswith(b".c2lnbmF0dXJl")
237
238        # Check that the credentials have the token and proper expiration
239        assert payload == {
240            "aud": "https://www.googleapis.com/oauth2/v4/token",
241            "exp": 3600,
242            "iat": 0,
243            "iss": "[email protected]",
244            "target_audience": "https://audience.com",
245        }
246
247    @mock.patch(
248        "google.auth._helpers.utcnow",
249        return_value=datetime.datetime.utcfromtimestamp(0),
250    )
251    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
252    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
253    def test_with_service_account(self, sign, get, utcnow):
254        sign.side_effect = [b"signature"]
255
256        request = mock.create_autospec(transport.Request, instance=True)
257        self.credentials = credentials.IDTokenCredentials(
258            request=request,
259            target_audience="https://audience.com",
260            service_account_email="[email protected]",
261        )
262
263        # Generate authorization grant:
264        token = self.credentials._make_authorization_grant_assertion()
265        payload = jwt.decode(token, verify=False)
266
267        # The JWT token signature is 'signature' encoded in base 64:
268        assert token.endswith(b".c2lnbmF0dXJl")
269
270        # Check that the credentials have the token and proper expiration
271        assert payload == {
272            "aud": "https://www.googleapis.com/oauth2/v4/token",
273            "exp": 3600,
274            "iat": 0,
275            "iss": "[email protected]",
276            "target_audience": "https://audience.com",
277        }
278
279    @mock.patch(
280        "google.auth._helpers.utcnow",
281        return_value=datetime.datetime.utcfromtimestamp(0),
282    )
283    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
284    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
285    def test_additional_claims(self, sign, get, utcnow):
286        get.side_effect = [
287            {"email": "[email protected]", "scopes": ["one", "two"]}
288        ]
289        sign.side_effect = [b"signature"]
290
291        request = mock.create_autospec(transport.Request, instance=True)
292        self.credentials = credentials.IDTokenCredentials(
293            request=request,
294            target_audience="https://audience.com",
295            additional_claims={"foo": "bar"},
296        )
297
298        # Generate authorization grant:
299        token = self.credentials._make_authorization_grant_assertion()
300        payload = jwt.decode(token, verify=False)
301
302        # The JWT token signature is 'signature' encoded in base 64:
303        assert token.endswith(b".c2lnbmF0dXJl")
304
305        # Check that the credentials have the token and proper expiration
306        assert payload == {
307            "aud": "https://www.googleapis.com/oauth2/v4/token",
308            "exp": 3600,
309            "iat": 0,
310            "iss": "[email protected]",
311            "target_audience": "https://audience.com",
312            "foo": "bar",
313        }
314
315    def test_token_uri(self):
316        request = mock.create_autospec(transport.Request, instance=True)
317
318        self.credentials = credentials.IDTokenCredentials(
319            request=request,
320            signer=mock.Mock(),
321            service_account_email="[email protected]",
322            target_audience="https://audience.com",
323        )
324        assert self.credentials._token_uri == credentials._DEFAULT_TOKEN_URI
325
326        self.credentials = credentials.IDTokenCredentials(
327            request=request,
328            signer=mock.Mock(),
329            service_account_email="[email protected]",
330            target_audience="https://audience.com",
331            token_uri="https://example.com/token",
332        )
333        assert self.credentials._token_uri == "https://example.com/token"
334
335    @mock.patch(
336        "google.auth._helpers.utcnow",
337        return_value=datetime.datetime.utcfromtimestamp(0),
338    )
339    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
340    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
341    def test_with_target_audience(self, sign, get, utcnow):
342        get.side_effect = [
343            {"email": "[email protected]", "scopes": ["one", "two"]}
344        ]
345        sign.side_effect = [b"signature"]
346
347        request = mock.create_autospec(transport.Request, instance=True)
348        self.credentials = credentials.IDTokenCredentials(
349            request=request, target_audience="https://audience.com"
350        )
351        self.credentials = self.credentials.with_target_audience("https://actually.not")
352
353        # Generate authorization grant:
354        token = self.credentials._make_authorization_grant_assertion()
355        payload = jwt.decode(token, verify=False)
356
357        # The JWT token signature is 'signature' encoded in base 64:
358        assert token.endswith(b".c2lnbmF0dXJl")
359
360        # Check that the credentials have the token and proper expiration
361        assert payload == {
362            "aud": "https://www.googleapis.com/oauth2/v4/token",
363            "exp": 3600,
364            "iat": 0,
365            "iss": "[email protected]",
366            "target_audience": "https://actually.not",
367        }
368
369        # Check that the signer have been initialized with a Request object
370        assert isinstance(self.credentials._signer._request, transport.Request)
371
372    @responses.activate
373    def test_with_target_audience_integration(self):
374        """ Test that it is possible to refresh credentials
375        generated from `with_target_audience`.
376
377        Instead of mocking the methods, the HTTP responses
378        have been mocked.
379        """
380
381        # mock information about credentials
382        responses.add(
383            responses.GET,
384            "http://metadata.google.internal/computeMetadata/v1/instance/"
385            "service-accounts/default/?recursive=true",
386            status=200,
387            content_type="application/json",
388            json={
389                "scopes": "email",
390                "email": "[email protected]",
391                "aliases": ["default"],
392            },
393        )
394
395        # mock token for credentials
396        responses.add(
397            responses.GET,
398            "http://metadata.google.internal/computeMetadata/v1/instance/"
399            "service-accounts/service-account@example.com/token",
400            status=200,
401            content_type="application/json",
402            json={
403                "access_token": "some-token",
404                "expires_in": 3210,
405                "token_type": "Bearer",
406            },
407        )
408
409        # mock sign blob endpoint
410        signature = base64.b64encode(b"some-signature").decode("utf-8")
411        responses.add(
412            responses.POST,
413            "https://iamcredentials.googleapis.com/v1/projects/-/"
414            "serviceAccounts/service-account@example.com:signBlob?alt=json",
415            status=200,
416            content_type="application/json",
417            json={"keyId": "some-key-id", "signedBlob": signature},
418        )
419
420        id_token = "{}.{}.{}".format(
421            base64.b64encode(b'{"some":"some"}').decode("utf-8"),
422            base64.b64encode(b'{"exp": 3210}').decode("utf-8"),
423            base64.b64encode(b"token").decode("utf-8"),
424        )
425
426        # mock id token endpoint
427        responses.add(
428            responses.POST,
429            "https://www.googleapis.com/oauth2/v4/token",
430            status=200,
431            content_type="application/json",
432            json={"id_token": id_token, "expiry": 3210},
433        )
434
435        self.credentials = credentials.IDTokenCredentials(
436            request=requests.Request(),
437            service_account_email="[email protected]",
438            target_audience="https://audience.com",
439        )
440
441        self.credentials = self.credentials.with_target_audience("https://actually.not")
442
443        self.credentials.refresh(requests.Request())
444
445        assert self.credentials.token is not None
446
447    @mock.patch(
448        "google.auth._helpers.utcnow",
449        return_value=datetime.datetime.utcfromtimestamp(0),
450    )
451    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
452    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
453    def test_with_quota_project(self, sign, get, utcnow):
454        get.side_effect = [
455            {"email": "[email protected]", "scopes": ["one", "two"]}
456        ]
457        sign.side_effect = [b"signature"]
458
459        request = mock.create_autospec(transport.Request, instance=True)
460        self.credentials = credentials.IDTokenCredentials(
461            request=request, target_audience="https://audience.com"
462        )
463        self.credentials = self.credentials.with_quota_project("project-foo")
464
465        assert self.credentials._quota_project_id == "project-foo"
466
467        # Generate authorization grant:
468        token = self.credentials._make_authorization_grant_assertion()
469        payload = jwt.decode(token, verify=False)
470
471        # The JWT token signature is 'signature' encoded in base 64:
472        assert token.endswith(b".c2lnbmF0dXJl")
473
474        # Check that the credentials have the token and proper expiration
475        assert payload == {
476            "aud": "https://www.googleapis.com/oauth2/v4/token",
477            "exp": 3600,
478            "iat": 0,
479            "iss": "[email protected]",
480            "target_audience": "https://audience.com",
481        }
482
483        # Check that the signer have been initialized with a Request object
484        assert isinstance(self.credentials._signer._request, transport.Request)
485
486    @responses.activate
487    def test_with_quota_project_integration(self):
488        """ Test that it is possible to refresh credentials
489        generated from `with_quota_project`.
490
491        Instead of mocking the methods, the HTTP responses
492        have been mocked.
493        """
494
495        # mock information about credentials
496        responses.add(
497            responses.GET,
498            "http://metadata.google.internal/computeMetadata/v1/instance/"
499            "service-accounts/default/?recursive=true",
500            status=200,
501            content_type="application/json",
502            json={
503                "scopes": "email",
504                "email": "[email protected]",
505                "aliases": ["default"],
506            },
507        )
508
509        # mock token for credentials
510        responses.add(
511            responses.GET,
512            "http://metadata.google.internal/computeMetadata/v1/instance/"
513            "service-accounts/service-account@example.com/token",
514            status=200,
515            content_type="application/json",
516            json={
517                "access_token": "some-token",
518                "expires_in": 3210,
519                "token_type": "Bearer",
520            },
521        )
522
523        # mock sign blob endpoint
524        signature = base64.b64encode(b"some-signature").decode("utf-8")
525        responses.add(
526            responses.POST,
527            "https://iamcredentials.googleapis.com/v1/projects/-/"
528            "serviceAccounts/service-account@example.com:signBlob?alt=json",
529            status=200,
530            content_type="application/json",
531            json={"keyId": "some-key-id", "signedBlob": signature},
532        )
533
534        id_token = "{}.{}.{}".format(
535            base64.b64encode(b'{"some":"some"}').decode("utf-8"),
536            base64.b64encode(b'{"exp": 3210}').decode("utf-8"),
537            base64.b64encode(b"token").decode("utf-8"),
538        )
539
540        # mock id token endpoint
541        responses.add(
542            responses.POST,
543            "https://www.googleapis.com/oauth2/v4/token",
544            status=200,
545            content_type="application/json",
546            json={"id_token": id_token, "expiry": 3210},
547        )
548
549        self.credentials = credentials.IDTokenCredentials(
550            request=requests.Request(),
551            service_account_email="[email protected]",
552            target_audience="https://audience.com",
553        )
554
555        self.credentials = self.credentials.with_quota_project("project-foo")
556
557        self.credentials.refresh(requests.Request())
558
559        assert self.credentials.token is not None
560        assert self.credentials._quota_project_id == "project-foo"
561
562    @mock.patch(
563        "google.auth._helpers.utcnow",
564        return_value=datetime.datetime.utcfromtimestamp(0),
565    )
566    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
567    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
568    @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
569    def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow):
570        get.side_effect = [
571            {"email": "[email protected]", "scopes": ["one", "two"]}
572        ]
573        sign.side_effect = [b"signature"]
574        id_token_jwt_grant.side_effect = [
575            ("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
576        ]
577
578        request = mock.create_autospec(transport.Request, instance=True)
579        self.credentials = credentials.IDTokenCredentials(
580            request=request, target_audience="https://audience.com"
581        )
582
583        # Refresh credentials
584        self.credentials.refresh(None)
585
586        # Check that the credentials have the token and proper expiration
587        assert self.credentials.token == "idtoken"
588        assert self.credentials.expiry == (datetime.datetime.utcfromtimestamp(3600))
589
590        # Check the credential info
591        assert self.credentials.service_account_email == "[email protected]"
592
593        # Check that the credentials are valid (have a token and are not
594        # expired)
595        assert self.credentials.valid
596
597    @mock.patch(
598        "google.auth._helpers.utcnow",
599        return_value=datetime.datetime.utcfromtimestamp(0),
600    )
601    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
602    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
603    def test_refresh_error(self, sign, get, utcnow):
604        get.side_effect = [
605            {"email": "[email protected]", "scopes": ["one", "two"]}
606        ]
607        sign.side_effect = [b"signature"]
608
609        request = mock.create_autospec(transport.Request, instance=True)
610        response = mock.Mock()
611        response.data = b'{"error": "http error"}'
612        response.status = 500
613        request.side_effect = [response]
614
615        self.credentials = credentials.IDTokenCredentials(
616            request=request, target_audience="https://audience.com"
617        )
618
619        with pytest.raises(exceptions.RefreshError) as excinfo:
620            self.credentials.refresh(request)
621
622        assert excinfo.match(r"http error")
623
624    @mock.patch(
625        "google.auth._helpers.utcnow",
626        return_value=datetime.datetime.utcfromtimestamp(0),
627    )
628    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
629    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
630    @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
631    def test_before_request_refreshes(self, id_token_jwt_grant, sign, get, utcnow):
632        get.side_effect = [
633            {"email": "[email protected]", "scopes": "one two"}
634        ]
635        sign.side_effect = [b"signature"]
636        id_token_jwt_grant.side_effect = [
637            ("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
638        ]
639
640        request = mock.create_autospec(transport.Request, instance=True)
641        self.credentials = credentials.IDTokenCredentials(
642            request=request, target_audience="https://audience.com"
643        )
644
645        # Credentials should start as invalid
646        assert not self.credentials.valid
647
648        # before_request should cause a refresh
649        request = mock.create_autospec(transport.Request, instance=True)
650        self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
651
652        # The refresh endpoint should've been called.
653        assert get.called
654
655        # Credentials should now be valid.
656        assert self.credentials.valid
657
658    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
659    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
660    def test_sign_bytes(self, sign, get):
661        get.side_effect = [
662            {"email": "[email protected]", "scopes": ["one", "two"]}
663        ]
664        sign.side_effect = [b"signature"]
665
666        request = mock.create_autospec(transport.Request, instance=True)
667        response = mock.Mock()
668        response.data = b'{"signature": "c2lnbmF0dXJl"}'
669        response.status = 200
670        request.side_effect = [response]
671
672        self.credentials = credentials.IDTokenCredentials(
673            request=request, target_audience="https://audience.com"
674        )
675
676        # Generate authorization grant:
677        signature = self.credentials.sign_bytes(b"some bytes")
678
679        # The JWT token signature is 'signature' encoded in base 64:
680        assert signature == b"signature"
681
682    @mock.patch(
683        "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
684    )
685    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
686    def test_get_id_token_from_metadata(self, get, get_service_account_info):
687        get.return_value = SAMPLE_ID_TOKEN
688        get_service_account_info.return_value = {"email": "[email protected]"}
689
690        cred = credentials.IDTokenCredentials(
691            mock.Mock(), "audience", use_metadata_identity_endpoint=True
692        )
693        cred.refresh(request=mock.Mock())
694
695        assert cred.token == SAMPLE_ID_TOKEN
696        assert cred.expiry == datetime.datetime.fromtimestamp(SAMPLE_ID_TOKEN_EXP)
697        assert cred._use_metadata_identity_endpoint
698        assert cred._signer is None
699        assert cred._token_uri is None
700        assert cred._service_account_email == "[email protected]"
701        assert cred._target_audience == "audience"
702        with pytest.raises(ValueError):
703            cred.sign_bytes(b"bytes")
704
705    @mock.patch(
706        "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
707    )
708    def test_with_target_audience_for_metadata(self, get_service_account_info):
709        get_service_account_info.return_value = {"email": "[email protected]"}
710
711        cred = credentials.IDTokenCredentials(
712            mock.Mock(), "audience", use_metadata_identity_endpoint=True
713        )
714        cred = cred.with_target_audience("new_audience")
715
716        assert cred._target_audience == "new_audience"
717        assert cred._use_metadata_identity_endpoint
718        assert cred._signer is None
719        assert cred._token_uri is None
720        assert cred._service_account_email == "[email protected]"
721
722    @mock.patch(
723        "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
724    )
725    def test_id_token_with_quota_project(self, get_service_account_info):
726        get_service_account_info.return_value = {"email": "[email protected]"}
727
728        cred = credentials.IDTokenCredentials(
729            mock.Mock(), "audience", use_metadata_identity_endpoint=True
730        )
731        cred = cred.with_quota_project("project-foo")
732
733        assert cred._quota_project_id == "project-foo"
734        assert cred._use_metadata_identity_endpoint
735        assert cred._signer is None
736        assert cred._token_uri is None
737        assert cred._service_account_email == "[email protected]"
738
739    @mock.patch(
740        "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
741    )
742    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
743    def test_invalid_id_token_from_metadata(self, get, get_service_account_info):
744        get.return_value = "invalid_id_token"
745        get_service_account_info.return_value = {"email": "[email protected]"}
746
747        cred = credentials.IDTokenCredentials(
748            mock.Mock(), "audience", use_metadata_identity_endpoint=True
749        )
750
751        with pytest.raises(ValueError):
752            cred.refresh(request=mock.Mock())
753
754    @mock.patch(
755        "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
756    )
757    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
758    def test_transport_error_from_metadata(self, get, get_service_account_info):
759        get.side_effect = exceptions.TransportError("transport error")
760        get_service_account_info.return_value = {"email": "[email protected]"}
761
762        cred = credentials.IDTokenCredentials(
763            mock.Mock(), "audience", use_metadata_identity_endpoint=True
764        )
765
766        with pytest.raises(exceptions.RefreshError) as excinfo:
767            cred.refresh(request=mock.Mock())
768        assert excinfo.match(r"transport error")
769
770    def test_get_id_token_from_metadata_constructor(self):
771        with pytest.raises(ValueError):
772            credentials.IDTokenCredentials(
773                mock.Mock(),
774                "audience",
775                use_metadata_identity_endpoint=True,
776                token_uri="token_uri",
777            )
778        with pytest.raises(ValueError):
779            credentials.IDTokenCredentials(
780                mock.Mock(),
781                "audience",
782                use_metadata_identity_endpoint=True,
783                signer=mock.Mock(),
784            )
785        with pytest.raises(ValueError):
786            credentials.IDTokenCredentials(
787                mock.Mock(),
788                "audience",
789                use_metadata_identity_endpoint=True,
790                additional_claims={"key", "value"},
791            )
792        with pytest.raises(ValueError):
793            credentials.IDTokenCredentials(
794                mock.Mock(),
795                "audience",
796                use_metadata_identity_endpoint=True,
797                service_account_email="[email protected]",
798            )
799