1 #[path = "../support.rs"]
2 mod support;
3 use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
4 use tower::limit::concurrency::ConcurrencyLimitLayer;
5 use tower_test::{assert_request_eq, mock};
6 
7 #[tokio::test(flavor = "current_thread")]
basic_service_limit_functionality_with_poll_ready()8 async fn basic_service_limit_functionality_with_poll_ready() {
9     let _t = support::trace_init();
10     let limit = ConcurrencyLimitLayer::new(2);
11     let (mut service, mut handle) = mock::spawn_layer(limit);
12 
13     assert_ready_ok!(service.poll_ready());
14     let r1 = service.call("hello 1");
15 
16     assert_ready_ok!(service.poll_ready());
17     let r2 = service.call("hello 2");
18 
19     assert_pending!(service.poll_ready());
20 
21     assert!(!service.is_woken());
22 
23     // The request gets passed through
24     assert_request_eq!(handle, "hello 1").send_response("world 1");
25 
26     // The next request gets passed through
27     assert_request_eq!(handle, "hello 2").send_response("world 2");
28 
29     // There are no more requests
30     assert_pending!(handle.poll_request());
31 
32     assert_eq!(r1.await.unwrap(), "world 1");
33 
34     assert!(service.is_woken());
35 
36     // Another request can be sent
37     assert_ready_ok!(service.poll_ready());
38 
39     let r3 = service.call("hello 3");
40 
41     assert_pending!(service.poll_ready());
42 
43     assert_eq!(r2.await.unwrap(), "world 2");
44 
45     // The request gets passed through
46     assert_request_eq!(handle, "hello 3").send_response("world 3");
47 
48     assert_eq!(r3.await.unwrap(), "world 3");
49 }
50 
51 #[tokio::test(flavor = "current_thread")]
basic_service_limit_functionality_without_poll_ready()52 async fn basic_service_limit_functionality_without_poll_ready() {
53     let _t = support::trace_init();
54     let limit = ConcurrencyLimitLayer::new(2);
55     let (mut service, mut handle) = mock::spawn_layer(limit);
56 
57     assert_ready_ok!(service.poll_ready());
58     let r1 = service.call("hello 1");
59 
60     assert_ready_ok!(service.poll_ready());
61     let r2 = service.call("hello 2");
62 
63     assert_pending!(service.poll_ready());
64 
65     // The request gets passed through
66     assert_request_eq!(handle, "hello 1").send_response("world 1");
67 
68     assert!(!service.is_woken());
69 
70     // The next request gets passed through
71     assert_request_eq!(handle, "hello 2").send_response("world 2");
72 
73     assert!(!service.is_woken());
74 
75     // There are no more requests
76     assert_pending!(handle.poll_request());
77 
78     assert_eq!(r1.await.unwrap(), "world 1");
79 
80     assert!(service.is_woken());
81 
82     // One more request can be sent
83     assert_ready_ok!(service.poll_ready());
84     let r4 = service.call("hello 4");
85 
86     assert_pending!(service.poll_ready());
87 
88     assert_eq!(r2.await.unwrap(), "world 2");
89     assert!(service.is_woken());
90 
91     // The request gets passed through
92     assert_request_eq!(handle, "hello 4").send_response("world 4");
93 
94     assert_eq!(r4.await.unwrap(), "world 4");
95 }
96 
97 #[tokio::test(flavor = "current_thread")]
request_without_capacity()98 async fn request_without_capacity() {
99     let _t = support::trace_init();
100     let limit = ConcurrencyLimitLayer::new(0);
101     let (mut service, _) = mock::spawn_layer::<(), (), _>(limit);
102 
103     assert_pending!(service.poll_ready());
104 }
105 
106 #[tokio::test(flavor = "current_thread")]
reserve_capacity_without_sending_request()107 async fn reserve_capacity_without_sending_request() {
108     let _t = support::trace_init();
109     let limit = ConcurrencyLimitLayer::new(1);
110     let (mut s1, mut handle) = mock::spawn_layer(limit);
111 
112     let mut s2 = s1.clone();
113 
114     // Reserve capacity in s1
115     assert_ready_ok!(s1.poll_ready());
116 
117     // Service 2 cannot get capacity
118     assert_pending!(s2.poll_ready());
119 
120     // s1 sends the request, then s2 is able to get capacity
121     let r1 = s1.call("hello");
122 
123     assert_request_eq!(handle, "hello").send_response("world");
124 
125     assert_pending!(s2.poll_ready());
126 
127     r1.await.unwrap();
128 
129     assert_ready_ok!(s2.poll_ready());
130 }
131 
132 #[tokio::test(flavor = "current_thread")]
service_drop_frees_capacity()133 async fn service_drop_frees_capacity() {
134     let _t = support::trace_init();
135     let limit = ConcurrencyLimitLayer::new(1);
136     let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
137 
138     let mut s2 = s1.clone();
139 
140     // Reserve capacity in s1
141     assert_ready_ok!(s1.poll_ready());
142 
143     // Service 2 cannot get capacity
144     assert_pending!(s2.poll_ready());
145 
146     drop(s1);
147 
148     assert!(s2.is_woken());
149     assert_ready_ok!(s2.poll_ready());
150 }
151 
152 #[tokio::test(flavor = "current_thread")]
response_error_releases_capacity()153 async fn response_error_releases_capacity() {
154     let _t = support::trace_init();
155     let limit = ConcurrencyLimitLayer::new(1);
156     let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit);
157 
158     let mut s2 = s1.clone();
159 
160     // Reserve capacity in s1
161     assert_ready_ok!(s1.poll_ready());
162 
163     // s1 sends the request, then s2 is able to get capacity
164     let r1 = s1.call("hello");
165 
166     assert_request_eq!(handle, "hello").send_error("boom");
167 
168     r1.await.unwrap_err();
169 
170     assert_ready_ok!(s2.poll_ready());
171 }
172 
173 #[tokio::test(flavor = "current_thread")]
response_future_drop_releases_capacity()174 async fn response_future_drop_releases_capacity() {
175     let _t = support::trace_init();
176     let limit = ConcurrencyLimitLayer::new(1);
177     let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit);
178 
179     let mut s2 = s1.clone();
180 
181     // Reserve capacity in s1
182     assert_ready_ok!(s1.poll_ready());
183 
184     // s1 sends the request, then s2 is able to get capacity
185     let r1 = s1.call("hello");
186 
187     assert_pending!(s2.poll_ready());
188 
189     drop(r1);
190 
191     assert_ready_ok!(s2.poll_ready());
192 }
193 
194 #[tokio::test(flavor = "current_thread")]
multi_waiters()195 async fn multi_waiters() {
196     let _t = support::trace_init();
197     let limit = ConcurrencyLimitLayer::new(1);
198     let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
199     let mut s2 = s1.clone();
200     let mut s3 = s1.clone();
201 
202     // Reserve capacity in s1
203     assert_ready_ok!(s1.poll_ready());
204 
205     // s2 and s3 are not ready
206     assert_pending!(s2.poll_ready());
207     assert_pending!(s3.poll_ready());
208 
209     drop(s1);
210 
211     assert!(s2.is_woken());
212     assert!(!s3.is_woken());
213 
214     drop(s2);
215 
216     assert!(s3.is_woken());
217 }
218