1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.s3benchmarks; 17 18 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.BENCHMARK_ITERATIONS; 19 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.COPY_SUFFIX; 20 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.PRE_WARMUP_ITERATIONS; 21 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.PRE_WARMUP_RUNS; 22 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.WARMUP_KEY; 23 import static software.amazon.awssdk.transfer.s3.SizeConstant.MB; 24 import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError; 25 26 import com.amazonaws.ClientConfiguration; 27 import com.amazonaws.services.s3.AmazonS3; 28 import com.amazonaws.services.s3.AmazonS3Client; 29 import com.amazonaws.services.s3.transfer.Copy; 30 import com.amazonaws.services.s3.transfer.Download; 31 import com.amazonaws.services.s3.transfer.TransferManager; 32 import com.amazonaws.services.s3.transfer.TransferManagerBuilder; 33 import com.amazonaws.services.s3.transfer.Upload; 34 import java.io.File; 35 import java.io.IOException; 36 import java.nio.file.Files; 37 import java.util.ArrayList; 38 import java.util.List; 39 import java.util.concurrent.ExecutorService; 40 import java.util.concurrent.Executors; 41 import software.amazon.awssdk.testutils.RandomTempFile; 42 import software.amazon.awssdk.utils.Logger; 43 44 abstract class V1BaseTransferManagerBenchmark implements TransferManagerBenchmark { 45 private static final int MAX_CONCURRENCY = 100; 46 private static final Logger logger = Logger.loggerFor("TransferManagerBenchmark"); 47 48 protected final TransferManager transferManager; 49 protected final AmazonS3 s3Client; 50 protected final String bucket; 51 protected final String key; 52 protected final int iteration; 53 protected final String path; 54 private final File tmpFile; 55 private final ExecutorService executorService; 56 V1BaseTransferManagerBenchmark(TransferManagerBenchmarkConfig config)57 V1BaseTransferManagerBenchmark(TransferManagerBenchmarkConfig config) { 58 logger.info(() -> "Benchmark config: " + config); 59 Long partSizeInMb = config.partSizeInMb() == null ? null : config.partSizeInMb() * MB; 60 s3Client = AmazonS3Client.builder() 61 .withClientConfiguration(new ClientConfiguration().withMaxConnections(MAX_CONCURRENCY)) 62 .build(); 63 executorService = Executors.newFixedThreadPool(MAX_CONCURRENCY); 64 transferManager = TransferManagerBuilder.standard() 65 .withMinimumUploadPartSize(partSizeInMb) 66 .withS3Client(s3Client) 67 .withExecutorFactory(() -> executorService) 68 .build(); 69 bucket = config.bucket(); 70 key = config.key(); 71 path = config.filePath(); 72 iteration = config.iteration() == null ? BENCHMARK_ITERATIONS : config.iteration(); 73 try { 74 tmpFile = new RandomTempFile(20 * MB); 75 } catch (IOException e) { 76 logger.error(() -> "Failed to create the file"); 77 throw new RuntimeException("Failed to create the temp file", e); 78 } 79 } 80 81 @Override run()82 public void run() { 83 try { 84 warmUp(); 85 additionalWarmup(); 86 doRunBenchmark(); 87 } catch (Exception e) { 88 logger.error(() -> "Exception occurred", e); 89 } finally { 90 cleanup(); 91 } 92 } 93 additionalWarmup()94 protected void additionalWarmup() { 95 // default to no-op 96 } 97 doRunBenchmark()98 protected abstract void doRunBenchmark(); 99 cleanup()100 private void cleanup() { 101 executorService.shutdown(); 102 transferManager.shutdownNow(); 103 s3Client.shutdown(); 104 } 105 warmUp()106 private void warmUp() { 107 logger.info(() -> "Starting to warm up"); 108 109 for (int i = 0; i < PRE_WARMUP_ITERATIONS; i++) { 110 warmUpUploadBatch(); 111 warmUpDownloadBatch(); 112 warmUpCopyBatch(); 113 114 try { 115 Thread.sleep(500); 116 } catch (InterruptedException e) { 117 Thread.currentThread().interrupt(); 118 logger.warn(() -> "Thread interrupted when waiting for completion", e); 119 } 120 } 121 logger.info(() -> "Ending warm up"); 122 } 123 warmUpCopyBatch()124 private void warmUpCopyBatch() { 125 List<Copy> uploads = new ArrayList<>(); 126 for (int i = 0; i < 3; i++) { 127 uploads.add(transferManager.copy(bucket, WARMUP_KEY, bucket, WARMUP_KEY + COPY_SUFFIX)); 128 } 129 130 uploads.forEach(u -> { 131 try { 132 u.waitForCopyResult(); 133 } catch (InterruptedException e) { 134 Thread.currentThread().interrupt(); 135 logger.error(() -> "Thread interrupted ", e); 136 } 137 }); 138 } 139 warmUpDownloadBatch()140 private void warmUpDownloadBatch() { 141 List<Download> downloads = new ArrayList<>(); 142 List<File> tmpFiles = new ArrayList<>(); 143 for (int i = 0; i < PRE_WARMUP_RUNS; i++) { 144 File tmpFile = RandomTempFile.randomUncreatedFile(); 145 tmpFiles.add(tmpFile); 146 downloads.add(transferManager.download(bucket, WARMUP_KEY, tmpFile)); 147 } 148 149 downloads.forEach(u -> { 150 try { 151 u.waitForCompletion(); 152 } catch (InterruptedException e) { 153 Thread.currentThread().interrupt(); 154 logger.error(() -> "Thread interrupted ", e); 155 } 156 }); 157 158 tmpFiles.forEach(f -> runAndLogError(logger.logger(), "Deleting file failed", () -> Files.delete(f.toPath()))); 159 } 160 warmUpUploadBatch()161 private void warmUpUploadBatch() { 162 List<Upload> uploads = new ArrayList<>(); 163 for (int i = 0; i < PRE_WARMUP_RUNS; i++) { 164 uploads.add(transferManager.upload(bucket, WARMUP_KEY, tmpFile)); 165 } 166 167 uploads.forEach(u -> { 168 try { 169 u.waitForUploadResult(); 170 } catch (InterruptedException e) { 171 Thread.currentThread().interrupt(); 172 logger.error(() -> "Thread interrupted ", e); 173 } 174 }); 175 } 176 } 177