1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.s3benchmarks;
17 
18 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.BENCHMARK_ITERATIONS;
19 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.COPY_SUFFIX;
20 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.PRE_WARMUP_ITERATIONS;
21 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.PRE_WARMUP_RUNS;
22 import static software.amazon.awssdk.s3benchmarks.BenchmarkUtils.WARMUP_KEY;
23 import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
24 import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;
25 
26 import com.amazonaws.ClientConfiguration;
27 import com.amazonaws.services.s3.AmazonS3;
28 import com.amazonaws.services.s3.AmazonS3Client;
29 import com.amazonaws.services.s3.transfer.Copy;
30 import com.amazonaws.services.s3.transfer.Download;
31 import com.amazonaws.services.s3.transfer.TransferManager;
32 import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
33 import com.amazonaws.services.s3.transfer.Upload;
34 import java.io.File;
35 import java.io.IOException;
36 import java.nio.file.Files;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import software.amazon.awssdk.testutils.RandomTempFile;
42 import software.amazon.awssdk.utils.Logger;
43 
44 abstract class V1BaseTransferManagerBenchmark implements TransferManagerBenchmark {
45     private static final int MAX_CONCURRENCY = 100;
46     private static final Logger logger = Logger.loggerFor("TransferManagerBenchmark");
47 
48     protected final TransferManager transferManager;
49     protected final AmazonS3 s3Client;
50     protected final String bucket;
51     protected final String key;
52     protected final int iteration;
53     protected final String path;
54     private final File tmpFile;
55     private final ExecutorService executorService;
56 
V1BaseTransferManagerBenchmark(TransferManagerBenchmarkConfig config)57     V1BaseTransferManagerBenchmark(TransferManagerBenchmarkConfig config) {
58         logger.info(() -> "Benchmark config: " + config);
59         Long partSizeInMb = config.partSizeInMb() == null ? null : config.partSizeInMb() * MB;
60         s3Client = AmazonS3Client.builder()
61                                  .withClientConfiguration(new ClientConfiguration().withMaxConnections(MAX_CONCURRENCY))
62                                  .build();
63         executorService = Executors.newFixedThreadPool(MAX_CONCURRENCY);
64         transferManager = TransferManagerBuilder.standard()
65                                                 .withMinimumUploadPartSize(partSizeInMb)
66                                                 .withS3Client(s3Client)
67                                                 .withExecutorFactory(() -> executorService)
68                                                 .build();
69         bucket = config.bucket();
70         key = config.key();
71         path = config.filePath();
72         iteration = config.iteration() == null ? BENCHMARK_ITERATIONS : config.iteration();
73         try {
74             tmpFile = new RandomTempFile(20 * MB);
75         } catch (IOException e) {
76             logger.error(() -> "Failed to create the file");
77             throw new RuntimeException("Failed to create the temp file", e);
78         }
79     }
80 
81     @Override
run()82     public void run() {
83         try {
84             warmUp();
85             additionalWarmup();
86             doRunBenchmark();
87         } catch (Exception e) {
88             logger.error(() -> "Exception occurred", e);
89         } finally {
90             cleanup();
91         }
92     }
93 
additionalWarmup()94     protected void additionalWarmup() {
95         // default to no-op
96     }
97 
doRunBenchmark()98     protected abstract void doRunBenchmark();
99 
cleanup()100     private void cleanup() {
101         executorService.shutdown();
102         transferManager.shutdownNow();
103         s3Client.shutdown();
104     }
105 
warmUp()106     private void warmUp() {
107         logger.info(() -> "Starting to warm up");
108 
109         for (int i = 0; i < PRE_WARMUP_ITERATIONS; i++) {
110             warmUpUploadBatch();
111             warmUpDownloadBatch();
112             warmUpCopyBatch();
113 
114             try {
115                 Thread.sleep(500);
116             } catch (InterruptedException e) {
117                 Thread.currentThread().interrupt();
118                 logger.warn(() -> "Thread interrupted when waiting for completion", e);
119             }
120         }
121         logger.info(() -> "Ending warm up");
122     }
123 
warmUpCopyBatch()124     private void warmUpCopyBatch() {
125         List<Copy> uploads = new ArrayList<>();
126         for (int i = 0; i < 3; i++) {
127             uploads.add(transferManager.copy(bucket, WARMUP_KEY, bucket, WARMUP_KEY + COPY_SUFFIX));
128         }
129 
130         uploads.forEach(u -> {
131             try {
132                 u.waitForCopyResult();
133             } catch (InterruptedException e) {
134                 Thread.currentThread().interrupt();
135                 logger.error(() -> "Thread interrupted ", e);
136             }
137         });
138     }
139 
warmUpDownloadBatch()140     private void warmUpDownloadBatch() {
141         List<Download> downloads = new ArrayList<>();
142         List<File> tmpFiles = new ArrayList<>();
143         for (int i = 0; i < PRE_WARMUP_RUNS; i++) {
144             File tmpFile = RandomTempFile.randomUncreatedFile();
145             tmpFiles.add(tmpFile);
146             downloads.add(transferManager.download(bucket, WARMUP_KEY, tmpFile));
147         }
148 
149         downloads.forEach(u -> {
150             try {
151                 u.waitForCompletion();
152             } catch (InterruptedException e) {
153                 Thread.currentThread().interrupt();
154                 logger.error(() -> "Thread interrupted ", e);
155             }
156         });
157 
158         tmpFiles.forEach(f -> runAndLogError(logger.logger(), "Deleting file failed", () -> Files.delete(f.toPath())));
159     }
160 
warmUpUploadBatch()161     private void warmUpUploadBatch() {
162         List<Upload> uploads = new ArrayList<>();
163         for (int i = 0; i < PRE_WARMUP_RUNS; i++) {
164             uploads.add(transferManager.upload(bucket, WARMUP_KEY, tmpFile));
165         }
166 
167         uploads.forEach(u -> {
168             try {
169                 u.waitForUploadResult();
170             } catch (InterruptedException e) {
171                 Thread.currentThread().interrupt();
172                 logger.error(() -> "Thread interrupted ", e);
173             }
174         });
175     }
176 }
177