1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/chaotic_good/client_transport.h"
18
19 #include <cstdint>
20 #include <cstdlib>
21 #include <memory>
22 #include <string>
23 #include <tuple>
24 #include <utility>
25
26 #include "absl/random/bit_gen_ref.h"
27 #include "absl/random/random.h"
28 #include "absl/status/statusor.h"
29
30 #include <grpc/event_engine/event_engine.h>
31 #include <grpc/slice.h>
32 #include <grpc/support/log.h>
33
34 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
35 #include "src/core/ext/transport/chaotic_good/frame.h"
36 #include "src/core/ext/transport/chaotic_good/frame_header.h"
37 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
38 #include "src/core/lib/gprpp/match.h"
39 #include "src/core/lib/gprpp/ref_counted_ptr.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/promise/activity.h"
42 #include "src/core/lib/promise/all_ok.h"
43 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
44 #include "src/core/lib/promise/loop.h"
45 #include "src/core/lib/promise/map.h"
46 #include "src/core/lib/promise/promise.h"
47 #include "src/core/lib/promise/try_join.h"
48 #include "src/core/lib/promise/try_seq.h"
49 #include "src/core/lib/resource_quota/arena.h"
50 #include "src/core/lib/resource_quota/resource_quota.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/slice/slice_buffer.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/transport/promise_endpoint.h"
55
56 namespace grpc_core {
57 namespace chaotic_good {
58
TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport)59 auto ChaoticGoodClientTransport::TransportWriteLoop(
60 RefCountedPtr<ChaoticGoodTransport> transport) {
61 return Loop([this, transport = std::move(transport)] {
62 return TrySeq(
63 // Get next outgoing frame.
64 outgoing_frames_.Next(),
65 // Serialize and write it out.
66 [transport = transport.get()](ClientFrame client_frame) {
67 return transport->WriteFrame(GetFrameInterface(client_frame));
68 },
69 []() -> LoopCtl<absl::Status> {
70 // The write failures will be caught in TrySeq and exit loop.
71 // Therefore, only need to return Continue() in the last lambda
72 // function.
73 return Continue();
74 });
75 });
76 }
77
LookupStream(uint32_t stream_id)78 absl::optional<CallHandler> ChaoticGoodClientTransport::LookupStream(
79 uint32_t stream_id) {
80 MutexLock lock(&mu_);
81 auto it = stream_map_.find(stream_id);
82 if (it == stream_map_.end()) {
83 return absl::nullopt;
84 }
85 return it->second;
86 }
87
PushFrameIntoCall(ServerFragmentFrame frame,CallHandler call_handler)88 auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
89 CallHandler call_handler) {
90 auto& headers = frame.headers;
91 auto push = TrySeq(
92 If(
93 headers != nullptr,
94 [call_handler, &headers]() mutable {
95 return call_handler.PushServerInitialMetadata(std::move(headers));
96 },
97 []() -> StatusFlag { return Success{}; }),
98 [call_handler, message = std::move(frame.message)]() mutable {
99 return If(
100 message.has_value(),
101 [&call_handler, &message]() mutable {
102 return call_handler.PushMessage(std::move(message->message));
103 },
104 []() -> StatusFlag { return Success{}; });
105 },
106 [call_handler, trailers = std::move(frame.trailers)]() mutable {
107 return If(
108 trailers != nullptr,
109 [&call_handler, &trailers]() mutable {
110 return call_handler.PushServerTrailingMetadata(
111 std::move(trailers));
112 },
113 []() -> StatusFlag { return Success{}; });
114 });
115 // Wrap the actual sequence with something that owns the call handler so that
116 // its lifetime extends until the push completes.
117 return [call_handler, push = std::move(push)]() mutable { return push(); };
118 }
119
TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport)120 auto ChaoticGoodClientTransport::TransportReadLoop(
121 RefCountedPtr<ChaoticGoodTransport> transport) {
122 return Loop([this, transport = std::move(transport)] {
123 return TrySeq(
124 transport->ReadFrameBytes(),
125 [](std::tuple<FrameHeader, BufferPair> frame_bytes)
126 -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
127 const auto& frame_header = std::get<0>(frame_bytes);
128 if (frame_header.type != FrameType::kFragment) {
129 return absl::InternalError(
130 absl::StrCat("Expected fragment frame, got ",
131 static_cast<int>(frame_header.type)));
132 }
133 return frame_bytes;
134 },
135 [this, transport = transport.get()](
136 std::tuple<FrameHeader, BufferPair> frame_bytes) {
137 const auto& frame_header = std::get<0>(frame_bytes);
138 auto& buffers = std::get<1>(frame_bytes);
139 absl::optional<CallHandler> call_handler =
140 LookupStream(frame_header.stream_id);
141 ServerFragmentFrame frame;
142 absl::Status deserialize_status;
143 const FrameLimits frame_limits{1024 * 1024 * 1024,
144 aligned_bytes_ - 1};
145 if (call_handler.has_value()) {
146 deserialize_status = transport->DeserializeFrame(
147 frame_header, std::move(buffers), call_handler->arena(), frame,
148 frame_limits);
149 } else {
150 // Stream not found, skip the frame.
151 auto arena = MakeScopedArena(1024, &allocator_);
152 deserialize_status =
153 transport->DeserializeFrame(frame_header, std::move(buffers),
154 arena.get(), frame, frame_limits);
155 }
156 return If(
157 deserialize_status.ok() && call_handler.has_value(),
158 [this, &frame, &call_handler]() {
159 return call_handler->SpawnWaitable(
160 "push-frame", [this, call_handler = *call_handler,
161 frame = std::move(frame)]() mutable {
162 return Map(call_handler.CancelIfFails(PushFrameIntoCall(
163 std::move(frame), call_handler)),
164 [](StatusFlag f) {
165 return StatusCast<absl::Status>(f);
166 });
167 });
168 },
169 [&deserialize_status]() {
170 // Stream not found, nothing to do.
171 return [deserialize_status =
172 std::move(deserialize_status)]() mutable {
173 return std::move(deserialize_status);
174 };
175 });
176 },
177 []() -> LoopCtl<absl::Status> { return Continue{}; });
178 });
179 }
180
OnTransportActivityDone()181 auto ChaoticGoodClientTransport::OnTransportActivityDone() {
182 return [this](absl::Status) { AbortWithError(); };
183 }
184
ChaoticGoodClientTransport(PromiseEndpoint control_endpoint,PromiseEndpoint data_endpoint,const ChannelArgs & args,std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,HPackParser hpack_parser,HPackCompressor hpack_encoder)185 ChaoticGoodClientTransport::ChaoticGoodClientTransport(
186 PromiseEndpoint control_endpoint, PromiseEndpoint data_endpoint,
187 const ChannelArgs& args,
188 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
189 HPackParser hpack_parser, HPackCompressor hpack_encoder)
190 : allocator_(args.GetObject<ResourceQuota>()
191 ->memory_quota()
192 ->CreateMemoryAllocator("chaotic-good")),
193 outgoing_frames_(4) {
194 auto transport = MakeRefCounted<ChaoticGoodTransport>(
195 std::move(control_endpoint), std::move(data_endpoint),
196 std::move(hpack_parser), std::move(hpack_encoder));
197 writer_ = MakeActivity(
198 // Continuously write next outgoing frames to promise endpoints.
199 TransportWriteLoop(transport), EventEngineWakeupScheduler(event_engine),
200 OnTransportActivityDone());
201 reader_ = MakeActivity(
202 // Continuously read next incoming frames from promise endpoints.
203 TransportReadLoop(std::move(transport)),
204 EventEngineWakeupScheduler(event_engine), OnTransportActivityDone());
205 }
206
~ChaoticGoodClientTransport()207 ChaoticGoodClientTransport::~ChaoticGoodClientTransport() {
208 if (writer_ != nullptr) {
209 writer_.reset();
210 }
211 if (reader_ != nullptr) {
212 reader_.reset();
213 }
214 }
215
AbortWithError()216 void ChaoticGoodClientTransport::AbortWithError() {
217 // Mark transport as unavailable when the endpoint write/read failed.
218 // Close all the available pipes.
219 outgoing_frames_.MarkClosed();
220 ReleasableMutexLock lock(&mu_);
221 StreamMap stream_map = std::move(stream_map_);
222 stream_map_.clear();
223 lock.Release();
224 for (const auto& pair : stream_map) {
225 auto call_handler = pair.second;
226 call_handler.SpawnInfallible("cancel", [call_handler]() mutable {
227 call_handler.Cancel(ServerMetadataFromStatus(
228 absl::UnavailableError("Transport closed.")));
229 return Empty{};
230 });
231 }
232 }
233
MakeStream(CallHandler call_handler)234 uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) {
235 ReleasableMutexLock lock(&mu_);
236 const uint32_t stream_id = next_stream_id_++;
237 stream_map_.emplace(stream_id, call_handler);
238 lock.Release();
239 call_handler.OnDone([this, stream_id]() {
240 MutexLock lock(&mu_);
241 stream_map_.erase(stream_id);
242 });
243 return stream_id;
244 }
245
CallOutboundLoop(uint32_t stream_id,CallHandler call_handler)246 auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
247 CallHandler call_handler) {
248 auto send_fragment = [stream_id,
249 outgoing_frames = outgoing_frames_.MakeSender()](
250 ClientFragmentFrame frame) mutable {
251 frame.stream_id = stream_id;
252 return Map(outgoing_frames.Send(std::move(frame)),
253 [](bool success) -> absl::Status {
254 if (!success) {
255 // Failed to send outgoing frame.
256 return absl::UnavailableError("Transport closed.");
257 }
258 return absl::OkStatus();
259 });
260 };
261 return TrySeq(
262 // Wait for initial metadata then send it out.
263 call_handler.PullClientInitialMetadata(),
264 [send_fragment](ClientMetadataHandle md) mutable {
265 if (grpc_chaotic_good_trace.enabled()) {
266 gpr_log(GPR_INFO, "CHAOTIC_GOOD: Sending initial metadata: %s",
267 md->DebugString().c_str());
268 }
269 ClientFragmentFrame frame;
270 frame.headers = std::move(md);
271 return send_fragment(std::move(frame));
272 },
273 // Continuously send client frame with client to server messages.
274 ForEach(OutgoingMessages(call_handler),
275 [send_fragment,
276 aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
277 ClientFragmentFrame frame;
278 // Construct frame header (flags, header_length and
279 // trailer_length will be added in serialization).
280 const uint32_t message_length = message->payload()->Length();
281 const uint32_t padding =
282 message_length % aligned_bytes == 0
283 ? 0
284 : aligned_bytes - message_length % aligned_bytes;
285 GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
286 frame.message = FragmentMessage(std::move(message), padding,
287 message_length);
288 return send_fragment(std::move(frame));
289 }),
290 [send_fragment]() mutable {
291 ClientFragmentFrame frame;
292 frame.end_of_stream = true;
293 return send_fragment(std::move(frame));
294 });
295 }
296
StartCall(CallHandler call_handler)297 void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
298 // At this point, the connection is set up.
299 // Start sending data frames.
300 call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable {
301 const uint32_t stream_id = MakeStream(call_handler);
302 return Map(CallOutboundLoop(stream_id, call_handler),
303 [stream_id, this](absl::Status result) {
304 if (!result.ok()) {
305 CancelFrame frame;
306 frame.stream_id = stream_id;
307 outgoing_frames_.MakeSender().UnbufferedImmediateSend(
308 std::move(frame));
309 }
310 return result;
311 });
312 });
313 }
314
PerformOp(grpc_transport_op * op)315 void ChaoticGoodClientTransport::PerformOp(grpc_transport_op* op) {
316 MutexLock lock(&mu_);
317 bool did_stuff = false;
318 if (op->start_connectivity_watch != nullptr) {
319 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
320 std::move(op->start_connectivity_watch));
321 did_stuff = true;
322 }
323 if (op->stop_connectivity_watch != nullptr) {
324 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
325 did_stuff = true;
326 }
327 if (op->set_accept_stream) {
328 Crash("set_accept_stream not supported on clients");
329 }
330 if (!did_stuff) {
331 Crash(absl::StrCat("unimplemented transport perform op: ",
332 grpc_transport_op_string(op)));
333 }
334 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
335 }
336
337 } // namespace chaotic_good
338 } // namespace grpc_core
339