1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 //! gRPC C Core binds a call to a completion queue, all the related readiness
4 //! will be forwarded to the completion queue. This module utilizes the mechanism
5 //! and using `Kicker` to wake up completion queue.
6 //!
7 //! Apparently, to minimize context switch, it's better to bind the future to the
8 //! same completion queue as its inner call. Hence method `Executor::spawn` is provided.
9
10 use std::cell::UnsafeCell;
11 use std::future::Future;
12 use std::pin::Pin;
13 use std::sync::atomic::{AtomicU8, Ordering};
14 use std::sync::Arc;
15 use std::task::{Context, Poll};
16
17 use futures_util::task::{waker_ref, ArcWake};
18
19 use super::CallTag;
20 use crate::call::Call;
21 use crate::cq::{CompletionQueue, WorkQueue};
22 use crate::error::{Error, Result};
23 use crate::grpc_sys::{self, grpc_call_error};
24
25 /// A handle to a `Spawn`.
26 /// Inner future is expected to be polled in the same thread as cq.
27 type SpawnHandle = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
28
29 /// `Kicker` wakes up the completion queue that the inner call binds to.
30 pub(crate) struct Kicker {
31 call: Call,
32 }
33
34 impl Kicker {
from_call(call: Call) -> Kicker35 pub fn from_call(call: Call) -> Kicker {
36 Kicker { call }
37 }
38
39 /// Wakes up its completion queue.
40 ///
41 /// `tag` will be popped by `grpc_completion_queue_next` in the future.
kick(&self, tag: Box<CallTag>) -> Result<()>42 pub fn kick(&self, tag: Box<CallTag>) -> Result<()> {
43 let _ref = self.call.cq.borrow()?;
44 unsafe {
45 let ptr = Box::into_raw(tag);
46 let status = grpc_sys::grpcwrap_call_kick_completion_queue(self.call.call, ptr as _);
47 if status == grpc_call_error::GRPC_CALL_OK {
48 Ok(())
49 } else {
50 Err(Error::CallFailure(status))
51 }
52 }
53 }
54 }
55
56 unsafe impl Sync for Kicker {}
57
58 impl Clone for Kicker {
clone(&self) -> Kicker59 fn clone(&self) -> Kicker {
60 // Bump call's reference count.
61 let call = unsafe {
62 grpc_sys::grpc_call_ref(self.call.call);
63 self.call.call
64 };
65 let cq = self.call.cq.clone();
66 Kicker {
67 call: Call { call, cq },
68 }
69 }
70 }
71
72 /// When a future is scheduled, it becomes IDLE. When it's ready to be polled,
73 /// it will be notified via task.wake(), and marked as NOTIFIED. When executor
74 /// begins to poll the future, it's marked as POLLING. When the executor finishes
75 /// polling, the future can either be ready or not ready. In the former case, it's
76 /// marked as COMPLETED. If it's latter, it's marked as IDLE again.
77 ///
78 /// Note it's possible the future is notified during polling, in which case, executor
79 /// should polling it when last polling is finished unless it returns ready.
80 const NOTIFIED: u8 = 1;
81 const IDLE: u8 = 2;
82 const POLLING: u8 = 3;
83 const COMPLETED: u8 = 4;
84
85 /// Maintains the spawned future with state, so that it can be notified and polled efficiently.
86 pub struct SpawnTask {
87 handle: UnsafeCell<Option<SpawnHandle>>,
88 state: AtomicU8,
89 kicker: Kicker,
90 queue: Arc<WorkQueue>,
91 }
92
93 /// `SpawnTask` access is guarded by `state` field, which guarantees Sync.
94 ///
95 /// Sync is required by `ArcWake`.
96 unsafe impl Sync for SpawnTask {}
97
98 impl SpawnTask {
new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask99 fn new(s: SpawnHandle, kicker: Kicker, queue: Arc<WorkQueue>) -> SpawnTask {
100 SpawnTask {
101 handle: UnsafeCell::new(Some(s)),
102 state: AtomicU8::new(IDLE),
103 kicker,
104 queue,
105 }
106 }
107
108 /// Marks the state of this task to NOTIFIED.
109 ///
110 /// Returns true means the task was IDLE, needs to be scheduled.
mark_notified(&self) -> bool111 fn mark_notified(&self) -> bool {
112 loop {
113 match self.state.compare_exchange_weak(
114 IDLE,
115 NOTIFIED,
116 Ordering::AcqRel,
117 Ordering::Acquire,
118 ) {
119 Ok(_) => return true,
120 Err(POLLING) => match self.state.compare_exchange_weak(
121 POLLING,
122 NOTIFIED,
123 Ordering::AcqRel,
124 Ordering::Acquire,
125 ) {
126 Err(IDLE) | Err(POLLING) => continue,
127 // If it succeeds, then executor will poll the future again;
128 // if it fails, then the future should be resolved. In both
129 // cases, no need to notify the future, hence return false.
130 _ => return false,
131 },
132 Err(IDLE) => continue,
133 _ => return false,
134 }
135 }
136 }
137 }
138
resolve(task: Arc<SpawnTask>, success: bool)139 pub fn resolve(task: Arc<SpawnTask>, success: bool) {
140 // it should always be canceled for now.
141 assert!(success);
142 poll(task, true);
143 }
144
145 /// A custom Waker.
146 ///
147 /// It will push the inner future to work_queue if it's notified on the
148 /// same thread as inner cq.
149 impl ArcWake for SpawnTask {
wake_by_ref(task: &Arc<Self>)150 fn wake_by_ref(task: &Arc<Self>) {
151 if !task.mark_notified() {
152 return;
153 }
154
155 // It can lead to deadlock if poll the future immediately. So we need to
156 // defer the work instead.
157 if let Some(UnfinishedWork(w)) = task.queue.push_work(UnfinishedWork(task.clone())) {
158 match task.kicker.kick(Box::new(CallTag::Spawn(w))) {
159 // If the queue is shutdown, then the tag will be notified
160 // eventually. So just skip here.
161 Err(Error::QueueShutdown) => (),
162 Err(e) => panic!("unexpected error when canceling call: {:?}", e),
163 _ => (),
164 }
165 }
166 }
167 }
168
169 /// Work that should be deferred to be handled.
170 ///
171 /// Sometimes a work can't be done immediately as it might lead
172 /// to resource conflict, deadlock for example. So they will be
173 /// pushed into a queue and handled when current work is done.
174 pub struct UnfinishedWork(Arc<SpawnTask>);
175
176 impl UnfinishedWork {
finish(self)177 pub fn finish(self) {
178 resolve(self.0, true);
179 }
180 }
181
182 /// Poll the future.
183 ///
184 /// `woken` indicates that if the cq is waken up by itself.
poll(task: Arc<SpawnTask>, woken: bool)185 fn poll(task: Arc<SpawnTask>, woken: bool) {
186 let mut init_state = if woken { NOTIFIED } else { IDLE };
187 // TODO: maybe we need to break the loop to avoid hunger.
188 loop {
189 match task
190 .state
191 .compare_exchange(init_state, POLLING, Ordering::AcqRel, Ordering::Acquire)
192 {
193 Ok(_) => {}
194 Err(COMPLETED) => return,
195 Err(s) => panic!("unexpected state {}", s),
196 }
197
198 let waker = waker_ref(&task);
199 let mut cx = Context::from_waker(&waker);
200
201 // L208 "lock"s state, hence it's safe to get a mutable reference.
202 match unsafe { &mut *task.handle.get() }
203 .as_mut()
204 .unwrap()
205 .as_mut()
206 .poll(&mut cx)
207 {
208 Poll::Ready(()) => {
209 task.state.store(COMPLETED, Ordering::Release);
210 unsafe { &mut *task.handle.get() }.take();
211 }
212 _ => {
213 match task.state.compare_exchange(
214 POLLING,
215 IDLE,
216 Ordering::AcqRel,
217 Ordering::Acquire,
218 ) {
219 Ok(_) => return,
220 Err(NOTIFIED) => {
221 init_state = NOTIFIED;
222 }
223 Err(s) => panic!("unexpected state {}", s),
224 }
225 }
226 }
227 }
228 }
229
230 /// An executor that drives a future in the gRPC poll thread, which
231 /// can reduce thread context switching.
232 pub(crate) struct Executor<'a> {
233 cq: &'a CompletionQueue,
234 }
235
236 impl<'a> Executor<'a> {
new(cq: &CompletionQueue) -> Executor<'_>237 pub fn new(cq: &CompletionQueue) -> Executor<'_> {
238 Executor { cq }
239 }
240
cq(&self) -> &CompletionQueue241 pub fn cq(&self) -> &CompletionQueue {
242 self.cq
243 }
244
245 /// Spawn the future into inner poll loop.
246 ///
247 /// If you want to trace the future, you may need to create a sender/receiver
248 /// pair by yourself.
spawn<F>(&self, f: F, kicker: Kicker) where F: Future<Output = ()> + Send + 'static,249 pub fn spawn<F>(&self, f: F, kicker: Kicker)
250 where
251 F: Future<Output = ()> + Send + 'static,
252 {
253 let s = Box::pin(f);
254 let notify = Arc::new(SpawnTask::new(s, kicker, self.cq.worker.clone()));
255 poll(notify, false)
256 }
257 }
258