1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import json
16
17import mock
18import pytest
19from six.moves import http_client
20from six.moves import urllib
21
22from google.auth import exceptions
23from google.auth import transport
24from google.oauth2 import sts
25from google.oauth2 import utils
26
27CLIENT_ID = "username"
28CLIENT_SECRET = "password"
29# Base64 encoding of "username:password"
30BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
31
32
33class TestStsClient(object):
34    GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
35    RESOURCE = "https://api.example.com/"
36    AUDIENCE = "urn:example:cooperation-context"
37    SCOPES = ["scope1", "scope2"]
38    REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
39    SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE"
40    SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
41    ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE"
42    ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
43    TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2"
44    ADDON_HEADERS = {"x-client-version": "0.1.2"}
45    ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}}
46    SUCCESS_RESPONSE = {
47        "access_token": "ACCESS_TOKEN",
48        "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
49        "token_type": "Bearer",
50        "expires_in": 3600,
51        "scope": "scope1 scope2",
52    }
53    ERROR_RESPONSE = {
54        "error": "invalid_request",
55        "error_description": "Invalid subject token",
56        "error_uri": "https://tools.ietf.org/html/rfc6749",
57    }
58    CLIENT_AUTH_BASIC = utils.ClientAuthentication(
59        utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
60    )
61    CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
62        utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
63    )
64
65    @classmethod
66    def make_client(cls, client_auth=None):
67        return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth)
68
69    @classmethod
70    def make_mock_request(cls, data, status=http_client.OK):
71        response = mock.create_autospec(transport.Response, instance=True)
72        response.status = status
73        response.data = json.dumps(data).encode("utf-8")
74
75        request = mock.create_autospec(transport.Request)
76        request.return_value = response
77
78        return request
79
80    @classmethod
81    def assert_request_kwargs(cls, request_kwargs, headers, request_data):
82        """Asserts the request was called with the expected parameters.
83        """
84        assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT
85        assert request_kwargs["method"] == "POST"
86        assert request_kwargs["headers"] == headers
87        assert request_kwargs["body"] is not None
88        body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
89        for (k, v) in body_tuples:
90            assert v.decode("utf-8") == request_data[k.decode("utf-8")]
91        assert len(body_tuples) == len(request_data.keys())
92
93    def test_exchange_token_full_success_without_auth(self):
94        """Test token exchange success without client authentication using full
95        parameters.
96        """
97        client = self.make_client()
98        headers = self.ADDON_HEADERS.copy()
99        headers["Content-Type"] = "application/x-www-form-urlencoded"
100        request_data = {
101            "grant_type": self.GRANT_TYPE,
102            "resource": self.RESOURCE,
103            "audience": self.AUDIENCE,
104            "scope": " ".join(self.SCOPES),
105            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
106            "subject_token": self.SUBJECT_TOKEN,
107            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
108            "actor_token": self.ACTOR_TOKEN,
109            "actor_token_type": self.ACTOR_TOKEN_TYPE,
110            "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
111        }
112        request = self.make_mock_request(
113            status=http_client.OK, data=self.SUCCESS_RESPONSE
114        )
115
116        response = client.exchange_token(
117            request,
118            self.GRANT_TYPE,
119            self.SUBJECT_TOKEN,
120            self.SUBJECT_TOKEN_TYPE,
121            self.RESOURCE,
122            self.AUDIENCE,
123            self.SCOPES,
124            self.REQUESTED_TOKEN_TYPE,
125            self.ACTOR_TOKEN,
126            self.ACTOR_TOKEN_TYPE,
127            self.ADDON_OPTIONS,
128            self.ADDON_HEADERS,
129        )
130
131        self.assert_request_kwargs(request.call_args[1], headers, request_data)
132        assert response == self.SUCCESS_RESPONSE
133
134    def test_exchange_token_partial_success_without_auth(self):
135        """Test token exchange success without client authentication using
136        partial (required only) parameters.
137        """
138        client = self.make_client()
139        headers = {"Content-Type": "application/x-www-form-urlencoded"}
140        request_data = {
141            "grant_type": self.GRANT_TYPE,
142            "audience": self.AUDIENCE,
143            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
144            "subject_token": self.SUBJECT_TOKEN,
145            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
146        }
147        request = self.make_mock_request(
148            status=http_client.OK, data=self.SUCCESS_RESPONSE
149        )
150
151        response = client.exchange_token(
152            request,
153            grant_type=self.GRANT_TYPE,
154            subject_token=self.SUBJECT_TOKEN,
155            subject_token_type=self.SUBJECT_TOKEN_TYPE,
156            audience=self.AUDIENCE,
157            requested_token_type=self.REQUESTED_TOKEN_TYPE,
158        )
159
160        self.assert_request_kwargs(request.call_args[1], headers, request_data)
161        assert response == self.SUCCESS_RESPONSE
162
163    def test_exchange_token_non200_without_auth(self):
164        """Test token exchange without client auth responding with non-200 status.
165        """
166        client = self.make_client()
167        request = self.make_mock_request(
168            status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
169        )
170
171        with pytest.raises(exceptions.OAuthError) as excinfo:
172            client.exchange_token(
173                request,
174                self.GRANT_TYPE,
175                self.SUBJECT_TOKEN,
176                self.SUBJECT_TOKEN_TYPE,
177                self.RESOURCE,
178                self.AUDIENCE,
179                self.SCOPES,
180                self.REQUESTED_TOKEN_TYPE,
181                self.ACTOR_TOKEN,
182                self.ACTOR_TOKEN_TYPE,
183                self.ADDON_OPTIONS,
184                self.ADDON_HEADERS,
185            )
186
187        assert excinfo.match(
188            r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
189        )
190
191    def test_exchange_token_full_success_with_basic_auth(self):
192        """Test token exchange success with basic client authentication using full
193        parameters.
194        """
195        client = self.make_client(self.CLIENT_AUTH_BASIC)
196        headers = self.ADDON_HEADERS.copy()
197        headers["Content-Type"] = "application/x-www-form-urlencoded"
198        headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING)
199        request_data = {
200            "grant_type": self.GRANT_TYPE,
201            "resource": self.RESOURCE,
202            "audience": self.AUDIENCE,
203            "scope": " ".join(self.SCOPES),
204            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
205            "subject_token": self.SUBJECT_TOKEN,
206            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
207            "actor_token": self.ACTOR_TOKEN,
208            "actor_token_type": self.ACTOR_TOKEN_TYPE,
209            "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
210        }
211        request = self.make_mock_request(
212            status=http_client.OK, data=self.SUCCESS_RESPONSE
213        )
214
215        response = client.exchange_token(
216            request,
217            self.GRANT_TYPE,
218            self.SUBJECT_TOKEN,
219            self.SUBJECT_TOKEN_TYPE,
220            self.RESOURCE,
221            self.AUDIENCE,
222            self.SCOPES,
223            self.REQUESTED_TOKEN_TYPE,
224            self.ACTOR_TOKEN,
225            self.ACTOR_TOKEN_TYPE,
226            self.ADDON_OPTIONS,
227            self.ADDON_HEADERS,
228        )
229
230        self.assert_request_kwargs(request.call_args[1], headers, request_data)
231        assert response == self.SUCCESS_RESPONSE
232
233    def test_exchange_token_partial_success_with_basic_auth(self):
234        """Test token exchange success with basic client authentication using
235        partial (required only) parameters.
236        """
237        client = self.make_client(self.CLIENT_AUTH_BASIC)
238        headers = {
239            "Content-Type": "application/x-www-form-urlencoded",
240            "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
241        }
242        request_data = {
243            "grant_type": self.GRANT_TYPE,
244            "audience": self.AUDIENCE,
245            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
246            "subject_token": self.SUBJECT_TOKEN,
247            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
248        }
249        request = self.make_mock_request(
250            status=http_client.OK, data=self.SUCCESS_RESPONSE
251        )
252
253        response = client.exchange_token(
254            request,
255            grant_type=self.GRANT_TYPE,
256            subject_token=self.SUBJECT_TOKEN,
257            subject_token_type=self.SUBJECT_TOKEN_TYPE,
258            audience=self.AUDIENCE,
259            requested_token_type=self.REQUESTED_TOKEN_TYPE,
260        )
261
262        self.assert_request_kwargs(request.call_args[1], headers, request_data)
263        assert response == self.SUCCESS_RESPONSE
264
265    def test_exchange_token_non200_with_basic_auth(self):
266        """Test token exchange with basic client auth responding with non-200
267        status.
268        """
269        client = self.make_client(self.CLIENT_AUTH_BASIC)
270        request = self.make_mock_request(
271            status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
272        )
273
274        with pytest.raises(exceptions.OAuthError) as excinfo:
275            client.exchange_token(
276                request,
277                self.GRANT_TYPE,
278                self.SUBJECT_TOKEN,
279                self.SUBJECT_TOKEN_TYPE,
280                self.RESOURCE,
281                self.AUDIENCE,
282                self.SCOPES,
283                self.REQUESTED_TOKEN_TYPE,
284                self.ACTOR_TOKEN,
285                self.ACTOR_TOKEN_TYPE,
286                self.ADDON_OPTIONS,
287                self.ADDON_HEADERS,
288            )
289
290        assert excinfo.match(
291            r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
292        )
293
294    def test_exchange_token_full_success_with_reqbody_auth(self):
295        """Test token exchange success with request body client authenticaiton
296        using full parameters.
297        """
298        client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
299        headers = self.ADDON_HEADERS.copy()
300        headers["Content-Type"] = "application/x-www-form-urlencoded"
301        request_data = {
302            "grant_type": self.GRANT_TYPE,
303            "resource": self.RESOURCE,
304            "audience": self.AUDIENCE,
305            "scope": " ".join(self.SCOPES),
306            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
307            "subject_token": self.SUBJECT_TOKEN,
308            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
309            "actor_token": self.ACTOR_TOKEN,
310            "actor_token_type": self.ACTOR_TOKEN_TYPE,
311            "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
312            "client_id": CLIENT_ID,
313            "client_secret": CLIENT_SECRET,
314        }
315        request = self.make_mock_request(
316            status=http_client.OK, data=self.SUCCESS_RESPONSE
317        )
318
319        response = client.exchange_token(
320            request,
321            self.GRANT_TYPE,
322            self.SUBJECT_TOKEN,
323            self.SUBJECT_TOKEN_TYPE,
324            self.RESOURCE,
325            self.AUDIENCE,
326            self.SCOPES,
327            self.REQUESTED_TOKEN_TYPE,
328            self.ACTOR_TOKEN,
329            self.ACTOR_TOKEN_TYPE,
330            self.ADDON_OPTIONS,
331            self.ADDON_HEADERS,
332        )
333
334        self.assert_request_kwargs(request.call_args[1], headers, request_data)
335        assert response == self.SUCCESS_RESPONSE
336
337    def test_exchange_token_partial_success_with_reqbody_auth(self):
338        """Test token exchange success with request body client authentication
339        using partial (required only) parameters.
340        """
341        client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
342        headers = {"Content-Type": "application/x-www-form-urlencoded"}
343        request_data = {
344            "grant_type": self.GRANT_TYPE,
345            "audience": self.AUDIENCE,
346            "requested_token_type": self.REQUESTED_TOKEN_TYPE,
347            "subject_token": self.SUBJECT_TOKEN,
348            "subject_token_type": self.SUBJECT_TOKEN_TYPE,
349            "client_id": CLIENT_ID,
350            "client_secret": CLIENT_SECRET,
351        }
352        request = self.make_mock_request(
353            status=http_client.OK, data=self.SUCCESS_RESPONSE
354        )
355
356        response = client.exchange_token(
357            request,
358            grant_type=self.GRANT_TYPE,
359            subject_token=self.SUBJECT_TOKEN,
360            subject_token_type=self.SUBJECT_TOKEN_TYPE,
361            audience=self.AUDIENCE,
362            requested_token_type=self.REQUESTED_TOKEN_TYPE,
363        )
364
365        self.assert_request_kwargs(request.call_args[1], headers, request_data)
366        assert response == self.SUCCESS_RESPONSE
367
368    def test_exchange_token_non200_with_reqbody_auth(self):
369        """Test token exchange with POST request body client auth responding
370        with non-200 status.
371        """
372        client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
373        request = self.make_mock_request(
374            status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
375        )
376
377        with pytest.raises(exceptions.OAuthError) as excinfo:
378            client.exchange_token(
379                request,
380                self.GRANT_TYPE,
381                self.SUBJECT_TOKEN,
382                self.SUBJECT_TOKEN_TYPE,
383                self.RESOURCE,
384                self.AUDIENCE,
385                self.SCOPES,
386                self.REQUESTED_TOKEN_TYPE,
387                self.ACTOR_TOKEN,
388                self.ACTOR_TOKEN_TYPE,
389                self.ADDON_OPTIONS,
390                self.ADDON_HEADERS,
391            )
392
393        assert excinfo.match(
394            r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
395        )
396