1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
18
19 #include <utility>
20
21 #include "absl/meta/type_traits.h"
22 #include "absl/random/distributions.h"
23
24 #include <grpc/support/log.h>
25
26 grpc_core::TraceFlag grpc_ping_trace(false, "http2_ping");
27
28 namespace grpc_core {
29
OnPing(Callback on_start,Callback on_ack)30 void Chttp2PingCallbacks::OnPing(Callback on_start, Callback on_ack) {
31 on_start_.emplace_back(std::move(on_start));
32 on_ack_.emplace_back(std::move(on_ack));
33 ping_requested_ = true;
34 }
35
OnPingAck(Callback on_ack)36 void Chttp2PingCallbacks::OnPingAck(Callback on_ack) {
37 auto it = inflight_.find(most_recent_inflight_);
38 if (it != inflight_.end()) {
39 it->second.on_ack.emplace_back(std::move(on_ack));
40 return;
41 }
42 ping_requested_ = true;
43 on_ack_.emplace_back(std::move(on_ack));
44 }
45
StartPing(absl::BitGenRef bitgen)46 uint64_t Chttp2PingCallbacks::StartPing(absl::BitGenRef bitgen) {
47 uint64_t id;
48 do {
49 id = absl::Uniform<uint64_t>(bitgen);
50 } while (inflight_.contains(id));
51 CallbackVec cbs = std::move(on_start_);
52 CallbackVec().swap(on_start_);
53 InflightPing inflight;
54 inflight.on_ack.swap(on_ack_);
55 started_new_ping_without_setting_timeout_ = true;
56 inflight_.emplace(id, std::move(inflight));
57 most_recent_inflight_ = id;
58 ping_requested_ = false;
59 for (auto& cb : cbs) {
60 cb();
61 }
62 return id;
63 }
64
AckPing(uint64_t id,grpc_event_engine::experimental::EventEngine * event_engine)65 bool Chttp2PingCallbacks::AckPing(
66 uint64_t id, grpc_event_engine::experimental::EventEngine* event_engine) {
67 auto ping = inflight_.extract(id);
68 if (ping.empty()) return false;
69 if (ping.mapped().on_timeout !=
70 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
71 event_engine->Cancel(ping.mapped().on_timeout);
72 }
73 for (auto& cb : ping.mapped().on_ack) {
74 cb();
75 }
76 return true;
77 }
78
CancelAll(grpc_event_engine::experimental::EventEngine * event_engine)79 void Chttp2PingCallbacks::CancelAll(
80 grpc_event_engine::experimental::EventEngine* event_engine) {
81 CallbackVec().swap(on_start_);
82 CallbackVec().swap(on_ack_);
83 for (auto& cbs : inflight_) {
84 CallbackVec().swap(cbs.second.on_ack);
85 if (cbs.second.on_timeout !=
86 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid) {
87 event_engine->Cancel(std::exchange(
88 cbs.second.on_timeout,
89 grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid));
90 }
91 }
92 ping_requested_ = false;
93 }
94
OnPingTimeout(Duration ping_timeout,grpc_event_engine::experimental::EventEngine * event_engine,Callback callback)95 absl::optional<uint64_t> Chttp2PingCallbacks::OnPingTimeout(
96 Duration ping_timeout,
97 grpc_event_engine::experimental::EventEngine* event_engine,
98 Callback callback) {
99 GPR_ASSERT(started_new_ping_without_setting_timeout_);
100 started_new_ping_without_setting_timeout_ = false;
101 auto it = inflight_.find(most_recent_inflight_);
102 if (it == inflight_.end()) return absl::nullopt;
103 it->second.on_timeout =
104 event_engine->RunAfter(ping_timeout, std::move(callback));
105 return most_recent_inflight_;
106 }
107
108 } // namespace grpc_core
109