1 use std::fmt; 2 use std::future::Future; 3 use std::pin::Pin; 4 use std::sync::Arc; 5 6 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] 7 use crate::body::Body; 8 #[cfg(feature = "server")] 9 use crate::body::HttpBody; 10 #[cfg(all(feature = "http2", feature = "server"))] 11 use crate::proto::h2::server::H2Stream; 12 use crate::rt::Executor; 13 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] 14 use crate::server::server::{new_svc::NewSvcTask, Watcher}; 15 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] 16 use crate::service::HttpService; 17 18 #[cfg(feature = "server")] 19 pub trait ConnStreamExec<F, B: HttpBody>: Clone { execute_h2stream(&mut self, fut: H2Stream<F, B>)20 fn execute_h2stream(&mut self, fut: H2Stream<F, B>); 21 } 22 23 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] 24 pub trait NewSvcExec<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>>: Clone { execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)25 fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>); 26 } 27 28 pub(crate) type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>; 29 30 // Either the user provides an executor for background tasks, or we use 31 // `tokio::spawn`. 32 #[derive(Clone)] 33 pub enum Exec { 34 Default, 35 Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>), 36 } 37 38 // ===== impl Exec ===== 39 40 impl Exec { execute<F>(&self, fut: F) where F: Future<Output = ()> + Send + 'static,41 pub(crate) fn execute<F>(&self, fut: F) 42 where 43 F: Future<Output = ()> + Send + 'static, 44 { 45 match *self { 46 Exec::Default => { 47 #[cfg(feature = "tcp")] 48 { 49 tokio::task::spawn(fut); 50 } 51 #[cfg(not(feature = "tcp"))] 52 { 53 // If no runtime, we need an executor! 54 panic!("executor must be set") 55 } 56 } 57 Exec::Executor(ref e) => { 58 e.execute(Box::pin(fut)); 59 } 60 } 61 } 62 } 63 64 impl fmt::Debug for Exec { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 66 f.debug_struct("Exec").finish() 67 } 68 } 69 70 #[cfg(feature = "server")] 71 impl<F, B> ConnStreamExec<F, B> for Exec 72 where 73 H2Stream<F, B>: Future<Output = ()> + Send + 'static, 74 B: HttpBody, 75 { execute_h2stream(&mut self, fut: H2Stream<F, B>)76 fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { 77 self.execute(fut) 78 } 79 } 80 81 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] 82 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec 83 where 84 NewSvcTask<I, N, S, E, W>: Future<Output = ()> + Send + 'static, 85 S: HttpService<Body>, 86 W: Watcher<I, S, E>, 87 { execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)88 fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { 89 self.execute(fut) 90 } 91 } 92 93 // ==== impl Executor ===== 94 95 #[cfg(feature = "server")] 96 impl<E, F, B> ConnStreamExec<F, B> for E 97 where 98 E: Executor<H2Stream<F, B>> + Clone, 99 H2Stream<F, B>: Future<Output = ()>, 100 B: HttpBody, 101 { execute_h2stream(&mut self, fut: H2Stream<F, B>)102 fn execute_h2stream(&mut self, fut: H2Stream<F, B>) { 103 self.execute(fut) 104 } 105 } 106 107 #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] 108 impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E 109 where 110 E: Executor<NewSvcTask<I, N, S, E, W>> + Clone, 111 NewSvcTask<I, N, S, E, W>: Future<Output = ()>, 112 S: HttpService<Body>, 113 W: Watcher<I, S, E>, 114 { execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>)115 fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) { 116 self.execute(fut) 117 } 118 } 119 120 // If http2 is not enable, we just have a stub here, so that the trait bounds 121 // that *would* have been needed are still checked. Why? 122 // 123 // Because enabling `http2` shouldn't suddenly add new trait bounds that cause 124 // a compilation error. 125 #[cfg(not(feature = "http2"))] 126 #[allow(missing_debug_implementations)] 127 pub struct H2Stream<F, B>(std::marker::PhantomData<(F, B)>); 128 129 #[cfg(not(feature = "http2"))] 130 impl<F, B, E> Future for H2Stream<F, B> 131 where 132 F: Future<Output = Result<http::Response<B>, E>>, 133 B: crate::body::HttpBody, 134 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>, 135 E: Into<Box<dyn std::error::Error + Send + Sync>>, 136 { 137 type Output = (); 138 poll( self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output>139 fn poll( 140 self: Pin<&mut Self>, 141 _cx: &mut std::task::Context<'_>, 142 ) -> std::task::Poll<Self::Output> { 143 unreachable!() 144 } 145 } 146