1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. 2 3 use std::future::Future; 4 5 use crate::call::client::{ 6 CallOption, ClientCStreamReceiver, ClientCStreamSender, ClientDuplexReceiver, 7 ClientDuplexSender, ClientSStreamReceiver, ClientUnaryReceiver, 8 }; 9 use crate::call::{Call, Method}; 10 use crate::channel::Channel; 11 use crate::error::Result; 12 use crate::task::Executor; 13 use crate::task::Kicker; 14 use futures_executor::block_on; 15 16 /// A generic client for making RPC calls. 17 #[derive(Clone)] 18 pub struct Client { 19 channel: Channel, 20 // Used to kick its completion queue. 21 kicker: Kicker, 22 } 23 24 impl Client { 25 /// Initialize a new [`Client`]. new(channel: Channel) -> Client26 pub fn new(channel: Channel) -> Client { 27 let kicker = channel.create_kicker().unwrap(); 28 Client { channel, kicker } 29 } 30 31 /// Create a synchronized unary RPC call. 32 /// 33 /// It uses futures_executor::block_on to wait for the futures. It's recommended to use 34 /// the asynchronous version. unary_call<Req, Resp: Unpin>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<Resp>35 pub fn unary_call<Req, Resp: Unpin>( 36 &self, 37 method: &Method<Req, Resp>, 38 req: &Req, 39 opt: CallOption, 40 ) -> Result<Resp> { 41 block_on(self.unary_call_async(method, req, opt)?) 42 } 43 44 /// Create an asynchronized unary RPC call. unary_call_async<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>>45 pub fn unary_call_async<Req, Resp>( 46 &self, 47 method: &Method<Req, Resp>, 48 req: &Req, 49 opt: CallOption, 50 ) -> Result<ClientUnaryReceiver<Resp>> { 51 Call::unary_async(&self.channel, method, req, opt) 52 } 53 54 /// Create an asynchronized client streaming call. 55 /// 56 /// Client can send a stream of requests and server responds with a single response. client_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)>57 pub fn client_streaming<Req, Resp>( 58 &self, 59 method: &Method<Req, Resp>, 60 opt: CallOption, 61 ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> { 62 Call::client_streaming(&self.channel, method, opt) 63 } 64 65 /// Create an asynchronized server streaming call. 66 /// 67 /// Client sends on request and server responds with a stream of responses. server_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, req: &Req, opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>>68 pub fn server_streaming<Req, Resp>( 69 &self, 70 method: &Method<Req, Resp>, 71 req: &Req, 72 opt: CallOption, 73 ) -> Result<ClientSStreamReceiver<Resp>> { 74 Call::server_streaming(&self.channel, method, req, opt) 75 } 76 77 /// Create an asynchronized duplex streaming call. 78 /// 79 /// Client sends a stream of requests and server responds with a stream of responses. 80 /// The response stream is completely independent and both side can be sending messages 81 /// at the same time. duplex_streaming<Req, Resp>( &self, method: &Method<Req, Resp>, opt: CallOption, ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)>82 pub fn duplex_streaming<Req, Resp>( 83 &self, 84 method: &Method<Req, Resp>, 85 opt: CallOption, 86 ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> { 87 Call::duplex_streaming(&self.channel, method, opt) 88 } 89 90 /// Spawn the future into current gRPC poll thread. 91 /// 92 /// This can reduce a lot of context switching, but please make 93 /// sure there is no heavy work in the future. spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,94 pub fn spawn<F>(&self, f: F) 95 where 96 F: Future<Output = ()> + Send + 'static, 97 { 98 let kicker = self.kicker.clone(); 99 Executor::new(self.channel.cq()).spawn(f, kicker) 100 } 101 102 /// Get the underlying channel. 103 #[inline] channel(&self) -> &Channel104 pub fn channel(&self) -> &Channel { 105 &self.channel 106 } 107 } 108