1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 //! gRPC C Core binds a call to a completion queue, all the related readiness
4 //! will be forwarded to the completion queue. This module utilizes the mechanism
5 //! and using `Kicker` to wake up completion queue.
6 //!
7 //! Apparently, to minimize context switch, it's better to bind the future to the
8 //! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
9 
10 use std::cell::UnsafeCell;
11 use std::future::Future;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicU8, Ordering};
14 use std::sync::Arc;
15 use std::task::{Context, Poll};
16 
17 use futures_util::task::{waker_ref, ArcWake};
18 
19 use super::CallTag;
20 use crate::call::Call;
21 use crate::cq::{CompletionQueue, WorkQueue};
22 use crate::error::{Error, Result};
23 use crate::grpc_sys::{self, grpc_call_error};
24 
25 /// A handle to a `Spawn`.
26 /// Inner future is expected to be polled in the same thread as cq.
27 type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
28 
29 /// `Kicker` wakes up the completion queue that the inner call binds to.
30 pub(crate) struct Kicker {
31     call: Call,
32 }
33 
34 impl Kicker {
from_call(call: Call) -> Kicker35     pub fn from_call(call: Call) -> Kicker {
36         Kicker { call }
37     }
38 
39     /// Wakes up its completion queue.
40     ///
41     /// `tag` will be popped by `grpc_completion_queue_next` in the future.
kick(&self, tag: Box<CallTag>) -> Result<()>42     pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
43         let _ref = self.call.cq.borrow()?;
44         unsafe {
45             let ptr = Box::into_raw(tag);
46             let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
47             if status == grpc_call_error::GRPC_CALL_OK {
48                 Ok(())
49             } else {
50                 Err(Error::CallFailure(status))
51             }
52         }
53     }
54 }
55 
56 unsafe impl Sync for Kicker {}
57 
58 impl Clone for Kicker {
clone(&self) -> Kicker59     fn clone(&self) -> Kicker {
60         // Bump call's reference count.
61         let call = unsafe {
62             grpc_sys::grpc_call_ref(self.call.call);
63             self.call.call
64         };
65         let cq = self.call.cq.clone();
66         Kicker {
67             call: Call { call, cq },
68         }
69     }
70 }
71 
72 /// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
73 /// it will be notified via task.wake(), and marked as NOTIFIED. When executor
74 /// begins to poll the future, it's marked as POLLING. When the executor finishes
75 /// polling, the future can either be ready or not ready. In the former case, it's
76 /// marked as COMPLETED. If it's latter, it's marked as IDLE again.
77 ///
78 /// Note it's possible the future is notified during polling, in which case, executor
79 /// should polling it when last polling is finished unless it returns ready.
80 const NOTIFIED: u8 = 1;
81 const IDLE: u8 = 2;
82 const POLLING: u8 = 3;
83 const COMPLETED: u8 = 4;
84 
85 /// Maintains the spawned future with state, so that it can be notified and polled efficiently.
86 pub struct SpawnTask {
87     handle: UnsafeCell<Option<SpawnHandle>>,
88     state: AtomicU8,
89     kicker: Kicker,
90     queue: Arc<WorkQueue>,
91 }
92 
93 /// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
94 ///
95 /// Sync is required by `ArcWake`.
96 unsafe impl Sync for SpawnTask {}
97 
98 impl SpawnTask {
new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask99     fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
100         SpawnTask {
101             handle: UnsafeCell::new(Some(s)),
102             state: AtomicU8::new(IDLE),
103             kicker,
104             queue,
105         }
106     }
107 
108     /// Marks the state of this task to NOTIFIED.
109     ///
110     /// Returns true means the task was IDLE, needs to be scheduled.
mark_notified(&self) -> bool111     fn mark_notified(&self) -> bool {
112         loop {
113             match self.state.compare_exchange_weak(
114                 IDLE,
115                 NOTIFIED,
116                 Ordering::AcqRel,
117                 Ordering::Acquire,
118             ) {
119                 Ok(_) => return true,
120                 Err(POLLING) => match self.state.compare_exchange_weak(
121                     POLLING,
122                     NOTIFIED,
123                     Ordering::AcqRel,
124                     Ordering::Acquire,
125                 ) {
126                     Err(IDLE) | Err(POLLING) => continue,
127                     // If it succeeds, then executor will poll the future again;
128                     // if it fails, then the future should be resolved. In both
129                     // cases, no need to notify the future, hence return false.
130                     _ => return false,
131                 },
132                 Err(IDLE) => continue,
133                 _ => return false,
134             }
135         }
136     }
137 }
138 
resolve(task: Arc<SpawnTask>, success: bool)139 pub fn resolve(task: Arc<SpawnTask>, success: bool) {
140     // it should always be canceled for now.
141     assert!(success);
142     poll(task, true);
143 }
144 
145 /// A custom Waker.
146 ///
147 /// It will push the inner future to work_queue if it's notified on the
148 /// same thread as inner cq.
149 impl ArcWake for SpawnTask {
wake_by_ref(task: &Arc<Self>)150     fn wake_by_ref(task: &Arc<Self>) {
151         if !task.mark_notified() {
152             return;
153         }
154 
155         // It can lead to deadlock if poll the future immediately. So we need to
156         // defer the work instead.
157         if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
158             match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
159                 // If the queue is shutdown, then the tag will be notified
160                 // eventually. So just skip here.
161                 Err(Error::QueueShutdown) => (),
162                 Err(e) => panic!("unexpected error when canceling call: {:?}", e),
163                 _ => (),
164             }
165         }
166     }
167 }
168 
169 /// Work that should be deferred to be handled.
170 ///
171 /// Sometimes a work can't be done immediately as it might lead
172 /// to resource conflict, deadlock for example. So they will be
173 /// pushed into a queue and handled when current work is done.
174 pub struct UnfinishedWork(Arc<SpawnTask>);
175 
176 impl UnfinishedWork {
finish(self)177     pub fn finish(self) {
178         resolve(self.0, true);
179     }
180 }
181 
182 /// Poll the future.
183 ///
184 /// `woken` indicates that if the cq is waken up by itself.
poll(task: Arc<SpawnTask>, woken: bool)185 fn poll(task: Arc<SpawnTask>, woken: bool) {
186     let mut init_state = if woken { NOTIFIED } else { IDLE };
187     // TODO: maybe we need to break the loop to avoid hunger.
188     loop {
189         match task
190             .state
191             .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
192         {
193             Ok(_) => {}
194             Err(COMPLETED) => return,
195             Err(s) => panic!("unexpected state {}", s),
196         }
197 
198         let waker = waker_ref(&task);
199         let mut cx = Context::from_waker(&waker);
200 
201         // L208 "lock"s state, hence it's safe to get a mutable reference.
202         match unsafe { &mut *task.handle.get() }
203             .as_mut()
204             .unwrap()
205             .as_mut()
206             .poll(&mut cx)
207         {
208             Poll::Ready(()) => {
209                 task.state.store(COMPLETED, Ordering::Release);
210                 unsafe { &mut *task.handle.get() }.take();
211             }
212             _ => {
213                 match task.state.compare_exchange(
214                     POLLING,
215                     IDLE,
216                     Ordering::AcqRel,
217                     Ordering::Acquire,
218                 ) {
219                     Ok(_) => return,
220                     Err(NOTIFIED) => {
221                         init_state = NOTIFIED;
222                     }
223                     Err(s) => panic!("unexpected state {}", s),
224                 }
225             }
226         }
227     }
228 }
229 
230 /// An executor that drives a future in the gRPC poll thread, which
231 /// can reduce thread context switching.
232 pub(crate) struct Executor<'a> {
233     cq: &'a CompletionQueue,
234 }
235 
236 impl<'a> Executor<'a> {
new(cq: &CompletionQueue) -> Executor<'_>237     pub fn new(cq: &CompletionQueue) -> Executor<'_> {
238         Executor { cq }
239     }
240 
cq(&self) -> &CompletionQueue241     pub fn cq(&self) -> &CompletionQueue {
242         self.cq
243     }
244 
245     /// Spawn the future into inner poll loop.
246     ///
247     /// If you want to trace the future, you may need to create a sender/receiver
248     /// pair by yourself.
spawn<F>(&self, f: F, kicker: Kicker) where F: Future<Output = ()> + Send + 'static,249     pub fn spawn<F>(&self, f: F, kicker: Kicker)
250     where
251         F: Future<Output = ()> + Send + 'static,
252     {
253         let s = Box::pin(f);
254         let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
255         poll(notify, false)
256     }
257 }
258