1# Copyright 2014 Google Inc. All rights reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unit tests for JWT related methods in oauth2client.""" 16 17import os 18import tempfile 19import time 20 21import mock 22import unittest2 23 24from oauth2client import _helpers 25from oauth2client import client 26from oauth2client import crypt 27from oauth2client import file 28from oauth2client import service_account 29from .http_mock import HttpMockSequence 30 31 32__author__ = '[email protected] (Joe Gregorio)' 33 34 35_FORMATS_TO_CONSTRUCTOR_ARGS = { 36 'p12': 'private_key_pkcs12', 37 'pem': 'private_key_pkcs8_pem', 38} 39 40 41def data_filename(filename): 42 return os.path.join(os.path.dirname(__file__), 'data', filename) 43 44 45def datafile(filename): 46 with open(data_filename(filename), 'rb') as file_obj: 47 return file_obj.read() 48 49 50class CryptTests(unittest2.TestCase): 51 52 def setUp(self): 53 self.format_ = 'p12' 54 self.signer = crypt.OpenSSLSigner 55 self.verifier = crypt.OpenSSLVerifier 56 57 def test_sign_and_verify(self): 58 self._check_sign_and_verify('privatekey.' + self.format_) 59 60 def test_sign_and_verify_from_converted_pkcs12(self): 61 # Tests that following instructions to convert from PKCS12 to 62 # PEM works. 63 if self.format_ == 'pem': 64 self._check_sign_and_verify('pem_from_pkcs12.pem') 65 66 def _check_sign_and_verify(self, private_key_file): 67 private_key = datafile(private_key_file) 68 public_key = datafile('public_cert.pem') 69 70 # We pass in a non-bytes password to make sure all branches 71 # are traversed in tests. 72 signer = self.signer.from_string(private_key, 73 password=u'notasecret') 74 signature = signer.sign('foo') 75 76 verifier = self.verifier.from_string(public_key, True) 77 self.assertTrue(verifier.verify(b'foo', signature)) 78 79 self.assertFalse(verifier.verify(b'bar', signature)) 80 self.assertFalse(verifier.verify(b'foo', b'bad signagure')) 81 self.assertFalse(verifier.verify(b'foo', u'bad signagure')) 82 83 def _check_jwt_failure(self, jwt, expected_error): 84 public_key = datafile('public_cert.pem') 85 certs = {'foo': public_key} 86 audience = ('https://www.googleapis.com/auth/id?client_id=' 87 '[email protected]') 88 89 with self.assertRaises(crypt.AppIdentityError) as exc_manager: 90 crypt.verify_signed_jwt_with_certs(jwt, certs, audience) 91 92 self.assertTrue(expected_error in str(exc_manager.exception)) 93 94 def _create_signed_jwt(self): 95 private_key = datafile('privatekey.' + self.format_) 96 signer = self.signer.from_string(private_key) 97 audience = '[email protected]' 98 now = int(time.time()) 99 100 return crypt.make_signed_jwt(signer, { 101 'aud': audience, 102 'iat': now, 103 'exp': now + 300, 104 'user': 'billy bob', 105 'metadata': {'meta': 'data'}, 106 }) 107 108 def test_verify_id_token(self): 109 jwt = self._create_signed_jwt() 110 public_key = datafile('public_cert.pem') 111 certs = {'foo': public_key} 112 audience = '[email protected]' 113 contents = crypt.verify_signed_jwt_with_certs(jwt, certs, audience) 114 self.assertEqual('billy bob', contents['user']) 115 self.assertEqual('data', contents['metadata']['meta']) 116 117 def test_verify_id_token_with_certs_uri(self): 118 jwt = self._create_signed_jwt() 119 120 http = HttpMockSequence([ 121 ({'status': '200'}, datafile('certs.json')), 122 ]) 123 124 contents = client.verify_id_token( 125 jwt, '[email protected]', 126 http=http) 127 self.assertEqual('billy bob', contents['user']) 128 self.assertEqual('data', contents['metadata']['meta']) 129 130 def test_verify_id_token_with_certs_uri_default_http(self): 131 jwt = self._create_signed_jwt() 132 133 http = HttpMockSequence([ 134 ({'status': '200'}, datafile('certs.json')), 135 ]) 136 137 with mock.patch('oauth2client.transport._CACHED_HTTP', new=http): 138 contents = client.verify_id_token( 139 jwt, '[email protected]') 140 141 self.assertEqual('billy bob', contents['user']) 142 self.assertEqual('data', contents['metadata']['meta']) 143 144 def test_verify_id_token_with_certs_uri_fails(self): 145 jwt = self._create_signed_jwt() 146 test_email = '[email protected]' 147 148 http = HttpMockSequence([ 149 ({'status': '404'}, datafile('certs.json')), 150 ]) 151 152 with self.assertRaises(client.VerifyJwtTokenError): 153 client.verify_id_token(jwt, test_email, http=http) 154 155 def test_verify_id_token_bad_tokens(self): 156 private_key = datafile('privatekey.' + self.format_) 157 158 # Wrong number of segments 159 self._check_jwt_failure('foo', 'Wrong number of segments') 160 161 # Not json 162 self._check_jwt_failure('foo.bar.baz', 'Can\'t parse token') 163 164 # Bad signature 165 jwt = b'.'.join([b'foo', 166 _helpers._urlsafe_b64encode('{"a":"b"}'), 167 b'baz']) 168 self._check_jwt_failure(jwt, 'Invalid token signature') 169 170 # No expiration 171 signer = self.signer.from_string(private_key) 172 audience = ('https:#www.googleapis.com/auth/id?client_id=' 173 '[email protected]') 174 jwt = crypt.make_signed_jwt(signer, { 175 'aud': audience, 176 'iat': time.time(), 177 }) 178 self._check_jwt_failure(jwt, 'No exp field in token') 179 180 # No issued at 181 jwt = crypt.make_signed_jwt(signer, { 182 'aud': 'audience', 183 'exp': time.time() + 400, 184 }) 185 self._check_jwt_failure(jwt, 'No iat field in token') 186 187 # Too early 188 jwt = crypt.make_signed_jwt(signer, { 189 'aud': 'audience', 190 'iat': time.time() + 301, 191 'exp': time.time() + 400, 192 }) 193 self._check_jwt_failure(jwt, 'Token used too early') 194 195 # Too late 196 jwt = crypt.make_signed_jwt(signer, { 197 'aud': 'audience', 198 'iat': time.time() - 500, 199 'exp': time.time() - 301, 200 }) 201 self._check_jwt_failure(jwt, 'Token used too late') 202 203 # Wrong target 204 jwt = crypt.make_signed_jwt(signer, { 205 'aud': 'somebody else', 206 'iat': time.time(), 207 'exp': time.time() + 300, 208 }) 209 self._check_jwt_failure(jwt, 'Wrong recipient') 210 211 def test_from_string_non_509_cert(self): 212 # Use a private key instead of a certificate to test the other branch 213 # of from_string(). 214 public_key = datafile('privatekey.pem') 215 verifier = self.verifier.from_string(public_key, is_x509_cert=False) 216 self.assertIsInstance(verifier, self.verifier) 217 218 219class PEMCryptTestsPyCrypto(CryptTests): 220 221 def setUp(self): 222 self.format_ = 'pem' 223 self.signer = crypt.PyCryptoSigner 224 self.verifier = crypt.PyCryptoVerifier 225 226 227class PEMCryptTestsOpenSSL(CryptTests): 228 229 def setUp(self): 230 self.format_ = 'pem' 231 self.signer = crypt.OpenSSLSigner 232 self.verifier = crypt.OpenSSLVerifier 233 234 235class SignedJwtAssertionCredentialsTests(unittest2.TestCase): 236 237 def setUp(self): 238 self.format_ = 'p12' 239 crypt.Signer = crypt.OpenSSLSigner 240 241 def _make_credentials(self): 242 private_key = datafile('privatekey.' + self.format_) 243 signer = crypt.Signer.from_string(private_key) 244 credentials = service_account.ServiceAccountCredentials( 245 '[email protected]', signer, 246 scopes='read+write', 247 sub='[email protected]') 248 if self.format_ == 'pem': 249 credentials._private_key_pkcs8_pem = private_key 250 elif self.format_ == 'p12': 251 credentials._private_key_pkcs12 = private_key 252 credentials._private_key_password = ( 253 service_account._PASSWORD_DEFAULT) 254 else: # pragma: NO COVER 255 raise ValueError('Unexpected format.') 256 return credentials 257 258 def test_credentials_good(self): 259 credentials = self._make_credentials() 260 http = HttpMockSequence([ 261 ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), 262 ({'status': '200'}, 'echo_request_headers'), 263 ]) 264 http = credentials.authorize(http) 265 resp, content = http.request('http://example.org') 266 self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) 267 268 def test_credentials_to_from_json(self): 269 credentials = self._make_credentials() 270 json = credentials.to_json() 271 restored = client.Credentials.new_from_json(json) 272 self.assertEqual(credentials._private_key_pkcs12, 273 restored._private_key_pkcs12) 274 self.assertEqual(credentials._private_key_password, 275 restored._private_key_password) 276 self.assertEqual(credentials._kwargs, restored._kwargs) 277 278 def _credentials_refresh(self, credentials): 279 http = HttpMockSequence([ 280 ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), 281 ({'status': '401'}, b''), 282 ({'status': '200'}, b'{"access_token":"3/3w","expires_in":3600}'), 283 ({'status': '200'}, 'echo_request_headers'), 284 ]) 285 http = credentials.authorize(http) 286 _, content = http.request('http://example.org') 287 return content 288 289 def test_credentials_refresh_without_storage(self): 290 credentials = self._make_credentials() 291 content = self._credentials_refresh(credentials) 292 self.assertEqual(b'Bearer 3/3w', content[b'Authorization']) 293 294 def test_credentials_refresh_with_storage(self): 295 credentials = self._make_credentials() 296 297 filehandle, filename = tempfile.mkstemp() 298 os.close(filehandle) 299 store = file.Storage(filename) 300 store.put(credentials) 301 credentials.set_store(store) 302 303 content = self._credentials_refresh(credentials) 304 305 self.assertEqual(b'Bearer 3/3w', content[b'Authorization']) 306 os.unlink(filename) 307 308 309class PEMSignedJwtAssertionCredentialsOpenSSLTests( 310 SignedJwtAssertionCredentialsTests): 311 312 def setUp(self): 313 self.format_ = 'pem' 314 crypt.Signer = crypt.OpenSSLSigner 315 316 317class PEMSignedJwtAssertionCredentialsPyCryptoTests( 318 SignedJwtAssertionCredentialsTests): 319 320 def setUp(self): 321 self.format_ = 'pem' 322 crypt.Signer = crypt.PyCryptoSigner 323 324 325class TestHasOpenSSLFlag(unittest2.TestCase): 326 327 def test_true(self): 328 self.assertEqual(True, client.HAS_OPENSSL) 329 self.assertEqual(True, client.HAS_CRYPTO) 330