1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::fmt::{self, Debug, Formatter};
4 use std::sync::Arc;
5 
6 use super::Inner;
7 use crate::call::{BatchContext, MessageReader, RpcStatusCode};
8 use crate::error::Error;
9 use crate::metadata::UnownedMetadata;
10 
11 /// Batch job type.
12 #[derive(PartialEq, Debug)]
13 pub enum BatchType {
14     /// Finish without reading any message.
15     Finish,
16     /// Extract one message when finish.
17     Read,
18     /// Check the rpc code and then extract one message.
19     CheckRead,
20 }
21 
22 /// A promise result which stores a message reader with bundled metadata.
23 pub struct BatchResult {
24     pub message_reader: Option<MessageReader>,
25     pub initial_metadata: UnownedMetadata,
26     pub trailing_metadata: UnownedMetadata,
27 }
28 
29 impl BatchResult {
new( message_reader: Option<MessageReader>, initial_metadata: Option<UnownedMetadata>, trailing_metadata: Option<UnownedMetadata>, ) -> BatchResult30     pub fn new(
31         message_reader: Option<MessageReader>,
32         initial_metadata: Option<UnownedMetadata>,
33         trailing_metadata: Option<UnownedMetadata>,
34     ) -> BatchResult {
35         let initial_metadata = if let Some(m) = initial_metadata {
36             m
37         } else {
38             UnownedMetadata::empty()
39         };
40         let trailing_metadata = if let Some(m) = trailing_metadata {
41             m
42         } else {
43             UnownedMetadata::empty()
44         };
45         BatchResult {
46             message_reader,
47             initial_metadata,
48             trailing_metadata,
49         }
50     }
51 }
52 
53 /// A promise used to resolve batch jobs.
54 pub struct Batch {
55     ty: BatchType,
56     ctx: BatchContext,
57     inner: Arc<Inner<BatchResult>>,
58 }
59 
60 impl Batch {
new(ty: BatchType, inner: Arc<Inner<BatchResult>>) -> Batch61     pub fn new(ty: BatchType, inner: Arc<Inner<BatchResult>>) -> Batch {
62         Batch {
63             ty,
64             ctx: BatchContext::new(),
65             inner,
66         }
67     }
68 
context(&self) -> &BatchContext69     pub fn context(&self) -> &BatchContext {
70         &self.ctx
71     }
72 
read_one_msg(&mut self, success: bool)73     fn read_one_msg(&mut self, success: bool) {
74         let task = {
75             let mut guard = self.inner.lock();
76             if success {
77                 guard.set_result(Ok(BatchResult::new(self.ctx.recv_message(), None, None)))
78             } else {
79                 // rely on C core to handle the failed read (e.g. deliver approriate
80                 // statusCode on the clientside).
81                 guard.set_result(Ok(BatchResult::new(None, None, None)))
82             }
83         };
84         task.map(|t| t.wake());
85     }
86 
finish_response(&mut self, succeed: bool)87     fn finish_response(&mut self, succeed: bool) {
88         let task = {
89             let mut guard = self.inner.lock();
90             if succeed {
91                 let status = self.ctx.rpc_status();
92                 if status.code() == RpcStatusCode::OK {
93                     guard.set_result(Ok(BatchResult::new(
94                         None,
95                         Some(self.ctx.take_initial_metadata()),
96                         Some(self.ctx.take_trailing_metadata()),
97                     )))
98                 } else {
99                     guard.set_result(Err(Error::RpcFailure(status)))
100                 }
101             } else {
102                 guard.set_result(Err(Error::RemoteStopped))
103             }
104         };
105         task.map(|t| t.wake());
106     }
107 
handle_unary_response(&mut self)108     fn handle_unary_response(&mut self) {
109         let task = {
110             let mut guard = self.inner.lock();
111             let status = self.ctx.rpc_status();
112             if status.code() == RpcStatusCode::OK {
113                 guard.set_result(Ok(BatchResult::new(
114                     self.ctx.recv_message(),
115                     Some(self.ctx.take_initial_metadata()),
116                     Some(self.ctx.take_trailing_metadata()),
117                 )))
118             } else {
119                 guard.set_result(Err(Error::RpcFailure(status)))
120             }
121         };
122         task.map(|t| t.wake());
123     }
124 
resolve(mut self, success: bool)125     pub fn resolve(mut self, success: bool) {
126         match self.ty {
127             BatchType::CheckRead => {
128                 assert!(success);
129                 self.handle_unary_response();
130             }
131             BatchType::Finish => {
132                 self.finish_response(success);
133             }
134             BatchType::Read => {
135                 self.read_one_msg(success);
136             }
137         }
138     }
139 }
140 
141 impl Debug for Batch {
fmt(&self, f: &mut Formatter<'_>) -> fmt::Result142     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
143         write!(f, "Batch [{:?}]", self.ty)
144     }
145 }
146 
147 /// A promise used to resolve async action status.
148 ///
149 /// The action can only succeed or fail without extra error hint.
150 pub struct Action {
151     inner: Arc<Inner<bool>>,
152 }
153 
154 impl Action {
new(inner: Arc<Inner<bool>>) -> Action155     pub fn new(inner: Arc<Inner<bool>>) -> Action {
156         Action { inner }
157     }
158 
resolve(self, success: bool)159     pub fn resolve(self, success: bool) {
160         let task = {
161             let mut guard = self.inner.lock();
162             guard.set_result(Ok(success))
163         };
164         task.map(|t| t.wake());
165     }
166 }
167