1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::future::Future;
4 
5 use crate::call::client::{
6     CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver,
7     ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver,
8 };
9 use crate::call::{Call, Method};
10 use crate::channel::Channel;
11 use crate::error::Result;
12 use crate::task::Executor;
13 use crate::task::Kicker;
14 use futures_executor::block_on;
15 
16 /// A generic client for making RPC calls.
17 #[derive(Clone)]
18 pub struct Client {
19     channel: Channel,
20     // Used to kick its completion queue.
21     kicker: Kicker,
22 }
23 
24 impl Client {
25     /// Initialize a new [`Client`].
new(channel: Channel) -> Client26     pub fn new(channel: Channel) -> Client {
27         let kicker = channel.create_kicker().unwrap();
28         Client { channel, kicker }
29     }
30 
31     /// Create a synchronized unary RPC call.
32     ///
33     /// It uses futures_executor::block_on to wait for the futures. It's recommended to use
34     /// the asynchronous version.
unary_call<Req, Resp: Unpin>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<Resp>35     pub fn unary_call<Req, Resp: Unpin>(
36         &self,
37         method: &Method<Req, Resp>,
38         req: &Req,
39         opt: CallOption,
40     ) -> Result<Resp> {
41         block_on(self.unary_call_async(method, req, opt)?)
42     }
43 
44     /// Create an asynchronized unary RPC call.
unary_call_async<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>>45     pub fn unary_call_async<Req, Resp>(
46         &self,
47         method: &Method<Req, Resp>,
48         req: &Req,
49         opt: CallOption,
50     ) -> Result<ClientUnaryReceiver<Resp>> {
51         Call::unary_async(&self.channel, method, req, opt)
52     }
53 
54     /// Create an asynchronized client streaming call.
55     ///
56     /// Client can send a stream of requests and server responds with a single response.
client_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)>57     pub fn client_streaming<Req, Resp>(
58         &self,
59         method: &Method<Req, Resp>,
60         opt: CallOption,
61     ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> {
62         Call::client_streaming(&self.channel, method, opt)
63     }
64 
65     /// Create an asynchronized server streaming call.
66     ///
67     /// Client sends on request and server responds with a stream of responses.
server_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>>68     pub fn server_streaming<Req, Resp>(
69         &self,
70         method: &Method<Req, Resp>,
71         req: &Req,
72         opt: CallOption,
73     ) -> Result<ClientSStreamReceiver<Resp>> {
74         Call::server_streaming(&self.channel, method, req, opt)
75     }
76 
77     /// Create an asynchronized duplex streaming call.
78     ///
79     /// Client sends a stream of requests and server responds with a stream of responses.
80     /// The response stream is completely independent and both side can be sending messages
81     /// at the same time.
duplex_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)>82     pub fn duplex_streaming<Req, Resp>(
83         &self,
84         method: &Method<Req, Resp>,
85         opt: CallOption,
86     ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> {
87         Call::duplex_streaming(&self.channel, method, opt)
88     }
89 
90     /// Spawn the future into current gRPC poll thread.
91     ///
92     /// This can reduce a lot of context switching, but please make
93     /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,94     pub fn spawn<F>(&self, f: F)
95     where
96         F: Future<Output = ()> + Send + 'static,
97     {
98         let kicker = self.kicker.clone();
99         Executor::new(self.channel.cq()).spawn(f, kicker)
100     }
101 
102     /// Get the underlying channel.
103     #[inline]
channel(&self) -> &Channel104     pub fn channel(&self) -> &Channel {
105         &self.channel
106     }
107 }
108