1 use crate::load;
2 use futures_util::pin_mut;
3 use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
4 use tower_test::{assert_request_eq, mock};
5 
6 use super::*;
7 
8 #[tokio::test]
basic()9 async fn basic() {
10     // start the pool
11     let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
12     pin_mut!(handle);
13 
14     let mut pool = mock::Spawn::new(Builder::new().build(mock, ()));
15     assert_pending!(pool.poll_ready());
16 
17     // give the pool a backing service
18     let (svc1_m, svc1) = mock::pair();
19     pin_mut!(svc1);
20 
21     assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
22     assert_ready_ok!(pool.poll_ready());
23 
24     // send a request to the one backing service
25     let mut fut = task::spawn(pool.call(()));
26 
27     assert_pending!(fut.poll());
28     assert_request_eq!(svc1, ()).send_response("foobar");
29     assert_eq!(assert_ready_ok!(fut.poll()), "foobar");
30 }
31 
32 #[tokio::test]
high_load()33 async fn high_load() {
34     // start the pool
35     let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
36     pin_mut!(handle);
37 
38     let pool = Builder::new()
39         .urgency(1.0) // so _any_ Pending will add a service
40         .underutilized_below(0.0) // so no Ready will remove a service
41         .max_services(Some(2))
42         .build(mock, ());
43     let mut pool = mock::Spawn::new(pool);
44     assert_pending!(pool.poll_ready());
45 
46     // give the pool a backing service
47     let (svc1_m, svc1) = mock::pair();
48     pin_mut!(svc1);
49 
50     svc1.allow(1);
51     assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
52     assert_ready_ok!(pool.poll_ready());
53 
54     // make the one backing service not ready
55     let mut fut1 = task::spawn(pool.call(()));
56 
57     // if we poll_ready again, pool should notice that load is increasing
58     // since urgency == 1.0, it should immediately enter high load
59     assert_pending!(pool.poll_ready());
60     // it should ask the maker for another service, so we give it one
61     let (svc2_m, svc2) = mock::pair();
62     pin_mut!(svc2);
63 
64     svc2.allow(1);
65     assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
66 
67     // the pool should now be ready again for one more request
68     assert_ready_ok!(pool.poll_ready());
69     let mut fut2 = task::spawn(pool.call(()));
70 
71     assert_pending!(pool.poll_ready());
72 
73     // the pool should _not_ try to add another service
74     // sicen we have max_services(2)
75     assert_pending!(handle.as_mut().poll_request());
76 
77     // let see that each service got one request
78     assert_request_eq!(svc1, ()).send_response("foo");
79     assert_request_eq!(svc2, ()).send_response("bar");
80     assert_eq!(assert_ready_ok!(fut1.poll()), "foo");
81     assert_eq!(assert_ready_ok!(fut2.poll()), "bar");
82 }
83 
84 #[tokio::test]
low_load()85 async fn low_load() {
86     // start the pool
87     let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
88     pin_mut!(handle);
89 
90     let pool = Builder::new()
91         .urgency(1.0) // so any event will change the service count
92         .build(mock, ());
93 
94     let mut pool = mock::Spawn::new(pool);
95 
96     assert_pending!(pool.poll_ready());
97 
98     // give the pool a backing service
99     let (svc1_m, svc1) = mock::pair();
100     pin_mut!(svc1);
101 
102     svc1.allow(1);
103     assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
104     assert_ready_ok!(pool.poll_ready());
105 
106     // cycling a request should now work
107     let mut fut = task::spawn(pool.call(()));
108 
109     assert_request_eq!(svc1, ()).send_response("foo");
110     assert_eq!(assert_ready_ok!(fut.poll()), "foo");
111     // and pool should now not be ready (since svc1 isn't ready)
112     // it should immediately try to add another service
113     // which we give it
114     assert_pending!(pool.poll_ready());
115     let (svc2_m, svc2) = mock::pair();
116     pin_mut!(svc2);
117 
118     svc2.allow(1);
119     assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
120     // pool is now ready
121     // which (because of urgency == 1.0) should immediately cause it to drop a service
122     // it'll drop svc1, so it'll still be ready
123     assert_ready_ok!(pool.poll_ready());
124     // and even with another ready, it won't drop svc2 since its now the only service
125     assert_ready_ok!(pool.poll_ready());
126 
127     // cycling a request should now work on svc2
128     let mut fut = task::spawn(pool.call(()));
129 
130     assert_request_eq!(svc2, ()).send_response("foo");
131     assert_eq!(assert_ready_ok!(fut.poll()), "foo");
132 
133     // and again (still svc2)
134     svc2.allow(1);
135     assert_ready_ok!(pool.poll_ready());
136     let mut fut = task::spawn(pool.call(()));
137 
138     assert_request_eq!(svc2, ()).send_response("foo");
139     assert_eq!(assert_ready_ok!(fut.poll()), "foo");
140 }
141 
142 #[tokio::test]
failing_service()143 async fn failing_service() {
144     // start the pool
145     let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
146     pin_mut!(handle);
147 
148     let pool = Builder::new()
149         .urgency(1.0) // so _any_ Pending will add a service
150         .underutilized_below(0.0) // so no Ready will remove a service
151         .build(mock, ());
152 
153     let mut pool = mock::Spawn::new(pool);
154 
155     assert_pending!(pool.poll_ready());
156 
157     // give the pool a backing service
158     let (svc1_m, svc1) = mock::pair();
159     pin_mut!(svc1);
160 
161     svc1.allow(1);
162     assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
163     assert_ready_ok!(pool.poll_ready());
164 
165     // one request-response cycle
166     let mut fut = task::spawn(pool.call(()));
167 
168     assert_request_eq!(svc1, ()).send_response("foo");
169     assert_eq!(assert_ready_ok!(fut.poll()), "foo");
170 
171     // now make svc1 fail, so it has to be removed
172     svc1.send_error("ouch");
173     // polling now should recognize the failed service,
174     // try to create a new one, and then realize the maker isn't ready
175     assert_pending!(pool.poll_ready());
176     // then we release another service
177     let (svc2_m, svc2) = mock::pair();
178     pin_mut!(svc2);
179 
180     svc2.allow(1);
181     assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
182 
183     // the pool should now be ready again
184     assert_ready_ok!(pool.poll_ready());
185     // and a cycle should work (and go through svc2)
186     let mut fut = task::spawn(pool.call(()));
187 
188     assert_request_eq!(svc2, ()).send_response("bar");
189     assert_eq!(assert_ready_ok!(fut.poll()), "bar");
190 }
191