1 use crate::load;
2 use futures_util::pin_mut;
3 use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
4 use tower_test::{assert_request_eq, mock};
5
6 use super::*;
7
8 #[tokio::test]
basic()9 async fn basic() {
10 // start the pool
11 let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
12 pin_mut!(handle);
13
14 let mut pool = mock::Spawn::new(Builder::new().build(mock, ()));
15 assert_pending!(pool.poll_ready());
16
17 // give the pool a backing service
18 let (svc1_m, svc1) = mock::pair();
19 pin_mut!(svc1);
20
21 assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
22 assert_ready_ok!(pool.poll_ready());
23
24 // send a request to the one backing service
25 let mut fut = task::spawn(pool.call(()));
26
27 assert_pending!(fut.poll());
28 assert_request_eq!(svc1, ()).send_response("foobar");
29 assert_eq!(assert_ready_ok!(fut.poll()), "foobar");
30 }
31
32 #[tokio::test]
high_load()33 async fn high_load() {
34 // start the pool
35 let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
36 pin_mut!(handle);
37
38 let pool = Builder::new()
39 .urgency(1.0) // so _any_ Pending will add a service
40 .underutilized_below(0.0) // so no Ready will remove a service
41 .max_services(Some(2))
42 .build(mock, ());
43 let mut pool = mock::Spawn::new(pool);
44 assert_pending!(pool.poll_ready());
45
46 // give the pool a backing service
47 let (svc1_m, svc1) = mock::pair();
48 pin_mut!(svc1);
49
50 svc1.allow(1);
51 assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
52 assert_ready_ok!(pool.poll_ready());
53
54 // make the one backing service not ready
55 let mut fut1 = task::spawn(pool.call(()));
56
57 // if we poll_ready again, pool should notice that load is increasing
58 // since urgency == 1.0, it should immediately enter high load
59 assert_pending!(pool.poll_ready());
60 // it should ask the maker for another service, so we give it one
61 let (svc2_m, svc2) = mock::pair();
62 pin_mut!(svc2);
63
64 svc2.allow(1);
65 assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
66
67 // the pool should now be ready again for one more request
68 assert_ready_ok!(pool.poll_ready());
69 let mut fut2 = task::spawn(pool.call(()));
70
71 assert_pending!(pool.poll_ready());
72
73 // the pool should _not_ try to add another service
74 // sicen we have max_services(2)
75 assert_pending!(handle.as_mut().poll_request());
76
77 // let see that each service got one request
78 assert_request_eq!(svc1, ()).send_response("foo");
79 assert_request_eq!(svc2, ()).send_response("bar");
80 assert_eq!(assert_ready_ok!(fut1.poll()), "foo");
81 assert_eq!(assert_ready_ok!(fut2.poll()), "bar");
82 }
83
84 #[tokio::test]
low_load()85 async fn low_load() {
86 // start the pool
87 let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
88 pin_mut!(handle);
89
90 let pool = Builder::new()
91 .urgency(1.0) // so any event will change the service count
92 .build(mock, ());
93
94 let mut pool = mock::Spawn::new(pool);
95
96 assert_pending!(pool.poll_ready());
97
98 // give the pool a backing service
99 let (svc1_m, svc1) = mock::pair();
100 pin_mut!(svc1);
101
102 svc1.allow(1);
103 assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
104 assert_ready_ok!(pool.poll_ready());
105
106 // cycling a request should now work
107 let mut fut = task::spawn(pool.call(()));
108
109 assert_request_eq!(svc1, ()).send_response("foo");
110 assert_eq!(assert_ready_ok!(fut.poll()), "foo");
111 // and pool should now not be ready (since svc1 isn't ready)
112 // it should immediately try to add another service
113 // which we give it
114 assert_pending!(pool.poll_ready());
115 let (svc2_m, svc2) = mock::pair();
116 pin_mut!(svc2);
117
118 svc2.allow(1);
119 assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
120 // pool is now ready
121 // which (because of urgency == 1.0) should immediately cause it to drop a service
122 // it'll drop svc1, so it'll still be ready
123 assert_ready_ok!(pool.poll_ready());
124 // and even with another ready, it won't drop svc2 since its now the only service
125 assert_ready_ok!(pool.poll_ready());
126
127 // cycling a request should now work on svc2
128 let mut fut = task::spawn(pool.call(()));
129
130 assert_request_eq!(svc2, ()).send_response("foo");
131 assert_eq!(assert_ready_ok!(fut.poll()), "foo");
132
133 // and again (still svc2)
134 svc2.allow(1);
135 assert_ready_ok!(pool.poll_ready());
136 let mut fut = task::spawn(pool.call(()));
137
138 assert_request_eq!(svc2, ()).send_response("foo");
139 assert_eq!(assert_ready_ok!(fut.poll()), "foo");
140 }
141
142 #[tokio::test]
failing_service()143 async fn failing_service() {
144 // start the pool
145 let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
146 pin_mut!(handle);
147
148 let pool = Builder::new()
149 .urgency(1.0) // so _any_ Pending will add a service
150 .underutilized_below(0.0) // so no Ready will remove a service
151 .build(mock, ());
152
153 let mut pool = mock::Spawn::new(pool);
154
155 assert_pending!(pool.poll_ready());
156
157 // give the pool a backing service
158 let (svc1_m, svc1) = mock::pair();
159 pin_mut!(svc1);
160
161 svc1.allow(1);
162 assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
163 assert_ready_ok!(pool.poll_ready());
164
165 // one request-response cycle
166 let mut fut = task::spawn(pool.call(()));
167
168 assert_request_eq!(svc1, ()).send_response("foo");
169 assert_eq!(assert_ready_ok!(fut.poll()), "foo");
170
171 // now make svc1 fail, so it has to be removed
172 svc1.send_error("ouch");
173 // polling now should recognize the failed service,
174 // try to create a new one, and then realize the maker isn't ready
175 assert_pending!(pool.poll_ready());
176 // then we release another service
177 let (svc2_m, svc2) = mock::pair();
178 pin_mut!(svc2);
179
180 svc2.allow(1);
181 assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
182
183 // the pool should now be ready again
184 assert_ready_ok!(pool.poll_ready());
185 // and a cycle should work (and go through svc2)
186 let mut fut = task::spawn(pool.call(()));
187
188 assert_request_eq!(svc2, ()).send_response("bar");
189 assert_eq!(assert_ready_ok!(fut.poll()), "bar");
190 }
191