1# -*- coding: utf-8 -*-
2#
3# Copyright 2015 Google Inc.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Tests for transfer.py."""
18import string
19import unittest
20
21import httplib2
22import mock
23import six
24from six.moves import http_client
25
26from apitools.base.py import base_api
27from apitools.base.py import exceptions
28from apitools.base.py import gzip
29from apitools.base.py import http_wrapper
30from apitools.base.py import transfer
31
32
33class TransferTest(unittest.TestCase):
34
35    def assertRangeAndContentRangeCompatible(self, request, response):
36        request_prefix = 'bytes='
37        self.assertIn('range', request.headers)
38        self.assertTrue(request.headers['range'].startswith(request_prefix))
39        request_range = request.headers['range'][len(request_prefix):]
40
41        response_prefix = 'bytes '
42        self.assertIn('content-range', response.info)
43        response_header = response.info['content-range']
44        self.assertTrue(response_header.startswith(response_prefix))
45        response_range = (
46            response_header[len(response_prefix):].partition('/')[0])
47
48        msg = ('Request range ({0}) not a prefix of '
49               'response_range ({1})').format(
50                   request_range, response_range)
51        self.assertTrue(response_range.startswith(request_range), msg=msg)
52
53    def testComputeEndByte(self):
54        total_size = 100
55        chunksize = 10
56        download = transfer.Download.FromStream(
57            six.StringIO(), chunksize=chunksize, total_size=total_size)
58        self.assertEqual(chunksize - 1,
59                         download._Download__ComputeEndByte(0, end=50))
60
61    def testComputeEndByteReturnNone(self):
62        download = transfer.Download.FromStream(six.StringIO())
63        self.assertIsNone(
64            download._Download__ComputeEndByte(0, use_chunks=False))
65
66    def testComputeEndByteNoChunks(self):
67        total_size = 100
68        download = transfer.Download.FromStream(
69            six.StringIO(), chunksize=10, total_size=total_size)
70        for end in (None, 1000):
71            self.assertEqual(
72                total_size - 1,
73                download._Download__ComputeEndByte(0, end=end,
74                                                   use_chunks=False),
75                msg='Failed on end={0}'.format(end))
76
77    def testComputeEndByteNoTotal(self):
78        download = transfer.Download.FromStream(six.StringIO())
79        default_chunksize = download.chunksize
80        for chunksize in (100, default_chunksize):
81            download.chunksize = chunksize
82            for start in (0, 10):
83                self.assertEqual(
84                    download.chunksize + start - 1,
85                    download._Download__ComputeEndByte(start),
86                    msg='Failed on start={0}, chunksize={1}'.format(
87                        start, chunksize))
88
89    def testComputeEndByteSmallTotal(self):
90        total_size = 100
91        download = transfer.Download.FromStream(six.StringIO(),
92                                                total_size=total_size)
93        for start in (0, 10):
94            self.assertEqual(total_size - 1,
95                             download._Download__ComputeEndByte(start),
96                             msg='Failed on start={0}'.format(start))
97
98    def testDownloadThenStream(self):
99        bytes_http = object()
100        http = object()
101        download_stream = six.StringIO()
102        download = transfer.Download.FromStream(download_stream,
103                                                total_size=26)
104        download.bytes_http = bytes_http
105        base_url = 'https://part.one/'
106        with mock.patch.object(http_wrapper, 'MakeRequest',
107                               autospec=True) as make_request:
108            make_request.return_value = http_wrapper.Response(
109                info={
110                    'content-range': 'bytes 0-25/26',
111                    'status': http_client.OK,
112                },
113                content=string.ascii_lowercase,
114                request_url=base_url,
115            )
116            request = http_wrapper.Request(url='https://part.one/')
117            download.InitializeDownload(request, http=http)
118            self.assertEqual(1, make_request.call_count)
119            received_request = make_request.call_args[0][1]
120            self.assertEqual(base_url, received_request.url)
121            self.assertRangeAndContentRangeCompatible(
122                received_request, make_request.return_value)
123
124        with mock.patch.object(http_wrapper, 'MakeRequest',
125                               autospec=True) as make_request:
126            make_request.return_value = http_wrapper.Response(
127                info={
128                    'status': http_client.REQUESTED_RANGE_NOT_SATISFIABLE,
129                },
130                content='error',
131                request_url=base_url,
132            )
133            download.StreamInChunks()
134            self.assertEqual(1, make_request.call_count)
135            received_request = make_request.call_args[0][1]
136            self.assertEqual('bytes=26-', received_request.headers['range'])
137
138    def testGetRange(self):
139        for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]:
140            bytes_http = object()
141            http = object()
142            download_stream = six.StringIO()
143            download = transfer.Download.FromStream(download_stream,
144                                                    total_size=26,
145                                                    auto_transfer=False)
146            download.bytes_http = bytes_http
147            base_url = 'https://part.one/'
148            with mock.patch.object(http_wrapper, 'MakeRequest',
149                                   autospec=True) as make_request:
150                make_request.return_value = http_wrapper.Response(
151                    info={
152                        'content-range': 'bytes %d-%d/26' %
153                                         (start_byte, end_byte),
154                        'status': http_client.OK,
155                    },
156                    content=string.ascii_lowercase[start_byte:end_byte + 1],
157                    request_url=base_url,
158                )
159                request = http_wrapper.Request(url='https://part.one/')
160                download.InitializeDownload(request, http=http)
161                download.GetRange(start_byte, end_byte)
162                self.assertEqual(1, make_request.call_count)
163                received_request = make_request.call_args[0][1]
164                self.assertEqual(base_url, received_request.url)
165                self.assertRangeAndContentRangeCompatible(
166                    received_request, make_request.return_value)
167
168    def testNonChunkedDownload(self):
169        bytes_http = object()
170        http = object()
171        download_stream = six.StringIO()
172        download = transfer.Download.FromStream(download_stream, total_size=52)
173        download.bytes_http = bytes_http
174        base_url = 'https://part.one/'
175
176        with mock.patch.object(http_wrapper, 'MakeRequest',
177                               autospec=True) as make_request:
178            make_request.return_value = http_wrapper.Response(
179                info={
180                    'content-range': 'bytes 0-51/52',
181                    'status': http_client.OK,
182                },
183                content=string.ascii_lowercase * 2,
184                request_url=base_url,
185            )
186            request = http_wrapper.Request(url='https://part.one/')
187            download.InitializeDownload(request, http=http)
188            self.assertEqual(1, make_request.call_count)
189            received_request = make_request.call_args[0][1]
190            self.assertEqual(base_url, received_request.url)
191            self.assertRangeAndContentRangeCompatible(
192                received_request, make_request.return_value)
193            download_stream.seek(0)
194            self.assertEqual(string.ascii_lowercase * 2,
195                             download_stream.getvalue())
196
197    def testChunkedDownload(self):
198        bytes_http = object()
199        http = object()
200        download_stream = six.StringIO()
201        download = transfer.Download.FromStream(
202            download_stream, chunksize=26, total_size=52)
203        download.bytes_http = bytes_http
204
205        # Setting autospec on a mock with an iterable side_effect is
206        # currently broken (http://bugs.python.org/issue17826), so
207        # instead we write a little function.
208        def _ReturnBytes(unused_http, http_request,
209                         *unused_args, **unused_kwds):
210            url = http_request.url
211            if url == 'https://part.one/':
212                return http_wrapper.Response(
213                    info={
214                        'content-location': 'https://part.two/',
215                        'content-range': 'bytes 0-25/52',
216                        'status': http_client.PARTIAL_CONTENT,
217                    },
218                    content=string.ascii_lowercase,
219                    request_url='https://part.one/',
220                )
221            elif url == 'https://part.two/':
222                return http_wrapper.Response(
223                    info={
224                        'content-range': 'bytes 26-51/52',
225                        'status': http_client.OK,
226                    },
227                    content=string.ascii_uppercase,
228                    request_url='https://part.two/',
229                )
230            else:
231                self.fail('Unknown URL requested: %s' % url)
232
233        with mock.patch.object(http_wrapper, 'MakeRequest',
234                               autospec=True) as make_request:
235            make_request.side_effect = _ReturnBytes
236            request = http_wrapper.Request(url='https://part.one/')
237            download.InitializeDownload(request, http=http)
238            self.assertEqual(2, make_request.call_count)
239            for call in make_request.call_args_list:
240                self.assertRangeAndContentRangeCompatible(
241                    call[0][1], _ReturnBytes(*call[0]))
242            download_stream.seek(0)
243            self.assertEqual(string.ascii_lowercase + string.ascii_uppercase,
244                             download_stream.getvalue())
245
246    def testMultipartEncoding(self):
247        # This is really a table test for various issues we've seen in
248        # the past; see notes below for particular histories.
249
250        test_cases = [
251            # Python's mime module by default encodes lines that start
252            # with "From " as ">From ", which we need to make sure we
253            # don't run afoul of when sending content that isn't
254            # intended to be so encoded. This test calls out that we
255            # get this right. We test for both the multipart and
256            # non-multipart case.
257            'line one\nFrom \nline two',
258
259            # We had originally used a `six.StringIO` to hold the http
260            # request body in the case of a multipart upload; for
261            # bytes being uploaded in Python3, however, this causes
262            # issues like this:
263            # https://github.com/GoogleCloudPlatform/gcloud-python/issues/1760
264            # We test below to ensure that we don't end up mangling
265            # the body before sending.
266            u'name,main_ingredient\nRäksmörgås,Räkor\nBaguette,Bröd',
267        ]
268
269        for upload_contents in test_cases:
270            multipart_body = '{"body_field_one": 7}'
271            upload_bytes = upload_contents.encode('ascii', 'backslashreplace')
272            upload_config = base_api.ApiUploadInfo(
273                accept=['*/*'],
274                max_size=None,
275                resumable_multipart=True,
276                resumable_path=u'/resumable/upload',
277                simple_multipart=True,
278                simple_path=u'/upload',
279            )
280            url_builder = base_api._UrlBuilder('http://www.uploads.com')
281
282            # Test multipart: having a body argument in http_request forces
283            # multipart here.
284            upload = transfer.Upload.FromStream(
285                six.BytesIO(upload_bytes),
286                'text/plain',
287                total_size=len(upload_bytes))
288            http_request = http_wrapper.Request(
289                'http://www.uploads.com',
290                headers={'content-type': 'text/plain'},
291                body=multipart_body)
292            upload.ConfigureRequest(upload_config, http_request, url_builder)
293            self.assertEqual(
294                'multipart', url_builder.query_params['uploadType'])
295            rewritten_upload_contents = b'\n'.join(
296                http_request.body.split(b'--')[2].splitlines()[1:])
297            self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
298
299            # Test non-multipart (aka media): no body argument means this is
300            # sent as media.
301            upload = transfer.Upload.FromStream(
302                six.BytesIO(upload_bytes),
303                'text/plain',
304                total_size=len(upload_bytes))
305            http_request = http_wrapper.Request(
306                'http://www.uploads.com',
307                headers={'content-type': 'text/plain'})
308            upload.ConfigureRequest(upload_config, http_request, url_builder)
309            self.assertEqual(url_builder.query_params['uploadType'], 'media')
310            rewritten_upload_contents = http_request.body
311            self.assertTrue(rewritten_upload_contents.endswith(upload_bytes))
312
313
314class UploadTest(unittest.TestCase):
315
316    def setUp(self):
317        # Sample highly compressible data.
318        self.sample_data = b'abc' * 200
319        # Stream of the sample data.
320        self.sample_stream = six.BytesIO(self.sample_data)
321        # Sample url_builder.
322        self.url_builder = base_api._UrlBuilder('http://www.uploads.com')
323        # Sample request.
324        self.request = http_wrapper.Request(
325            'http://www.uploads.com',
326            headers={'content-type': 'text/plain'})
327        # Sample successful response.
328        self.response = http_wrapper.Response(
329            info={'status': http_client.OK,
330                  'location': 'http://www.uploads.com'},
331            content='',
332            request_url='http://www.uploads.com',)
333        # Sample failure response.
334        self.fail_response = http_wrapper.Response(
335            info={'status': http_client.SERVICE_UNAVAILABLE,
336                  'location': 'http://www.uploads.com'},
337            content='',
338            request_url='http://www.uploads.com',)
339
340    def testStreamInChunksCompressed(self):
341        """Test that StreamInChunks will handle compression correctly."""
342        # Create and configure the upload object.
343        upload = transfer.Upload(
344            stream=self.sample_stream,
345            mime_type='text/plain',
346            total_size=len(self.sample_data),
347            close_stream=False,
348            gzip_encoded=True)
349        upload.strategy = transfer.RESUMABLE_UPLOAD
350        # Set the chunk size so the entire stream is uploaded.
351        upload.chunksize = len(self.sample_data)
352        # Mock the upload to return the sample response.
353        with mock.patch.object(transfer.Upload,
354                               '_Upload__SendMediaRequest') as mock_result, \
355                mock.patch.object(http_wrapper,
356                                  'MakeRequest') as make_request:
357            mock_result.return_value = self.response
358            make_request.return_value = self.response
359
360            # Initialization.
361            upload.InitializeUpload(self.request, 'http')
362            upload.StreamInChunks()
363            # Get the uploaded request and end position of the stream.
364            (request, _), _ = mock_result.call_args_list[0]
365            # Ensure the mock was called.
366            self.assertTrue(mock_result.called)
367            # Ensure the correct content encoding was set.
368            self.assertEqual(request.headers['Content-Encoding'], 'gzip')
369            # Ensure the stream was compresed.
370            self.assertLess(len(request.body), len(self.sample_data))
371
372    def testStreamMediaCompressedFail(self):
373        """Test that non-chunked uploads raise an exception.
374
375        Ensure uploads with the compressed and resumable flags set called from
376        StreamMedia raise an exception. Those uploads are unsupported.
377        """
378        # Create the upload object.
379        upload = transfer.Upload(
380            stream=self.sample_stream,
381            mime_type='text/plain',
382            total_size=len(self.sample_data),
383            close_stream=False,
384            auto_transfer=True,
385            gzip_encoded=True)
386        upload.strategy = transfer.RESUMABLE_UPLOAD
387        # Mock the upload to return the sample response.
388        with mock.patch.object(http_wrapper,
389                               'MakeRequest') as make_request:
390            make_request.return_value = self.response
391
392            # Initialization.
393            upload.InitializeUpload(self.request, 'http')
394            # Ensure stream media raises an exception when the upload is
395            # compressed. Compression is not supported on non-chunked uploads.
396            with self.assertRaises(exceptions.InvalidUserInputError):
397                upload.StreamMedia()
398
399    def testAutoTransferCompressed(self):
400        """Test that automatic transfers are compressed.
401
402        Ensure uploads with the compressed, resumable, and automatic transfer
403        flags set call StreamInChunks. StreamInChunks is tested in an earlier
404        test.
405        """
406        # Create the upload object.
407        upload = transfer.Upload(
408            stream=self.sample_stream,
409            mime_type='text/plain',
410            total_size=len(self.sample_data),
411            close_stream=False,
412            gzip_encoded=True)
413        upload.strategy = transfer.RESUMABLE_UPLOAD
414        # Mock the upload to return the sample response.
415        with mock.patch.object(transfer.Upload,
416                               'StreamInChunks') as mock_result, \
417                mock.patch.object(http_wrapper,
418                                  'MakeRequest') as make_request:
419            mock_result.return_value = self.response
420            make_request.return_value = self.response
421
422            # Initialization.
423            upload.InitializeUpload(self.request, 'http')
424            # Ensure the mock was called.
425            self.assertTrue(mock_result.called)
426
427    def testMultipartCompressed(self):
428        """Test that multipart uploads are compressed."""
429        # Create the multipart configuration.
430        upload_config = base_api.ApiUploadInfo(
431            accept=['*/*'],
432            max_size=None,
433            simple_multipart=True,
434            simple_path=u'/upload',)
435        # Create the upload object.
436        upload = transfer.Upload(
437            stream=self.sample_stream,
438            mime_type='text/plain',
439            total_size=len(self.sample_data),
440            close_stream=False,
441            gzip_encoded=True)
442        # Set a body to trigger multipart configuration.
443        self.request.body = '{"body_field_one": 7}'
444        # Configure the request.
445        upload.ConfigureRequest(upload_config, self.request, self.url_builder)
446        # Ensure the request is a multipart request now.
447        self.assertEqual(
448            self.url_builder.query_params['uploadType'], 'multipart')
449        # Ensure the request is gzip encoded.
450        self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
451        # Ensure data is compressed
452        self.assertLess(len(self.request.body), len(self.sample_data))
453        # Ensure uncompressed data includes the sample data.
454        with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f:
455            original = f.read()
456            self.assertTrue(self.sample_data in original)
457
458    def testMediaCompressed(self):
459        """Test that media uploads are compressed."""
460        # Create the media configuration.
461        upload_config = base_api.ApiUploadInfo(
462            accept=['*/*'],
463            max_size=None,
464            simple_multipart=True,
465            simple_path=u'/upload',)
466        # Create the upload object.
467        upload = transfer.Upload(
468            stream=self.sample_stream,
469            mime_type='text/plain',
470            total_size=len(self.sample_data),
471            close_stream=False,
472            gzip_encoded=True)
473        # Configure the request.
474        upload.ConfigureRequest(upload_config, self.request, self.url_builder)
475        # Ensure the request is a media request now.
476        self.assertEqual(self.url_builder.query_params['uploadType'], 'media')
477        # Ensure the request is gzip encoded.
478        self.assertEqual(self.request.headers['Content-Encoding'], 'gzip')
479        # Ensure data is compressed
480        self.assertLess(len(self.request.body), len(self.sample_data))
481        # Ensure uncompressed data includes the sample data.
482        with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f:
483            original = f.read()
484            self.assertTrue(self.sample_data in original)
485
486    def HttpRequestSideEffect(self, responses=None):
487        responses = [(response.info, response.content)
488                     for response in responses]
489
490        def _side_effect(uri, **kwargs):  # pylint: disable=unused-argument
491            body = kwargs['body']
492            read_func = getattr(body, 'read', None)
493            if read_func:
494                # If the body is a stream, consume the stream.
495                body = read_func()
496            self.assertEqual(int(kwargs['headers']['content-length']),
497                             len(body))
498            return responses.pop(0)
499        return _side_effect
500
501    def testRetryRequestChunks(self):
502        """Test that StreamInChunks will retry correctly."""
503        refresh_response = http_wrapper.Response(
504            info={'status': http_wrapper.RESUME_INCOMPLETE,
505                  'location': 'http://www.uploads.com'},
506            content='',
507            request_url='http://www.uploads.com',)
508
509        # Create and configure the upload object.
510        bytes_http = httplib2.Http()
511        upload = transfer.Upload(
512            stream=self.sample_stream,
513            mime_type='text/plain',
514            total_size=len(self.sample_data),
515            close_stream=False,
516            http=bytes_http)
517
518        upload.strategy = transfer.RESUMABLE_UPLOAD
519        # Set the chunk size so the entire stream is uploaded.
520        upload.chunksize = len(self.sample_data)
521        # Mock the upload to return the sample response.
522        with mock.patch.object(bytes_http,
523                               'request') as make_request:
524            # This side effect also checks the request body.
525            responses = [
526                self.response,  # Initial request in InitializeUpload().
527                self.fail_response,  # 503 status code from server.
528                refresh_response,  # Refresh upload progress.
529                self.response,  # Successful request.
530            ]
531            make_request.side_effect = self.HttpRequestSideEffect(responses)
532
533            # Initialization.
534            upload.InitializeUpload(self.request, bytes_http)
535            upload.StreamInChunks()
536
537            # Ensure the mock was called the correct number of times.
538            self.assertEquals(make_request.call_count, len(responses))
539
540    def testStreamInChunks(self):
541        """Test StreamInChunks."""
542        resume_incomplete_responses = [http_wrapper.Response(
543            info={'status': http_wrapper.RESUME_INCOMPLETE,
544                  'location': 'http://www.uploads.com',
545                  'range': '0-{}'.format(end)},
546            content='',
547            request_url='http://www.uploads.com',) for end in [199, 399, 599]]
548        responses = [
549            self.response  # Initial request in InitializeUpload().
550        ] + resume_incomplete_responses + [
551            self.response,  # Successful request.
552        ]
553        # Create and configure the upload object.
554        bytes_http = httplib2.Http()
555        upload = transfer.Upload(
556            stream=self.sample_stream,
557            mime_type='text/plain',
558            total_size=len(self.sample_data),
559            close_stream=False,
560            http=bytes_http)
561
562        upload.strategy = transfer.RESUMABLE_UPLOAD
563        # Set the chunk size so the entire stream is uploaded.
564        upload.chunksize = 200
565        # Mock the upload to return the sample response.
566        with mock.patch.object(bytes_http,
567                               'request') as make_request:
568            # This side effect also checks the request body.
569            make_request.side_effect = self.HttpRequestSideEffect(responses)
570
571            # Initialization.
572            upload.InitializeUpload(self.request, bytes_http)
573            upload.StreamInChunks()
574
575            # Ensure the mock was called the correct number of times.
576            self.assertEquals(make_request.call_count, len(responses))
577