1 use std::error::Error as StdError;
2 use std::future::Future;
3 use std::marker::Unpin;
4 use std::pin::Pin;
5 use std::task::{Context, Poll};
6 
7 use bytes::{Buf, Bytes};
8 use http::Request;
9 use tokio::io::{AsyncRead, AsyncWrite};
10 use tracing::{debug, trace};
11 
12 use super::{Http1Transaction, Wants};
13 use crate::body::{Body, DecodedLength, HttpBody};
14 use crate::common;
15 use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
16 use crate::upgrade::OnUpgrade;
17 
18 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
19     conn: Conn<I, Bs::Data, T>,
20     dispatch: D,
21     body_tx: Option<crate::body::Sender>,
22     body_rx: Pin<Box<Option<Bs>>>,
23     is_closing: bool,
24 }
25 
26 pub(crate) trait Dispatch {
27     type PollItem;
28     type PollBody;
29     type PollError;
30     type RecvItem;
poll_msg( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>31     fn poll_msg(
32         self: Pin<&mut Self>,
33         cx: &mut Context<'_>,
34     ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>35     fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>36     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
should_poll(&self) -> bool37     fn should_poll(&self) -> bool;
38 }
39 
40 cfg_server! {
41     use crate::service::HttpService;
42 
43     pub(crate) struct Server<S: HttpService<B>, B> {
44         in_flight: Pin<Box<Option<S::Future>>>,
45         pub(crate) service: S,
46     }
47 }
48 
49 cfg_client! {
50     pin_project_lite::pin_project! {
51         pub(crate) struct Client<B> {
52             callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
53             #[pin]
54             rx: ClientRx<B>,
55             rx_closed: bool,
56         }
57     }
58 
59     type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
60 }
61 
62 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
63 where
64     D: Dispatch<
65             PollItem = MessageHead<T::Outgoing>,
66             PollBody = Bs,
67             RecvItem = MessageHead<T::Incoming>,
68         > + Unpin,
69     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
70     I: AsyncRead + AsyncWrite + Unpin,
71     T: Http1Transaction + Unpin,
72     Bs: HttpBody + 'static,
73     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
74 {
new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self75     pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
76         Dispatcher {
77             conn,
78             dispatch,
79             body_tx: None,
80             body_rx: Box::pin(None),
81             is_closing: false,
82         }
83     }
84 
85     #[cfg(feature = "server")]
disable_keep_alive(&mut self)86     pub(crate) fn disable_keep_alive(&mut self) {
87         self.conn.disable_keep_alive();
88         if self.conn.is_write_closed() {
89             self.close();
90         }
91     }
92 
into_inner(self) -> (I, Bytes, D)93     pub(crate) fn into_inner(self) -> (I, Bytes, D) {
94         let (io, buf) = self.conn.into_inner();
95         (io, buf, self.dispatch)
96     }
97 
98     /// Run this dispatcher until HTTP says this connection is done,
99     /// but don't call `AsyncWrite::shutdown` on the underlying IO.
100     ///
101     /// This is useful for old-style HTTP upgrades, but ignores
102     /// newer-style upgrade API.
poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> where Self: Unpin,103     pub(crate) fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>
104     where
105         Self: Unpin,
106     {
107         Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
108             if let Dispatched::Upgrade(pending) = ds {
109                 pending.manual();
110             }
111         })
112     }
113 
poll_catch( &mut self, cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>114     fn poll_catch(
115         &mut self,
116         cx: &mut Context<'_>,
117         should_shutdown: bool,
118     ) -> Poll<crate::Result<Dispatched>> {
119         Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
120             // Be sure to alert a streaming body of the failure.
121             if let Some(mut body) = self.body_tx.take() {
122                 body.send_error(crate::Error::new_body("connection error"));
123             }
124             // An error means we're shutting down either way.
125             // We just try to give the error to the user,
126             // and close the connection with an Ok. If we
127             // cannot give it to the user, then return the Err.
128             self.dispatch.recv_msg(Err(e))?;
129             Ok(Dispatched::Shutdown)
130         }))
131     }
132 
poll_inner( &mut self, cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>133     fn poll_inner(
134         &mut self,
135         cx: &mut Context<'_>,
136         should_shutdown: bool,
137     ) -> Poll<crate::Result<Dispatched>> {
138         T::update_date();
139 
140         ready!(self.poll_loop(cx))?;
141 
142         if self.is_done() {
143             if let Some(pending) = self.conn.pending_upgrade() {
144                 self.conn.take_error()?;
145                 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
146             } else if should_shutdown {
147                 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
148             }
149             self.conn.take_error()?;
150             Poll::Ready(Ok(Dispatched::Shutdown))
151         } else {
152             Poll::Pending
153         }
154     }
155 
poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>156     fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
157         // Limit the looping on this connection, in case it is ready far too
158         // often, so that other futures don't starve.
159         //
160         // 16 was chosen arbitrarily, as that is number of pipelined requests
161         // benchmarks often use. Perhaps it should be a config option instead.
162         for _ in 0..16 {
163             let _ = self.poll_read(cx)?;
164             let _ = self.poll_write(cx)?;
165             let _ = self.poll_flush(cx)?;
166 
167             // This could happen if reading paused before blocking on IO,
168             // such as getting to the end of a framed message, but then
169             // writing/flushing set the state back to Init. In that case,
170             // if the read buffer still had bytes, we'd want to try poll_read
171             // again, or else we wouldn't ever be woken up again.
172             //
173             // Using this instead of task::current() and notify() inside
174             // the Conn is noticeably faster in pipelined benchmarks.
175             if !self.conn.wants_read_again() {
176                 //break;
177                 return Poll::Ready(Ok(()));
178             }
179         }
180 
181         trace!("poll_loop yielding (self = {:p})", self);
182 
183         common::task::yield_now(cx).map(|never| match never {})
184     }
185 
poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>186     fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
187         loop {
188             if self.is_closing {
189                 return Poll::Ready(Ok(()));
190             } else if self.conn.can_read_head() {
191                 ready!(self.poll_read_head(cx))?;
192             } else if let Some(mut body) = self.body_tx.take() {
193                 if self.conn.can_read_body() {
194                     match body.poll_ready(cx) {
195                         Poll::Ready(Ok(())) => (),
196                         Poll::Pending => {
197                             self.body_tx = Some(body);
198                             return Poll::Pending;
199                         }
200                         Poll::Ready(Err(_canceled)) => {
201                             // user doesn't care about the body
202                             // so we should stop reading
203                             trace!("body receiver dropped before eof, draining or closing");
204                             self.conn.poll_drain_or_close_read(cx);
205                             continue;
206                         }
207                     }
208                     match self.conn.poll_read_body(cx) {
209                         Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
210                             Ok(()) => {
211                                 self.body_tx = Some(body);
212                             }
213                             Err(_canceled) => {
214                                 if self.conn.can_read_body() {
215                                     trace!("body receiver dropped before eof, closing");
216                                     self.conn.close_read();
217                                 }
218                             }
219                         },
220                         Poll::Ready(None) => {
221                             // just drop, the body will close automatically
222                         }
223                         Poll::Pending => {
224                             self.body_tx = Some(body);
225                             return Poll::Pending;
226                         }
227                         Poll::Ready(Some(Err(e))) => {
228                             body.send_error(crate::Error::new_body(e));
229                         }
230                     }
231                 } else {
232                     // just drop, the body will close automatically
233                 }
234             } else {
235                 return self.conn.poll_read_keep_alive(cx);
236             }
237         }
238     }
239 
poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>240     fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
241         // can dispatch receive, or does it still care about, an incoming message?
242         match ready!(self.dispatch.poll_ready(cx)) {
243             Ok(()) => (),
244             Err(()) => {
245                 trace!("dispatch no longer receiving messages");
246                 self.close();
247                 return Poll::Ready(Ok(()));
248             }
249         }
250         // dispatch is ready for a message, try to read one
251         match ready!(self.conn.poll_read_head(cx)) {
252             Some(Ok((mut head, body_len, wants))) => {
253                 let body = match body_len {
254                     DecodedLength::ZERO => Body::empty(),
255                     other => {
256                         let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
257                         self.body_tx = Some(tx);
258                         rx
259                     }
260                 };
261                 if wants.contains(Wants::UPGRADE) {
262                     let upgrade = self.conn.on_upgrade();
263                     debug_assert!(!upgrade.is_none(), "empty upgrade");
264                     debug_assert!(
265                         head.extensions.get::<OnUpgrade>().is_none(),
266                         "OnUpgrade already set"
267                     );
268                     head.extensions.insert(upgrade);
269                 }
270                 self.dispatch.recv_msg(Ok((head, body)))?;
271                 Poll::Ready(Ok(()))
272             }
273             Some(Err(err)) => {
274                 debug!("read_head error: {}", err);
275                 self.dispatch.recv_msg(Err(err))?;
276                 // if here, the dispatcher gave the user the error
277                 // somewhere else. we still need to shutdown, but
278                 // not as a second error.
279                 self.close();
280                 Poll::Ready(Ok(()))
281             }
282             None => {
283                 // read eof, the write side will have been closed too unless
284                 // allow_read_close was set to true, in which case just do
285                 // nothing...
286                 debug_assert!(self.conn.is_read_closed());
287                 if self.conn.is_write_closed() {
288                     self.close();
289                 }
290                 Poll::Ready(Ok(()))
291             }
292         }
293     }
294 
poll_write(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>295     fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
296         loop {
297             if self.is_closing {
298                 return Poll::Ready(Ok(()));
299             } else if self.body_rx.is_none()
300                 && self.conn.can_write_head()
301                 && self.dispatch.should_poll()
302             {
303                 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
304                     let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
305 
306                     // Check if the body knows its full data immediately.
307                     //
308                     // If so, we can skip a bit of bookkeeping that streaming
309                     // bodies need to do.
310                     if let Some(full) = crate::body::take_full_data(&mut body) {
311                         self.conn.write_full_msg(head, full);
312                         return Poll::Ready(Ok(()));
313                     }
314 
315                     let body_type = if body.is_end_stream() {
316                         self.body_rx.set(None);
317                         None
318                     } else {
319                         let btype = body
320                             .size_hint()
321                             .exact()
322                             .map(BodyLength::Known)
323                             .or_else(|| Some(BodyLength::Unknown));
324                         self.body_rx.set(Some(body));
325                         btype
326                     };
327                     self.conn.write_head(head, body_type);
328                 } else {
329                     self.close();
330                     return Poll::Ready(Ok(()));
331                 }
332             } else if !self.conn.can_buffer_body() {
333                 ready!(self.poll_flush(cx))?;
334             } else {
335                 // A new scope is needed :(
336                 if let (Some(mut body), clear_body) =
337                     OptGuard::new(self.body_rx.as_mut()).guard_mut()
338                 {
339                     debug_assert!(!*clear_body, "opt guard defaults to keeping body");
340                     if !self.conn.can_write_body() {
341                         trace!(
342                             "no more write body allowed, user body is_end_stream = {}",
343                             body.is_end_stream(),
344                         );
345                         *clear_body = true;
346                         continue;
347                     }
348 
349                     let item = ready!(body.as_mut().poll_data(cx));
350                     if let Some(item) = item {
351                         let chunk = item.map_err(|e| {
352                             *clear_body = true;
353                             crate::Error::new_user_body(e)
354                         })?;
355                         let eos = body.is_end_stream();
356                         if eos {
357                             *clear_body = true;
358                             if chunk.remaining() == 0 {
359                                 trace!("discarding empty chunk");
360                                 self.conn.end_body()?;
361                             } else {
362                                 self.conn.write_body_and_end(chunk);
363                             }
364                         } else {
365                             if chunk.remaining() == 0 {
366                                 trace!("discarding empty chunk");
367                                 continue;
368                             }
369                             self.conn.write_body(chunk);
370                         }
371                     } else {
372                         *clear_body = true;
373                         self.conn.end_body()?;
374                     }
375                 } else {
376                     // If there's no body_rx, end the body
377                     if self.conn.can_write_body() {
378                         self.conn.end_body()?;
379                     } else {
380                         return Poll::Pending;
381                     }
382                 }
383             }
384         }
385     }
386 
poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>>387     fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
388         self.conn.poll_flush(cx).map_err(|err| {
389             debug!("error writing: {}", err);
390             crate::Error::new_body_write(err)
391         })
392     }
393 
close(&mut self)394     fn close(&mut self) {
395         self.is_closing = true;
396         self.conn.close_read();
397         self.conn.close_write();
398     }
399 
is_done(&self) -> bool400     fn is_done(&self) -> bool {
401         if self.is_closing {
402             return true;
403         }
404 
405         let read_done = self.conn.is_read_closed();
406 
407         if !T::should_read_first() && read_done {
408             // a client that cannot read may was well be done.
409             true
410         } else {
411             let write_done = self.conn.is_write_closed()
412                 || (!self.dispatch.should_poll() && self.body_rx.is_none());
413             read_done && write_done
414         }
415     }
416 }
417 
418 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
419 where
420     D: Dispatch<
421             PollItem = MessageHead<T::Outgoing>,
422             PollBody = Bs,
423             RecvItem = MessageHead<T::Incoming>,
424         > + Unpin,
425     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
426     I: AsyncRead + AsyncWrite + Unpin,
427     T: Http1Transaction + Unpin,
428     Bs: HttpBody + 'static,
429     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
430 {
431     type Output = crate::Result<Dispatched>;
432 
433     #[inline]
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>434     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
435         self.poll_catch(cx, true)
436     }
437 }
438 
439 // ===== impl OptGuard =====
440 
441 /// A drop guard to allow a mutable borrow of an Option while being able to
442 /// set whether the `Option` should be cleared on drop.
443 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
444 
445 impl<'a, T> OptGuard<'a, T> {
new(pin: Pin<&'a mut Option<T>>) -> Self446     fn new(pin: Pin<&'a mut Option<T>>) -> Self {
447         OptGuard(pin, false)
448     }
449 
guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool)450     fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
451         (self.0.as_mut().as_pin_mut(), &mut self.1)
452     }
453 }
454 
455 impl<'a, T> Drop for OptGuard<'a, T> {
drop(&mut self)456     fn drop(&mut self) {
457         if self.1 {
458             self.0.set(None);
459         }
460     }
461 }
462 
463 // ===== impl Server =====
464 
465 cfg_server! {
466     impl<S, B> Server<S, B>
467     where
468         S: HttpService<B>,
469     {
470         pub(crate) fn new(service: S) -> Server<S, B> {
471             Server {
472                 in_flight: Box::pin(None),
473                 service,
474             }
475         }
476 
477         pub(crate) fn into_service(self) -> S {
478             self.service
479         }
480     }
481 
482     // Service is never pinned
483     impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
484 
485     impl<S, Bs> Dispatch for Server<S, Body>
486     where
487         S: HttpService<Body, ResBody = Bs>,
488         S::Error: Into<Box<dyn StdError + Send + Sync>>,
489         Bs: HttpBody,
490     {
491         type PollItem = MessageHead<http::StatusCode>;
492         type PollBody = Bs;
493         type PollError = S::Error;
494         type RecvItem = RequestHead;
495 
496         fn poll_msg(
497             mut self: Pin<&mut Self>,
498             cx: &mut Context<'_>,
499         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
500             let mut this = self.as_mut();
501             let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
502                 let resp = ready!(fut.as_mut().poll(cx)?);
503                 let (parts, body) = resp.into_parts();
504                 let head = MessageHead {
505                     version: parts.version,
506                     subject: parts.status,
507                     headers: parts.headers,
508                     extensions: parts.extensions,
509                 };
510                 Poll::Ready(Some(Ok((head, body))))
511             } else {
512                 unreachable!("poll_msg shouldn't be called if no inflight");
513             };
514 
515             // Since in_flight finished, remove it
516             this.in_flight.set(None);
517             ret
518         }
519 
520         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
521             let (msg, body) = msg?;
522             let mut req = Request::new(body);
523             *req.method_mut() = msg.subject.0;
524             *req.uri_mut() = msg.subject.1;
525             *req.headers_mut() = msg.headers;
526             *req.version_mut() = msg.version;
527             *req.extensions_mut() = msg.extensions;
528             let fut = self.service.call(req);
529             self.in_flight.set(Some(fut));
530             Ok(())
531         }
532 
533         fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
534             if self.in_flight.is_some() {
535                 Poll::Pending
536             } else {
537                 self.service.poll_ready(cx).map_err(|_e| {
538                     // FIXME: return error value.
539                     trace!("service closed");
540                 })
541             }
542         }
543 
544         fn should_poll(&self) -> bool {
545             self.in_flight.is_some()
546         }
547     }
548 }
549 
550 // ===== impl Client =====
551 
552 cfg_client! {
553     impl<B> Client<B> {
554         pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
555             Client {
556                 callback: None,
557                 rx,
558                 rx_closed: false,
559             }
560         }
561     }
562 
563     impl<B> Dispatch for Client<B>
564     where
565         B: HttpBody,
566     {
567         type PollItem = RequestHead;
568         type PollBody = B;
569         type PollError = std::convert::Infallible;
570         type RecvItem = crate::proto::ResponseHead;
571 
572         fn poll_msg(
573             mut self: Pin<&mut Self>,
574             cx: &mut Context<'_>,
575         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
576             let mut this = self.as_mut();
577             debug_assert!(!this.rx_closed);
578             match this.rx.poll_recv(cx) {
579                 Poll::Ready(Some((req, mut cb))) => {
580                     // check that future hasn't been canceled already
581                     match cb.poll_canceled(cx) {
582                         Poll::Ready(()) => {
583                             trace!("request canceled");
584                             Poll::Ready(None)
585                         }
586                         Poll::Pending => {
587                             let (parts, body) = req.into_parts();
588                             let head = RequestHead {
589                                 version: parts.version,
590                                 subject: crate::proto::RequestLine(parts.method, parts.uri),
591                                 headers: parts.headers,
592                                 extensions: parts.extensions,
593                             };
594                             this.callback = Some(cb);
595                             Poll::Ready(Some(Ok((head, body))))
596                         }
597                     }
598                 }
599                 Poll::Ready(None) => {
600                     // user has dropped sender handle
601                     trace!("client tx closed");
602                     this.rx_closed = true;
603                     Poll::Ready(None)
604                 }
605                 Poll::Pending => Poll::Pending,
606             }
607         }
608 
609         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
610             match msg {
611                 Ok((msg, body)) => {
612                     if let Some(cb) = self.callback.take() {
613                         let res = msg.into_response(body);
614                         cb.send(Ok(res));
615                         Ok(())
616                     } else {
617                         // Getting here is likely a bug! An error should have happened
618                         // in Conn::require_empty_read() before ever parsing a
619                         // full message!
620                         Err(crate::Error::new_unexpected_message())
621                     }
622                 }
623                 Err(err) => {
624                     if let Some(cb) = self.callback.take() {
625                         cb.send(Err((err, None)));
626                         Ok(())
627                     } else if !self.rx_closed {
628                         self.rx.close();
629                         if let Some((req, cb)) = self.rx.try_recv() {
630                             trace!("canceling queued request with connection error: {}", err);
631                             // in this case, the message was never even started, so it's safe to tell
632                             // the user that the request was completely canceled
633                             cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
634                             Ok(())
635                         } else {
636                             Err(err)
637                         }
638                     } else {
639                         Err(err)
640                     }
641                 }
642             }
643         }
644 
645         fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
646             match self.callback {
647                 Some(ref mut cb) => match cb.poll_canceled(cx) {
648                     Poll::Ready(()) => {
649                         trace!("callback receiver has dropped");
650                         Poll::Ready(Err(()))
651                     }
652                     Poll::Pending => Poll::Ready(Ok(())),
653                 },
654                 None => Poll::Ready(Err(())),
655             }
656         }
657 
658         fn should_poll(&self) -> bool {
659             self.callback.is_none()
660         }
661     }
662 }
663 
664 #[cfg(test)]
665 mod tests {
666     use super::*;
667     use crate::proto::h1::ClientTransaction;
668     use std::time::Duration;
669 
670     #[test]
client_read_bytes_before_writing_request()671     fn client_read_bytes_before_writing_request() {
672         let _ = pretty_env_logger::try_init();
673 
674         tokio_test::task::spawn(()).enter(|cx, _| {
675             let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
676 
677             // Block at 0 for now, but we will release this response before
678             // the request is ready to write later...
679             let (mut tx, rx) = crate::client::dispatch::channel();
680             let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
681             let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
682 
683             // First poll is needed to allow tx to send...
684             assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
685 
686             // Unblock our IO, which has a response before we've sent request!
687             //
688             handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
689 
690             let mut res_rx = tx
691                 .try_send(crate::Request::new(crate::Body::empty()))
692                 .unwrap();
693 
694             tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
695             let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
696                 .expect_err("callback should send error");
697 
698             match (err.0.kind(), err.1) {
699                 (&crate::error::Kind::Canceled, Some(_)) => (),
700                 other => panic!("expected Canceled, got {:?}", other),
701             }
702         });
703     }
704 
705     #[tokio::test]
client_flushing_is_not_ready_for_next_request()706     async fn client_flushing_is_not_ready_for_next_request() {
707         let _ = pretty_env_logger::try_init();
708 
709         let (io, _handle) = tokio_test::io::Builder::new()
710             .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
711             .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
712             .wait(std::time::Duration::from_secs(2))
713             .build_with_handle();
714 
715         let (mut tx, rx) = crate::client::dispatch::channel();
716         let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
717         conn.set_write_strategy_queue();
718 
719         let dispatcher = Dispatcher::new(Client::new(rx), conn);
720         let _dispatcher = tokio::spawn(async move { dispatcher.await });
721 
722         let req = crate::Request::builder()
723             .method("POST")
724             .body(crate::Body::from("reee"))
725             .unwrap();
726 
727         let res = tx.try_send(req).unwrap().await.expect("response");
728         drop(res);
729 
730         assert!(!tx.is_ready());
731     }
732 
733     #[tokio::test]
body_empty_chunks_ignored()734     async fn body_empty_chunks_ignored() {
735         let _ = pretty_env_logger::try_init();
736 
737         let io = tokio_test::io::Builder::new()
738             // no reading or writing, just be blocked for the test...
739             .wait(Duration::from_secs(5))
740             .build();
741 
742         let (mut tx, rx) = crate::client::dispatch::channel();
743         let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
744         let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
745 
746         // First poll is needed to allow tx to send...
747         assert!(dispatcher.poll().is_pending());
748 
749         let body = {
750             let (mut tx, body) = crate::Body::channel();
751             tx.try_send_data("".into()).unwrap();
752             body
753         };
754 
755         let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
756 
757         // Ensure conn.write_body wasn't called with the empty chunk.
758         // If it is, it will trigger an assertion.
759         assert!(dispatcher.poll().is_pending());
760     }
761 }
762