1 use std::fmt;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::sync::Arc;
5 
6 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
7 use crate::body::Body;
8 #[cfg(feature = "server")]
9 use crate::body::HttpBody;
10 #[cfg(all(feature = "http2", feature = "server"))]
11 use crate::proto::h2::server::H2Stream;
12 use crate::rt::Executor;
13 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
14 use crate::server::server::{new_svc::NewSvcTask, Watcher};
15 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
16 use crate::service::HttpService;
17 
18 #[cfg(feature = "server")]
19 pub trait ConnStreamExec<F, B: HttpBody>: Clone {
execute_h2stream(&mut self, fut: H2Stream<F, B>)20     fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
21 }
22 
23 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
24 pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone {
execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)25     fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>);
26 }
27 
28 pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
29 
30 // Either the user provides an executor for background tasks, or we use
31 // `tokio::spawn`.
32 #[derive(Clone)]
33 pub enum Exec {
34     Default,
35     Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>),
36 }
37 
38 // ===== impl Exec =====
39 
40 impl Exec {
execute<F>(&self, fut: F) where F: Future<Output = ()> + Send + 'static,41     pub(crate) fn execute<F>(&self, fut: F)
42     where
43         F: Future<Output = ()> + Send + 'static,
44     {
45         match *self {
46             Exec::Default => {
47                 #[cfg(feature = "tcp")]
48                 {
49                     tokio::task::spawn(fut);
50                 }
51                 #[cfg(not(feature = "tcp"))]
52                 {
53                     // If no runtime, we need an executor!
54                     panic!("executor must be set")
55                 }
56             }
57             Exec::Executor(ref e) => {
58                 e.execute(Box::pin(fut));
59             }
60         }
61     }
62 }
63 
64 impl fmt::Debug for Exec {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result65     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66         f.debug_struct("Exec").finish()
67     }
68 }
69 
70 #[cfg(feature = "server")]
71 impl<F, B> ConnStreamExec<F, B> for Exec
72 where
73     H2Stream<F, B>: Future<Output = ()> + Send + 'static,
74     B: HttpBody,
75 {
execute_h2stream(&mut self, fut: H2Stream<F, B>)76     fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
77         self.execute(fut)
78     }
79 }
80 
81 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
82 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
83 where
84     NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static,
85     S: HttpService<Body>,
86     W: Watcher<I, S, E>,
87 {
execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)88     fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
89         self.execute(fut)
90     }
91 }
92 
93 // ==== impl Executor =====
94 
95 #[cfg(feature = "server")]
96 impl<E, F, B> ConnStreamExec<F, B> for E
97 where
98     E: Executor<H2Stream<F, B>> + Clone,
99     H2Stream<F, B>: Future<Output = ()>,
100     B: HttpBody,
101 {
execute_h2stream(&mut self, fut: H2Stream<F, B>)102     fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
103         self.execute(fut)
104     }
105 }
106 
107 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
108 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
109 where
110     E: Executor<NewSvcTask<I, N, S, E, W>> + Clone,
111     NewSvcTask<I, N, S, E, W>: Future<Output = ()>,
112     S: HttpService<Body>,
113     W: Watcher<I, S, E>,
114 {
execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)115     fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) {
116         self.execute(fut)
117     }
118 }
119 
120 // If http2 is not enable, we just have a stub here, so that the trait bounds
121 // that *would* have been needed are still checked. Why?
122 //
123 // Because enabling `http2` shouldn't suddenly add new trait bounds that cause
124 // a compilation error.
125 #[cfg(not(feature = "http2"))]
126 #[allow(missing_debug_implementations)]
127 pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>);
128 
129 #[cfg(not(feature = "http2"))]
130 impl<F, B, E> Future for H2Stream<F, B>
131 where
132     F: Future<Output = Result<http::Response<B>, E>>,
133     B: crate::body::HttpBody,
134     B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
135     E: Into<Box<dyn std::error::Error + Send + Sync>>,
136 {
137     type Output = ();
138 
poll( self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output>139     fn poll(
140         self: Pin<&mut Self>,
141         _cx: &mut std::task::Context<'_>,
142     ) -> std::task::Poll<Self::Output> {
143         unreachable!()
144     }
145 }
146