1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/deadline/deadline_filter.h"
20
21 #include <functional>
22 #include <memory>
23 #include <new>
24 #include <utility>
25
26 #include "absl/status/status.h"
27 #include "absl/types/optional.h"
28
29 #include <grpc/grpc.h>
30 #include <grpc/status.h>
31 #include <grpc/support/log.h>
32
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/channel_stack_builder.h"
35 #include "src/core/lib/config/core_configuration.h"
36 #include "src/core/lib/gprpp/debug_location.h"
37 #include "src/core/lib/gprpp/status_helper.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/iomgr/timer.h"
41 #include "src/core/lib/promise/arena_promise.h"
42 #include "src/core/lib/promise/context.h"
43 #include "src/core/lib/surface/call.h"
44 #include "src/core/lib/surface/channel_init.h"
45 #include "src/core/lib/surface/channel_stack_type.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47
48 namespace grpc_core {
49
50 // A fire-and-forget class representing a pending deadline timer.
51 // Allocated on the call arena.
52 class TimerState {
53 public:
TimerState(grpc_deadline_state * deadline_state,Timestamp deadline)54 TimerState(grpc_deadline_state* deadline_state, Timestamp deadline)
55 : deadline_state_(deadline_state) {
56 GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
57 GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
58 grpc_timer_init(&timer_, deadline, &closure_);
59 }
60
Cancel()61 void Cancel() { grpc_timer_cancel(&timer_); }
62
63 private:
64 // The on_complete callback used when sending a cancel_error batch down the
65 // filter stack. Yields the call combiner when the batch returns.
YieldCallCombiner(void * arg,grpc_error_handle)66 static void YieldCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
67 TimerState* self = static_cast<TimerState*>(arg);
68 GRPC_CALL_COMBINER_STOP(self->deadline_state_->call_combiner,
69 "got on_complete from cancel_stream batch");
70 GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
71 "DeadlineTimerState");
72 }
73
74 // This is called via the call combiner, so access to deadline_state is
75 // synchronized.
SendCancelOpInCallCombiner(void * arg,grpc_error_handle error)76 static void SendCancelOpInCallCombiner(void* arg, grpc_error_handle error) {
77 TimerState* self = static_cast<TimerState*>(arg);
78 grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
79 GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
80 batch->cancel_stream = true;
81 batch->payload->cancel_stream.cancel_error = error;
82 grpc_call_element* elem = self->deadline_state_->elem;
83 elem->filter->start_transport_stream_op_batch(elem, batch);
84 }
85
86 // Timer callback.
TimerCallback(void * arg,grpc_error_handle error)87 static void TimerCallback(void* arg, grpc_error_handle error) {
88 TimerState* self = static_cast<TimerState*>(arg);
89 if (error != absl::CancelledError()) {
90 error = grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"),
91 StatusIntProperty::kRpcStatus,
92 GRPC_STATUS_DEADLINE_EXCEEDED);
93 self->deadline_state_->call_combiner->Cancel(error);
94 GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
95 nullptr);
96 GRPC_CALL_COMBINER_START(self->deadline_state_->call_combiner,
97 &self->closure_, error,
98 "deadline exceeded -- sending cancel_stream op");
99 } else {
100 GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
101 "DeadlineTimerState");
102 }
103 }
104
105 // NOTE: This object's dtor is never called, so do not add any data
106 // members that require destruction!
107 // TODO(roth): We should ideally call this object's dtor somewhere,
108 // but that would require adding more synchronization, because we'd
109 // need to call the dtor only after both (a) the timer callback
110 // finishes and (b) the filter sees the call completion and attempts
111 // to cancel the timer.
112 grpc_deadline_state* deadline_state_;
113 grpc_timer timer_;
114 grpc_closure closure_;
115 };
116
117 } // namespace grpc_core
118
119 //
120 // grpc_deadline_state
121 //
122
123 // Starts the deadline timer.
124 // This is called via the call combiner, so access to deadline_state is
125 // synchronized.
start_timer_if_needed(grpc_deadline_state * deadline_state,grpc_core::Timestamp deadline)126 static void start_timer_if_needed(grpc_deadline_state* deadline_state,
127 grpc_core::Timestamp deadline) {
128 if (deadline == grpc_core::Timestamp::InfFuture()) return;
129 GPR_ASSERT(deadline_state->timer_state == nullptr);
130 deadline_state->timer_state =
131 deadline_state->arena->New<grpc_core::TimerState>(deadline_state,
132 deadline);
133 }
134
135 // Cancels the deadline timer.
136 // This is called via the call combiner, so access to deadline_state is
137 // synchronized.
cancel_timer_if_needed(grpc_deadline_state * deadline_state)138 static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
139 if (deadline_state->timer_state != nullptr) {
140 deadline_state->timer_state->Cancel();
141 deadline_state->timer_state = nullptr;
142 }
143 }
144
145 // Callback run when we receive trailing metadata.
recv_trailing_metadata_ready(void * arg,grpc_error_handle error)146 static void recv_trailing_metadata_ready(void* arg, grpc_error_handle error) {
147 grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
148 cancel_timer_if_needed(deadline_state);
149 // Invoke the original callback.
150 grpc_core::Closure::Run(DEBUG_LOCATION,
151 deadline_state->original_recv_trailing_metadata_ready,
152 error);
153 }
154
155 // Inject our own recv_trailing_metadata_ready callback into op.
inject_recv_trailing_metadata_ready(grpc_deadline_state * deadline_state,grpc_transport_stream_op_batch * op)156 static void inject_recv_trailing_metadata_ready(
157 grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
158 deadline_state->original_recv_trailing_metadata_ready =
159 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
160 GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
161 recv_trailing_metadata_ready, deadline_state,
162 grpc_schedule_on_exec_ctx);
163 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
164 &deadline_state->recv_trailing_metadata_ready;
165 }
166
167 // Callback and associated state for starting the timer after call stack
168 // initialization has been completed.
169 struct start_timer_after_init_state {
start_timer_after_init_statestart_timer_after_init_state170 start_timer_after_init_state(grpc_deadline_state* deadline_state,
171 grpc_core::Timestamp deadline)
172 : deadline_state(deadline_state), deadline(deadline) {}
~start_timer_after_init_statestart_timer_after_init_state173 ~start_timer_after_init_state() {
174 start_timer_if_needed(deadline_state, deadline);
175 }
176
177 bool in_call_combiner = false;
178 grpc_deadline_state* deadline_state;
179 grpc_core::Timestamp deadline;
180 grpc_closure closure;
181 };
start_timer_after_init(void * arg,grpc_error_handle error)182 static void start_timer_after_init(void* arg, grpc_error_handle error) {
183 struct start_timer_after_init_state* state =
184 static_cast<struct start_timer_after_init_state*>(arg);
185 grpc_deadline_state* deadline_state = state->deadline_state;
186 if (!state->in_call_combiner) {
187 // We are initially called without holding the call combiner, so we
188 // need to bounce ourselves into it.
189 state->in_call_combiner = true;
190 GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
191 error, "scheduling deadline timer");
192 return;
193 }
194 delete state;
195 GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
196 "done scheduling deadline timer");
197 }
198
grpc_deadline_state(grpc_call_element * elem,const grpc_call_element_args & args,grpc_core::Timestamp deadline)199 grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
200 const grpc_call_element_args& args,
201 grpc_core::Timestamp deadline)
202 : elem(elem),
203 call_stack(args.call_stack),
204 call_combiner(args.call_combiner),
205 arena(args.arena) {
206 // Deadline will always be infinite on servers, so the timer will only be
207 // set on clients with a finite deadline.
208 if (deadline != grpc_core::Timestamp::InfFuture()) {
209 // When the deadline passes, we indicate the failure by sending down
210 // an op with cancel_error set. However, we can't send down any ops
211 // until after the call stack is fully initialized. If we start the
212 // timer here, we have no guarantee that the timer won't pop before
213 // call stack initialization is finished. To avoid that problem, we
214 // create a closure to start the timer, and we schedule that closure
215 // to be run after call stack initialization is done.
216 struct start_timer_after_init_state* state =
217 new start_timer_after_init_state(this, deadline);
218 GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
219 grpc_schedule_on_exec_ctx);
220 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, absl::OkStatus());
221 }
222 }
223
~grpc_deadline_state()224 grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
225
grpc_deadline_state_reset(grpc_deadline_state * deadline_state,grpc_core::Timestamp new_deadline)226 void grpc_deadline_state_reset(grpc_deadline_state* deadline_state,
227 grpc_core::Timestamp new_deadline) {
228 cancel_timer_if_needed(deadline_state);
229 start_timer_if_needed(deadline_state, new_deadline);
230 }
231
grpc_deadline_state_client_start_transport_stream_op_batch(grpc_deadline_state * deadline_state,grpc_transport_stream_op_batch * op)232 void grpc_deadline_state_client_start_transport_stream_op_batch(
233 grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
234 if (op->cancel_stream) {
235 cancel_timer_if_needed(deadline_state);
236 } else {
237 // Make sure we know when the call is complete, so that we can cancel
238 // the timer.
239 if (op->recv_trailing_metadata) {
240 inject_recv_trailing_metadata_ready(deadline_state, op);
241 }
242 }
243 }
244
245 //
246 // filter code
247 //
248
249 // Constructor for channel_data. Used for both client and server filters.
deadline_init_channel_elem(grpc_channel_element *,grpc_channel_element_args * args)250 static grpc_error_handle deadline_init_channel_elem(
251 grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
252 GPR_ASSERT(!args->is_last);
253 return absl::OkStatus();
254 }
255
256 // Destructor for channel_data. Used for both client and server filters.
deadline_destroy_channel_elem(grpc_channel_element *)257 static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
258
259 // Additional call data used only for the server filter.
260 struct server_call_data {
261 grpc_deadline_state deadline_state; // Must be first.
262 // The closure for receiving initial metadata.
263 grpc_closure recv_initial_metadata_ready;
264 // Received initial metadata batch.
265 grpc_metadata_batch* recv_initial_metadata;
266 // The original recv_initial_metadata_ready closure, which we chain to
267 // after our own closure is invoked.
268 grpc_closure* next_recv_initial_metadata_ready;
269 };
270
271 // Constructor for call_data. Used for both client and server filters.
deadline_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)272 static grpc_error_handle deadline_init_call_elem(
273 grpc_call_element* elem, const grpc_call_element_args* args) {
274 new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline);
275 return absl::OkStatus();
276 }
277
278 // Destructor for call_data. Used for both client and server filters.
deadline_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)279 static void deadline_destroy_call_elem(
280 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
281 grpc_closure* /*ignored*/) {
282 grpc_deadline_state* deadline_state =
283 static_cast<grpc_deadline_state*>(elem->call_data);
284 deadline_state->~grpc_deadline_state();
285 }
286
287 // Method for starting a call op for client filter.
deadline_client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)288 static void deadline_client_start_transport_stream_op_batch(
289 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
290 grpc_deadline_state_client_start_transport_stream_op_batch(
291 static_cast<grpc_deadline_state*>(elem->call_data), op);
292 // Chain to next filter.
293 grpc_call_next_op(elem, op);
294 }
295
296 // Callback for receiving initial metadata on the server.
recv_initial_metadata_ready(void * arg,grpc_error_handle error)297 static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) {
298 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
299 server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
300 start_timer_if_needed(
301 &calld->deadline_state,
302 calld->recv_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
303 .value_or(grpc_core::Timestamp::InfFuture()));
304 // Invoke the next callback.
305 grpc_core::Closure::Run(DEBUG_LOCATION,
306 calld->next_recv_initial_metadata_ready, error);
307 }
308
309 // Method for starting a call op for server filter.
deadline_server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)310 static void deadline_server_start_transport_stream_op_batch(
311 grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
312 server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
313 if (op->cancel_stream) {
314 cancel_timer_if_needed(&calld->deadline_state);
315 } else {
316 // If we're receiving initial metadata, we need to get the deadline
317 // from the recv_initial_metadata_ready callback. So we inject our
318 // own callback into that hook.
319 if (op->recv_initial_metadata) {
320 calld->next_recv_initial_metadata_ready =
321 op->payload->recv_initial_metadata.recv_initial_metadata_ready;
322 calld->recv_initial_metadata =
323 op->payload->recv_initial_metadata.recv_initial_metadata;
324 GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
325 recv_initial_metadata_ready, elem,
326 grpc_schedule_on_exec_ctx);
327 op->payload->recv_initial_metadata.recv_initial_metadata_ready =
328 &calld->recv_initial_metadata_ready;
329 }
330 // Make sure we know when the call is complete, so that we can cancel
331 // the timer.
332 // Note that we trigger this on recv_trailing_metadata, even though
333 // the client never sends trailing metadata, because this is the
334 // hook that tells us when the call is complete on the server side.
335 if (op->recv_trailing_metadata) {
336 inject_recv_trailing_metadata_ready(&calld->deadline_state, op);
337 }
338 }
339 // Chain to next filter.
340 grpc_call_next_op(elem, op);
341 }
342
343 const grpc_channel_filter grpc_client_deadline_filter = {
344 deadline_client_start_transport_stream_op_batch,
345 [](grpc_channel_element*, grpc_core::CallArgs call_args,
__anonccf856fe0102() 346 grpc_core::NextPromiseFactory next_promise_factory) {
347 return next_promise_factory(std::move(call_args));
348 },
349 grpc_channel_next_op,
350 sizeof(grpc_deadline_state),
351 deadline_init_call_elem,
352 grpc_call_stack_ignore_set_pollset_or_pollset_set,
353 deadline_destroy_call_elem,
354 0, // sizeof(channel_data)
355 deadline_init_channel_elem,
356 grpc_channel_stack_no_post_init,
357 deadline_destroy_channel_elem,
358 grpc_channel_next_get_info,
359 "deadline",
360 };
361
362 const grpc_channel_filter grpc_server_deadline_filter = {
363 deadline_server_start_transport_stream_op_batch,
364 [](grpc_channel_element*, grpc_core::CallArgs call_args,
__anonccf856fe0202() 365 grpc_core::NextPromiseFactory next_promise_factory) {
366 auto deadline = call_args.client_initial_metadata->get(
367 grpc_core::GrpcTimeoutMetadata());
368 if (deadline.has_value()) {
369 grpc_core::GetContext<grpc_core::CallContext>()->UpdateDeadline(
370 *deadline);
371 }
372 return next_promise_factory(std::move(call_args));
373 },
374 grpc_channel_next_op,
375 sizeof(server_call_data),
376 deadline_init_call_elem,
377 grpc_call_stack_ignore_set_pollset_or_pollset_set,
378 deadline_destroy_call_elem,
379 0, // sizeof(channel_data)
380 deadline_init_channel_elem,
381 grpc_channel_stack_no_post_init,
382 deadline_destroy_channel_elem,
383 grpc_channel_next_get_info,
384 "deadline",
385 };
386
grpc_deadline_checking_enabled(const grpc_core::ChannelArgs & channel_args)387 bool grpc_deadline_checking_enabled(
388 const grpc_core::ChannelArgs& channel_args) {
389 return channel_args.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
390 .value_or(!channel_args.WantMinimalStack());
391 }
392
393 namespace grpc_core {
RegisterDeadlineFilter(CoreConfiguration::Builder * builder)394 void RegisterDeadlineFilter(CoreConfiguration::Builder* builder) {
395 auto register_filter = [builder](grpc_channel_stack_type type,
396 const grpc_channel_filter* filter) {
397 builder->channel_init()->RegisterStage(
398 type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
399 [filter](ChannelStackBuilder* builder) {
400 auto args = builder->channel_args();
401 if (grpc_deadline_checking_enabled(args)) {
402 builder->PrependFilter(filter);
403 }
404 return true;
405 });
406 };
407 register_filter(GRPC_CLIENT_DIRECT_CHANNEL, &grpc_client_deadline_filter);
408 register_filter(GRPC_SERVER_CHANNEL, &grpc_server_deadline_filter);
409 }
410 } // namespace grpc_core
411