1 use crate::metadata::{MetadataMap, MetadataValue};
2 #[cfg(feature = "transport")]
3 use crate::transport::server::TcpConnectInfo;
4 #[cfg(feature = "tls")]
5 use crate::transport::{server::TlsConnectInfo, Certificate};
6 use crate::Extensions;
7 #[cfg(feature = "transport")]
8 use std::net::SocketAddr;
9 #[cfg(feature = "tls")]
10 use std::sync::Arc;
11 use std::time::Duration;
12 use tokio_stream::Stream;
13
14 /// A gRPC request and metadata from an RPC call.
15 #[derive(Debug)]
16 pub struct Request<T> {
17 metadata: MetadataMap,
18 message: T,
19 extensions: Extensions,
20 }
21
22 /// Trait implemented by RPC request types.
23 ///
24 /// Types implementing this trait can be used as arguments to client RPC
25 /// methods without explicitly wrapping them into `tonic::Request`s. The purpose
26 /// is to make client calls slightly more convenient to write.
27 ///
28 /// Tonic's code generation and blanket implementations handle this for you,
29 /// so it is not necessary to implement this trait directly.
30 ///
31 /// # Example
32 ///
33 /// Given the following gRPC method definition:
34 /// ```proto
35 /// rpc GetFeature(Point) returns (Feature) {}
36 /// ```
37 ///
38 /// we can call `get_feature` in two equivalent ways:
39 /// ```rust
40 /// # pub struct Point {}
41 /// # pub struct Client {}
42 /// # impl Client {
43 /// # fn get_feature(&self, r: impl tonic::IntoRequest<Point>) {}
44 /// # }
45 /// # let client = Client {};
46 /// use tonic::Request;
47 ///
48 /// client.get_feature(Point {});
49 /// client.get_feature(Request::new(Point {}));
50 /// ```
51 pub trait IntoRequest<T>: sealed::Sealed {
52 /// Wrap the input message `T` in a `tonic::Request`
into_request(self) -> Request<T>53 fn into_request(self) -> Request<T>;
54 }
55
56 /// Trait implemented by RPC streaming request types.
57 ///
58 /// Types implementing this trait can be used as arguments to client streaming
59 /// RPC methods without explicitly wrapping them into `tonic::Request`s. The
60 /// purpose is to make client calls slightly more convenient to write.
61 ///
62 /// Tonic's code generation and blanket implementations handle this for you,
63 /// so it is not necessary to implement this trait directly.
64 ///
65 /// # Example
66 ///
67 /// Given the following gRPC service method definition:
68 /// ```proto
69 /// rpc RecordRoute(stream Point) returns (RouteSummary) {}
70 /// ```
71 /// we can call `record_route` in two equivalent ways:
72 ///
73 /// ```rust
74 /// # #[derive(Clone)]
75 /// # pub struct Point {};
76 /// # pub struct Client {};
77 /// # impl Client {
78 /// # fn record_route(&self, r: impl tonic::IntoStreamingRequest<Message = Point>) {}
79 /// # }
80 /// # let client = Client {};
81 /// use tonic::Request;
82 ///
83 /// let messages = vec![Point {}, Point {}];
84 ///
85 /// client.record_route(Request::new(tokio_stream::iter(messages.clone())));
86 /// client.record_route(tokio_stream::iter(messages));
87 /// ```
88 pub trait IntoStreamingRequest: sealed::Sealed {
89 /// The RPC request stream type
90 type Stream: Stream<Item = Self::Message> + Send + 'static;
91
92 /// The RPC request type
93 type Message;
94
95 /// Wrap the stream of messages in a `tonic::Request`
into_streaming_request(self) -> Request<Self::Stream>96 fn into_streaming_request(self) -> Request<Self::Stream>;
97 }
98
99 impl<T> Request<T> {
100 /// Create a new gRPC request.
101 ///
102 /// ```rust
103 /// # use tonic::Request;
104 /// # pub struct HelloRequest {
105 /// # pub name: String,
106 /// # }
107 /// Request::new(HelloRequest {
108 /// name: "Bob".into(),
109 /// });
110 /// ```
new(message: T) -> Self111 pub fn new(message: T) -> Self {
112 Request {
113 metadata: MetadataMap::new(),
114 message,
115 extensions: Extensions::new(),
116 }
117 }
118
119 /// Get a reference to the message
get_ref(&self) -> &T120 pub fn get_ref(&self) -> &T {
121 &self.message
122 }
123
124 /// Get a mutable reference to the message
get_mut(&mut self) -> &mut T125 pub fn get_mut(&mut self) -> &mut T {
126 &mut self.message
127 }
128
129 /// Get a reference to the custom request metadata.
metadata(&self) -> &MetadataMap130 pub fn metadata(&self) -> &MetadataMap {
131 &self.metadata
132 }
133
134 /// Get a mutable reference to the request metadata.
metadata_mut(&mut self) -> &mut MetadataMap135 pub fn metadata_mut(&mut self) -> &mut MetadataMap {
136 &mut self.metadata
137 }
138
139 /// Consumes `self`, returning the message
into_inner(self) -> T140 pub fn into_inner(self) -> T {
141 self.message
142 }
143
144 /// Consumes `self` returning the parts of the request.
into_parts(self) -> (MetadataMap, Extensions, T)145 pub fn into_parts(self) -> (MetadataMap, Extensions, T) {
146 (self.metadata, self.extensions, self.message)
147 }
148
149 /// Create a new gRPC request from metadata, extensions and message.
from_parts(metadata: MetadataMap, extensions: Extensions, message: T) -> Self150 pub fn from_parts(metadata: MetadataMap, extensions: Extensions, message: T) -> Self {
151 Self {
152 metadata,
153 extensions,
154 message,
155 }
156 }
157
from_http_parts(parts: http::request::Parts, message: T) -> Self158 pub(crate) fn from_http_parts(parts: http::request::Parts, message: T) -> Self {
159 Request {
160 metadata: MetadataMap::from_headers(parts.headers),
161 message,
162 extensions: Extensions::from_http(parts.extensions),
163 }
164 }
165
166 /// Convert an HTTP request to a gRPC request
from_http(http: http::Request<T>) -> Self167 pub fn from_http(http: http::Request<T>) -> Self {
168 let (parts, message) = http.into_parts();
169 Request::from_http_parts(parts, message)
170 }
171
into_http( self, uri: http::Uri, method: http::Method, version: http::Version, sanitize_headers: SanitizeHeaders, ) -> http::Request<T>172 pub(crate) fn into_http(
173 self,
174 uri: http::Uri,
175 method: http::Method,
176 version: http::Version,
177 sanitize_headers: SanitizeHeaders,
178 ) -> http::Request<T> {
179 let mut request = http::Request::new(self.message);
180
181 *request.version_mut() = version;
182 *request.method_mut() = method;
183 *request.uri_mut() = uri;
184 *request.headers_mut() = match sanitize_headers {
185 SanitizeHeaders::Yes => self.metadata.into_sanitized_headers(),
186 SanitizeHeaders::No => self.metadata.into_headers(),
187 };
188 *request.extensions_mut() = self.extensions.into_http();
189
190 request
191 }
192
193 #[doc(hidden)]
map<F, U>(self, f: F) -> Request<U> where F: FnOnce(T) -> U,194 pub fn map<F, U>(self, f: F) -> Request<U>
195 where
196 F: FnOnce(T) -> U,
197 {
198 let message = f(self.message);
199
200 Request {
201 metadata: self.metadata,
202 message,
203 extensions: self.extensions,
204 }
205 }
206
207 /// Get the local address of this connection.
208 ///
209 /// This will return `None` if the `IO` type used
210 /// does not implement `Connected` or when using a unix domain socket.
211 /// This currently only works on the server side.
212 #[cfg(feature = "transport")]
213 #[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
local_addr(&self) -> Option<SocketAddr>214 pub fn local_addr(&self) -> Option<SocketAddr> {
215 let addr = self
216 .extensions()
217 .get::<TcpConnectInfo>()
218 .and_then(|i| i.local_addr());
219
220 #[cfg(feature = "tls")]
221 let addr = addr.or_else(|| {
222 self.extensions()
223 .get::<TlsConnectInfo<TcpConnectInfo>>()
224 .and_then(|i| i.get_ref().local_addr())
225 });
226
227 addr
228 }
229
230 /// Get the remote address of this connection.
231 ///
232 /// This will return `None` if the `IO` type used
233 /// does not implement `Connected` or when using a unix domain socket.
234 /// This currently only works on the server side.
235 #[cfg(feature = "transport")]
236 #[cfg_attr(docsrs, doc(cfg(feature = "transport")))]
remote_addr(&self) -> Option<SocketAddr>237 pub fn remote_addr(&self) -> Option<SocketAddr> {
238 let addr = self
239 .extensions()
240 .get::<TcpConnectInfo>()
241 .and_then(|i| i.remote_addr());
242
243 #[cfg(feature = "tls")]
244 let addr = addr.or_else(|| {
245 self.extensions()
246 .get::<TlsConnectInfo<TcpConnectInfo>>()
247 .and_then(|i| i.get_ref().remote_addr())
248 });
249
250 addr
251 }
252
253 /// Get the peer certificates of the connected client.
254 ///
255 /// This is used to fetch the certificates from the TLS session
256 /// and is mostly used for mTLS. This currently only returns
257 /// `Some` on the server side of the `transport` server with
258 /// TLS enabled connections.
259 #[cfg(feature = "tls")]
260 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
peer_certs(&self) -> Option<Arc<Vec<Certificate>>>261 pub fn peer_certs(&self) -> Option<Arc<Vec<Certificate>>> {
262 self.extensions()
263 .get::<TlsConnectInfo<TcpConnectInfo>>()
264 .and_then(|i| i.peer_certs())
265 }
266
267 /// Set the max duration the request is allowed to take.
268 ///
269 /// Requires the server to support the `grpc-timeout` metadata, which Tonic does.
270 ///
271 /// The duration will be formatted according to [the spec] and use the most precise unit
272 /// possible.
273 ///
274 /// Example:
275 ///
276 /// ```rust
277 /// use std::time::Duration;
278 /// use tonic::Request;
279 ///
280 /// let mut request = Request::new(());
281 ///
282 /// request.set_timeout(Duration::from_secs(30));
283 ///
284 /// let value = request.metadata().get("grpc-timeout").unwrap();
285 ///
286 /// assert_eq!(
287 /// value,
288 /// // equivalent to 30 seconds
289 /// "30000000u"
290 /// );
291 /// ```
292 ///
293 /// [the spec]: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
set_timeout(&mut self, deadline: Duration)294 pub fn set_timeout(&mut self, deadline: Duration) {
295 let value: MetadataValue<_> = duration_to_grpc_timeout(deadline).parse().unwrap();
296 self.metadata_mut()
297 .insert(crate::metadata::GRPC_TIMEOUT_HEADER, value);
298 }
299
300 /// Returns a reference to the associated extensions.
extensions(&self) -> &Extensions301 pub fn extensions(&self) -> &Extensions {
302 &self.extensions
303 }
304
305 /// Returns a mutable reference to the associated extensions.
306 ///
307 /// # Example
308 ///
309 /// Extensions can be set in interceptors:
310 ///
311 /// ```no_run
312 /// use tonic::{Request, service::interceptor};
313 ///
314 /// struct MyExtension {
315 /// some_piece_of_data: String,
316 /// }
317 ///
318 /// interceptor(|mut request: Request<()>| {
319 /// request.extensions_mut().insert(MyExtension {
320 /// some_piece_of_data: "foo".to_string(),
321 /// });
322 ///
323 /// Ok(request)
324 /// });
325 /// ```
326 ///
327 /// And picked up by RPCs:
328 ///
329 /// ```no_run
330 /// use tonic::{async_trait, Status, Request, Response};
331 /// #
332 /// # struct Output {}
333 /// # struct Input;
334 /// # struct MyService;
335 /// # struct MyExtension;
336 /// # #[async_trait]
337 /// # trait TestService {
338 /// # async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status>;
339 /// # }
340 ///
341 /// #[async_trait]
342 /// impl TestService for MyService {
343 /// async fn handler(&self, req: Request<Input>) -> Result<Response<Output>, Status> {
344 /// let value: &MyExtension = req.extensions().get::<MyExtension>().unwrap();
345 ///
346 /// Ok(Response::new(Output {}))
347 /// }
348 /// }
349 /// ```
extensions_mut(&mut self) -> &mut Extensions350 pub fn extensions_mut(&mut self) -> &mut Extensions {
351 &mut self.extensions
352 }
353 }
354
355 impl<T> IntoRequest<T> for T {
into_request(self) -> Request<Self>356 fn into_request(self) -> Request<Self> {
357 Request::new(self)
358 }
359 }
360
361 impl<T> IntoRequest<T> for Request<T> {
into_request(self) -> Request<T>362 fn into_request(self) -> Request<T> {
363 self
364 }
365 }
366
367 impl<T> IntoStreamingRequest for T
368 where
369 T: Stream + Send + 'static,
370 {
371 type Stream = T;
372 type Message = T::Item;
373
into_streaming_request(self) -> Request<Self>374 fn into_streaming_request(self) -> Request<Self> {
375 Request::new(self)
376 }
377 }
378
379 impl<T> IntoStreamingRequest for Request<T>
380 where
381 T: Stream + Send + 'static,
382 {
383 type Stream = T;
384 type Message = T::Item;
385
into_streaming_request(self) -> Self386 fn into_streaming_request(self) -> Self {
387 self
388 }
389 }
390
391 impl<T> sealed::Sealed for T {}
392
393 mod sealed {
394 pub trait Sealed {}
395 }
396
duration_to_grpc_timeout(duration: Duration) -> String397 fn duration_to_grpc_timeout(duration: Duration) -> String {
398 fn try_format<T: Into<u128>>(
399 duration: Duration,
400 unit: char,
401 convert: impl FnOnce(Duration) -> T,
402 ) -> Option<String> {
403 // The gRPC spec specifies that the timeout most be at most 8 digits. So this is the largest a
404 // value can be before we need to use a bigger unit.
405 let max_size: u128 = 99_999_999; // exactly 8 digits
406
407 let value = convert(duration).into();
408 if value > max_size {
409 None
410 } else {
411 Some(format!("{}{}", value, unit))
412 }
413 }
414
415 // pick the most precise unit that is less than or equal to 8 digits as per the gRPC spec
416 try_format(duration, 'n', |d| d.as_nanos())
417 .or_else(|| try_format(duration, 'u', |d| d.as_micros()))
418 .or_else(|| try_format(duration, 'm', |d| d.as_millis()))
419 .or_else(|| try_format(duration, 'S', |d| d.as_secs()))
420 .or_else(|| try_format(duration, 'M', |d| d.as_secs() / 60))
421 .or_else(|| {
422 try_format(duration, 'H', |d| {
423 let minutes = d.as_secs() / 60;
424 minutes / 60
425 })
426 })
427 // duration has to be more than 11_415 years for this to happen
428 .expect("duration is unrealistically large")
429 }
430
431 /// When converting a `tonic::Request` into a `http::Request` should reserved
432 /// headers be removed?
433 pub(crate) enum SanitizeHeaders {
434 Yes,
435 No,
436 }
437
438 #[cfg(test)]
439 mod tests {
440 use super::*;
441 use crate::metadata::MetadataValue;
442 use http::Uri;
443
444 #[test]
reserved_headers_are_excluded()445 fn reserved_headers_are_excluded() {
446 let mut r = Request::new(1);
447
448 for header in &MetadataMap::GRPC_RESERVED_HEADERS {
449 r.metadata_mut()
450 .insert(*header, MetadataValue::from_static("invalid"));
451 }
452
453 let http_request = r.into_http(
454 Uri::default(),
455 http::Method::POST,
456 http::Version::HTTP_2,
457 SanitizeHeaders::Yes,
458 );
459 assert!(http_request.headers().is_empty());
460 }
461
462 #[test]
duration_to_grpc_timeout_less_than_second()463 fn duration_to_grpc_timeout_less_than_second() {
464 let timeout = Duration::from_millis(500);
465 let value = duration_to_grpc_timeout(timeout);
466 assert_eq!(value, format!("{}u", timeout.as_micros()));
467 }
468
469 #[test]
duration_to_grpc_timeout_more_than_second()470 fn duration_to_grpc_timeout_more_than_second() {
471 let timeout = Duration::from_secs(30);
472 let value = duration_to_grpc_timeout(timeout);
473 assert_eq!(value, format!("{}u", timeout.as_micros()));
474 }
475
476 #[test]
duration_to_grpc_timeout_a_very_long_time()477 fn duration_to_grpc_timeout_a_very_long_time() {
478 let one_hour = Duration::from_secs(60 * 60);
479 let value = duration_to_grpc_timeout(one_hour);
480 assert_eq!(value, format!("{}m", one_hour.as_millis()));
481 }
482 }
483