1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. 2 3 use std::fmt::{self, Debug, Formatter}; 4 use std::sync::Arc; 5 6 use super::Inner; 7 use crate::call::{BatchContext, MessageReader, RpcStatusCode}; 8 use crate::error::Error; 9 use crate::metadata::UnownedMetadata; 10 11 /// Batch job type. 12 #[derive(PartialEq, Debug)] 13 pub enum BatchType { 14 /// Finish without reading any message. 15 Finish, 16 /// Extract one message when finish. 17 Read, 18 /// Check the rpc code and then extract one message. 19 CheckRead, 20 } 21 22 /// A promise result which stores a message reader with bundled metadata. 23 pub struct BatchResult { 24 pub message_reader: Option<MessageReader>, 25 pub initial_metadata: UnownedMetadata, 26 pub trailing_metadata: UnownedMetadata, 27 } 28 29 impl BatchResult { new( message_reader: Option<MessageReader>, initial_metadata: Option<UnownedMetadata>, trailing_metadata: Option<UnownedMetadata>, ) -> BatchResult30 pub fn new( 31 message_reader: Option<MessageReader>, 32 initial_metadata: Option<UnownedMetadata>, 33 trailing_metadata: Option<UnownedMetadata>, 34 ) -> BatchResult { 35 let initial_metadata = if let Some(m) = initial_metadata { 36 m 37 } else { 38 UnownedMetadata::empty() 39 }; 40 let trailing_metadata = if let Some(m) = trailing_metadata { 41 m 42 } else { 43 UnownedMetadata::empty() 44 }; 45 BatchResult { 46 message_reader, 47 initial_metadata, 48 trailing_metadata, 49 } 50 } 51 } 52 53 /// A promise used to resolve batch jobs. 54 pub struct Batch { 55 ty: BatchType, 56 ctx: BatchContext, 57 inner: Arc<Inner<BatchResult>>, 58 } 59 60 impl Batch { new(ty: BatchType, inner: Arc<Inner<BatchResult>>) -> Batch61 pub fn new(ty: BatchType, inner: Arc<Inner<BatchResult>>) -> Batch { 62 Batch { 63 ty, 64 ctx: BatchContext::new(), 65 inner, 66 } 67 } 68 context(&self) -> &BatchContext69 pub fn context(&self) -> &BatchContext { 70 &self.ctx 71 } 72 read_one_msg(&mut self, success: bool)73 fn read_one_msg(&mut self, success: bool) { 74 let task = { 75 let mut guard = self.inner.lock(); 76 if success { 77 guard.set_result(Ok(BatchResult::new(self.ctx.recv_message(), None, None))) 78 } else { 79 // rely on C core to handle the failed read (e.g. deliver approriate 80 // statusCode on the clientside). 81 guard.set_result(Ok(BatchResult::new(None, None, None))) 82 } 83 }; 84 task.map(|t| t.wake()); 85 } 86 finish_response(&mut self, succeed: bool)87 fn finish_response(&mut self, succeed: bool) { 88 let task = { 89 let mut guard = self.inner.lock(); 90 if succeed { 91 let status = self.ctx.rpc_status(); 92 if status.code() == RpcStatusCode::OK { 93 guard.set_result(Ok(BatchResult::new( 94 None, 95 Some(self.ctx.take_initial_metadata()), 96 Some(self.ctx.take_trailing_metadata()), 97 ))) 98 } else { 99 guard.set_result(Err(Error::RpcFailure(status))) 100 } 101 } else { 102 guard.set_result(Err(Error::RemoteStopped)) 103 } 104 }; 105 task.map(|t| t.wake()); 106 } 107 handle_unary_response(&mut self)108 fn handle_unary_response(&mut self) { 109 let task = { 110 let mut guard = self.inner.lock(); 111 let status = self.ctx.rpc_status(); 112 if status.code() == RpcStatusCode::OK { 113 guard.set_result(Ok(BatchResult::new( 114 self.ctx.recv_message(), 115 Some(self.ctx.take_initial_metadata()), 116 Some(self.ctx.take_trailing_metadata()), 117 ))) 118 } else { 119 guard.set_result(Err(Error::RpcFailure(status))) 120 } 121 }; 122 task.map(|t| t.wake()); 123 } 124 resolve(mut self, success: bool)125 pub fn resolve(mut self, success: bool) { 126 match self.ty { 127 BatchType::CheckRead => { 128 assert!(success); 129 self.handle_unary_response(); 130 } 131 BatchType::Finish => { 132 self.finish_response(success); 133 } 134 BatchType::Read => { 135 self.read_one_msg(success); 136 } 137 } 138 } 139 } 140 141 impl Debug for Batch { fmt(&self, f: &mut Formatter<'_>) -> fmt::Result142 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { 143 write!(f, "Batch [{:?}]", self.ty) 144 } 145 } 146 147 /// A promise used to resolve async action status. 148 /// 149 /// The action can only succeed or fail without extra error hint. 150 pub struct Action { 151 inner: Arc<Inner<bool>>, 152 } 153 154 impl Action { new(inner: Arc<Inner<bool>>) -> Action155 pub fn new(inner: Arc<Inner<bool>>) -> Action { 156 Action { inner } 157 } 158 resolve(self, success: bool)159 pub fn resolve(self, success: bool) { 160 let task = { 161 let mut guard = self.inner.lock(); 162 guard.set_result(Ok(success)) 163 }; 164 task.map(|t| t.wake()); 165 } 166 } 167