1# -*- coding: utf-8 -*- 2# 3# Copyright 2015 Google Inc. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Tests for transfer.py.""" 18import string 19import unittest 20 21import httplib2 22import mock 23import six 24from six.moves import http_client 25 26from apitools.base.py import base_api 27from apitools.base.py import exceptions 28from apitools.base.py import gzip 29from apitools.base.py import http_wrapper 30from apitools.base.py import transfer 31 32 33class TransferTest(unittest.TestCase): 34 35 def assertRangeAndContentRangeCompatible(self, request, response): 36 request_prefix = 'bytes=' 37 self.assertIn('range', request.headers) 38 self.assertTrue(request.headers['range'].startswith(request_prefix)) 39 request_range = request.headers['range'][len(request_prefix):] 40 41 response_prefix = 'bytes ' 42 self.assertIn('content-range', response.info) 43 response_header = response.info['content-range'] 44 self.assertTrue(response_header.startswith(response_prefix)) 45 response_range = ( 46 response_header[len(response_prefix):].partition('/')[0]) 47 48 msg = ('Request range ({0}) not a prefix of ' 49 'response_range ({1})').format( 50 request_range, response_range) 51 self.assertTrue(response_range.startswith(request_range), msg=msg) 52 53 def testComputeEndByte(self): 54 total_size = 100 55 chunksize = 10 56 download = transfer.Download.FromStream( 57 six.StringIO(), chunksize=chunksize, total_size=total_size) 58 self.assertEqual(chunksize - 1, 59 download._Download__ComputeEndByte(0, end=50)) 60 61 def testComputeEndByteReturnNone(self): 62 download = transfer.Download.FromStream(six.StringIO()) 63 self.assertIsNone( 64 download._Download__ComputeEndByte(0, use_chunks=False)) 65 66 def testComputeEndByteNoChunks(self): 67 total_size = 100 68 download = transfer.Download.FromStream( 69 six.StringIO(), chunksize=10, total_size=total_size) 70 for end in (None, 1000): 71 self.assertEqual( 72 total_size - 1, 73 download._Download__ComputeEndByte(0, end=end, 74 use_chunks=False), 75 msg='Failed on end={0}'.format(end)) 76 77 def testComputeEndByteNoTotal(self): 78 download = transfer.Download.FromStream(six.StringIO()) 79 default_chunksize = download.chunksize 80 for chunksize in (100, default_chunksize): 81 download.chunksize = chunksize 82 for start in (0, 10): 83 self.assertEqual( 84 download.chunksize + start - 1, 85 download._Download__ComputeEndByte(start), 86 msg='Failed on start={0}, chunksize={1}'.format( 87 start, chunksize)) 88 89 def testComputeEndByteSmallTotal(self): 90 total_size = 100 91 download = transfer.Download.FromStream(six.StringIO(), 92 total_size=total_size) 93 for start in (0, 10): 94 self.assertEqual(total_size - 1, 95 download._Download__ComputeEndByte(start), 96 msg='Failed on start={0}'.format(start)) 97 98 def testDownloadThenStream(self): 99 bytes_http = object() 100 http = object() 101 download_stream = six.StringIO() 102 download = transfer.Download.FromStream(download_stream, 103 total_size=26) 104 download.bytes_http = bytes_http 105 base_url = 'https://part.one/' 106 with mock.patch.object(http_wrapper, 'MakeRequest', 107 autospec=True) as make_request: 108 make_request.return_value = http_wrapper.Response( 109 info={ 110 'content-range': 'bytes 0-25/26', 111 'status': http_client.OK, 112 }, 113 content=string.ascii_lowercase, 114 request_url=base_url, 115 ) 116 request = http_wrapper.Request(url='https://part.one/') 117 download.InitializeDownload(request, http=http) 118 self.assertEqual(1, make_request.call_count) 119 received_request = make_request.call_args[0][1] 120 self.assertEqual(base_url, received_request.url) 121 self.assertRangeAndContentRangeCompatible( 122 received_request, make_request.return_value) 123 124 with mock.patch.object(http_wrapper, 'MakeRequest', 125 autospec=True) as make_request: 126 make_request.return_value = http_wrapper.Response( 127 info={ 128 'status': http_client.REQUESTED_RANGE_NOT_SATISFIABLE, 129 }, 130 content='error', 131 request_url=base_url, 132 ) 133 download.StreamInChunks() 134 self.assertEqual(1, make_request.call_count) 135 received_request = make_request.call_args[0][1] 136 self.assertEqual('bytes=26-', received_request.headers['range']) 137 138 def testGetRange(self): 139 for (start_byte, end_byte) in [(0, 25), (5, 15), (0, 0), (25, 25)]: 140 bytes_http = object() 141 http = object() 142 download_stream = six.StringIO() 143 download = transfer.Download.FromStream(download_stream, 144 total_size=26, 145 auto_transfer=False) 146 download.bytes_http = bytes_http 147 base_url = 'https://part.one/' 148 with mock.patch.object(http_wrapper, 'MakeRequest', 149 autospec=True) as make_request: 150 make_request.return_value = http_wrapper.Response( 151 info={ 152 'content-range': 'bytes %d-%d/26' % 153 (start_byte, end_byte), 154 'status': http_client.OK, 155 }, 156 content=string.ascii_lowercase[start_byte:end_byte + 1], 157 request_url=base_url, 158 ) 159 request = http_wrapper.Request(url='https://part.one/') 160 download.InitializeDownload(request, http=http) 161 download.GetRange(start_byte, end_byte) 162 self.assertEqual(1, make_request.call_count) 163 received_request = make_request.call_args[0][1] 164 self.assertEqual(base_url, received_request.url) 165 self.assertRangeAndContentRangeCompatible( 166 received_request, make_request.return_value) 167 168 def testNonChunkedDownload(self): 169 bytes_http = object() 170 http = object() 171 download_stream = six.StringIO() 172 download = transfer.Download.FromStream(download_stream, total_size=52) 173 download.bytes_http = bytes_http 174 base_url = 'https://part.one/' 175 176 with mock.patch.object(http_wrapper, 'MakeRequest', 177 autospec=True) as make_request: 178 make_request.return_value = http_wrapper.Response( 179 info={ 180 'content-range': 'bytes 0-51/52', 181 'status': http_client.OK, 182 }, 183 content=string.ascii_lowercase * 2, 184 request_url=base_url, 185 ) 186 request = http_wrapper.Request(url='https://part.one/') 187 download.InitializeDownload(request, http=http) 188 self.assertEqual(1, make_request.call_count) 189 received_request = make_request.call_args[0][1] 190 self.assertEqual(base_url, received_request.url) 191 self.assertRangeAndContentRangeCompatible( 192 received_request, make_request.return_value) 193 download_stream.seek(0) 194 self.assertEqual(string.ascii_lowercase * 2, 195 download_stream.getvalue()) 196 197 def testChunkedDownload(self): 198 bytes_http = object() 199 http = object() 200 download_stream = six.StringIO() 201 download = transfer.Download.FromStream( 202 download_stream, chunksize=26, total_size=52) 203 download.bytes_http = bytes_http 204 205 # Setting autospec on a mock with an iterable side_effect is 206 # currently broken (http://bugs.python.org/issue17826), so 207 # instead we write a little function. 208 def _ReturnBytes(unused_http, http_request, 209 *unused_args, **unused_kwds): 210 url = http_request.url 211 if url == 'https://part.one/': 212 return http_wrapper.Response( 213 info={ 214 'content-location': 'https://part.two/', 215 'content-range': 'bytes 0-25/52', 216 'status': http_client.PARTIAL_CONTENT, 217 }, 218 content=string.ascii_lowercase, 219 request_url='https://part.one/', 220 ) 221 elif url == 'https://part.two/': 222 return http_wrapper.Response( 223 info={ 224 'content-range': 'bytes 26-51/52', 225 'status': http_client.OK, 226 }, 227 content=string.ascii_uppercase, 228 request_url='https://part.two/', 229 ) 230 else: 231 self.fail('Unknown URL requested: %s' % url) 232 233 with mock.patch.object(http_wrapper, 'MakeRequest', 234 autospec=True) as make_request: 235 make_request.side_effect = _ReturnBytes 236 request = http_wrapper.Request(url='https://part.one/') 237 download.InitializeDownload(request, http=http) 238 self.assertEqual(2, make_request.call_count) 239 for call in make_request.call_args_list: 240 self.assertRangeAndContentRangeCompatible( 241 call[0][1], _ReturnBytes(*call[0])) 242 download_stream.seek(0) 243 self.assertEqual(string.ascii_lowercase + string.ascii_uppercase, 244 download_stream.getvalue()) 245 246 def testMultipartEncoding(self): 247 # This is really a table test for various issues we've seen in 248 # the past; see notes below for particular histories. 249 250 test_cases = [ 251 # Python's mime module by default encodes lines that start 252 # with "From " as ">From ", which we need to make sure we 253 # don't run afoul of when sending content that isn't 254 # intended to be so encoded. This test calls out that we 255 # get this right. We test for both the multipart and 256 # non-multipart case. 257 'line one\nFrom \nline two', 258 259 # We had originally used a `six.StringIO` to hold the http 260 # request body in the case of a multipart upload; for 261 # bytes being uploaded in Python3, however, this causes 262 # issues like this: 263 # https://github.com/GoogleCloudPlatform/gcloud-python/issues/1760 264 # We test below to ensure that we don't end up mangling 265 # the body before sending. 266 u'name,main_ingredient\nRäksmörgås,Räkor\nBaguette,Bröd', 267 ] 268 269 for upload_contents in test_cases: 270 multipart_body = '{"body_field_one": 7}' 271 upload_bytes = upload_contents.encode('ascii', 'backslashreplace') 272 upload_config = base_api.ApiUploadInfo( 273 accept=['*/*'], 274 max_size=None, 275 resumable_multipart=True, 276 resumable_path=u'/resumable/upload', 277 simple_multipart=True, 278 simple_path=u'/upload', 279 ) 280 url_builder = base_api._UrlBuilder('http://www.uploads.com') 281 282 # Test multipart: having a body argument in http_request forces 283 # multipart here. 284 upload = transfer.Upload.FromStream( 285 six.BytesIO(upload_bytes), 286 'text/plain', 287 total_size=len(upload_bytes)) 288 http_request = http_wrapper.Request( 289 'http://www.uploads.com', 290 headers={'content-type': 'text/plain'}, 291 body=multipart_body) 292 upload.ConfigureRequest(upload_config, http_request, url_builder) 293 self.assertEqual( 294 'multipart', url_builder.query_params['uploadType']) 295 rewritten_upload_contents = b'\n'.join( 296 http_request.body.split(b'--')[2].splitlines()[1:]) 297 self.assertTrue(rewritten_upload_contents.endswith(upload_bytes)) 298 299 # Test non-multipart (aka media): no body argument means this is 300 # sent as media. 301 upload = transfer.Upload.FromStream( 302 six.BytesIO(upload_bytes), 303 'text/plain', 304 total_size=len(upload_bytes)) 305 http_request = http_wrapper.Request( 306 'http://www.uploads.com', 307 headers={'content-type': 'text/plain'}) 308 upload.ConfigureRequest(upload_config, http_request, url_builder) 309 self.assertEqual(url_builder.query_params['uploadType'], 'media') 310 rewritten_upload_contents = http_request.body 311 self.assertTrue(rewritten_upload_contents.endswith(upload_bytes)) 312 313 314class UploadTest(unittest.TestCase): 315 316 def setUp(self): 317 # Sample highly compressible data. 318 self.sample_data = b'abc' * 200 319 # Stream of the sample data. 320 self.sample_stream = six.BytesIO(self.sample_data) 321 # Sample url_builder. 322 self.url_builder = base_api._UrlBuilder('http://www.uploads.com') 323 # Sample request. 324 self.request = http_wrapper.Request( 325 'http://www.uploads.com', 326 headers={'content-type': 'text/plain'}) 327 # Sample successful response. 328 self.response = http_wrapper.Response( 329 info={'status': http_client.OK, 330 'location': 'http://www.uploads.com'}, 331 content='', 332 request_url='http://www.uploads.com',) 333 # Sample failure response. 334 self.fail_response = http_wrapper.Response( 335 info={'status': http_client.SERVICE_UNAVAILABLE, 336 'location': 'http://www.uploads.com'}, 337 content='', 338 request_url='http://www.uploads.com',) 339 340 def testStreamInChunksCompressed(self): 341 """Test that StreamInChunks will handle compression correctly.""" 342 # Create and configure the upload object. 343 upload = transfer.Upload( 344 stream=self.sample_stream, 345 mime_type='text/plain', 346 total_size=len(self.sample_data), 347 close_stream=False, 348 gzip_encoded=True) 349 upload.strategy = transfer.RESUMABLE_UPLOAD 350 # Set the chunk size so the entire stream is uploaded. 351 upload.chunksize = len(self.sample_data) 352 # Mock the upload to return the sample response. 353 with mock.patch.object(transfer.Upload, 354 '_Upload__SendMediaRequest') as mock_result, \ 355 mock.patch.object(http_wrapper, 356 'MakeRequest') as make_request: 357 mock_result.return_value = self.response 358 make_request.return_value = self.response 359 360 # Initialization. 361 upload.InitializeUpload(self.request, 'http') 362 upload.StreamInChunks() 363 # Get the uploaded request and end position of the stream. 364 (request, _), _ = mock_result.call_args_list[0] 365 # Ensure the mock was called. 366 self.assertTrue(mock_result.called) 367 # Ensure the correct content encoding was set. 368 self.assertEqual(request.headers['Content-Encoding'], 'gzip') 369 # Ensure the stream was compresed. 370 self.assertLess(len(request.body), len(self.sample_data)) 371 372 def testStreamMediaCompressedFail(self): 373 """Test that non-chunked uploads raise an exception. 374 375 Ensure uploads with the compressed and resumable flags set called from 376 StreamMedia raise an exception. Those uploads are unsupported. 377 """ 378 # Create the upload object. 379 upload = transfer.Upload( 380 stream=self.sample_stream, 381 mime_type='text/plain', 382 total_size=len(self.sample_data), 383 close_stream=False, 384 auto_transfer=True, 385 gzip_encoded=True) 386 upload.strategy = transfer.RESUMABLE_UPLOAD 387 # Mock the upload to return the sample response. 388 with mock.patch.object(http_wrapper, 389 'MakeRequest') as make_request: 390 make_request.return_value = self.response 391 392 # Initialization. 393 upload.InitializeUpload(self.request, 'http') 394 # Ensure stream media raises an exception when the upload is 395 # compressed. Compression is not supported on non-chunked uploads. 396 with self.assertRaises(exceptions.InvalidUserInputError): 397 upload.StreamMedia() 398 399 def testAutoTransferCompressed(self): 400 """Test that automatic transfers are compressed. 401 402 Ensure uploads with the compressed, resumable, and automatic transfer 403 flags set call StreamInChunks. StreamInChunks is tested in an earlier 404 test. 405 """ 406 # Create the upload object. 407 upload = transfer.Upload( 408 stream=self.sample_stream, 409 mime_type='text/plain', 410 total_size=len(self.sample_data), 411 close_stream=False, 412 gzip_encoded=True) 413 upload.strategy = transfer.RESUMABLE_UPLOAD 414 # Mock the upload to return the sample response. 415 with mock.patch.object(transfer.Upload, 416 'StreamInChunks') as mock_result, \ 417 mock.patch.object(http_wrapper, 418 'MakeRequest') as make_request: 419 mock_result.return_value = self.response 420 make_request.return_value = self.response 421 422 # Initialization. 423 upload.InitializeUpload(self.request, 'http') 424 # Ensure the mock was called. 425 self.assertTrue(mock_result.called) 426 427 def testMultipartCompressed(self): 428 """Test that multipart uploads are compressed.""" 429 # Create the multipart configuration. 430 upload_config = base_api.ApiUploadInfo( 431 accept=['*/*'], 432 max_size=None, 433 simple_multipart=True, 434 simple_path=u'/upload',) 435 # Create the upload object. 436 upload = transfer.Upload( 437 stream=self.sample_stream, 438 mime_type='text/plain', 439 total_size=len(self.sample_data), 440 close_stream=False, 441 gzip_encoded=True) 442 # Set a body to trigger multipart configuration. 443 self.request.body = '{"body_field_one": 7}' 444 # Configure the request. 445 upload.ConfigureRequest(upload_config, self.request, self.url_builder) 446 # Ensure the request is a multipart request now. 447 self.assertEqual( 448 self.url_builder.query_params['uploadType'], 'multipart') 449 # Ensure the request is gzip encoded. 450 self.assertEqual(self.request.headers['Content-Encoding'], 'gzip') 451 # Ensure data is compressed 452 self.assertLess(len(self.request.body), len(self.sample_data)) 453 # Ensure uncompressed data includes the sample data. 454 with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f: 455 original = f.read() 456 self.assertTrue(self.sample_data in original) 457 458 def testMediaCompressed(self): 459 """Test that media uploads are compressed.""" 460 # Create the media configuration. 461 upload_config = base_api.ApiUploadInfo( 462 accept=['*/*'], 463 max_size=None, 464 simple_multipart=True, 465 simple_path=u'/upload',) 466 # Create the upload object. 467 upload = transfer.Upload( 468 stream=self.sample_stream, 469 mime_type='text/plain', 470 total_size=len(self.sample_data), 471 close_stream=False, 472 gzip_encoded=True) 473 # Configure the request. 474 upload.ConfigureRequest(upload_config, self.request, self.url_builder) 475 # Ensure the request is a media request now. 476 self.assertEqual(self.url_builder.query_params['uploadType'], 'media') 477 # Ensure the request is gzip encoded. 478 self.assertEqual(self.request.headers['Content-Encoding'], 'gzip') 479 # Ensure data is compressed 480 self.assertLess(len(self.request.body), len(self.sample_data)) 481 # Ensure uncompressed data includes the sample data. 482 with gzip.GzipFile(fileobj=six.BytesIO(self.request.body)) as f: 483 original = f.read() 484 self.assertTrue(self.sample_data in original) 485 486 def HttpRequestSideEffect(self, responses=None): 487 responses = [(response.info, response.content) 488 for response in responses] 489 490 def _side_effect(uri, **kwargs): # pylint: disable=unused-argument 491 body = kwargs['body'] 492 read_func = getattr(body, 'read', None) 493 if read_func: 494 # If the body is a stream, consume the stream. 495 body = read_func() 496 self.assertEqual(int(kwargs['headers']['content-length']), 497 len(body)) 498 return responses.pop(0) 499 return _side_effect 500 501 def testRetryRequestChunks(self): 502 """Test that StreamInChunks will retry correctly.""" 503 refresh_response = http_wrapper.Response( 504 info={'status': http_wrapper.RESUME_INCOMPLETE, 505 'location': 'http://www.uploads.com'}, 506 content='', 507 request_url='http://www.uploads.com',) 508 509 # Create and configure the upload object. 510 bytes_http = httplib2.Http() 511 upload = transfer.Upload( 512 stream=self.sample_stream, 513 mime_type='text/plain', 514 total_size=len(self.sample_data), 515 close_stream=False, 516 http=bytes_http) 517 518 upload.strategy = transfer.RESUMABLE_UPLOAD 519 # Set the chunk size so the entire stream is uploaded. 520 upload.chunksize = len(self.sample_data) 521 # Mock the upload to return the sample response. 522 with mock.patch.object(bytes_http, 523 'request') as make_request: 524 # This side effect also checks the request body. 525 responses = [ 526 self.response, # Initial request in InitializeUpload(). 527 self.fail_response, # 503 status code from server. 528 refresh_response, # Refresh upload progress. 529 self.response, # Successful request. 530 ] 531 make_request.side_effect = self.HttpRequestSideEffect(responses) 532 533 # Initialization. 534 upload.InitializeUpload(self.request, bytes_http) 535 upload.StreamInChunks() 536 537 # Ensure the mock was called the correct number of times. 538 self.assertEquals(make_request.call_count, len(responses)) 539 540 def testStreamInChunks(self): 541 """Test StreamInChunks.""" 542 resume_incomplete_responses = [http_wrapper.Response( 543 info={'status': http_wrapper.RESUME_INCOMPLETE, 544 'location': 'http://www.uploads.com', 545 'range': '0-{}'.format(end)}, 546 content='', 547 request_url='http://www.uploads.com',) for end in [199, 399, 599]] 548 responses = [ 549 self.response # Initial request in InitializeUpload(). 550 ] + resume_incomplete_responses + [ 551 self.response, # Successful request. 552 ] 553 # Create and configure the upload object. 554 bytes_http = httplib2.Http() 555 upload = transfer.Upload( 556 stream=self.sample_stream, 557 mime_type='text/plain', 558 total_size=len(self.sample_data), 559 close_stream=False, 560 http=bytes_http) 561 562 upload.strategy = transfer.RESUMABLE_UPLOAD 563 # Set the chunk size so the entire stream is uploaded. 564 upload.chunksize = 200 565 # Mock the upload to return the sample response. 566 with mock.patch.object(bytes_http, 567 'request') as make_request: 568 # This side effect also checks the request body. 569 make_request.side_effect = self.HttpRequestSideEffect(responses) 570 571 # Initialization. 572 upload.InitializeUpload(self.request, bytes_http) 573 upload.StreamInChunks() 574 575 # Ensure the mock was called the correct number of times. 576 self.assertEquals(make_request.call_count, len(responses)) 577