1 //! Server implementation and builder. 2 3 mod conn; 4 mod incoming; 5 mod recover_error; 6 #[cfg(feature = "tls")] 7 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] 8 mod tls; 9 #[cfg(unix)] 10 mod unix; 11 12 pub use super::service::Routes; 13 pub use super::service::RoutesBuilder; 14 15 pub use conn::{Connected, TcpConnectInfo}; 16 #[cfg(feature = "tls")] 17 pub use tls::ServerTlsConfig; 18 19 #[cfg(feature = "tls")] 20 pub use conn::TlsConnectInfo; 21 22 #[cfg(feature = "tls")] 23 use super::service::TlsAcceptor; 24 25 #[cfg(unix)] 26 pub use unix::UdsConnectInfo; 27 28 pub use incoming::TcpIncoming; 29 30 #[cfg(feature = "tls")] 31 pub(crate) use tokio_rustls::server::TlsStream; 32 33 #[cfg(feature = "tls")] 34 use crate::transport::Error; 35 36 use self::recover_error::RecoverError; 37 use super::service::{GrpcTimeout, ServerIo}; 38 use crate::body::BoxBody; 39 use crate::server::NamedService; 40 use bytes::Bytes; 41 use http::{Request, Response}; 42 use http_body::Body as _; 43 use hyper::{server::accept, Body}; 44 use pin_project::pin_project; 45 use std::{ 46 convert::Infallible, 47 fmt, 48 future::{self, Future}, 49 marker::PhantomData, 50 net::SocketAddr, 51 pin::Pin, 52 sync::Arc, 53 task::{ready, Context, Poll}, 54 time::Duration, 55 }; 56 use tokio::io::{AsyncRead, AsyncWrite}; 57 use tokio_stream::Stream; 58 use tower::{ 59 layer::util::{Identity, Stack}, 60 layer::Layer, 61 limit::concurrency::ConcurrencyLimitLayer, 62 util::Either, 63 Service, ServiceBuilder, 64 }; 65 66 type BoxHttpBody = http_body::combinators::UnsyncBoxBody<Bytes, crate::Error>; 67 type BoxService = tower::util::BoxService<Request<Body>, Response<BoxHttpBody>, crate::Error>; 68 type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>; 69 70 const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS: u64 = 20; 71 72 /// A default batteries included `transport` server. 73 /// 74 /// This is a wrapper around [`hyper::Server`] and provides an easy builder 75 /// pattern style builder [`Server`]. This builder exposes easy configuration parameters 76 /// for providing a fully featured http2 based gRPC server. This should provide 77 /// a very good out of the box http2 server for use with tonic but is also a 78 /// reference implementation that should be a good starting point for anyone 79 /// wanting to create a more complex and/or specific implementation. 80 #[derive(Clone)] 81 pub struct Server<L = Identity> { 82 trace_interceptor: Option<TraceInterceptor>, 83 concurrency_limit: Option<usize>, 84 timeout: Option<Duration>, 85 #[cfg(feature = "tls")] 86 tls: Option<TlsAcceptor>, 87 init_stream_window_size: Option<u32>, 88 init_connection_window_size: Option<u32>, 89 max_concurrent_streams: Option<u32>, 90 tcp_keepalive: Option<Duration>, 91 tcp_nodelay: bool, 92 http2_keepalive_interval: Option<Duration>, 93 http2_keepalive_timeout: Option<Duration>, 94 http2_adaptive_window: Option<bool>, 95 http2_max_pending_accept_reset_streams: Option<usize>, 96 max_frame_size: Option<u32>, 97 accept_http1: bool, 98 service_builder: ServiceBuilder<L>, 99 } 100 101 impl Default for Server<Identity> { default() -> Self102 fn default() -> Self { 103 Self { 104 trace_interceptor: None, 105 concurrency_limit: None, 106 timeout: None, 107 #[cfg(feature = "tls")] 108 tls: None, 109 init_stream_window_size: None, 110 init_connection_window_size: None, 111 max_concurrent_streams: None, 112 tcp_keepalive: None, 113 tcp_nodelay: false, 114 http2_keepalive_interval: None, 115 http2_keepalive_timeout: None, 116 http2_adaptive_window: None, 117 http2_max_pending_accept_reset_streams: None, 118 max_frame_size: None, 119 accept_http1: false, 120 service_builder: Default::default(), 121 } 122 } 123 } 124 125 /// A stack based `Service` router. 126 #[derive(Debug)] 127 pub struct Router<L = Identity> { 128 server: Server<L>, 129 routes: Routes, 130 } 131 132 impl<S: NamedService, T> NamedService for Either<S, T> { 133 const NAME: &'static str = S::NAME; 134 } 135 136 impl Server { 137 /// Create a new server builder that can configure a [`Server`]. builder() -> Self138 pub fn builder() -> Self { 139 Server { 140 tcp_nodelay: true, 141 accept_http1: false, 142 ..Default::default() 143 } 144 } 145 } 146 147 impl<L> Server<L> { 148 /// Configure TLS for this server. 149 #[cfg(feature = "tls")] 150 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error>151 pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> { 152 Ok(Server { 153 tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?), 154 ..self 155 }) 156 } 157 158 /// Set the concurrency limit applied to on requests inbound per connection. 159 /// 160 /// # Example 161 /// 162 /// ``` 163 /// # use tonic::transport::Server; 164 /// # use tower_service::Service; 165 /// # let builder = Server::builder(); 166 /// builder.concurrency_limit_per_connection(32); 167 /// ``` 168 #[must_use] concurrency_limit_per_connection(self, limit: usize) -> Self169 pub fn concurrency_limit_per_connection(self, limit: usize) -> Self { 170 Server { 171 concurrency_limit: Some(limit), 172 ..self 173 } 174 } 175 176 /// Set a timeout on for all request handlers. 177 /// 178 /// # Example 179 /// 180 /// ``` 181 /// # use tonic::transport::Server; 182 /// # use tower_service::Service; 183 /// # use std::time::Duration; 184 /// # let builder = Server::builder(); 185 /// builder.timeout(Duration::from_secs(30)); 186 /// ``` 187 #[must_use] timeout(self, timeout: Duration) -> Self188 pub fn timeout(self, timeout: Duration) -> Self { 189 Server { 190 timeout: Some(timeout), 191 ..self 192 } 193 } 194 195 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 196 /// stream-level flow control. 197 /// 198 /// Default is 65,535 199 /// 200 /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize 201 #[must_use] initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self202 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self { 203 Server { 204 init_stream_window_size: sz.into(), 205 ..self 206 } 207 } 208 209 /// Sets the max connection-level flow control for HTTP2 210 /// 211 /// Default is 65,535 212 #[must_use] initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self213 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self { 214 Server { 215 init_connection_window_size: sz.into(), 216 ..self 217 } 218 } 219 220 /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 221 /// connections. 222 /// 223 /// Default is no limit (`None`). 224 /// 225 /// [spec]: https://httpwg.org/specs/rfc9113.html#n-stream-concurrency 226 #[must_use] max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self227 pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self { 228 Server { 229 max_concurrent_streams: max.into(), 230 ..self 231 } 232 } 233 234 /// Set whether HTTP2 Ping frames are enabled on accepted connections. 235 /// 236 /// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration 237 /// specified will be the time interval between HTTP2 Ping frames. 238 /// The timeout for receiving an acknowledgement of the keepalive ping 239 /// can be set with [`Server::http2_keepalive_timeout`]. 240 /// 241 /// Default is no HTTP2 keepalive (`None`) 242 /// 243 #[must_use] http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self244 pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self { 245 Server { 246 http2_keepalive_interval, 247 ..self 248 } 249 } 250 251 /// Sets a timeout for receiving an acknowledgement of the keepalive ping. 252 /// 253 /// If the ping is not acknowledged within the timeout, the connection will be closed. 254 /// Does nothing if http2_keep_alive_interval is disabled. 255 /// 256 /// Default is 20 seconds. 257 /// 258 #[must_use] http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self259 pub fn http2_keepalive_timeout(self, http2_keepalive_timeout: Option<Duration>) -> Self { 260 Server { 261 http2_keepalive_timeout, 262 ..self 263 } 264 } 265 266 /// Sets whether to use an adaptive flow control. Defaults to false. 267 /// Enabling this will override the limits set in http2_initial_stream_window_size and 268 /// http2_initial_connection_window_size. 269 #[must_use] http2_adaptive_window(self, enabled: Option<bool>) -> Self270 pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self { 271 Server { 272 http2_adaptive_window: enabled, 273 ..self 274 } 275 } 276 277 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent. 278 /// 279 /// This will default to whatever the default in h2 is. As of v0.3.17, it is 20. 280 /// 281 /// See <https://github.com/hyperium/hyper/issues/2877> for more information. 282 #[must_use] http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self283 pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self { 284 Server { 285 http2_max_pending_accept_reset_streams: max, 286 ..self 287 } 288 } 289 290 /// Set whether TCP keepalive messages are enabled on accepted connections. 291 /// 292 /// If `None` is specified, keepalive is disabled, otherwise the duration 293 /// specified will be the time to remain idle before sending TCP keepalive 294 /// probes. 295 /// 296 /// Default is no keepalive (`None`) 297 /// 298 #[must_use] tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self299 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self { 300 Server { 301 tcp_keepalive, 302 ..self 303 } 304 } 305 306 /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default. 307 #[must_use] tcp_nodelay(self, enabled: bool) -> Self308 pub fn tcp_nodelay(self, enabled: bool) -> Self { 309 Server { 310 tcp_nodelay: enabled, 311 ..self 312 } 313 } 314 315 /// Sets the maximum frame size to use for HTTP2. 316 /// 317 /// Passing `None` will do nothing. 318 /// 319 /// If not set, will default from underlying transport. 320 #[must_use] max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self321 pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self { 322 Server { 323 max_frame_size: frame_size.into(), 324 ..self 325 } 326 } 327 328 /// Allow this server to accept http1 requests. 329 /// 330 /// Accepting http1 requests is only useful when developing `grpc-web` 331 /// enabled services. If this setting is set to `true` but services are 332 /// not correctly configured to handle grpc-web requests, your server may 333 /// return confusing (but correct) protocol errors. 334 /// 335 /// Default is `false`. 336 #[must_use] accept_http1(self, accept_http1: bool) -> Self337 pub fn accept_http1(self, accept_http1: bool) -> Self { 338 Server { 339 accept_http1, 340 ..self 341 } 342 } 343 344 /// Intercept inbound headers and add a [`tracing::Span`] to each response future. 345 #[must_use] trace_fn<F>(self, f: F) -> Self where F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,346 pub fn trace_fn<F>(self, f: F) -> Self 347 where 348 F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static, 349 { 350 Server { 351 trace_interceptor: Some(Arc::new(f)), 352 ..self 353 } 354 } 355 356 /// Create a router with the `S` typed service as the first service. 357 /// 358 /// This will clone the `Server` builder and create a router that will 359 /// route around different services. add_service<S>(&mut self, svc: S) -> Router<L> where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static, L: Clone,360 pub fn add_service<S>(&mut self, svc: S) -> Router<L> 361 where 362 S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> 363 + NamedService 364 + Clone 365 + Send 366 + 'static, 367 S::Future: Send + 'static, 368 L: Clone, 369 { 370 Router::new(self.clone(), Routes::new(svc)) 371 } 372 373 /// Create a router with the optional `S` typed service as the first service. 374 /// 375 /// This will clone the `Server` builder and create a router that will 376 /// route around different services. 377 /// 378 /// # Note 379 /// Even when the argument given is `None` this will capture *all* requests to this service name. 380 /// As a result, one cannot use this to toggle between two identically named implementations. add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L> where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static, L: Clone,381 pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L> 382 where 383 S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> 384 + NamedService 385 + Clone 386 + Send 387 + 'static, 388 S::Future: Send + 'static, 389 L: Clone, 390 { 391 let routes = svc.map(Routes::new).unwrap_or_default(); 392 Router::new(self.clone(), routes) 393 } 394 395 /// Create a router with given [`Routes`]. 396 /// 397 /// This will clone the `Server` builder and create a router that will 398 /// route around different services that were already added to the provided `routes`. add_routes(&mut self, routes: Routes) -> Router<L> where L: Clone,399 pub fn add_routes(&mut self, routes: Routes) -> Router<L> 400 where 401 L: Clone, 402 { 403 Router::new(self.clone(), routes) 404 } 405 406 /// Set the [Tower] [`Layer`] all services will be wrapped in. 407 /// 408 /// This enables using middleware from the [Tower ecosystem][eco]. 409 /// 410 /// # Example 411 /// 412 /// ``` 413 /// # use tonic::transport::Server; 414 /// # use tower_service::Service; 415 /// use tower::timeout::TimeoutLayer; 416 /// use std::time::Duration; 417 /// 418 /// # let mut builder = Server::builder(); 419 /// builder.layer(TimeoutLayer::new(Duration::from_secs(30))); 420 /// ``` 421 /// 422 /// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used 423 /// here as an example. 424 /// 425 /// You can build more complex layers using [`ServiceBuilder`]. Those layers can include 426 /// [interceptors]: 427 /// 428 /// ``` 429 /// # use tonic::transport::Server; 430 /// # use tower_service::Service; 431 /// use tower::ServiceBuilder; 432 /// use std::time::Duration; 433 /// use tonic::{Request, Status, service::interceptor}; 434 /// 435 /// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> { 436 /// if valid_credentials(&request) { 437 /// Ok(request) 438 /// } else { 439 /// Err(Status::unauthenticated("invalid credentials")) 440 /// } 441 /// } 442 /// 443 /// fn valid_credentials(request: &Request<()>) -> bool { 444 /// // ... 445 /// # true 446 /// } 447 /// 448 /// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> { 449 /// Ok(request) 450 /// } 451 /// 452 /// let layer = ServiceBuilder::new() 453 /// .load_shed() 454 /// .timeout(Duration::from_secs(30)) 455 /// .layer(interceptor(auth_interceptor)) 456 /// .layer(interceptor(some_other_interceptor)) 457 /// .into_inner(); 458 /// 459 /// Server::builder().layer(layer); 460 /// ``` 461 /// 462 /// [Tower]: https://github.com/tower-rs/tower 463 /// [`Layer`]: tower::layer::Layer 464 /// [eco]: https://github.com/tower-rs 465 /// [`ServiceBuilder`]: tower::ServiceBuilder 466 /// [interceptors]: crate::service::Interceptor layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>>467 pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> { 468 Server { 469 service_builder: self.service_builder.layer(new_layer), 470 trace_interceptor: self.trace_interceptor, 471 concurrency_limit: self.concurrency_limit, 472 timeout: self.timeout, 473 #[cfg(feature = "tls")] 474 tls: self.tls, 475 init_stream_window_size: self.init_stream_window_size, 476 init_connection_window_size: self.init_connection_window_size, 477 max_concurrent_streams: self.max_concurrent_streams, 478 tcp_keepalive: self.tcp_keepalive, 479 tcp_nodelay: self.tcp_nodelay, 480 http2_keepalive_interval: self.http2_keepalive_interval, 481 http2_keepalive_timeout: self.http2_keepalive_timeout, 482 http2_adaptive_window: self.http2_adaptive_window, 483 http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams, 484 max_frame_size: self.max_frame_size, 485 accept_http1: self.accept_http1, 486 } 487 } 488 serve_with_shutdown<S, I, F, IO, IE, ResBody>( self, svc: S, incoming: I, signal: Option<F>, ) -> Result<(), super::Error> where L: Layer<S>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into<crate::Error>, F: Future<Output = ()>, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,489 pub(crate) async fn serve_with_shutdown<S, I, F, IO, IE, ResBody>( 490 self, 491 svc: S, 492 incoming: I, 493 signal: Option<F>, 494 ) -> Result<(), super::Error> 495 where 496 L: Layer<S>, 497 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, 498 <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send + 'static, 499 <<L as Layer<S>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, 500 I: Stream<Item = Result<IO, IE>>, 501 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, 502 IO::ConnectInfo: Clone + Send + Sync + 'static, 503 IE: Into<crate::Error>, 504 F: Future<Output = ()>, 505 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 506 ResBody::Error: Into<crate::Error>, 507 { 508 let trace_interceptor = self.trace_interceptor.clone(); 509 let concurrency_limit = self.concurrency_limit; 510 let init_connection_window_size = self.init_connection_window_size; 511 let init_stream_window_size = self.init_stream_window_size; 512 let max_concurrent_streams = self.max_concurrent_streams; 513 let timeout = self.timeout; 514 let max_frame_size = self.max_frame_size; 515 let http2_only = !self.accept_http1; 516 517 let http2_keepalive_interval = self.http2_keepalive_interval; 518 let http2_keepalive_timeout = self 519 .http2_keepalive_timeout 520 .unwrap_or_else(|| Duration::new(DEFAULT_HTTP2_KEEPALIVE_TIMEOUT_SECS, 0)); 521 let http2_adaptive_window = self.http2_adaptive_window; 522 let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams; 523 524 let svc = self.service_builder.service(svc); 525 526 let tcp = incoming::tcp_incoming(incoming, self); 527 let incoming = accept::from_stream::<_, _, crate::Error>(tcp); 528 529 let svc = MakeSvc { 530 inner: svc, 531 concurrency_limit, 532 timeout, 533 trace_interceptor, 534 _io: PhantomData, 535 }; 536 537 let server = hyper::Server::builder(incoming) 538 .http2_only(http2_only) 539 .http2_initial_connection_window_size(init_connection_window_size) 540 .http2_initial_stream_window_size(init_stream_window_size) 541 .http2_max_concurrent_streams(max_concurrent_streams) 542 .http2_keep_alive_interval(http2_keepalive_interval) 543 .http2_keep_alive_timeout(http2_keepalive_timeout) 544 .http2_adaptive_window(http2_adaptive_window.unwrap_or_default()) 545 .http2_max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams) 546 .http2_max_frame_size(max_frame_size); 547 548 if let Some(signal) = signal { 549 server 550 .serve(svc) 551 .with_graceful_shutdown(signal) 552 .await 553 .map_err(super::Error::from_source)? 554 } else { 555 server.serve(svc).await.map_err(super::Error::from_source)?; 556 } 557 558 Ok(()) 559 } 560 } 561 562 impl<L> Router<L> { new(server: Server<L>, routes: Routes) -> Self563 pub(crate) fn new(server: Server<L>, routes: Routes) -> Self { 564 Self { server, routes } 565 } 566 } 567 568 impl<L> Router<L> { 569 /// Add a new service to this router. add_service<S>(mut self, svc: S) -> Self where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static,570 pub fn add_service<S>(mut self, svc: S) -> Self 571 where 572 S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> 573 + NamedService 574 + Clone 575 + Send 576 + 'static, 577 S::Future: Send + 'static, 578 { 579 self.routes = self.routes.add_service(svc); 580 self 581 } 582 583 /// Add a new optional service to this router. 584 /// 585 /// # Note 586 /// Even when the argument given is `None` this will capture *all* requests to this service name. 587 /// As a result, one cannot use this to toggle between two identically named implementations. 588 #[allow(clippy::type_complexity)] add_optional_service<S>(mut self, svc: Option<S>) -> Self where S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> + NamedService + Clone + Send + 'static, S::Future: Send + 'static,589 pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self 590 where 591 S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible> 592 + NamedService 593 + Clone 594 + Send 595 + 'static, 596 S::Future: Send + 'static, 597 { 598 if let Some(svc) = svc { 599 self.routes = self.routes.add_service(svc); 600 } 601 self 602 } 603 604 /// Convert this tonic `Router` into an axum `Router` consuming the tonic one. into_router(self) -> axum::Router605 pub fn into_router(self) -> axum::Router { 606 self.routes.into_router() 607 } 608 609 /// Consume this [`Server`] creating a future that will execute the server 610 /// on [tokio]'s default executor. 611 /// 612 /// [`Server`]: struct.Server.html 613 /// [tokio]: https://docs.rs/tokio serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error> where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,614 pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error> 615 where 616 L: Layer<Routes>, 617 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, 618 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, 619 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, 620 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 621 ResBody::Error: Into<crate::Error>, 622 { 623 let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) 624 .map_err(super::Error::from_source)?; 625 self.server 626 .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>( 627 self.routes.prepare(), 628 incoming, 629 None, 630 ) 631 .await 632 } 633 634 /// Consume this [`Server`] creating a future that will execute the server 635 /// on [tokio]'s default executor. And shutdown when the provided signal 636 /// is received. 637 /// 638 /// [`Server`]: struct.Server.html 639 /// [tokio]: https://docs.rs/tokio serve_with_shutdown<F: Future<Output = ()>, ResBody>( self, addr: SocketAddr, signal: F, ) -> Result<(), super::Error> where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,640 pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>( 641 self, 642 addr: SocketAddr, 643 signal: F, 644 ) -> Result<(), super::Error> 645 where 646 L: Layer<Routes>, 647 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, 648 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, 649 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, 650 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 651 ResBody::Error: Into<crate::Error>, 652 { 653 let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) 654 .map_err(super::Error::from_source)?; 655 self.server 656 .serve_with_shutdown(self.routes.prepare(), incoming, Some(signal)) 657 .await 658 } 659 660 /// Consume this [`Server`] creating a future that will execute the server 661 /// on the provided incoming stream of `AsyncRead + AsyncWrite`. 662 /// 663 /// This method discards any provided [`Server`] TCP configuration. 664 /// 665 /// [`Server`]: struct.Server.html serve_with_incoming<I, IO, IE, ResBody>( self, incoming: I, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into<crate::Error>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,666 pub async fn serve_with_incoming<I, IO, IE, ResBody>( 667 self, 668 incoming: I, 669 ) -> Result<(), super::Error> 670 where 671 I: Stream<Item = Result<IO, IE>>, 672 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, 673 IO::ConnectInfo: Clone + Send + Sync + 'static, 674 IE: Into<crate::Error>, 675 L: Layer<Routes>, 676 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, 677 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, 678 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, 679 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 680 ResBody::Error: Into<crate::Error>, 681 { 682 self.server 683 .serve_with_shutdown::<_, _, future::Ready<()>, _, _, ResBody>( 684 self.routes.prepare(), 685 incoming, 686 None, 687 ) 688 .await 689 } 690 691 /// Consume this [`Server`] creating a future that will execute the server 692 /// on the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to 693 /// `serve_with_shutdown` this method will also take a signal future to 694 /// gracefully shutdown the server. 695 /// 696 /// This method discards any provided [`Server`] TCP configuration. 697 /// 698 /// [`Server`]: struct.Server.html serve_with_incoming_shutdown<I, IO, IE, F, ResBody>( self, incoming: I, signal: F, ) -> Result<(), super::Error> where I: Stream<Item = Result<IO, IE>>, IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into<crate::Error>, F: Future<Output = ()>, L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,699 pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>( 700 self, 701 incoming: I, 702 signal: F, 703 ) -> Result<(), super::Error> 704 where 705 I: Stream<Item = Result<IO, IE>>, 706 IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static, 707 IO::ConnectInfo: Clone + Send + Sync + 'static, 708 IE: Into<crate::Error>, 709 F: Future<Output = ()>, 710 L: Layer<Routes>, 711 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, 712 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, 713 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, 714 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 715 ResBody::Error: Into<crate::Error>, 716 { 717 self.server 718 .serve_with_shutdown(self.routes.prepare(), incoming, Some(signal)) 719 .await 720 } 721 722 /// Create a tower service out of a router. into_service<ResBody>(self) -> L::Service where L: Layer<Routes>, L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, ResBody: http_body::Body<Data = Bytes> + Send + 'static, ResBody::Error: Into<crate::Error>,723 pub fn into_service<ResBody>(self) -> L::Service 724 where 725 L: Layer<Routes>, 726 L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, 727 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send + 'static, 728 <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error: Into<crate::Error> + Send, 729 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 730 ResBody::Error: Into<crate::Error>, 731 { 732 self.server.service_builder.service(self.routes.prepare()) 733 } 734 } 735 736 impl<L> fmt::Debug for Server<L> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result737 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 738 f.debug_struct("Builder").finish() 739 } 740 } 741 742 struct Svc<S> { 743 inner: S, 744 trace_interceptor: Option<TraceInterceptor>, 745 } 746 747 impl<S, ResBody> Service<Request<Body>> for Svc<S> 748 where 749 S: Service<Request<Body>, Response = Response<ResBody>>, 750 S::Error: Into<crate::Error>, 751 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 752 ResBody::Error: Into<crate::Error>, 753 { 754 type Response = Response<BoxHttpBody>; 755 type Error = crate::Error; 756 type Future = SvcFuture<S::Future>; 757 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>758 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 759 self.inner.poll_ready(cx).map_err(Into::into) 760 } 761 call(&mut self, mut req: Request<Body>) -> Self::Future762 fn call(&mut self, mut req: Request<Body>) -> Self::Future { 763 let span = if let Some(trace_interceptor) = &self.trace_interceptor { 764 let (parts, body) = req.into_parts(); 765 let bodyless_request = Request::from_parts(parts, ()); 766 767 let span = trace_interceptor(&bodyless_request); 768 769 let (parts, _) = bodyless_request.into_parts(); 770 req = Request::from_parts(parts, body); 771 772 span 773 } else { 774 tracing::Span::none() 775 }; 776 777 SvcFuture { 778 inner: self.inner.call(req), 779 span, 780 } 781 } 782 } 783 784 #[pin_project] 785 struct SvcFuture<F> { 786 #[pin] 787 inner: F, 788 span: tracing::Span, 789 } 790 791 impl<F, E, ResBody> Future for SvcFuture<F> 792 where 793 F: Future<Output = Result<Response<ResBody>, E>>, 794 E: Into<crate::Error>, 795 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 796 ResBody::Error: Into<crate::Error>, 797 { 798 type Output = Result<Response<BoxHttpBody>, crate::Error>; 799 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>800 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 801 let this = self.project(); 802 let _guard = this.span.enter(); 803 804 let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?; 805 let response = response.map(|body| body.map_err(Into::into).boxed_unsync()); 806 Poll::Ready(Ok(response)) 807 } 808 } 809 810 impl<S> fmt::Debug for Svc<S> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result811 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 812 f.debug_struct("Svc").finish() 813 } 814 } 815 816 struct MakeSvc<S, IO> { 817 concurrency_limit: Option<usize>, 818 timeout: Option<Duration>, 819 inner: S, 820 trace_interceptor: Option<TraceInterceptor>, 821 _io: PhantomData<fn() -> IO>, 822 } 823 824 impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO> 825 where 826 IO: Connected, 827 S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static, 828 S::Future: Send + 'static, 829 S::Error: Into<crate::Error> + Send, 830 ResBody: http_body::Body<Data = Bytes> + Send + 'static, 831 ResBody::Error: Into<crate::Error>, 832 { 833 type Response = BoxService; 834 type Error = crate::Error; 835 type Future = future::Ready<Result<Self::Response, Self::Error>>; 836 poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>837 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 838 Ok(()).into() 839 } 840 call(&mut self, io: &ServerIo<IO>) -> Self::Future841 fn call(&mut self, io: &ServerIo<IO>) -> Self::Future { 842 let conn_info = io.connect_info(); 843 844 let svc = self.inner.clone(); 845 let concurrency_limit = self.concurrency_limit; 846 let timeout = self.timeout; 847 let trace_interceptor = self.trace_interceptor.clone(); 848 849 let svc = ServiceBuilder::new() 850 .layer_fn(RecoverError::new) 851 .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new)) 852 .layer_fn(|s| GrpcTimeout::new(s, timeout)) 853 .service(svc); 854 855 let svc = ServiceBuilder::new() 856 .layer(BoxService::layer()) 857 .map_request(move |mut request: Request<Body>| { 858 match &conn_info { 859 tower::util::Either::A(inner) => { 860 request.extensions_mut().insert(inner.clone()); 861 } 862 tower::util::Either::B(inner) => { 863 #[cfg(feature = "tls")] 864 { 865 request.extensions_mut().insert(inner.clone()); 866 request.extensions_mut().insert(inner.get_ref().clone()); 867 } 868 869 #[cfg(not(feature = "tls"))] 870 { 871 // just a type check to make sure we didn't forget to 872 // insert this into the extensions 873 let _: &() = inner; 874 } 875 } 876 } 877 878 request 879 }) 880 .service(Svc { 881 inner: svc, 882 trace_interceptor, 883 }); 884 885 future::ready(Ok(svc)) 886 } 887 } 888