1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/chaotic_good/server_transport.h"
18
19 #include <memory>
20 #include <string>
21 #include <tuple>
22
23 #include "absl/random/bit_gen_ref.h"
24 #include "absl/random/random.h"
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27
28 #include <grpc/event_engine/event_engine.h>
29 #include <grpc/grpc.h>
30 #include <grpc/slice.h>
31 #include <grpc/support/log.h>
32
33 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
34 #include "src/core/ext/transport/chaotic_good/frame.h"
35 #include "src/core/ext/transport/chaotic_good/frame_header.h"
36 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/promise/activity.h"
40 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
41 #include "src/core/lib/promise/for_each.h"
42 #include "src/core/lib/promise/loop.h"
43 #include "src/core/lib/promise/switch.h"
44 #include "src/core/lib/promise/try_seq.h"
45 #include "src/core/lib/resource_quota/arena.h"
46 #include "src/core/lib/resource_quota/resource_quota.h"
47 #include "src/core/lib/slice/slice.h"
48 #include "src/core/lib/slice/slice_buffer.h"
49 #include "src/core/lib/transport/promise_endpoint.h"
50
51 namespace grpc_core {
52 namespace chaotic_good {
53
TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport)54 auto ChaoticGoodServerTransport::TransportWriteLoop(
55 RefCountedPtr<ChaoticGoodTransport> transport) {
56 return Loop([this, transport = std::move(transport)] {
57 return TrySeq(
58 // Get next outgoing frame.
59 outgoing_frames_.Next(),
60 // Serialize and write it out.
61 [transport = transport.get()](ServerFrame client_frame) {
62 return transport->WriteFrame(GetFrameInterface(client_frame));
63 },
64 []() -> LoopCtl<absl::Status> {
65 // The write failures will be caught in TrySeq and exit loop.
66 // Therefore, only need to return Continue() in the last lambda
67 // function.
68 return Continue();
69 });
70 });
71 }
72
PushFragmentIntoCall(CallInitiator call_initiator,ClientFragmentFrame frame,uint32_t stream_id)73 auto ChaoticGoodServerTransport::PushFragmentIntoCall(
74 CallInitiator call_initiator, ClientFragmentFrame frame,
75 uint32_t stream_id) {
76 auto& headers = frame.headers;
77 return TrySeq(
78 If(
79 headers != nullptr,
80 [call_initiator, &headers]() mutable {
81 return call_initiator.PushClientInitialMetadata(std::move(headers));
82 },
83 []() -> StatusFlag { return Success{}; }),
84 [call_initiator, message = std::move(frame.message)]() mutable {
85 return If(
86 message.has_value(),
87 [&call_initiator, &message]() mutable {
88 return call_initiator.PushMessage(std::move(message->message));
89 },
90 []() -> StatusFlag { return Success{}; });
91 },
92 [this, call_initiator, end_of_stream = frame.end_of_stream,
93 stream_id]() mutable -> StatusFlag {
94 if (end_of_stream) {
95 call_initiator.FinishSends();
96 // We have received end_of_stream. It is now safe to remove the call
97 // from the stream map.
98 MutexLock lock(&mu_);
99 stream_map_.erase(stream_id);
100 }
101 return Success{};
102 });
103 }
104
MaybePushFragmentIntoCall(absl::optional<CallInitiator> call_initiator,absl::Status error,ClientFragmentFrame frame,uint32_t stream_id)105 auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
106 absl::optional<CallInitiator> call_initiator, absl::Status error,
107 ClientFragmentFrame frame, uint32_t stream_id) {
108 return If(
109 call_initiator.has_value() && error.ok(),
110 [this, &call_initiator, &frame, &stream_id]() {
111 return Map(
112 call_initiator->SpawnWaitable(
113 "push-fragment",
114 [call_initiator, frame = std::move(frame), stream_id,
115 this]() mutable {
116 return call_initiator->CancelIfFails(PushFragmentIntoCall(
117 *call_initiator, std::move(frame), stream_id));
118 }),
119 [](StatusFlag status) { return StatusCast<absl::Status>(status); });
120 },
121 [&error, &frame]() {
122 // EOF frames may arrive after the call_initiator's OnDone callback
123 // has been invoked. In that case, the call_initiator would have
124 // already been removed from the stream_map and hence the EOF frame
125 // cannot be pushed into the call. No need to log such frames.
126 if (!frame.end_of_stream) {
127 gpr_log(
128 GPR_INFO,
129 "CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s",
130 error.ToString().c_str(), frame.ToString().c_str());
131 }
132 return Immediate(std::move(error));
133 });
134 }
135
SendFragment(ServerFragmentFrame frame,MpscSender<ServerFrame> outgoing_frames,CallInitiator call_initiator)136 auto ChaoticGoodServerTransport::SendFragment(
137 ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames,
138 CallInitiator call_initiator) {
139 if (grpc_chaotic_good_trace.enabled()) {
140 gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendFragment: frame=%s",
141 frame.ToString().c_str());
142 }
143 // Capture the call_initiator to ensure the underlying call spine is alive
144 // until the outgoing_frames.Send promise completes.
145 return Map(outgoing_frames.Send(std::move(frame)),
146 [call_initiator](bool success) -> absl::Status {
147 if (!success) {
148 // Failed to send outgoing frame.
149 return absl::UnavailableError("Transport closed.");
150 }
151 return absl::OkStatus();
152 });
153 }
154
SendCallBody(uint32_t stream_id,MpscSender<ServerFrame> outgoing_frames,CallInitiator call_initiator)155 auto ChaoticGoodServerTransport::SendCallBody(
156 uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
157 CallInitiator call_initiator) {
158 // Continuously send client frame with client to server
159 // messages.
160 return ForEach(
161 OutgoingMessages(call_initiator),
162 // Capture the call_initator to ensure the underlying call
163 // spine is alive until the SendFragment promise completes.
164 [stream_id, outgoing_frames, call_initiator,
165 aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
166 ServerFragmentFrame frame;
167 // Construct frame header (flags, header_length
168 // and trailer_length will be added in
169 // serialization).
170 const uint32_t message_length = message->payload()->Length();
171 const uint32_t padding =
172 message_length % aligned_bytes == 0
173 ? 0
174 : aligned_bytes - message_length % aligned_bytes;
175 GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
176 frame.message =
177 FragmentMessage(std::move(message), padding, message_length);
178 frame.stream_id = stream_id;
179 return SendFragment(std::move(frame), outgoing_frames, call_initiator);
180 });
181 }
182
SendCallInitialMetadataAndBody(uint32_t stream_id,MpscSender<ServerFrame> outgoing_frames,CallInitiator call_initiator)183 auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
184 uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
185 CallInitiator call_initiator) {
186 return TrySeq(
187 // Wait for initial metadata then send it out.
188 call_initiator.PullServerInitialMetadata(),
189 [stream_id, outgoing_frames, call_initiator,
190 this](absl::optional<ServerMetadataHandle> md) mutable {
191 if (grpc_chaotic_good_trace.enabled()) {
192 gpr_log(GPR_INFO,
193 "CHAOTIC_GOOD: SendCallInitialMetadataAndBody: md=%s",
194 md.has_value() ? (*md)->DebugString().c_str() : "null");
195 }
196 return If(
197 md.has_value(),
198 [&md, stream_id, &outgoing_frames, &call_initiator, this]() {
199 ServerFragmentFrame frame;
200 frame.headers = std::move(*md);
201 frame.stream_id = stream_id;
202 return TrySeq(
203 SendFragment(std::move(frame), outgoing_frames,
204 call_initiator),
205 SendCallBody(stream_id, outgoing_frames, call_initiator));
206 },
207 []() { return absl::OkStatus(); });
208 });
209 }
210
CallOutboundLoop(uint32_t stream_id,CallInitiator call_initiator)211 auto ChaoticGoodServerTransport::CallOutboundLoop(
212 uint32_t stream_id, CallInitiator call_initiator) {
213 auto outgoing_frames = outgoing_frames_.MakeSender();
214 return Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames,
215 call_initiator),
216 [stream_id](absl::Status main_body_result) {
217 if (grpc_chaotic_good_trace.enabled()) {
218 gpr_log(GPR_DEBUG,
219 "CHAOTIC_GOOD: CallOutboundLoop: stream_id=%d "
220 "main_body_result=%s",
221 stream_id, main_body_result.ToString().c_str());
222 }
223 return Empty{};
224 }),
225 call_initiator.PullServerTrailingMetadata(),
226 // Capture the call_initator to ensure the underlying call_spine
227 // is alive until the SendFragment promise completes.
228 [stream_id, outgoing_frames,
229 call_initiator](ServerMetadataHandle md) mutable {
230 ServerFragmentFrame frame;
231 frame.trailers = std::move(md);
232 frame.stream_id = stream_id;
233 return SendFragment(std::move(frame), outgoing_frames,
234 call_initiator);
235 });
236 }
237
DeserializeAndPushFragmentToNewCall(FrameHeader frame_header,BufferPair buffers,ChaoticGoodTransport & transport)238 auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
239 FrameHeader frame_header, BufferPair buffers,
240 ChaoticGoodTransport& transport) {
241 ClientFragmentFrame fragment_frame;
242 ScopedArenaPtr arena(acceptor_->CreateArena());
243 absl::Status status = transport.DeserializeFrame(
244 frame_header, std::move(buffers), arena.get(), fragment_frame,
245 FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
246 absl::optional<CallInitiator> call_initiator;
247 if (status.ok()) {
248 auto create_call_result =
249 acceptor_->CreateCall(*fragment_frame.headers, arena.release());
250 if (grpc_chaotic_good_trace.enabled()) {
251 gpr_log(GPR_INFO,
252 "CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: "
253 "create_call_result=%s",
254 create_call_result.ok()
255 ? "ok"
256 : create_call_result.status().ToString().c_str());
257 }
258 if (create_call_result.ok()) {
259 call_initiator.emplace(std::move(*create_call_result));
260 auto add_result = NewStream(frame_header.stream_id, *call_initiator);
261 if (add_result.ok()) {
262 call_initiator->SpawnGuarded(
263 "server-write", [this, stream_id = frame_header.stream_id,
264 call_initiator = *call_initiator]() {
265 return CallOutboundLoop(stream_id, call_initiator);
266 });
267 } else {
268 call_initiator.reset();
269 status = add_result;
270 }
271 } else {
272 status = create_call_result.status();
273 }
274 }
275 return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
276 std::move(fragment_frame),
277 frame_header.stream_id);
278 }
279
DeserializeAndPushFragmentToExistingCall(FrameHeader frame_header,BufferPair buffers,ChaoticGoodTransport & transport)280 auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
281 FrameHeader frame_header, BufferPair buffers,
282 ChaoticGoodTransport& transport) {
283 absl::optional<CallInitiator> call_initiator =
284 LookupStream(frame_header.stream_id);
285 Arena* arena = nullptr;
286 if (call_initiator.has_value()) arena = call_initiator->arena();
287 ClientFragmentFrame fragment_frame;
288 absl::Status status = transport.DeserializeFrame(
289 frame_header, std::move(buffers), arena, fragment_frame,
290 FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
291 return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
292 std::move(fragment_frame),
293 frame_header.stream_id);
294 }
295
ReadOneFrame(ChaoticGoodTransport & transport)296 auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
297 return TrySeq(
298 transport.ReadFrameBytes(),
299 [this, transport =
300 &transport](std::tuple<FrameHeader, BufferPair> frame_bytes) {
301 const auto& frame_header = std::get<0>(frame_bytes);
302 auto& buffers = std::get<1>(frame_bytes);
303 return Switch(
304 frame_header.type,
305 Case(FrameType::kSettings,
306 []() -> absl::Status {
307 return absl::InternalError("Unexpected settings frame");
308 }),
309 Case(FrameType::kFragment,
310 [this, &frame_header, &buffers, transport]() {
311 return If(
312 frame_header.flags.is_set(0),
313 [this, &frame_header, &buffers, transport]() {
314 return DeserializeAndPushFragmentToNewCall(
315 frame_header, std::move(buffers), *transport);
316 },
317 [this, &frame_header, &buffers, transport]() {
318 return DeserializeAndPushFragmentToExistingCall(
319 frame_header, std::move(buffers), *transport);
320 });
321 }),
322 Case(FrameType::kCancel,
323 [this, &frame_header]() {
324 absl::optional<CallInitiator> call_initiator =
325 ExtractStream(frame_header.stream_id);
326 return If(
327 call_initiator.has_value(),
328 [&call_initiator]() {
329 auto c = std::move(*call_initiator);
330 return c.SpawnWaitable("cancel", [c]() mutable {
331 c.Cancel();
332 return absl::OkStatus();
333 });
334 },
335 []() -> absl::Status {
336 return absl::InternalError("Unexpected cancel frame");
337 });
338 }),
339 Default([frame_header]() {
340 return absl::InternalError(
341 absl::StrCat("Unexpected frame type: ",
342 static_cast<uint8_t>(frame_header.type)));
343 }));
344 },
345 []() -> LoopCtl<absl::Status> { return Continue{}; });
346 }
347
TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport)348 auto ChaoticGoodServerTransport::TransportReadLoop(
349 RefCountedPtr<ChaoticGoodTransport> transport) {
350 return Seq(got_acceptor_.Wait(),
351 Loop([this, transport = std::move(transport)] {
352 return ReadOneFrame(*transport);
353 }));
354 }
355
OnTransportActivityDone(absl::string_view activity)356 auto ChaoticGoodServerTransport::OnTransportActivityDone(
357 absl::string_view activity) {
358 return [this, activity](absl::Status status) {
359 if (grpc_chaotic_good_trace.enabled()) {
360 gpr_log(GPR_INFO,
361 "CHAOTIC_GOOD: OnTransportActivityDone: activity=%s status=%s",
362 std::string(activity).c_str(), status.ToString().c_str());
363 }
364 AbortWithError();
365 };
366 }
367
ChaoticGoodServerTransport(const ChannelArgs & args,PromiseEndpoint control_endpoint,PromiseEndpoint data_endpoint,std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,HPackParser hpack_parser,HPackCompressor hpack_encoder)368 ChaoticGoodServerTransport::ChaoticGoodServerTransport(
369 const ChannelArgs& args, PromiseEndpoint control_endpoint,
370 PromiseEndpoint data_endpoint,
371 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
372 HPackParser hpack_parser, HPackCompressor hpack_encoder)
373 : outgoing_frames_(4),
374 allocator_(args.GetObject<ResourceQuota>()
375 ->memory_quota()
376 ->CreateMemoryAllocator("chaotic-good")) {
377 auto transport = MakeRefCounted<ChaoticGoodTransport>(
378 std::move(control_endpoint), std::move(data_endpoint),
379 std::move(hpack_parser), std::move(hpack_encoder));
380 writer_ = MakeActivity(TransportWriteLoop(transport),
381 EventEngineWakeupScheduler(event_engine),
382 OnTransportActivityDone("writer"));
383 reader_ = MakeActivity(TransportReadLoop(std::move(transport)),
384 EventEngineWakeupScheduler(event_engine),
385 OnTransportActivityDone("reader"));
386 }
387
SetAcceptor(Acceptor * acceptor)388 void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) {
389 GPR_ASSERT(acceptor_ == nullptr);
390 GPR_ASSERT(acceptor != nullptr);
391 acceptor_ = acceptor;
392 got_acceptor_.Set();
393 }
394
~ChaoticGoodServerTransport()395 ChaoticGoodServerTransport::~ChaoticGoodServerTransport() {
396 if (writer_ != nullptr) {
397 writer_.reset();
398 }
399 if (reader_ != nullptr) {
400 reader_.reset();
401 }
402 }
403
AbortWithError()404 void ChaoticGoodServerTransport::AbortWithError() {
405 // Mark transport as unavailable when the endpoint write/read failed.
406 // Close all the available pipes.
407 outgoing_frames_.MarkClosed();
408 ReleasableMutexLock lock(&mu_);
409 StreamMap stream_map = std::move(stream_map_);
410 stream_map_.clear();
411 state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN,
412 absl::UnavailableError("transport closed"),
413 "transport closed");
414 lock.Release();
415 for (const auto& pair : stream_map) {
416 auto call_initiator = pair.second;
417 call_initiator.SpawnInfallible("cancel", [call_initiator]() mutable {
418 call_initiator.Cancel();
419 return Empty{};
420 });
421 }
422 }
423
LookupStream(uint32_t stream_id)424 absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
425 uint32_t stream_id) {
426 MutexLock lock(&mu_);
427 auto it = stream_map_.find(stream_id);
428 if (it == stream_map_.end()) return absl::nullopt;
429 return it->second;
430 }
431
ExtractStream(uint32_t stream_id)432 absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
433 uint32_t stream_id) {
434 MutexLock lock(&mu_);
435 auto it = stream_map_.find(stream_id);
436 if (it == stream_map_.end()) return absl::nullopt;
437 auto r = std::move(it->second);
438 stream_map_.erase(it);
439 return std::move(r);
440 }
441
NewStream(uint32_t stream_id,CallInitiator call_initiator)442 absl::Status ChaoticGoodServerTransport::NewStream(
443 uint32_t stream_id, CallInitiator call_initiator) {
444 MutexLock lock(&mu_);
445 auto it = stream_map_.find(stream_id);
446 if (it != stream_map_.end()) {
447 return absl::InternalError("Stream already exists");
448 }
449 if (stream_id <= last_seen_new_stream_id_) {
450 return absl::InternalError("Stream id is not increasing");
451 }
452 stream_map_.emplace(stream_id, call_initiator);
453 call_initiator.OnDone([this, stream_id]() {
454 MutexLock lock(&mu_);
455 stream_map_.erase(stream_id);
456 });
457 return absl::OkStatus();
458 }
459
PerformOp(grpc_transport_op * op)460 void ChaoticGoodServerTransport::PerformOp(grpc_transport_op* op) {
461 std::vector<ActivityPtr> cancelled;
462 MutexLock lock(&mu_);
463 bool did_stuff = false;
464 if (op->start_connectivity_watch != nullptr) {
465 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
466 std::move(op->start_connectivity_watch));
467 did_stuff = true;
468 }
469 if (op->stop_connectivity_watch != nullptr) {
470 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
471 did_stuff = true;
472 }
473 if (op->set_accept_stream) {
474 if (op->set_accept_stream_fn != nullptr) {
475 Crash(absl::StrCat(
476 "set_accept_stream not supported on chaotic good transports: ",
477 grpc_transport_op_string(op)));
478 }
479 did_stuff = true;
480 }
481 if (!op->goaway_error.ok() || !op->disconnect_with_error.ok()) {
482 cancelled.push_back(std::move(writer_));
483 cancelled.push_back(std::move(reader_));
484 did_stuff = true;
485 }
486 if (!did_stuff) {
487 Crash(absl::StrCat("unimplemented transport perform op: ",
488 grpc_transport_op_string(op)));
489 }
490 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
491 }
492
493 } // namespace chaotic_good
494 } // namespace grpc_core
495