1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::ffi::CStr;
4 use std::future::Future;
5 use std::pin::Pin;
6 use std::sync::Arc;
7 use std::task::{Context, Poll};
8 use std::time::Duration;
9 use std::{result, slice};
10 
11 use crate::grpc_sys::{
12     self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
13 };
14 use futures_util::ready;
15 use futures_util::{Sink, Stream};
16 use parking_lot::Mutex;
17 
18 use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
19 use crate::buf::GrpcSlice;
20 use crate::call::{
21     BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
22 };
23 use crate::codec::{DeserializeFn, SerializeFn};
24 use crate::cq::CompletionQueue;
25 use crate::error::{Error, Result};
26 use crate::metadata::Metadata;
27 use crate::server::ServerChecker;
28 use crate::server::{BoxHandler, RequestCallContext};
29 use crate::task::{BatchFuture, CallTag, Executor, Kicker};
30 use crate::CheckResult;
31 
32 /// A time point that an rpc or operation should finished before it.
33 #[derive(Clone, Copy)]
34 pub struct Deadline {
35     pub(crate) spec: gpr_timespec,
36 }
37 
38 impl Deadline {
new(spec: gpr_timespec) -> Deadline39     fn new(spec: gpr_timespec) -> Deadline {
40         let realtime_spec =
41             unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
42 
43         Deadline {
44             spec: realtime_spec,
45         }
46     }
47 
48     /// Checks if the deadline is exceeded.
exceeded(self) -> bool49     pub fn exceeded(self) -> bool {
50         unsafe {
51             let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
52             grpc_sys::gpr_time_cmp(now, self.spec) >= 0
53         }
54     }
55 
spec(self) -> gpr_timespec56     pub(crate) fn spec(self) -> gpr_timespec {
57         self.spec
58     }
59 }
60 
61 impl From<Duration> for Deadline {
62     /// Build a deadline from given duration.
63     ///
64     /// The deadline will be `now + duration`.
65     #[inline]
from(dur: Duration) -> Deadline66     fn from(dur: Duration) -> Deadline {
67         Deadline::new(dur.into())
68     }
69 }
70 
71 /// Context for accepting a request.
72 pub struct RequestContext {
73     ctx: *mut grpcwrap_request_call_context,
74     request_call: Option<RequestCallContext>,
75 }
76 
77 impl RequestContext {
new(rc: RequestCallContext) -> RequestContext78     pub fn new(rc: RequestCallContext) -> RequestContext {
79         let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
80 
81         RequestContext {
82             ctx,
83             request_call: Some(rc),
84         }
85     }
86 
87     /// Try to accept a client side streaming request.
88     ///
89     /// Return error if the request is a client side unary request.
handle_stream_req( self, cq: &CompletionQueue, rc: &mut RequestCallContext, ) -> result::Result<(), Self>90     pub fn handle_stream_req(
91         self,
92         cq: &CompletionQueue,
93         rc: &mut RequestCallContext,
94     ) -> result::Result<(), Self> {
95         let checker = rc.get_checker();
96         let handler = unsafe { rc.get_handler(self.method()) };
97         match handler {
98             Some(handler) => match handler.method_type() {
99                 MethodType::Unary | MethodType::ServerStreaming => Err(self),
100                 _ => {
101                     execute(self, cq, None, handler, checker);
102                     Ok(())
103                 }
104             },
105             None => {
106                 execute_unimplemented(self, cq.clone());
107                 Ok(())
108             }
109         }
110     }
111 
112     /// Accept a client side unary request.
113     ///
114     /// This method should be called after `handle_stream_req`. When handling
115     /// client side unary request, handler will only be called after the unary
116     /// request is received.
handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue)117     pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
118         // fetch message before calling callback.
119         let tag = Box::new(CallTag::unary_request(self, rc));
120         let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
121         let request_ctx = tag.request_ctx().unwrap().as_ptr();
122         let tag_ptr = Box::into_raw(tag);
123         unsafe {
124             let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
125             let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
126             if code != grpc_call_error::GRPC_CALL_OK {
127                 drop(Box::from_raw(tag_ptr));
128                 // it should not failed.
129                 panic!("try to receive message fail: {:?}", code);
130             }
131         }
132     }
133 
take_request_call_context(&mut self) -> Option<RequestCallContext>134     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
135         self.request_call.take()
136     }
137 
as_ptr(&self) -> *mut grpcwrap_request_call_context138     pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
139         self.ctx
140     }
141 
call(&self, cq: CompletionQueue) -> Call142     fn call(&self, cq: CompletionQueue) -> Call {
143         unsafe {
144             // It is okay to use a mutable pointer on a immutable reference, `self`,
145             // because grpcwrap_request_call_context_ref_call is thread-safe.
146             let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
147             assert!(!call.is_null());
148             Call::from_raw(call, cq)
149         }
150     }
151 
method(&self) -> &[u8]152     pub fn method(&self) -> &[u8] {
153         let mut len = 0;
154         let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
155 
156         unsafe { slice::from_raw_parts(method as _, len) }
157     }
158 
host(&self) -> &[u8]159     fn host(&self) -> &[u8] {
160         let mut len = 0;
161         let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
162 
163         unsafe { slice::from_raw_parts(host as _, len) }
164     }
165 
deadline(&self) -> Deadline166     fn deadline(&self) -> Deadline {
167         let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
168 
169         Deadline::new(t)
170     }
171 
metadata(&self) -> &Metadata172     fn metadata(&self) -> &Metadata {
173         unsafe {
174             let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
175             let arr_ptr: *const Metadata = ptr as _;
176             &*arr_ptr
177         }
178     }
179 
peer(&self) -> String180     fn peer(&self) -> String {
181         unsafe {
182             // RequestContext always holds a reference of the call.
183             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
184             let p = grpc_sys::grpc_call_get_peer(call);
185             let peer = CStr::from_ptr(p)
186                 .to_str()
187                 .expect("valid UTF-8 data")
188                 .to_owned();
189             grpc_sys::gpr_free(p as _);
190             peer
191         }
192     }
193 
194     /// If the server binds in non-secure mode, this will return None
195     #[cfg(feature = "_secure")]
auth_context(&self) -> Option<crate::AuthContext>196     fn auth_context(&self) -> Option<crate::AuthContext> {
197         unsafe {
198             let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
199             crate::AuthContext::from_call_ptr(call)
200         }
201     }
202 }
203 
204 impl Drop for RequestContext {
drop(&mut self)205     fn drop(&mut self) {
206         unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
207     }
208 }
209 
210 /// A context for handling client side unary request.
211 pub struct UnaryRequestContext {
212     request: RequestContext,
213     request_call: Option<RequestCallContext>,
214     batch: BatchContext,
215 }
216 
217 impl UnaryRequestContext {
new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext218     pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
219         UnaryRequestContext {
220             request: ctx,
221             request_call: Some(rc),
222             batch: BatchContext::new(),
223         }
224     }
225 
batch_ctx(&self) -> &BatchContext226     pub fn batch_ctx(&self) -> &BatchContext {
227         &self.batch
228     }
229 
batch_ctx_mut(&mut self) -> &mut BatchContext230     pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
231         &mut self.batch
232     }
233 
request_ctx(&self) -> &RequestContext234     pub fn request_ctx(&self) -> &RequestContext {
235         &self.request
236     }
237 
take_request_call_context(&mut self) -> Option<RequestCallContext>238     pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
239         self.request_call.take()
240     }
241 
handle( self, rc: &mut RequestCallContext, cq: &CompletionQueue, reader: Option<MessageReader>, )242     pub fn handle(
243         self,
244         rc: &mut RequestCallContext,
245         cq: &CompletionQueue,
246         reader: Option<MessageReader>,
247     ) {
248         let checker = rc.get_checker();
249         let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
250         if reader.is_some() {
251             return execute(self.request, cq, reader, handler, checker);
252         }
253 
254         let status = RpcStatus::with_message(RpcStatusCode::INTERNAL, "No payload".to_owned());
255         self.request.call(cq.clone()).abort(&status)
256     }
257 }
258 
259 /// A stream for client a streaming call and a duplex streaming call.
260 ///
261 /// The corresponding RPC will be canceled if the stream did not
262 /// finish before dropping.
263 #[must_use = "if unused the RequestStream may immediately cancel the RPC"]
264 pub struct RequestStream<T> {
265     call: Arc<Mutex<ShareCall>>,
266     base: StreamingBase,
267     de: DeserializeFn<T>,
268 }
269 
270 impl<T> RequestStream<T> {
new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T>271     fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
272         RequestStream {
273             call,
274             base: StreamingBase::new(None),
275             de,
276         }
277     }
278 }
279 
280 impl<T> Stream for RequestStream<T> {
281     type Item = Result<T>;
282 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>283     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
284         {
285             let mut call = self.call.lock();
286             call.check_alive()?;
287         }
288 
289         let t = &mut *self;
290         match ready!(t.base.poll(cx, &mut t.call, false)?) {
291             None => Poll::Ready(None),
292             Some(data) => Poll::Ready(Some((t.de)(data))),
293         }
294     }
295 }
296 
297 impl<T> Drop for RequestStream<T> {
298     /// The corresponding RPC will be canceled if the stream did not
299     /// finish before dropping.
drop(&mut self)300     fn drop(&mut self) {
301         self.base.on_drop(&mut self.call);
302     }
303 }
304 
305 /// A helper macro used to implement server side unary sink.
306 /// Not using generic here because we don't need to expose
307 /// `CallHolder` or `Call` to caller.
308 // TODO: Use type alias to be friendly for documentation.
309 macro_rules! impl_unary_sink {
310     ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
311         pub struct $rt {
312             call: $holder,
313             cq_f: Option<BatchFuture>,
314             err: Option<Error>,
315         }
316 
317         impl Future for $rt {
318             type Output = Result<()>;
319 
320             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
321                 if let Some(e) = self.err.take() {
322                     return Poll::Ready(Err(e));
323                 }
324 
325                 if self.cq_f.is_some() {
326                     ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
327                     self.cq_f.take();
328                 }
329 
330                 ready!(self.call.call(|c| c.poll_finish(cx))?);
331                 Poll::Ready(Ok(()))
332             }
333         }
334 
335         $(#[$attr])*
336         pub struct $t<T> {
337             call: Option<$holder>,
338             write_flags: u32,
339             ser: SerializeFn<T>,
340             headers: Option<Metadata>,
341             call_flags: u32,
342         }
343 
344         impl<T> $t<T> {
345             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
346                 $t {
347                     call: Some(call),
348                     write_flags: 0,
349                     ser,
350                     headers: None,
351                     call_flags: 0,
352                 }
353             }
354 
355             #[inline]
356             pub fn set_headers(&mut self, meta: Metadata) {
357                 self.headers = Some(meta);
358             }
359 
360             #[inline]
361             pub fn set_call_flags(&mut self, flags: u32) {
362                 // TODO: implement a server-side call flags interface similar to the client-side .CallOption.
363                 self.call_flags = flags;
364             }
365 
366             pub fn success(self, t: T) -> $rt {
367                 self.complete(RpcStatus::ok(), Some(t))
368             }
369 
370             pub fn fail(self, status: RpcStatus) -> $rt {
371                 self.complete(status, None)
372             }
373 
374             fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
375                 let mut data = match t {
376                     Some(t) => {
377                         let mut buf = GrpcSlice::default();
378                         if let Err(e) = (self.ser)(&t, &mut buf) {
379                             return $rt {
380                                 call: self.call.take().unwrap(),
381                                 cq_f: None,
382                                 err: Some(e),
383                             };
384                         }
385                         Some(buf)
386                     }
387                     None => None,
388                 };
389 
390                 let headers = &mut self.headers;
391                 let call_flags = self.call_flags;
392                 let write_flags = self.write_flags;
393 
394                 let res = self.call.as_mut().unwrap().call(|c| {
395                     c.call
396                         .start_send_status_from_server(&status, headers, call_flags, true, &mut data, write_flags)
397                 });
398 
399                 let (cq_f, err) = match res {
400                     Ok(f) => (Some(f), None),
401                     Err(e) => (None, Some(e)),
402                 };
403 
404                 $rt {
405                     call: self.call.take().unwrap(),
406                     cq_f,
407                     err,
408                 }
409             }
410         }
411 
412         impl<T> Drop for $t<T> {
413             /// The corresponding RPC will be canceled if the sink did not
414             /// send a response before dropping.
415             fn drop(&mut self) {
416                 self.call
417                     .as_mut()
418                     .map(|call| call.call(|c| c.call.cancel()));
419             }
420         }
421     };
422 }
423 
424 impl_unary_sink!(
425     /// A sink for unary call.
426     ///
427     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
428     ///
429     /// [`success`]: #method.success
430     /// [`fail`]: #method.fail
431     #[must_use = "if unused the sink may immediately cancel the RPC"]
432     UnarySink,
433     UnarySinkResult,
434     ShareCall
435 );
436 impl_unary_sink!(
437     /// A sink for client streaming call.
438     ///
439     /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
440     ///
441     /// [`success`]: #method.success
442     /// [`fail`]: #method.fail
443     #[must_use = "if unused the sink may immediately cancel the RPC"]
444     ClientStreamingSink,
445     ClientStreamingSinkResult,
446     Arc<Mutex<ShareCall>>
447 );
448 
449 // A macro helper to implement server side streaming sink.
450 macro_rules! impl_stream_sink {
451     ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
452         $(#[$attr])*
453         pub struct $t<T> {
454             call: Option<$holder>,
455             base: SinkBase,
456             flush_f: Option<BatchFuture>,
457             status: RpcStatus,
458             flushed: bool,
459             closed: bool,
460             ser: SerializeFn<T>,
461         }
462 
463         impl<T> $t<T> {
464             fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
465                 $t {
466                     call: Some(call),
467                     base: SinkBase::new(true),
468                     flush_f: None,
469                     status: RpcStatus::ok(),
470                     flushed: false,
471                     closed: false,
472                     ser,
473                 }
474             }
475 
476             pub fn set_headers(&mut self, meta: Metadata) {
477                 self.base.headers = meta;
478             }
479 
480             /// By default it always sends messages with their configured buffer hint. But when the
481             /// `enhance_batch` is enabled, messages will be batched together as many as possible.
482             /// The rules are listed as below:
483             /// - All messages except the last one will be sent with `buffer_hint` set to true.
484             /// - The last message will also be sent with `buffer_hint` set to true unless any message is
485             ///    offered with buffer hint set to false.
486             ///
487             /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
488             /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
489             pub fn enhance_batch(&mut self, flag: bool) {
490                 self.base.enhance_buffer_strategy = flag;
491             }
492 
493             pub fn set_status(&mut self, status: RpcStatus) {
494                 assert!(self.flush_f.is_none());
495                 self.status = status;
496             }
497 
498             pub fn fail(mut self, status: RpcStatus) -> $ft {
499                 assert!(self.flush_f.is_none());
500                 let send_metadata = self.base.send_metadata;
501                 let res = self.call.as_mut().unwrap().call(|c| {
502                     c.call
503                         .start_send_status_from_server(&status, &mut None, 0, send_metadata, &mut None, 0)
504                 });
505 
506                 let (fail_f, err) = match res {
507                     Ok(f) => (Some(f), None),
508                     Err(e) => (None, Some(e)),
509                 };
510 
511                 $ft {
512                     call: self.call.take().unwrap(),
513                     fail_f,
514                     err,
515                 }
516             }
517         }
518 
519         impl<T> Drop for $t<T> {
520             /// The corresponding RPC will be canceled if the sink did not call
521             /// [`close`] or [`fail`] before dropping.
522             ///
523             /// [`close`]: #method.close
524             /// [`fail`]: #method.fail
525             fn drop(&mut self) {
526                 // We did not close it explicitly and it was not dropped in the `fail`.
527                 if !self.closed && self.call.is_some() {
528                     let mut call = self.call.take().unwrap();
529                     call.call(|c| c.call.cancel());
530                 }
531             }
532         }
533 
534         impl<T> Sink<(T, WriteFlags)> for $t<T> {
535             type Error = Error;
536 
537             #[inline]
538             fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
539                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
540                     return Poll::Ready(Err(Error::RemoteStopped));
541                 }
542                 Pin::new(&mut self.base).poll_ready(cx)
543             }
544 
545             #[inline]
546             fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
547                 let t = &mut *self;
548                 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser, 0)
549             }
550 
551             #[inline]
552             fn poll_flush(mut self: Pin<&mut Self>,  cx: &mut Context) -> Poll<Result<()>> {
553                 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
554                     return Poll::Ready(Err(Error::RemoteStopped));
555                 }
556                 let t = &mut *self;
557                 Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap(), 0)
558             }
559 
560             fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
561                 if self.flush_f.is_none() {
562                     ready!(Pin::new(&mut self.base).poll_ready(cx)?);
563 
564                     let send_metadata = self.base.send_metadata;
565                     let t = &mut *self;
566                     let status = &t.status;
567                     let flush_f = t.call.as_mut().unwrap().call(|c| {
568                         c.call
569                             .start_send_status_from_server(status, &mut None, 0, send_metadata, &mut None, 0)
570                     })?;
571                     t.flush_f = Some(flush_f);
572                 }
573 
574                 if !self.flushed {
575                     ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
576                     self.flushed = true;
577                 }
578 
579                 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
580                 self.closed = true;
581                 Poll::Ready(Ok(()))
582             }
583         }
584 
585         #[must_use = "if unused the sink failure may immediately cancel the RPC"]
586         pub struct $ft {
587             call: $holder,
588             fail_f: Option<BatchFuture>,
589             err: Option<Error>,
590         }
591 
592         impl Future for $ft {
593             type Output = Result<()>;
594 
595             fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
596                 if let Some(e) = self.err.take() {
597                     return Poll::Ready(Err(e));
598                 }
599 
600                 let readiness = self.call.call(|c| {
601                     if c.finished {
602                         return Poll::Ready(Ok(()));
603                     }
604 
605                     c.poll_finish(cx).map(|r| r.map(|_| ()))
606                 })?;
607 
608                 if let Some(ref mut f) = self.fail_f {
609                     ready!(Pin::new(f).poll(cx)?);
610                 }
611 
612                 self.fail_f.take();
613                 readiness.map(Ok)
614             }
615         }
616     };
617 }
618 
619 impl_stream_sink!(
620     /// A sink for server streaming call.
621     ///
622     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
623     ///
624     /// [`close`]: #method.close
625     /// [`fail`]: #method.fail
626     #[must_use = "if unused the sink may immediately cancel the RPC"]
627     ServerStreamingSink,
628     ServerStreamingSinkFailure,
629     ShareCall
630 );
631 impl_stream_sink!(
632     /// A sink for duplex streaming call.
633     ///
634     /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
635     ///
636     /// [`close`]: #method.close
637     /// [`fail`]: #method.fail
638     #[must_use = "if unused the sink may immediately cancel the RPC"]
639     DuplexSink,
640     DuplexSinkFailure,
641     Arc<Mutex<ShareCall>>
642 );
643 
644 /// A context for rpc handling.
645 pub struct RpcContext<'a> {
646     ctx: RequestContext,
647     executor: Executor<'a>,
648     deadline: Deadline,
649 }
650 
651 impl<'a> RpcContext<'a> {
new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_>652     fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
653         RpcContext {
654             deadline: ctx.deadline(),
655             ctx,
656             executor: Executor::new(cq),
657         }
658     }
659 
kicker(&self) -> Kicker660     fn kicker(&self) -> Kicker {
661         let call = self.call();
662         Kicker::from_call(call)
663     }
664 
call(&self) -> Call665     pub(crate) fn call(&self) -> Call {
666         self.ctx.call(self.executor.cq().clone())
667     }
668 
method(&self) -> &[u8]669     pub fn method(&self) -> &[u8] {
670         self.ctx.method()
671     }
672 
host(&self) -> &[u8]673     pub fn host(&self) -> &[u8] {
674         self.ctx.host()
675     }
676 
deadline(&self) -> Deadline677     pub fn deadline(&self) -> Deadline {
678         self.deadline
679     }
680 
681     /// Get the initial metadata sent by client.
request_headers(&self) -> &Metadata682     pub fn request_headers(&self) -> &Metadata {
683         self.ctx.metadata()
684     }
685 
peer(&self) -> String686     pub fn peer(&self) -> String {
687         self.ctx.peer()
688     }
689 
690     /// Wrapper around the gRPC Core AuthContext
691     ///
692     /// If the server binds in non-secure mode, this will return None
693     #[cfg(feature = "_secure")]
auth_context(&self) -> Option<crate::AuthContext>694     pub fn auth_context(&self) -> Option<crate::AuthContext> {
695         self.ctx.auth_context()
696     }
697 
698     /// Spawn the future into current gRPC poll thread.
699     ///
700     /// This can reduce a lot of context switching, but please make
701     /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,702     pub fn spawn<F>(&self, f: F)
703     where
704         F: Future<Output = ()> + Send + 'static,
705     {
706         self.executor.spawn(f, self.kicker())
707     }
708 }
709 
710 // Following four helper functions are used to create a callback closure.
711 
712 macro_rules! accept_call {
713     ($call:expr) => {
714         match $call.start_server_side() {
715             Err(Error::QueueShutdown) => return,
716             Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
717             Ok(f) => f,
718         }
719     };
720 }
721 
722 // Helper function to call a unary handler.
execute_unary<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, UnarySink<Q>),723 pub fn execute_unary<P, Q, F>(
724     ctx: RpcContext<'_>,
725     ser: SerializeFn<Q>,
726     de: DeserializeFn<P>,
727     payload: MessageReader,
728     f: &mut F,
729 ) where
730     F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
731 {
732     let mut call = ctx.call();
733     let close_f = accept_call!(call);
734     let request = match de(payload) {
735         Ok(f) => f,
736         Err(e) => {
737             let status = RpcStatus::with_message(
738                 RpcStatusCode::INTERNAL,
739                 format!("Failed to deserialize response message: {e:?}"),
740             );
741             call.abort(&status);
742             return;
743         }
744     };
745     let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
746     f(ctx, request, sink)
747 }
748 
749 // Helper function to call client streaming handler.
execute_client_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),750 pub fn execute_client_streaming<P, Q, F>(
751     ctx: RpcContext<'_>,
752     ser: SerializeFn<Q>,
753     de: DeserializeFn<P>,
754     f: &mut F,
755 ) where
756     F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
757 {
758     let mut call = ctx.call();
759     let close_f = accept_call!(call);
760     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
761 
762     let req_s = RequestStream::new(call.clone(), de);
763     let sink = ClientStreamingSink::new(call, ser);
764     f(ctx, req_s, sink)
765 }
766 
767 // Helper function to call server streaming handler.
execute_server_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),768 pub fn execute_server_streaming<P, Q, F>(
769     ctx: RpcContext<'_>,
770     ser: SerializeFn<Q>,
771     de: DeserializeFn<P>,
772     payload: MessageReader,
773     f: &mut F,
774 ) where
775     F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
776 {
777     let mut call = ctx.call();
778     let close_f = accept_call!(call);
779 
780     let request = match de(payload) {
781         Ok(t) => t,
782         Err(e) => {
783             let status = RpcStatus::with_message(
784                 RpcStatusCode::INTERNAL,
785                 format!("Failed to deserialize response message: {e:?}"),
786             );
787             call.abort(&status);
788             return;
789         }
790     };
791 
792     let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
793     f(ctx, request, sink)
794 }
795 
796 // Helper function to call duplex streaming handler.
execute_duplex_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),797 pub fn execute_duplex_streaming<P, Q, F>(
798     ctx: RpcContext<'_>,
799     ser: SerializeFn<Q>,
800     de: DeserializeFn<P>,
801     f: &mut F,
802 ) where
803     F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
804 {
805     let mut call = ctx.call();
806     let close_f = accept_call!(call);
807     let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
808 
809     let req_s = RequestStream::new(call.clone(), de);
810     let sink = DuplexSink::new(call, ser);
811     f(ctx, req_s, sink)
812 }
813 
814 // A helper function used to handle all undefined rpc calls.
execute_unimplemented(ctx: RequestContext, cq: CompletionQueue)815 pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
816     // Suppress needless-pass-by-value.
817     let ctx = ctx;
818     let mut call = ctx.call(cq);
819     accept_call!(call);
820     call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED))
821 }
822 
823 // Helper function to call handler.
824 //
825 // Invoked after a request is ready to be handled.
execute( ctx: RequestContext, cq: &CompletionQueue, payload: Option<MessageReader>, f: &mut BoxHandler, mut checkers: Vec<Box<dyn ServerChecker>>, )826 fn execute(
827     ctx: RequestContext,
828     cq: &CompletionQueue,
829     payload: Option<MessageReader>,
830     f: &mut BoxHandler,
831     mut checkers: Vec<Box<dyn ServerChecker>>,
832 ) {
833     let rpc_ctx = RpcContext::new(ctx, cq);
834 
835     for handler in checkers.iter_mut() {
836         match handler.check(&rpc_ctx) {
837             CheckResult::Continue => {}
838             CheckResult::Abort(status) => {
839                 rpc_ctx.call().abort(&status);
840                 return;
841             }
842         }
843     }
844 
845     f.handle(rpc_ctx, payload)
846 }
847