1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.core.retry.conditions;
17 
18 import static org.assertj.core.api.Assertions.assertThat;
19 import static org.junit.jupiter.api.Assertions.fail;
20 
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import org.junit.jupiter.api.Test;
27 import software.amazon.awssdk.core.exception.SdkClientException;
28 import software.amazon.awssdk.core.exception.SdkException;
29 import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
30 import software.amazon.awssdk.core.retry.RetryPolicyContext;
31 
32 public class TokenBucketRetryConditionTest {
33     private static final SdkException EXCEPTION = SdkClientException.create("");
34     private static final SdkException EXCEPTION_2 = SdkClientException.create("");
35 
36     @Test
maximumTokensCannotBeExceeded()37     public void maximumTokensCannotBeExceeded() {
38         TokenBucketRetryCondition condition = create(3, e -> 1);
39         for (int i = 1; i < 10; ++i) {
40             condition.requestSucceeded(context(null));
41             assertThat(condition.tokensAvailable()).isEqualTo(3);
42         }
43     }
44 
45     @Test
releasingMoreCapacityThanAvailableSetsCapacityToMax()46     public void releasingMoreCapacityThanAvailableSetsCapacityToMax() {
47         ExecutionAttributes attributes = new ExecutionAttributes();
48 
49         TokenBucketRetryCondition condition = create(11, e -> e == EXCEPTION ? 1 : 3);
50         assertThat(condition.shouldRetry(context(EXCEPTION, attributes))).isTrue();
51         assertThat(condition.tokensAvailable()).isEqualTo(10);
52         assertThat(condition.shouldRetry(context(EXCEPTION_2, attributes))).isTrue();
53         assertThat(condition.tokensAvailable()).isEqualTo(7);
54         condition.requestSucceeded(context(EXCEPTION_2, attributes));
55         assertThat(condition.tokensAvailable()).isEqualTo(10);
56         condition.requestSucceeded(context(EXCEPTION_2, attributes));
57         assertThat(condition.tokensAvailable()).isEqualTo(11);
58     }
59 
60     @Test
nonFirstAttemptsAreNotFree()61     public void nonFirstAttemptsAreNotFree() {
62         TokenBucketRetryCondition condition = create(2, e -> 1);
63 
64         assertThat(condition.shouldRetry(context(EXCEPTION))).isTrue();
65         assertThat(condition.tokensAvailable()).isEqualTo(1);
66 
67         assertThat(condition.shouldRetry(context(EXCEPTION))).isTrue();
68         assertThat(condition.tokensAvailable()).isEqualTo(0);
69 
70         assertThat(condition.shouldRetry(context(EXCEPTION))).isFalse();
71         assertThat(condition.tokensAvailable()).isEqualTo(0);
72     }
73 
74     @Test
exceptionCostIsHonored()75     public void exceptionCostIsHonored() {
76         // EXCEPTION costs 1, anything else costs 10
77         TokenBucketRetryCondition condition = create(20, e -> e == EXCEPTION ? 1 : 10);
78 
79         assertThat(condition.shouldRetry(context(EXCEPTION))).isTrue();
80         assertThat(condition.tokensAvailable()).isEqualTo(19);
81 
82         assertThat(condition.shouldRetry(context(EXCEPTION_2))).isTrue();
83         assertThat(condition.tokensAvailable()).isEqualTo(9);
84 
85         assertThat(condition.shouldRetry(context(EXCEPTION_2))).isFalse();
86         assertThat(condition.tokensAvailable()).isEqualTo(9);
87 
88         assertThat(condition.shouldRetry(context(EXCEPTION))).isTrue();
89         assertThat(condition.tokensAvailable()).isEqualTo(8);
90     }
91 
92     @Test
successReleasesAcquiredCost()93     public void successReleasesAcquiredCost() {
94         ExecutionAttributes attributes = new ExecutionAttributes();
95 
96         TokenBucketRetryCondition condition = create(20, e -> 10);
97 
98         assertThat(condition.shouldRetry(context(EXCEPTION, attributes))).isTrue();
99         assertThat(condition.tokensAvailable()).isEqualTo(10);
100 
101         condition.requestSucceeded(context(EXCEPTION, attributes));
102         assertThat(condition.tokensAvailable()).isEqualTo(20);
103     }
104 
105     @Test
firstRequestSuccessReleasesOne()106     public void firstRequestSuccessReleasesOne() {
107         TokenBucketRetryCondition condition = create(20, e -> 10);
108 
109         assertThat(condition.shouldRetry(context(null))).isTrue();
110         assertThat(condition.tokensAvailable()).isEqualTo(10);
111 
112         condition.requestSucceeded(context(null));
113         assertThat(condition.tokensAvailable()).isEqualTo(11);
114 
115         condition.requestSucceeded(context(null));
116         assertThat(condition.tokensAvailable()).isEqualTo(12);
117     }
118 
119     @Test
conditionSeemsToBeThreadSafe()120     public void conditionSeemsToBeThreadSafe() throws InterruptedException {
121         int bucketSize = 5;
122         TokenBucketRetryCondition condition = create(bucketSize, e -> 1);
123 
124         AtomicInteger concurrentCalls = new AtomicInteger(0);
125         AtomicBoolean failure = new AtomicBoolean(false);
126         int parallelism = bucketSize * 2;
127         ExecutorService executor = Executors.newFixedThreadPool(parallelism);
128         for (int i = 0; i < parallelism; ++i) {
129             executor.submit(() -> {
130                 try {
131                     for (int j = 0; j < 1000; ++j) {
132                         ExecutionAttributes attributes = new ExecutionAttributes();
133                         if (condition.shouldRetry(context(EXCEPTION, attributes))) {
134                             int calls = concurrentCalls.addAndGet(1);
135                             if (calls > bucketSize) {
136                                 failure.set(true);
137                             }
138                             Thread.sleep(1);
139                             concurrentCalls.addAndGet(-1);
140                             condition.requestSucceeded(context(EXCEPTION, attributes));
141                         }
142                         else {
143                             Thread.sleep(1);
144                         }
145                     }
146                 } catch (Throwable t) {
147                     t.printStackTrace();
148                     failure.set(true);
149                 }
150             });
151 
152             // Stagger the threads a bit.
153             Thread.sleep(1);
154         }
155 
156         executor.shutdown();
157         if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
158             fail();
159         }
160 
161         assertThat(failure.get()).isFalse();
162     }
163 
context(SdkException lastException)164     private RetryPolicyContext context(SdkException lastException) {
165         return RetryPolicyContext.builder()
166                                  .executionAttributes(new ExecutionAttributes())
167                                  .exception(lastException)
168                                  .build();
169     }
170 
context(SdkException lastException, ExecutionAttributes attributes)171     private RetryPolicyContext context(SdkException lastException, ExecutionAttributes attributes) {
172         return RetryPolicyContext.builder()
173                                  .executionAttributes(attributes)
174                                  .exception(lastException)
175                                  .build();
176     }
177 
create(int size, TokenBucketExceptionCostFunction function)178     private TokenBucketRetryCondition create(int size, TokenBucketExceptionCostFunction function) {
179         return TokenBucketRetryCondition.builder()
180                                         .tokenBucketSize(size)
181                                         .exceptionCostFunction(function)
182                                         .build();
183     }
184 
185 }