1 #[path = "../support.rs"]
2 mod support;
3 use tokio_test::{assert_pending, assert_ready, assert_ready_ok};
4 use tower::limit::concurrency::ConcurrencyLimitLayer;
5 use tower_test::{assert_request_eq, mock};
6
7 #[tokio::test(flavor = "current_thread")]
basic_service_limit_functionality_with_poll_ready()8 async fn basic_service_limit_functionality_with_poll_ready() {
9 let _t = support::trace_init();
10 let limit = ConcurrencyLimitLayer::new(2);
11 let (mut service, mut handle) = mock::spawn_layer(limit);
12
13 assert_ready_ok!(service.poll_ready());
14 let r1 = service.call("hello 1");
15
16 assert_ready_ok!(service.poll_ready());
17 let r2 = service.call("hello 2");
18
19 assert_pending!(service.poll_ready());
20
21 assert!(!service.is_woken());
22
23 // The request gets passed through
24 assert_request_eq!(handle, "hello 1").send_response("world 1");
25
26 // The next request gets passed through
27 assert_request_eq!(handle, "hello 2").send_response("world 2");
28
29 // There are no more requests
30 assert_pending!(handle.poll_request());
31
32 assert_eq!(r1.await.unwrap(), "world 1");
33
34 assert!(service.is_woken());
35
36 // Another request can be sent
37 assert_ready_ok!(service.poll_ready());
38
39 let r3 = service.call("hello 3");
40
41 assert_pending!(service.poll_ready());
42
43 assert_eq!(r2.await.unwrap(), "world 2");
44
45 // The request gets passed through
46 assert_request_eq!(handle, "hello 3").send_response("world 3");
47
48 assert_eq!(r3.await.unwrap(), "world 3");
49 }
50
51 #[tokio::test(flavor = "current_thread")]
basic_service_limit_functionality_without_poll_ready()52 async fn basic_service_limit_functionality_without_poll_ready() {
53 let _t = support::trace_init();
54 let limit = ConcurrencyLimitLayer::new(2);
55 let (mut service, mut handle) = mock::spawn_layer(limit);
56
57 assert_ready_ok!(service.poll_ready());
58 let r1 = service.call("hello 1");
59
60 assert_ready_ok!(service.poll_ready());
61 let r2 = service.call("hello 2");
62
63 assert_pending!(service.poll_ready());
64
65 // The request gets passed through
66 assert_request_eq!(handle, "hello 1").send_response("world 1");
67
68 assert!(!service.is_woken());
69
70 // The next request gets passed through
71 assert_request_eq!(handle, "hello 2").send_response("world 2");
72
73 assert!(!service.is_woken());
74
75 // There are no more requests
76 assert_pending!(handle.poll_request());
77
78 assert_eq!(r1.await.unwrap(), "world 1");
79
80 assert!(service.is_woken());
81
82 // One more request can be sent
83 assert_ready_ok!(service.poll_ready());
84 let r4 = service.call("hello 4");
85
86 assert_pending!(service.poll_ready());
87
88 assert_eq!(r2.await.unwrap(), "world 2");
89 assert!(service.is_woken());
90
91 // The request gets passed through
92 assert_request_eq!(handle, "hello 4").send_response("world 4");
93
94 assert_eq!(r4.await.unwrap(), "world 4");
95 }
96
97 #[tokio::test(flavor = "current_thread")]
request_without_capacity()98 async fn request_without_capacity() {
99 let _t = support::trace_init();
100 let limit = ConcurrencyLimitLayer::new(0);
101 let (mut service, _) = mock::spawn_layer::<(), (), _>(limit);
102
103 assert_pending!(service.poll_ready());
104 }
105
106 #[tokio::test(flavor = "current_thread")]
reserve_capacity_without_sending_request()107 async fn reserve_capacity_without_sending_request() {
108 let _t = support::trace_init();
109 let limit = ConcurrencyLimitLayer::new(1);
110 let (mut s1, mut handle) = mock::spawn_layer(limit);
111
112 let mut s2 = s1.clone();
113
114 // Reserve capacity in s1
115 assert_ready_ok!(s1.poll_ready());
116
117 // Service 2 cannot get capacity
118 assert_pending!(s2.poll_ready());
119
120 // s1 sends the request, then s2 is able to get capacity
121 let r1 = s1.call("hello");
122
123 assert_request_eq!(handle, "hello").send_response("world");
124
125 assert_pending!(s2.poll_ready());
126
127 r1.await.unwrap();
128
129 assert_ready_ok!(s2.poll_ready());
130 }
131
132 #[tokio::test(flavor = "current_thread")]
service_drop_frees_capacity()133 async fn service_drop_frees_capacity() {
134 let _t = support::trace_init();
135 let limit = ConcurrencyLimitLayer::new(1);
136 let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
137
138 let mut s2 = s1.clone();
139
140 // Reserve capacity in s1
141 assert_ready_ok!(s1.poll_ready());
142
143 // Service 2 cannot get capacity
144 assert_pending!(s2.poll_ready());
145
146 drop(s1);
147
148 assert!(s2.is_woken());
149 assert_ready_ok!(s2.poll_ready());
150 }
151
152 #[tokio::test(flavor = "current_thread")]
response_error_releases_capacity()153 async fn response_error_releases_capacity() {
154 let _t = support::trace_init();
155 let limit = ConcurrencyLimitLayer::new(1);
156 let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit);
157
158 let mut s2 = s1.clone();
159
160 // Reserve capacity in s1
161 assert_ready_ok!(s1.poll_ready());
162
163 // s1 sends the request, then s2 is able to get capacity
164 let r1 = s1.call("hello");
165
166 assert_request_eq!(handle, "hello").send_error("boom");
167
168 r1.await.unwrap_err();
169
170 assert_ready_ok!(s2.poll_ready());
171 }
172
173 #[tokio::test(flavor = "current_thread")]
response_future_drop_releases_capacity()174 async fn response_future_drop_releases_capacity() {
175 let _t = support::trace_init();
176 let limit = ConcurrencyLimitLayer::new(1);
177 let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit);
178
179 let mut s2 = s1.clone();
180
181 // Reserve capacity in s1
182 assert_ready_ok!(s1.poll_ready());
183
184 // s1 sends the request, then s2 is able to get capacity
185 let r1 = s1.call("hello");
186
187 assert_pending!(s2.poll_ready());
188
189 drop(r1);
190
191 assert_ready_ok!(s2.poll_ready());
192 }
193
194 #[tokio::test(flavor = "current_thread")]
multi_waiters()195 async fn multi_waiters() {
196 let _t = support::trace_init();
197 let limit = ConcurrencyLimitLayer::new(1);
198 let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit);
199 let mut s2 = s1.clone();
200 let mut s3 = s1.clone();
201
202 // Reserve capacity in s1
203 assert_ready_ok!(s1.poll_ready());
204
205 // s2 and s3 are not ready
206 assert_pending!(s2.poll_ready());
207 assert_pending!(s3.poll_ready());
208
209 drop(s1);
210
211 assert!(s2.is_woken());
212 assert!(!s3.is_woken());
213
214 drop(s2);
215
216 assert!(s3.is_woken());
217 }
218