1# Copyright 2017 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import base64 16import datetime 17import json 18 19import mock 20import pytest 21from six.moves import http_client 22 23from google.auth import _helpers 24from google.auth import exceptions 25from google.auth import iam 26from google.auth import transport 27import google.auth.credentials 28 29 30def make_request(status, data=None): 31 response = mock.create_autospec(transport.Response, instance=True) 32 response.status = status 33 34 if data is not None: 35 response.data = json.dumps(data).encode("utf-8") 36 37 request = mock.create_autospec(transport.Request) 38 request.return_value = response 39 return request 40 41 42def make_credentials(): 43 class CredentialsImpl(google.auth.credentials.Credentials): 44 def __init__(self): 45 super(CredentialsImpl, self).__init__() 46 self.token = "token" 47 # Force refresh 48 self.expiry = datetime.datetime.min + _helpers.REFRESH_THRESHOLD 49 50 def refresh(self, request): 51 pass 52 53 def with_quota_project(self, quota_project_id): 54 raise NotImplementedError() 55 56 return CredentialsImpl() 57 58 59class TestSigner(object): 60 def test_constructor(self): 61 request = mock.sentinel.request 62 credentials = mock.create_autospec( 63 google.auth.credentials.Credentials, instance=True 64 ) 65 66 signer = iam.Signer(request, credentials, mock.sentinel.service_account_email) 67 68 assert signer._request == mock.sentinel.request 69 assert signer._credentials == credentials 70 assert signer._service_account_email == mock.sentinel.service_account_email 71 72 def test_key_id(self): 73 signer = iam.Signer( 74 mock.sentinel.request, 75 mock.sentinel.credentials, 76 mock.sentinel.service_account_email, 77 ) 78 79 assert signer.key_id is None 80 81 def test_sign_bytes(self): 82 signature = b"DEADBEEF" 83 encoded_signature = base64.b64encode(signature).decode("utf-8") 84 request = make_request(http_client.OK, data={"signedBlob": encoded_signature}) 85 credentials = make_credentials() 86 87 signer = iam.Signer(request, credentials, mock.sentinel.service_account_email) 88 89 returned_signature = signer.sign("123") 90 91 assert returned_signature == signature 92 kwargs = request.call_args[1] 93 assert kwargs["headers"]["Content-Type"] == "application/json" 94 95 def test_sign_bytes_failure(self): 96 request = make_request(http_client.UNAUTHORIZED) 97 credentials = make_credentials() 98 99 signer = iam.Signer(request, credentials, mock.sentinel.service_account_email) 100 101 with pytest.raises(exceptions.TransportError): 102 signer.sign("123") 103