1 //
2 // Copyright 2016 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/deadline/deadline_filter.h"
20 
21 #include <functional>
22 #include <memory>
23 #include <new>
24 #include <utility>
25 
26 #include "absl/status/status.h"
27 #include "absl/types/optional.h"
28 
29 #include <grpc/grpc.h>
30 #include <grpc/status.h>
31 #include <grpc/support/log.h>
32 
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/channel_stack_builder.h"
35 #include "src/core/lib/config/core_configuration.h"
36 #include "src/core/lib/gprpp/debug_location.h"
37 #include "src/core/lib/gprpp/status_helper.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/iomgr/timer.h"
41 #include "src/core/lib/promise/arena_promise.h"
42 #include "src/core/lib/promise/context.h"
43 #include "src/core/lib/surface/call.h"
44 #include "src/core/lib/surface/channel_init.h"
45 #include "src/core/lib/surface/channel_stack_type.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47 
48 namespace grpc_core {
49 
50 // A fire-and-forget class representing a pending deadline timer.
51 // Allocated on the call arena.
52 class TimerState {
53  public:
TimerState(grpc_deadline_state * deadline_state,Timestamp deadline)54   TimerState(grpc_deadline_state* deadline_state, Timestamp deadline)
55       : deadline_state_(deadline_state) {
56     GRPC_CALL_STACK_REF(deadline_state->call_stack, "DeadlineTimerState");
57     GRPC_CLOSURE_INIT(&closure_, TimerCallback, this, nullptr);
58     grpc_timer_init(&timer_, deadline, &closure_);
59   }
60 
Cancel()61   void Cancel() { grpc_timer_cancel(&timer_); }
62 
63  private:
64   // The on_complete callback used when sending a cancel_error batch down the
65   // filter stack.  Yields the call combiner when the batch returns.
YieldCallCombiner(void * arg,grpc_error_handle)66   static void YieldCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
67     TimerState* self = static_cast<TimerState*>(arg);
68     GRPC_CALL_COMBINER_STOP(self->deadline_state_->call_combiner,
69                             "got on_complete from cancel_stream batch");
70     GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
71                           "DeadlineTimerState");
72   }
73 
74   // This is called via the call combiner, so access to deadline_state is
75   // synchronized.
SendCancelOpInCallCombiner(void * arg,grpc_error_handle error)76   static void SendCancelOpInCallCombiner(void* arg, grpc_error_handle error) {
77     TimerState* self = static_cast<TimerState*>(arg);
78     grpc_transport_stream_op_batch* batch = grpc_make_transport_stream_op(
79         GRPC_CLOSURE_INIT(&self->closure_, YieldCallCombiner, self, nullptr));
80     batch->cancel_stream = true;
81     batch->payload->cancel_stream.cancel_error = error;
82     grpc_call_element* elem = self->deadline_state_->elem;
83     elem->filter->start_transport_stream_op_batch(elem, batch);
84   }
85 
86   // Timer callback.
TimerCallback(void * arg,grpc_error_handle error)87   static void TimerCallback(void* arg, grpc_error_handle error) {
88     TimerState* self = static_cast<TimerState*>(arg);
89     if (error != absl::CancelledError()) {
90       error = grpc_error_set_int(GRPC_ERROR_CREATE("Deadline Exceeded"),
91                                  StatusIntProperty::kRpcStatus,
92                                  GRPC_STATUS_DEADLINE_EXCEEDED);
93       self->deadline_state_->call_combiner->Cancel(error);
94       GRPC_CLOSURE_INIT(&self->closure_, SendCancelOpInCallCombiner, self,
95                         nullptr);
96       GRPC_CALL_COMBINER_START(self->deadline_state_->call_combiner,
97                                &self->closure_, error,
98                                "deadline exceeded -- sending cancel_stream op");
99     } else {
100       GRPC_CALL_STACK_UNREF(self->deadline_state_->call_stack,
101                             "DeadlineTimerState");
102     }
103   }
104 
105   // NOTE: This object's dtor is never called, so do not add any data
106   // members that require destruction!
107   // TODO(roth): We should ideally call this object's dtor somewhere,
108   // but that would require adding more synchronization, because we'd
109   // need to call the dtor only after both (a) the timer callback
110   // finishes and (b) the filter sees the call completion and attempts
111   // to cancel the timer.
112   grpc_deadline_state* deadline_state_;
113   grpc_timer timer_;
114   grpc_closure closure_;
115 };
116 
117 }  // namespace grpc_core
118 
119 //
120 // grpc_deadline_state
121 //
122 
123 // Starts the deadline timer.
124 // This is called via the call combiner, so access to deadline_state is
125 // synchronized.
start_timer_if_needed(grpc_deadline_state * deadline_state,grpc_core::Timestamp deadline)126 static void start_timer_if_needed(grpc_deadline_state* deadline_state,
127                                   grpc_core::Timestamp deadline) {
128   if (deadline == grpc_core::Timestamp::InfFuture()) return;
129   GPR_ASSERT(deadline_state->timer_state == nullptr);
130   deadline_state->timer_state =
131       deadline_state->arena->New<grpc_core::TimerState>(deadline_state,
132                                                         deadline);
133 }
134 
135 // Cancels the deadline timer.
136 // This is called via the call combiner, so access to deadline_state is
137 // synchronized.
cancel_timer_if_needed(grpc_deadline_state * deadline_state)138 static void cancel_timer_if_needed(grpc_deadline_state* deadline_state) {
139   if (deadline_state->timer_state != nullptr) {
140     deadline_state->timer_state->Cancel();
141     deadline_state->timer_state = nullptr;
142   }
143 }
144 
145 // Callback run when we receive trailing metadata.
recv_trailing_metadata_ready(void * arg,grpc_error_handle error)146 static void recv_trailing_metadata_ready(void* arg, grpc_error_handle error) {
147   grpc_deadline_state* deadline_state = static_cast<grpc_deadline_state*>(arg);
148   cancel_timer_if_needed(deadline_state);
149   // Invoke the original callback.
150   grpc_core::Closure::Run(DEBUG_LOCATION,
151                           deadline_state->original_recv_trailing_metadata_ready,
152                           error);
153 }
154 
155 // Inject our own recv_trailing_metadata_ready callback into op.
inject_recv_trailing_metadata_ready(grpc_deadline_state * deadline_state,grpc_transport_stream_op_batch * op)156 static void inject_recv_trailing_metadata_ready(
157     grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
158   deadline_state->original_recv_trailing_metadata_ready =
159       op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
160   GRPC_CLOSURE_INIT(&deadline_state->recv_trailing_metadata_ready,
161                     recv_trailing_metadata_ready, deadline_state,
162                     grpc_schedule_on_exec_ctx);
163   op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
164       &deadline_state->recv_trailing_metadata_ready;
165 }
166 
167 // Callback and associated state for starting the timer after call stack
168 // initialization has been completed.
169 struct start_timer_after_init_state {
start_timer_after_init_statestart_timer_after_init_state170   start_timer_after_init_state(grpc_deadline_state* deadline_state,
171                                grpc_core::Timestamp deadline)
172       : deadline_state(deadline_state), deadline(deadline) {}
~start_timer_after_init_statestart_timer_after_init_state173   ~start_timer_after_init_state() {
174     start_timer_if_needed(deadline_state, deadline);
175   }
176 
177   bool in_call_combiner = false;
178   grpc_deadline_state* deadline_state;
179   grpc_core::Timestamp deadline;
180   grpc_closure closure;
181 };
start_timer_after_init(void * arg,grpc_error_handle error)182 static void start_timer_after_init(void* arg, grpc_error_handle error) {
183   struct start_timer_after_init_state* state =
184       static_cast<struct start_timer_after_init_state*>(arg);
185   grpc_deadline_state* deadline_state = state->deadline_state;
186   if (!state->in_call_combiner) {
187     // We are initially called without holding the call combiner, so we
188     // need to bounce ourselves into it.
189     state->in_call_combiner = true;
190     GRPC_CALL_COMBINER_START(deadline_state->call_combiner, &state->closure,
191                              error, "scheduling deadline timer");
192     return;
193   }
194   delete state;
195   GRPC_CALL_COMBINER_STOP(deadline_state->call_combiner,
196                           "done scheduling deadline timer");
197 }
198 
grpc_deadline_state(grpc_call_element * elem,const grpc_call_element_args & args,grpc_core::Timestamp deadline)199 grpc_deadline_state::grpc_deadline_state(grpc_call_element* elem,
200                                          const grpc_call_element_args& args,
201                                          grpc_core::Timestamp deadline)
202     : elem(elem),
203       call_stack(args.call_stack),
204       call_combiner(args.call_combiner),
205       arena(args.arena) {
206   // Deadline will always be infinite on servers, so the timer will only be
207   // set on clients with a finite deadline.
208   if (deadline != grpc_core::Timestamp::InfFuture()) {
209     // When the deadline passes, we indicate the failure by sending down
210     // an op with cancel_error set.  However, we can't send down any ops
211     // until after the call stack is fully initialized.  If we start the
212     // timer here, we have no guarantee that the timer won't pop before
213     // call stack initialization is finished.  To avoid that problem, we
214     // create a closure to start the timer, and we schedule that closure
215     // to be run after call stack initialization is done.
216     struct start_timer_after_init_state* state =
217         new start_timer_after_init_state(this, deadline);
218     GRPC_CLOSURE_INIT(&state->closure, start_timer_after_init, state,
219                       grpc_schedule_on_exec_ctx);
220     grpc_core::ExecCtx::Run(DEBUG_LOCATION, &state->closure, absl::OkStatus());
221   }
222 }
223 
~grpc_deadline_state()224 grpc_deadline_state::~grpc_deadline_state() { cancel_timer_if_needed(this); }
225 
grpc_deadline_state_reset(grpc_deadline_state * deadline_state,grpc_core::Timestamp new_deadline)226 void grpc_deadline_state_reset(grpc_deadline_state* deadline_state,
227                                grpc_core::Timestamp new_deadline) {
228   cancel_timer_if_needed(deadline_state);
229   start_timer_if_needed(deadline_state, new_deadline);
230 }
231 
grpc_deadline_state_client_start_transport_stream_op_batch(grpc_deadline_state * deadline_state,grpc_transport_stream_op_batch * op)232 void grpc_deadline_state_client_start_transport_stream_op_batch(
233     grpc_deadline_state* deadline_state, grpc_transport_stream_op_batch* op) {
234   if (op->cancel_stream) {
235     cancel_timer_if_needed(deadline_state);
236   } else {
237     // Make sure we know when the call is complete, so that we can cancel
238     // the timer.
239     if (op->recv_trailing_metadata) {
240       inject_recv_trailing_metadata_ready(deadline_state, op);
241     }
242   }
243 }
244 
245 //
246 // filter code
247 //
248 
249 // Constructor for channel_data.  Used for both client and server filters.
deadline_init_channel_elem(grpc_channel_element *,grpc_channel_element_args * args)250 static grpc_error_handle deadline_init_channel_elem(
251     grpc_channel_element* /*elem*/, grpc_channel_element_args* args) {
252   GPR_ASSERT(!args->is_last);
253   return absl::OkStatus();
254 }
255 
256 // Destructor for channel_data.  Used for both client and server filters.
deadline_destroy_channel_elem(grpc_channel_element *)257 static void deadline_destroy_channel_elem(grpc_channel_element* /*elem*/) {}
258 
259 // Additional call data used only for the server filter.
260 struct server_call_data {
261   grpc_deadline_state deadline_state;  // Must be first.
262   // The closure for receiving initial metadata.
263   grpc_closure recv_initial_metadata_ready;
264   // Received initial metadata batch.
265   grpc_metadata_batch* recv_initial_metadata;
266   // The original recv_initial_metadata_ready closure, which we chain to
267   // after our own closure is invoked.
268   grpc_closure* next_recv_initial_metadata_ready;
269 };
270 
271 // Constructor for call_data.  Used for both client and server filters.
deadline_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)272 static grpc_error_handle deadline_init_call_elem(
273     grpc_call_element* elem, const grpc_call_element_args* args) {
274   new (elem->call_data) grpc_deadline_state(elem, *args, args->deadline);
275   return absl::OkStatus();
276 }
277 
278 // Destructor for call_data.  Used for both client and server filters.
deadline_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)279 static void deadline_destroy_call_elem(
280     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
281     grpc_closure* /*ignored*/) {
282   grpc_deadline_state* deadline_state =
283       static_cast<grpc_deadline_state*>(elem->call_data);
284   deadline_state->~grpc_deadline_state();
285 }
286 
287 // Method for starting a call op for client filter.
deadline_client_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)288 static void deadline_client_start_transport_stream_op_batch(
289     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
290   grpc_deadline_state_client_start_transport_stream_op_batch(
291       static_cast<grpc_deadline_state*>(elem->call_data), op);
292   // Chain to next filter.
293   grpc_call_next_op(elem, op);
294 }
295 
296 // Callback for receiving initial metadata on the server.
recv_initial_metadata_ready(void * arg,grpc_error_handle error)297 static void recv_initial_metadata_ready(void* arg, grpc_error_handle error) {
298   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
299   server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
300   start_timer_if_needed(
301       &calld->deadline_state,
302       calld->recv_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
303           .value_or(grpc_core::Timestamp::InfFuture()));
304   // Invoke the next callback.
305   grpc_core::Closure::Run(DEBUG_LOCATION,
306                           calld->next_recv_initial_metadata_ready, error);
307 }
308 
309 // Method for starting a call op for server filter.
deadline_server_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * op)310 static void deadline_server_start_transport_stream_op_batch(
311     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
312   server_call_data* calld = static_cast<server_call_data*>(elem->call_data);
313   if (op->cancel_stream) {
314     cancel_timer_if_needed(&calld->deadline_state);
315   } else {
316     // If we're receiving initial metadata, we need to get the deadline
317     // from the recv_initial_metadata_ready callback.  So we inject our
318     // own callback into that hook.
319     if (op->recv_initial_metadata) {
320       calld->next_recv_initial_metadata_ready =
321           op->payload->recv_initial_metadata.recv_initial_metadata_ready;
322       calld->recv_initial_metadata =
323           op->payload->recv_initial_metadata.recv_initial_metadata;
324       GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
325                         recv_initial_metadata_ready, elem,
326                         grpc_schedule_on_exec_ctx);
327       op->payload->recv_initial_metadata.recv_initial_metadata_ready =
328           &calld->recv_initial_metadata_ready;
329     }
330     // Make sure we know when the call is complete, so that we can cancel
331     // the timer.
332     // Note that we trigger this on recv_trailing_metadata, even though
333     // the client never sends trailing metadata, because this is the
334     // hook that tells us when the call is complete on the server side.
335     if (op->recv_trailing_metadata) {
336       inject_recv_trailing_metadata_ready(&calld->deadline_state, op);
337     }
338   }
339   // Chain to next filter.
340   grpc_call_next_op(elem, op);
341 }
342 
343 const grpc_channel_filter grpc_client_deadline_filter = {
344     deadline_client_start_transport_stream_op_batch,
345     [](grpc_channel_element*, grpc_core::CallArgs call_args,
__anonccf856fe0102() 346        grpc_core::NextPromiseFactory next_promise_factory) {
347       return next_promise_factory(std::move(call_args));
348     },
349     grpc_channel_next_op,
350     sizeof(grpc_deadline_state),
351     deadline_init_call_elem,
352     grpc_call_stack_ignore_set_pollset_or_pollset_set,
353     deadline_destroy_call_elem,
354     0,  // sizeof(channel_data)
355     deadline_init_channel_elem,
356     grpc_channel_stack_no_post_init,
357     deadline_destroy_channel_elem,
358     grpc_channel_next_get_info,
359     "deadline",
360 };
361 
362 const grpc_channel_filter grpc_server_deadline_filter = {
363     deadline_server_start_transport_stream_op_batch,
364     [](grpc_channel_element*, grpc_core::CallArgs call_args,
__anonccf856fe0202() 365        grpc_core::NextPromiseFactory next_promise_factory) {
366       auto deadline = call_args.client_initial_metadata->get(
367           grpc_core::GrpcTimeoutMetadata());
368       if (deadline.has_value()) {
369         grpc_core::GetContext<grpc_core::CallContext>()->UpdateDeadline(
370             *deadline);
371       }
372       return next_promise_factory(std::move(call_args));
373     },
374     grpc_channel_next_op,
375     sizeof(server_call_data),
376     deadline_init_call_elem,
377     grpc_call_stack_ignore_set_pollset_or_pollset_set,
378     deadline_destroy_call_elem,
379     0,  // sizeof(channel_data)
380     deadline_init_channel_elem,
381     grpc_channel_stack_no_post_init,
382     deadline_destroy_channel_elem,
383     grpc_channel_next_get_info,
384     "deadline",
385 };
386 
grpc_deadline_checking_enabled(const grpc_core::ChannelArgs & channel_args)387 bool grpc_deadline_checking_enabled(
388     const grpc_core::ChannelArgs& channel_args) {
389   return channel_args.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
390       .value_or(!channel_args.WantMinimalStack());
391 }
392 
393 namespace grpc_core {
RegisterDeadlineFilter(CoreConfiguration::Builder * builder)394 void RegisterDeadlineFilter(CoreConfiguration::Builder* builder) {
395   auto register_filter = [builder](grpc_channel_stack_type type,
396                                    const grpc_channel_filter* filter) {
397     builder->channel_init()->RegisterStage(
398         type, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
399         [filter](ChannelStackBuilder* builder) {
400           auto args = builder->channel_args();
401           if (grpc_deadline_checking_enabled(args)) {
402             builder->PrependFilter(filter);
403           }
404           return true;
405         });
406   };
407   register_filter(GRPC_CLIENT_DIRECT_CHANNEL, &grpc_client_deadline_filter);
408   register_filter(GRPC_SERVER_CHANNEL, &grpc_server_deadline_filter);
409 }
410 }  // namespace grpc_core
411