xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/server_transport.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/chaotic_good/server_transport.h"
18 
19 #include <memory>
20 #include <string>
21 #include <tuple>
22 
23 #include "absl/random/bit_gen_ref.h"
24 #include "absl/random/random.h"
25 #include "absl/status/status.h"
26 #include "absl/status/statusor.h"
27 
28 #include <grpc/event_engine/event_engine.h>
29 #include <grpc/grpc.h>
30 #include <grpc/slice.h>
31 #include <grpc/support/log.h>
32 
33 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
34 #include "src/core/ext/transport/chaotic_good/frame.h"
35 #include "src/core/ext/transport/chaotic_good/frame_header.h"
36 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
37 #include "src/core/lib/gprpp/ref_counted_ptr.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/promise/activity.h"
40 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
41 #include "src/core/lib/promise/for_each.h"
42 #include "src/core/lib/promise/loop.h"
43 #include "src/core/lib/promise/switch.h"
44 #include "src/core/lib/promise/try_seq.h"
45 #include "src/core/lib/resource_quota/arena.h"
46 #include "src/core/lib/resource_quota/resource_quota.h"
47 #include "src/core/lib/slice/slice.h"
48 #include "src/core/lib/slice/slice_buffer.h"
49 #include "src/core/lib/transport/promise_endpoint.h"
50 
51 namespace grpc_core {
52 namespace chaotic_good {
53 
TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport)54 auto ChaoticGoodServerTransport::TransportWriteLoop(
55     RefCountedPtr<ChaoticGoodTransport> transport) {
56   return Loop([this, transport = std::move(transport)] {
57     return TrySeq(
58         // Get next outgoing frame.
59         outgoing_frames_.Next(),
60         // Serialize and write it out.
61         [transport = transport.get()](ServerFrame client_frame) {
62           return transport->WriteFrame(GetFrameInterface(client_frame));
63         },
64         []() -> LoopCtl<absl::Status> {
65           // The write failures will be caught in TrySeq and exit loop.
66           // Therefore, only need to return Continue() in the last lambda
67           // function.
68           return Continue();
69         });
70   });
71 }
72 
PushFragmentIntoCall(CallInitiator call_initiator,ClientFragmentFrame frame,uint32_t stream_id)73 auto ChaoticGoodServerTransport::PushFragmentIntoCall(
74     CallInitiator call_initiator, ClientFragmentFrame frame,
75     uint32_t stream_id) {
76   auto& headers = frame.headers;
77   return TrySeq(
78       If(
79           headers != nullptr,
80           [call_initiator, &headers]() mutable {
81             return call_initiator.PushClientInitialMetadata(std::move(headers));
82           },
83           []() -> StatusFlag { return Success{}; }),
84       [call_initiator, message = std::move(frame.message)]() mutable {
85         return If(
86             message.has_value(),
87             [&call_initiator, &message]() mutable {
88               return call_initiator.PushMessage(std::move(message->message));
89             },
90             []() -> StatusFlag { return Success{}; });
91       },
92       [this, call_initiator, end_of_stream = frame.end_of_stream,
93        stream_id]() mutable -> StatusFlag {
94         if (end_of_stream) {
95           call_initiator.FinishSends();
96           // We have received end_of_stream. It is now safe to remove the call
97           // from the stream map.
98           MutexLock lock(&mu_);
99           stream_map_.erase(stream_id);
100         }
101         return Success{};
102       });
103 }
104 
MaybePushFragmentIntoCall(absl::optional<CallInitiator> call_initiator,absl::Status error,ClientFragmentFrame frame,uint32_t stream_id)105 auto ChaoticGoodServerTransport::MaybePushFragmentIntoCall(
106     absl::optional<CallInitiator> call_initiator, absl::Status error,
107     ClientFragmentFrame frame, uint32_t stream_id) {
108   return If(
109       call_initiator.has_value() && error.ok(),
110       [this, &call_initiator, &frame, &stream_id]() {
111         return Map(
112             call_initiator->SpawnWaitable(
113                 "push-fragment",
114                 [call_initiator, frame = std::move(frame), stream_id,
115                  this]() mutable {
116                   return call_initiator->CancelIfFails(PushFragmentIntoCall(
117                       *call_initiator, std::move(frame), stream_id));
118                 }),
119             [](StatusFlag status) { return StatusCast<absl::Status>(status); });
120       },
121       [&error, &frame]() {
122         // EOF frames may arrive after the call_initiator's OnDone callback
123         // has been invoked. In that case, the call_initiator would have
124         // already been removed from the stream_map and hence the EOF frame
125         // cannot be pushed into the call. No need to log such frames.
126         if (!frame.end_of_stream) {
127           gpr_log(
128               GPR_INFO,
129               "CHAOTIC_GOOD: Cannot pass frame to stream. Error:%s Frame:%s",
130               error.ToString().c_str(), frame.ToString().c_str());
131         }
132         return Immediate(std::move(error));
133       });
134 }
135 
SendFragment(ServerFragmentFrame frame,MpscSender<ServerFrame> outgoing_frames,CallInitiator call_initiator)136 auto ChaoticGoodServerTransport::SendFragment(
137     ServerFragmentFrame frame, MpscSender<ServerFrame> outgoing_frames,
138     CallInitiator call_initiator) {
139   if (grpc_chaotic_good_trace.enabled()) {
140     gpr_log(GPR_INFO, "CHAOTIC_GOOD: SendFragment: frame=%s",
141             frame.ToString().c_str());
142   }
143   // Capture the call_initiator to ensure the underlying call spine is alive
144   // until the outgoing_frames.Send promise completes.
145   return Map(outgoing_frames.Send(std::move(frame)),
146              [call_initiator](bool success) -> absl::Status {
147                if (!success) {
148                  // Failed to send outgoing frame.
149                  return absl::UnavailableError("Transport closed.");
150                }
151                return absl::OkStatus();
152              });
153 }
154 
SendCallBody(uint32_t stream_id,MpscSender<ServerFrame> outgoing_frames,CallInitiator call_initiator)155 auto ChaoticGoodServerTransport::SendCallBody(
156     uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
157     CallInitiator call_initiator) {
158   // Continuously send client frame with client to server
159   // messages.
160   return ForEach(
161       OutgoingMessages(call_initiator),
162       // Capture the call_initator to ensure the underlying call
163       // spine is alive until the SendFragment promise completes.
164       [stream_id, outgoing_frames, call_initiator,
165        aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
166         ServerFragmentFrame frame;
167         // Construct frame header (flags, header_length
168         // and trailer_length will be added in
169         // serialization).
170         const uint32_t message_length = message->payload()->Length();
171         const uint32_t padding =
172             message_length % aligned_bytes == 0
173                 ? 0
174                 : aligned_bytes - message_length % aligned_bytes;
175         GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
176         frame.message =
177             FragmentMessage(std::move(message), padding, message_length);
178         frame.stream_id = stream_id;
179         return SendFragment(std::move(frame), outgoing_frames, call_initiator);
180       });
181 }
182 
SendCallInitialMetadataAndBody(uint32_t stream_id,MpscSender<ServerFrame> outgoing_frames,CallInitiator call_initiator)183 auto ChaoticGoodServerTransport::SendCallInitialMetadataAndBody(
184     uint32_t stream_id, MpscSender<ServerFrame> outgoing_frames,
185     CallInitiator call_initiator) {
186   return TrySeq(
187       // Wait for initial metadata then send it out.
188       call_initiator.PullServerInitialMetadata(),
189       [stream_id, outgoing_frames, call_initiator,
190        this](absl::optional<ServerMetadataHandle> md) mutable {
191         if (grpc_chaotic_good_trace.enabled()) {
192           gpr_log(GPR_INFO,
193                   "CHAOTIC_GOOD: SendCallInitialMetadataAndBody: md=%s",
194                   md.has_value() ? (*md)->DebugString().c_str() : "null");
195         }
196         return If(
197             md.has_value(),
198             [&md, stream_id, &outgoing_frames, &call_initiator, this]() {
199               ServerFragmentFrame frame;
200               frame.headers = std::move(*md);
201               frame.stream_id = stream_id;
202               return TrySeq(
203                   SendFragment(std::move(frame), outgoing_frames,
204                                call_initiator),
205                   SendCallBody(stream_id, outgoing_frames, call_initiator));
206             },
207             []() { return absl::OkStatus(); });
208       });
209 }
210 
CallOutboundLoop(uint32_t stream_id,CallInitiator call_initiator)211 auto ChaoticGoodServerTransport::CallOutboundLoop(
212     uint32_t stream_id, CallInitiator call_initiator) {
213   auto outgoing_frames = outgoing_frames_.MakeSender();
214   return Seq(Map(SendCallInitialMetadataAndBody(stream_id, outgoing_frames,
215                                                 call_initiator),
216                  [stream_id](absl::Status main_body_result) {
217                    if (grpc_chaotic_good_trace.enabled()) {
218                      gpr_log(GPR_DEBUG,
219                              "CHAOTIC_GOOD: CallOutboundLoop: stream_id=%d "
220                              "main_body_result=%s",
221                              stream_id, main_body_result.ToString().c_str());
222                    }
223                    return Empty{};
224                  }),
225              call_initiator.PullServerTrailingMetadata(),
226              // Capture the call_initator to ensure the underlying call_spine
227              // is alive until the SendFragment promise completes.
228              [stream_id, outgoing_frames,
229               call_initiator](ServerMetadataHandle md) mutable {
230                ServerFragmentFrame frame;
231                frame.trailers = std::move(md);
232                frame.stream_id = stream_id;
233                return SendFragment(std::move(frame), outgoing_frames,
234                                    call_initiator);
235              });
236 }
237 
DeserializeAndPushFragmentToNewCall(FrameHeader frame_header,BufferPair buffers,ChaoticGoodTransport & transport)238 auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToNewCall(
239     FrameHeader frame_header, BufferPair buffers,
240     ChaoticGoodTransport& transport) {
241   ClientFragmentFrame fragment_frame;
242   ScopedArenaPtr arena(acceptor_->CreateArena());
243   absl::Status status = transport.DeserializeFrame(
244       frame_header, std::move(buffers), arena.get(), fragment_frame,
245       FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
246   absl::optional<CallInitiator> call_initiator;
247   if (status.ok()) {
248     auto create_call_result =
249         acceptor_->CreateCall(*fragment_frame.headers, arena.release());
250     if (grpc_chaotic_good_trace.enabled()) {
251       gpr_log(GPR_INFO,
252               "CHAOTIC_GOOD: DeserializeAndPushFragmentToNewCall: "
253               "create_call_result=%s",
254               create_call_result.ok()
255                   ? "ok"
256                   : create_call_result.status().ToString().c_str());
257     }
258     if (create_call_result.ok()) {
259       call_initiator.emplace(std::move(*create_call_result));
260       auto add_result = NewStream(frame_header.stream_id, *call_initiator);
261       if (add_result.ok()) {
262         call_initiator->SpawnGuarded(
263             "server-write", [this, stream_id = frame_header.stream_id,
264                              call_initiator = *call_initiator]() {
265               return CallOutboundLoop(stream_id, call_initiator);
266             });
267       } else {
268         call_initiator.reset();
269         status = add_result;
270       }
271     } else {
272       status = create_call_result.status();
273     }
274   }
275   return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
276                                    std::move(fragment_frame),
277                                    frame_header.stream_id);
278 }
279 
DeserializeAndPushFragmentToExistingCall(FrameHeader frame_header,BufferPair buffers,ChaoticGoodTransport & transport)280 auto ChaoticGoodServerTransport::DeserializeAndPushFragmentToExistingCall(
281     FrameHeader frame_header, BufferPair buffers,
282     ChaoticGoodTransport& transport) {
283   absl::optional<CallInitiator> call_initiator =
284       LookupStream(frame_header.stream_id);
285   Arena* arena = nullptr;
286   if (call_initiator.has_value()) arena = call_initiator->arena();
287   ClientFragmentFrame fragment_frame;
288   absl::Status status = transport.DeserializeFrame(
289       frame_header, std::move(buffers), arena, fragment_frame,
290       FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
291   return MaybePushFragmentIntoCall(std::move(call_initiator), std::move(status),
292                                    std::move(fragment_frame),
293                                    frame_header.stream_id);
294 }
295 
ReadOneFrame(ChaoticGoodTransport & transport)296 auto ChaoticGoodServerTransport::ReadOneFrame(ChaoticGoodTransport& transport) {
297   return TrySeq(
298       transport.ReadFrameBytes(),
299       [this, transport =
300                  &transport](std::tuple<FrameHeader, BufferPair> frame_bytes) {
301         const auto& frame_header = std::get<0>(frame_bytes);
302         auto& buffers = std::get<1>(frame_bytes);
303         return Switch(
304             frame_header.type,
305             Case(FrameType::kSettings,
306                  []() -> absl::Status {
307                    return absl::InternalError("Unexpected settings frame");
308                  }),
309             Case(FrameType::kFragment,
310                  [this, &frame_header, &buffers, transport]() {
311                    return If(
312                        frame_header.flags.is_set(0),
313                        [this, &frame_header, &buffers, transport]() {
314                          return DeserializeAndPushFragmentToNewCall(
315                              frame_header, std::move(buffers), *transport);
316                        },
317                        [this, &frame_header, &buffers, transport]() {
318                          return DeserializeAndPushFragmentToExistingCall(
319                              frame_header, std::move(buffers), *transport);
320                        });
321                  }),
322             Case(FrameType::kCancel,
323                  [this, &frame_header]() {
324                    absl::optional<CallInitiator> call_initiator =
325                        ExtractStream(frame_header.stream_id);
326                    return If(
327                        call_initiator.has_value(),
328                        [&call_initiator]() {
329                          auto c = std::move(*call_initiator);
330                          return c.SpawnWaitable("cancel", [c]() mutable {
331                            c.Cancel();
332                            return absl::OkStatus();
333                          });
334                        },
335                        []() -> absl::Status {
336                          return absl::InternalError("Unexpected cancel frame");
337                        });
338                  }),
339             Default([frame_header]() {
340               return absl::InternalError(
341                   absl::StrCat("Unexpected frame type: ",
342                                static_cast<uint8_t>(frame_header.type)));
343             }));
344       },
345       []() -> LoopCtl<absl::Status> { return Continue{}; });
346 }
347 
TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport)348 auto ChaoticGoodServerTransport::TransportReadLoop(
349     RefCountedPtr<ChaoticGoodTransport> transport) {
350   return Seq(got_acceptor_.Wait(),
351              Loop([this, transport = std::move(transport)] {
352                return ReadOneFrame(*transport);
353              }));
354 }
355 
OnTransportActivityDone(absl::string_view activity)356 auto ChaoticGoodServerTransport::OnTransportActivityDone(
357     absl::string_view activity) {
358   return [this, activity](absl::Status status) {
359     if (grpc_chaotic_good_trace.enabled()) {
360       gpr_log(GPR_INFO,
361               "CHAOTIC_GOOD: OnTransportActivityDone: activity=%s status=%s",
362               std::string(activity).c_str(), status.ToString().c_str());
363     }
364     AbortWithError();
365   };
366 }
367 
ChaoticGoodServerTransport(const ChannelArgs & args,PromiseEndpoint control_endpoint,PromiseEndpoint data_endpoint,std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,HPackParser hpack_parser,HPackCompressor hpack_encoder)368 ChaoticGoodServerTransport::ChaoticGoodServerTransport(
369     const ChannelArgs& args, PromiseEndpoint control_endpoint,
370     PromiseEndpoint data_endpoint,
371     std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
372     HPackParser hpack_parser, HPackCompressor hpack_encoder)
373     : outgoing_frames_(4),
374       allocator_(args.GetObject<ResourceQuota>()
375                      ->memory_quota()
376                      ->CreateMemoryAllocator("chaotic-good")) {
377   auto transport = MakeRefCounted<ChaoticGoodTransport>(
378       std::move(control_endpoint), std::move(data_endpoint),
379       std::move(hpack_parser), std::move(hpack_encoder));
380   writer_ = MakeActivity(TransportWriteLoop(transport),
381                          EventEngineWakeupScheduler(event_engine),
382                          OnTransportActivityDone("writer"));
383   reader_ = MakeActivity(TransportReadLoop(std::move(transport)),
384                          EventEngineWakeupScheduler(event_engine),
385                          OnTransportActivityDone("reader"));
386 }
387 
SetAcceptor(Acceptor * acceptor)388 void ChaoticGoodServerTransport::SetAcceptor(Acceptor* acceptor) {
389   GPR_ASSERT(acceptor_ == nullptr);
390   GPR_ASSERT(acceptor != nullptr);
391   acceptor_ = acceptor;
392   got_acceptor_.Set();
393 }
394 
~ChaoticGoodServerTransport()395 ChaoticGoodServerTransport::~ChaoticGoodServerTransport() {
396   if (writer_ != nullptr) {
397     writer_.reset();
398   }
399   if (reader_ != nullptr) {
400     reader_.reset();
401   }
402 }
403 
AbortWithError()404 void ChaoticGoodServerTransport::AbortWithError() {
405   // Mark transport as unavailable when the endpoint write/read failed.
406   // Close all the available pipes.
407   outgoing_frames_.MarkClosed();
408   ReleasableMutexLock lock(&mu_);
409   StreamMap stream_map = std::move(stream_map_);
410   stream_map_.clear();
411   state_tracker_.SetState(GRPC_CHANNEL_SHUTDOWN,
412                           absl::UnavailableError("transport closed"),
413                           "transport closed");
414   lock.Release();
415   for (const auto& pair : stream_map) {
416     auto call_initiator = pair.second;
417     call_initiator.SpawnInfallible("cancel", [call_initiator]() mutable {
418       call_initiator.Cancel();
419       return Empty{};
420     });
421   }
422 }
423 
LookupStream(uint32_t stream_id)424 absl::optional<CallInitiator> ChaoticGoodServerTransport::LookupStream(
425     uint32_t stream_id) {
426   MutexLock lock(&mu_);
427   auto it = stream_map_.find(stream_id);
428   if (it == stream_map_.end()) return absl::nullopt;
429   return it->second;
430 }
431 
ExtractStream(uint32_t stream_id)432 absl::optional<CallInitiator> ChaoticGoodServerTransport::ExtractStream(
433     uint32_t stream_id) {
434   MutexLock lock(&mu_);
435   auto it = stream_map_.find(stream_id);
436   if (it == stream_map_.end()) return absl::nullopt;
437   auto r = std::move(it->second);
438   stream_map_.erase(it);
439   return std::move(r);
440 }
441 
NewStream(uint32_t stream_id,CallInitiator call_initiator)442 absl::Status ChaoticGoodServerTransport::NewStream(
443     uint32_t stream_id, CallInitiator call_initiator) {
444   MutexLock lock(&mu_);
445   auto it = stream_map_.find(stream_id);
446   if (it != stream_map_.end()) {
447     return absl::InternalError("Stream already exists");
448   }
449   if (stream_id <= last_seen_new_stream_id_) {
450     return absl::InternalError("Stream id is not increasing");
451   }
452   stream_map_.emplace(stream_id, call_initiator);
453   call_initiator.OnDone([this, stream_id]() {
454     MutexLock lock(&mu_);
455     stream_map_.erase(stream_id);
456   });
457   return absl::OkStatus();
458 }
459 
PerformOp(grpc_transport_op * op)460 void ChaoticGoodServerTransport::PerformOp(grpc_transport_op* op) {
461   std::vector<ActivityPtr> cancelled;
462   MutexLock lock(&mu_);
463   bool did_stuff = false;
464   if (op->start_connectivity_watch != nullptr) {
465     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
466                               std::move(op->start_connectivity_watch));
467     did_stuff = true;
468   }
469   if (op->stop_connectivity_watch != nullptr) {
470     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
471     did_stuff = true;
472   }
473   if (op->set_accept_stream) {
474     if (op->set_accept_stream_fn != nullptr) {
475       Crash(absl::StrCat(
476           "set_accept_stream not supported on chaotic good transports: ",
477           grpc_transport_op_string(op)));
478     }
479     did_stuff = true;
480   }
481   if (!op->goaway_error.ok() || !op->disconnect_with_error.ok()) {
482     cancelled.push_back(std::move(writer_));
483     cancelled.push_back(std::move(reader_));
484     did_stuff = true;
485   }
486   if (!did_stuff) {
487     Crash(absl::StrCat("unimplemented transport perform op: ",
488                        grpc_transport_op_string(op)));
489   }
490   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
491 }
492 
493 }  // namespace chaotic_good
494 }  // namespace grpc_core
495