1 //! Server implementation and builder.
2 
3 mod conn;
4 mod incoming;
5 mod recover_error;
6 #[cfg(feature = "tls")]
7 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
8 mod tls;
9 #[cfg(unix)]
10 mod unix;
11 
12 pub use super::service::Routes;
13 pub use super::service::RoutesBuilder;
14 
15 pub use conn::{Connected, TcpConnectInfo};
16 #[cfg(feature = "tls")]
17 pub use tls::ServerTlsConfig;
18 
19 #[cfg(feature = "tls")]
20 pub use conn::TlsConnectInfo;
21 
22 #[cfg(feature = "tls")]
23 use super::service::TlsAcceptor;
24 
25 #[cfg(unix)]
26 pub use unix::UdsConnectInfo;
27 
28 pub use incoming::TcpIncoming;
29 
30 #[cfg(feature = "tls")]
31 pub(crate) use tokio_rustls::server::TlsStream;
32 
33 #[cfg(feature = "tls")]
34 use crate::transport::Error;
35 
36 use self::recover_error::RecoverError;
37 use super::service::{GrpcTimeout, ServerIo};
38 use crate::body::BoxBody;
39 use crate::server::NamedService;
40 use bytes::Bytes;
41 use http::{Request, Response};
42 use http_body::Body as _;
43 use hyper::{server::accept, Body};
44 use pin_project::pin_project;
45 use std::{
46     convert::Infallible,
47     fmt,
48     future::{self, Future},
49     marker::PhantomData,
50     net::SocketAddr,
51     pin::Pin,
52     sync::Arc,
53     task::{ready, Context, Poll},
54     time::Duration,
55 };
56 use tokio::io::{AsyncRead, AsyncWrite};
57 use tokio_stream::Stream;
58 use tower::{
59     layer::util::{Identity, Stack},
60     layer::Layer,
61     limit::concurrency::ConcurrencyLimitLayer,
62     util::Either,
63     Service, ServiceBuilder,
64 };
65 
66 type BoxHttpBody = http_body::combinators::UnsyncBoxBody<Bytes, crate::Error>;
67 type BoxService = tower::util::BoxService<Request<Body>, Response<BoxHttpBody>, crate::Error>;
68 type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
69 
70 const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20;
71 
72 /// A default batteries included `transport` server.
73 ///
74 /// This is a wrapper around [`hyper::Server`] and provides an easy builder
75 /// pattern style builder [`Server`]. This builder exposes easy configuration parameters
76 /// for providing a fully featured http2 based gRPC server. This should provide
77 /// a very good out of the box http2 server for use with tonic but is also a
78 /// reference implementation that should be a good starting point for anyone
79 /// wanting to create a more complex and/or specific implementation.
80 #[derive(Clone)]
81 pub struct Server<L = Identity> {
82     trace_interceptor: Option<TraceInterceptor>,
83     concurrency_limit: Option<usize>,
84     timeout: Option<Duration>,
85     #[cfg(feature = "tls")]
86     tls: Option<TlsAcceptor>,
87     init_stream_window_size: Option<u32>,
88     init_connection_window_size: Option<u32>,
89     max_concurrent_streams: Option<u32>,
90     tcp_keepalive: Option<Duration>,
91     tcp_nodelay: bool,
92     http2_keepalive_interval: Option<Duration>,
93     http2_keepalive_timeout: Option<Duration>,
94     http2_adaptive_window: Option<bool>,
95     http2_max_pending_accept_reset_streams: Option<usize>,
96     max_frame_size: Option<u32>,
97     accept_http1: bool,
98     service_builder: ServiceBuilder<L>,
99 }
100 
101 impl Default for Server<Identity> {
default() -> Self102     fn default() -> Self {
103         Self {
104             trace_interceptor: None,
105             concurrency_limit: None,
106             timeout: None,
107             #[cfg(feature = "tls")]
108             tls: None,
109             init_stream_window_size: None,
110             init_connection_window_size: None,
111             max_concurrent_streams: None,
112             tcp_keepalive: None,
113             tcp_nodelay: false,
114             http2_keepalive_interval: None,
115             http2_keepalive_timeout: None,
116             http2_adaptive_window: None,
117             http2_max_pending_accept_reset_streams: None,
118             max_frame_size: None,
119             accept_http1: false,
120             service_builder: Default::default(),
121         }
122     }
123 }
124 
125 /// A stack based `Service` router.
126 #[derive(Debug)]
127 pub struct Router<L = Identity> {
128     server: Server<L>,
129     routes: Routes,
130 }
131 
132 impl<S: NamedService, T> NamedService for Either<S, T> {
133     const NAME: &'static str = S::NAME;
134 }
135 
136 impl Server {
137     /// Create a new server builder that can configure a [`Server`].
builder() -> Self138     pub fn builder() -> Self {
139         Server {
140             tcp_nodelay: true,
141             accept_http1: false,
142             ..Default::default()
143         }
144     }
145 }
146 
147 impl<L> Server<L> {
148     /// Configure TLS for this server.
149     #[cfg(feature = "tls")]
150     #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error>151     pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
152         Ok(Server {
153             tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
154             ..self
155         })
156     }
157 
158     /// Set the concurrency limit applied to on requests inbound per connection.
159     ///
160     /// # Example
161     ///
162     /// ```
163     /// # use tonic::transport::Server;
164     /// # use tower_service::Service;
165     /// # let builder = Server::builder();
166     /// builder.concurrency_limit_per_connection(32);
167     /// ```
168     #[must_use]
concurrency_limit_per_connection(self, limit: usize) -> Self169     pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
170         Server {
171             concurrency_limit: Some(limit),
172             ..self
173         }
174     }
175 
176     /// Set a timeout on for all request handlers.
177     ///
178     /// # Example
179     ///
180     /// ```
181     /// # use tonic::transport::Server;
182     /// # use tower_service::Service;
183     /// # use std::time::Duration;
184     /// # let builder = Server::builder();
185     /// builder.timeout(Duration::from_secs(30));
186     /// ```
187     #[must_use]
timeout(self, timeout: Duration) -> Self188     pub fn timeout(self, timeout: Duration) -> Self {
189         Server {
190             timeout: Some(timeout),
191             ..self
192         }
193     }
194 
195     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
196     /// stream-level flow control.
197     ///
198     /// Default is 65,535
199     ///
200     /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
201     #[must_use]
initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self202     pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
203         Server {
204             init_stream_window_size: sz.into(),
205             ..self
206         }
207     }
208 
209     /// Sets the max connection-level flow control for HTTP2
210     ///
211     /// Default is 65,535
212     #[must_use]
initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self213     pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
214         Server {
215             init_connection_window_size: sz.into(),
216             ..self
217         }
218     }
219 
220     /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
221     /// connections.
222     ///
223     /// Default is no limit (`None`).
224     ///
225     /// [spec]: https://httpwg.org/specs/rfc9113.html#n-stream-concurrency
226     #[must_use]
max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self227     pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
228         Server {
229             max_concurrent_streams: max.into(),
230             ..self
231         }
232     }
233 
234     /// Set whether HTTP2 Ping frames are enabled on accepted connections.
235     ///
236     /// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
237     /// specified will be the time interval between HTTP2 Ping frames.
238     /// The timeout for receiving an acknowledgement of the keepalive ping
239     /// can be set with [`Server::http2_keepalive_timeout`].
240     ///
241     /// Default is no HTTP2 keepalive (`None`)
242     ///
243     #[must_use]
http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self244     pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
245         Server {
246             http2_keepalive_interval,
247             ..self
248         }
249     }
250 
251     /// Sets a timeout for receiving an acknowledgement of the keepalive ping.
252     ///
253     /// If the ping is not acknowledged within the timeout, the connection will be closed.
254     /// Does nothing if http2_keep_alive_interval is disabled.
255     ///
256     /// Default is 20 seconds.
257     ///
258     #[must_use]
http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self259     pub fn http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self {
260         Server {
261             http2_keepalive_timeout,
262             ..self
263         }
264     }
265 
266     /// Sets whether to use an adaptive flow control. Defaults to false.
267     /// Enabling this will override the limits set in http2_initial_stream_window_size and
268     /// http2_initial_connection_window_size.
269     #[must_use]
http2_adaptive_window(self, enabled: Option<bool>) -> Self270     pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
271         Server {
272             http2_adaptive_window: enabled,
273             ..self
274         }
275     }
276 
277     /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
278     ///
279     /// This will default to whatever the default in h2 is. As of v0.3.17, it is 20.
280     ///
281     /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
282     #[must_use]
http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self283     pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self {
284         Server {
285             http2_max_pending_accept_reset_streams: max,
286             ..self
287         }
288     }
289 
290     /// Set whether TCP keepalive messages are enabled on accepted connections.
291     ///
292     /// If `None` is specified, keepalive is disabled, otherwise the duration
293     /// specified will be the time to remain idle before sending TCP keepalive
294     /// probes.
295     ///
296     /// Default is no keepalive (`None`)
297     ///
298     #[must_use]
tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self299     pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
300         Server {
301             tcp_keepalive,
302             ..self
303         }
304     }
305 
306     /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
307     #[must_use]
tcp_nodelay(self, enabled: bool) -> Self308     pub fn tcp_nodelay(self, enabled: bool) -> Self {
309         Server {
310             tcp_nodelay: enabled,
311             ..self
312         }
313     }
314 
315     /// Sets the maximum frame size to use for HTTP2.
316     ///
317     /// Passing `None` will do nothing.
318     ///
319     /// If not set, will default from underlying transport.
320     #[must_use]
max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self321     pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
322         Server {
323             max_frame_size: frame_size.into(),
324             ..self
325         }
326     }
327 
328     /// Allow this server to accept http1 requests.
329     ///
330     /// Accepting http1 requests is only useful when developing `grpc-web`
331     /// enabled services. If this setting is set to `true` but services are
332     /// not correctly configured to handle grpc-web requests, your server may
333     /// return confusing (but correct) protocol errors.
334     ///
335     /// Default is `false`.
336     #[must_use]
accept_http1(self, accept_http1: bool) -> Self337     pub fn accept_http1(self, accept_http1: bool) -> Self {
338         Server {
339             accept_http1,
340             ..self
341         }
342     }
343 
344     /// Intercept inbound headers and add a [`tracing::Span`] to each response future.
345     #[must_use]
trace_fn<F>(self, f: F) -> Self where F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,346     pub fn trace_fn<F>(self, f: F) -> Self
347     where
348         F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
349     {
350         Server {
351             trace_interceptor: Some(Arc::new(f)),
352             ..self
353         }
354     }
355 
356     /// Create a router with the `S` typed service as the first service.
357     ///
358     /// This will clone the `Server` builder and create a router that will
359     /// route around different services.
add_service<S>(&mut self, svc: S) -> Router<L> where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static, L: Clone,360     pub fn add_service<S>(&mut self, svc: S) -> Router<L>
361     where
362         S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
363             + NamedService
364             + Clone
365             + Send
366             + 'static,
367         S::Future: Send + 'static,
368         L: Clone,
369     {
370         Router::new(self.clone(), Routes::new(svc))
371     }
372 
373     /// Create a router with the optional `S` typed service as the first service.
374     ///
375     /// This will clone the `Server` builder and create a router that will
376     /// route around different services.
377     ///
378     /// # Note
379     /// Even when the argument given is `None` this will capture *all* requests to this service name.
380     /// As a result, one cannot use this to toggle between two identically named implementations.
add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L> where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static, L: Clone,381     pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L>
382     where
383         S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
384             + NamedService
385             + Clone
386             + Send
387             + 'static,
388         S::Future: Send + 'static,
389         L: Clone,
390     {
391         let routes = svc.map(Routes::new).unwrap_or_default();
392         Router::new(self.clone(), routes)
393     }
394 
395     /// Create a router with given [`Routes`].
396     ///
397     /// This will clone the `Server` builder and create a router that will
398     /// route around different services that were already added to the provided `routes`.
add_routes(&mut self, routes: Routes) -> Router<L> where L: Clone,399     pub fn add_routes(&mut self, routes: Routes) -> Router<L>
400     where
401         L: Clone,
402     {
403         Router::new(self.clone(), routes)
404     }
405 
406     /// Set the [Tower] [`Layer`] all services will be wrapped in.
407     ///
408     /// This enables using middleware from the [Tower ecosystem][eco].
409     ///
410     /// # Example
411     ///
412     /// ```
413     /// # use tonic::transport::Server;
414     /// # use tower_service::Service;
415     /// use tower::timeout::TimeoutLayer;
416     /// use std::time::Duration;
417     ///
418     /// # let mut builder = Server::builder();
419     /// builder.layer(TimeoutLayer::new(Duration::from_secs(30)));
420     /// ```
421     ///
422     /// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used
423     /// here as an example.
424     ///
425     /// You can build more complex layers using [`ServiceBuilder`]. Those layers can include
426     /// [interceptors]:
427     ///
428     /// ```
429     /// # use tonic::transport::Server;
430     /// # use tower_service::Service;
431     /// use tower::ServiceBuilder;
432     /// use std::time::Duration;
433     /// use tonic::{Request, Status, service::interceptor};
434     ///
435     /// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
436     ///     if valid_credentials(&request) {
437     ///         Ok(request)
438     ///     } else {
439     ///         Err(Status::unauthenticated("invalid credentials"))
440     ///     }
441     /// }
442     ///
443     /// fn valid_credentials(request: &Request<()>) -> bool {
444     ///     // ...
445     ///     # true
446     /// }
447     ///
448     /// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
449     ///     Ok(request)
450     /// }
451     ///
452     /// let layer = ServiceBuilder::new()
453     ///     .load_shed()
454     ///     .timeout(Duration::from_secs(30))
455     ///     .layer(interceptor(auth_interceptor))
456     ///     .layer(interceptor(some_other_interceptor))
457     ///     .into_inner();
458     ///
459     /// Server::builder().layer(layer);
460     /// ```
461     ///
462     /// [Tower]: https://github.com/tower-rs/tower
463     /// [`Layer`]: tower::layer::Layer
464     /// [eco]: https://github.com/tower-rs
465     /// [`ServiceBuilder`]: tower::ServiceBuilder
466     /// [interceptors]: crate::service::Interceptor
layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>>467     pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> {
468         Server {
469             service_builder: self.service_builder.layer(new_layer),
470             trace_interceptor: self.trace_interceptor,
471             concurrency_limit: self.concurrency_limit,
472             timeout: self.timeout,
473             #[cfg(feature = "tls")]
474             tls: self.tls,
475             init_stream_window_size: self.init_stream_window_size,
476             init_connection_window_size: self.init_connection_window_size,
477             max_concurrent_streams: self.max_concurrent_streams,
478             tcp_keepalive: self.tcp_keepalive,
479             tcp_nodelay: self.tcp_nodelay,
480             http2_keepalive_interval: self.http2_keepalive_interval,
481             http2_keepalive_timeout: self.http2_keepalive_timeout,
482             http2_adaptive_window: self.http2_adaptive_window,
483             http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams,
484             max_frame_size: self.max_frame_size,
485             accept_http1: self.accept_http1,
486         }
487     }
488 
serve_with_shutdown<S, I, F, IO, IE, ResBody>( self, svc: S, incoming: I, signal: Option<F>, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into<crate::Error>, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,489     pub(crate) async fn serve_with_shutdown<S, I, F, IO, IE, ResBody>(
490         self,
491         svc: S,
492         incoming: I,
493         signal: Option<F>,
494     ) -> Result<(), super::Error>
495     where
496         L: Layer<S>,
497         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
498         <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send + 'static,
499         <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
500         I: Stream<Item = Result<IO, IE>>,
501         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
502         IO::ConnectInfo: Clone + Send + Sync + 'static,
503         IE: Into<crate::Error>,
504         F: Future<Output = ()>,
505         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
506         ResBody::Error: Into<crate::Error>,
507     {
508         let trace_interceptor = self.trace_interceptor.clone();
509         let concurrency_limit = self.concurrency_limit;
510         let init_connection_window_size = self.init_connection_window_size;
511         let init_stream_window_size = self.init_stream_window_size;
512         let max_concurrent_streams = self.max_concurrent_streams;
513         let timeout = self.timeout;
514         let max_frame_size = self.max_frame_size;
515         let http2_only = !self.accept_http1;
516 
517         let http2_keepalive_interval = self.http2_keepalive_interval;
518         let http2_keepalive_timeout = self
519             .http2_keepalive_timeout
520             .unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0));
521         let http2_adaptive_window = self.http2_adaptive_window;
522         let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
523 
524         let svc = self.service_builder.service(svc);
525 
526         let tcp = incoming::tcp_incoming(incoming, self);
527         let incoming = accept::from_stream::<_, _, crate::Error>(tcp);
528 
529         let svc = MakeSvc {
530             inner: svc,
531             concurrency_limit,
532             timeout,
533             trace_interceptor,
534             _io: PhantomData,
535         };
536 
537         let server = hyper::Server::builder(incoming)
538             .http2_only(http2_only)
539             .http2_initial_connection_window_size(init_connection_window_size)
540             .http2_initial_stream_window_size(init_stream_window_size)
541             .http2_max_concurrent_streams(max_concurrent_streams)
542             .http2_keep_alive_interval(http2_keepalive_interval)
543             .http2_keep_alive_timeout(http2_keepalive_timeout)
544             .http2_adaptive_window(http2_adaptive_window.unwrap_or_default())
545             .http2_max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
546             .http2_max_frame_size(max_frame_size);
547 
548         if let Some(signal) = signal {
549             server
550                 .serve(svc)
551                 .with_graceful_shutdown(signal)
552                 .await
553                 .map_err(super::Error::from_source)?
554         } else {
555             server.serve(svc).await.map_err(super::Error::from_source)?;
556         }
557 
558         Ok(())
559     }
560 }
561 
562 impl<L> Router<L> {
new(server: Server<L>, routes: Routes) -> Self563     pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
564         Self { server, routes }
565     }
566 }
567 
568 impl<L> Router<L> {
569     /// Add a new service to this router.
add_service<S>(mut self, svc: S) -> Self where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static,570     pub fn add_service<S>(mut self, svc: S) -> Self
571     where
572         S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
573             + NamedService
574             + Clone
575             + Send
576             + 'static,
577         S::Future: Send + 'static,
578     {
579         self.routes = self.routes.add_service(svc);
580         self
581     }
582 
583     /// Add a new optional service to this router.
584     ///
585     /// # Note
586     /// Even when the argument given is `None` this will capture *all* requests to this service name.
587     /// As a result, one cannot use this to toggle between two identically named implementations.
588     #[allow(clippy::type_complexity)]
add_optional_service<S>(mut self, svc: Option<S>) -> Self where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static,589     pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
590     where
591         S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
592             + NamedService
593             + Clone
594             + Send
595             + 'static,
596         S::Future: Send + 'static,
597     {
598         if let Some(svc) = svc {
599             self.routes = self.routes.add_service(svc);
600         }
601         self
602     }
603 
604     /// Convert this tonic `Router` into an axum `Router` consuming the tonic one.
into_router(self) -> axum::Router605     pub fn into_router(self) -> axum::Router {
606         self.routes.into_router()
607     }
608 
609     /// Consume this [`Server`] creating a future that will execute the server
610     /// on [tokio]'s default executor.
611     ///
612     /// [`Server`]: struct.Server.html
613     /// [tokio]: https://docs.rs/tokio
serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error> where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,614     pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
615     where
616         L: Layer<Routes>,
617         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
618         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
619         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
620         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
621         ResBody::Error: Into<crate::Error>,
622     {
623         let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
624             .map_err(super::Error::from_source)?;
625         self.server
626             .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
627                 self.routes.prepare(),
628                 incoming,
629                 None,
630             )
631             .await
632     }
633 
634     /// Consume this [`Server`] creating a future that will execute the server
635     /// on [tokio]'s default executor. And shutdown when the provided signal
636     /// is received.
637     ///
638     /// [`Server`]: struct.Server.html
639     /// [tokio]: https://docs.rs/tokio
serve_with_shutdown<F: Future<Output = ()>, ResBody>( self, addr: SocketAddr, signal: F, ) -> Result<(), super::Error> where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,640     pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
641         self,
642         addr: SocketAddr,
643         signal: F,
644     ) -> Result<(), super::Error>
645     where
646         L: Layer<Routes>,
647         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
648         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
649         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
650         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
651         ResBody::Error: Into<crate::Error>,
652     {
653         let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive)
654             .map_err(super::Error::from_source)?;
655         self.server
656             .serve_with_shutdown(self.routes.prepare(), incoming, Some(signal))
657             .await
658     }
659 
660     /// Consume this [`Server`] creating a future that will execute the server
661     /// on the provided incoming stream of `AsyncRead + AsyncWrite`.
662     ///
663     /// This method discards any provided [`Server`] TCP configuration.
664     ///
665     /// [`Server`]: struct.Server.html
serve_with_incoming<I, IO, IE, ResBody>( self, incoming: I, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into<crate::Error>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,666     pub async fn serve_with_incoming<I, IO, IE, ResBody>(
667         self,
668         incoming: I,
669     ) -> Result<(), super::Error>
670     where
671         I: Stream<Item = Result<IO, IE>>,
672         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
673         IO::ConnectInfo: Clone + Send + Sync + 'static,
674         IE: Into<crate::Error>,
675         L: Layer<Routes>,
676         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
677         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
678         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
679         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
680         ResBody::Error: Into<crate::Error>,
681     {
682         self.server
683             .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>(
684                 self.routes.prepare(),
685                 incoming,
686                 None,
687             )
688             .await
689     }
690 
691     /// Consume this [`Server`] creating a future that will execute the server
692     /// on the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to
693     /// `serve_with_shutdown` this method will also take a signal future to
694     /// gracefully shutdown the server.
695     ///
696     /// This method discards any provided [`Server`] TCP configuration.
697     ///
698     /// [`Server`]: struct.Server.html
serve_with_incoming_shutdown<I, IO, IE, F, ResBody>( self, incoming: I, signal: F, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into<crate::Error>, F: Future<Output = ()>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,699     pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
700         self,
701         incoming: I,
702         signal: F,
703     ) -> Result<(), super::Error>
704     where
705         I: Stream<Item = Result<IO, IE>>,
706         IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
707         IO::ConnectInfo: Clone + Send + Sync + 'static,
708         IE: Into<crate::Error>,
709         F: Future<Output = ()>,
710         L: Layer<Routes>,
711         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
712         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
713         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
714         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
715         ResBody::Error: Into<crate::Error>,
716     {
717         self.server
718             .serve_with_shutdown(self.routes.prepare(), incoming, Some(signal))
719             .await
720     }
721 
722     /// Create a tower service out of a router.
into_service<ResBody>(self) -> L::Service where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,723     pub fn into_service<ResBody>(self) -> L::Service
724     where
725         L: Layer<Routes>,
726         L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
727         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static,
728         <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send,
729         ResBody: http_body::Body<Data = Bytes> + Send + 'static,
730         ResBody::Error: Into<crate::Error>,
731     {
732         self.server.service_builder.service(self.routes.prepare())
733     }
734 }
735 
736 impl<L> fmt::Debug for Server<L> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result737     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
738         f.debug_struct("Builder").finish()
739     }
740 }
741 
742 struct Svc<S> {
743     inner: S,
744     trace_interceptor: Option<TraceInterceptor>,
745 }
746 
747 impl<S, ResBody> Service<Request<Body>> for Svc<S>
748 where
749     S: Service<Request<Body>, Response = Response<ResBody>>,
750     S::Error: Into<crate::Error>,
751     ResBody: http_body::Body<Data = Bytes> + Send + 'static,
752     ResBody::Error: Into<crate::Error>,
753 {
754     type Response = Response<BoxHttpBody>;
755     type Error = crate::Error;
756     type Future = SvcFuture<S::Future>;
757 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>758     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
759         self.inner.poll_ready(cx).map_err(Into::into)
760     }
761 
call(&mut self, mut req: Request<Body>) -> Self::Future762     fn call(&mut self, mut req: Request<Body>) -> Self::Future {
763         let span = if let Some(trace_interceptor) = &self.trace_interceptor {
764             let (parts, body) = req.into_parts();
765             let bodyless_request = Request::from_parts(parts, ());
766 
767             let span = trace_interceptor(&bodyless_request);
768 
769             let (parts, _) = bodyless_request.into_parts();
770             req = Request::from_parts(parts, body);
771 
772             span
773         } else {
774             tracing::Span::none()
775         };
776 
777         SvcFuture {
778             inner: self.inner.call(req),
779             span,
780         }
781     }
782 }
783 
784 #[pin_project]
785 struct SvcFuture<F> {
786     #[pin]
787     inner: F,
788     span: tracing::Span,
789 }
790 
791 impl<F, E, ResBody> Future for SvcFuture<F>
792 where
793     F: Future<Output = Result<Response<ResBody>, E>>,
794     E: Into<crate::Error>,
795     ResBody: http_body::Body<Data = Bytes> + Send + 'static,
796     ResBody::Error: Into<crate::Error>,
797 {
798     type Output = Result<Response<BoxHttpBody>, crate::Error>;
799 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>800     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
801         let this = self.project();
802         let _guard = this.span.enter();
803 
804         let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
805         let response = response.map(|body| body.map_err(Into::into).boxed_unsync());
806         Poll::Ready(Ok(response))
807     }
808 }
809 
810 impl<S> fmt::Debug for Svc<S> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result811     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812         f.debug_struct("Svc").finish()
813     }
814 }
815 
816 struct MakeSvc<S, IO> {
817     concurrency_limit: Option<usize>,
818     timeout: Option<Duration>,
819     inner: S,
820     trace_interceptor: Option<TraceInterceptor>,
821     _io: PhantomData<fn() -> IO>,
822 }
823 
824 impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
825 where
826     IO: Connected,
827     S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
828     S::Future: Send + 'static,
829     S::Error: Into<crate::Error> + Send,
830     ResBody: http_body::Body<Data = Bytes> + Send + 'static,
831     ResBody::Error: Into<crate::Error>,
832 {
833     type Response = BoxService;
834     type Error = crate::Error;
835     type Future = future::Ready<Result<Self::Response, Self::Error>>;
836 
poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>837     fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
838         Ok(()).into()
839     }
840 
call(&mut self, io: &ServerIo<IO>) -> Self::Future841     fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
842         let conn_info = io.connect_info();
843 
844         let svc = self.inner.clone();
845         let concurrency_limit = self.concurrency_limit;
846         let timeout = self.timeout;
847         let trace_interceptor = self.trace_interceptor.clone();
848 
849         let svc = ServiceBuilder::new()
850             .layer_fn(RecoverError::new)
851             .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
852             .layer_fn(|s| GrpcTimeout::new(s, timeout))
853             .service(svc);
854 
855         let svc = ServiceBuilder::new()
856             .layer(BoxService::layer())
857             .map_request(move |mut request: Request<Body>| {
858                 match &conn_info {
859                     tower::util::Either::A(inner) => {
860                         request.extensions_mut().insert(inner.clone());
861                     }
862                     tower::util::Either::B(inner) => {
863                         #[cfg(feature = "tls")]
864                         {
865                             request.extensions_mut().insert(inner.clone());
866                             request.extensions_mut().insert(inner.get_ref().clone());
867                         }
868 
869                         #[cfg(not(feature = "tls"))]
870                         {
871                             // just a type check to make sure we didn't forget to
872                             // insert this into the extensions
873                             let _: &() = inner;
874                         }
875                     }
876                 }
877 
878                 request
879             })
880             .service(Svc {
881                 inner: svc,
882                 trace_interceptor,
883             });
884 
885         future::ready(Ok(svc))
886     }
887 }
888