xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/client_transport.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/chaotic_good/client_transport.h"
18 
19 #include <cstdint>
20 #include <cstdlib>
21 #include <memory>
22 #include <string>
23 #include <tuple>
24 #include <utility>
25 
26 #include "absl/random/bit_gen_ref.h"
27 #include "absl/random/random.h"
28 #include "absl/status/statusor.h"
29 
30 #include <grpc/event_engine/event_engine.h>
31 #include <grpc/slice.h>
32 #include <grpc/support/log.h>
33 
34 #include "src/core/ext/transport/chaotic_good/chaotic_good_transport.h"
35 #include "src/core/ext/transport/chaotic_good/frame.h"
36 #include "src/core/ext/transport/chaotic_good/frame_header.h"
37 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
38 #include "src/core/lib/gprpp/match.h"
39 #include "src/core/lib/gprpp/ref_counted_ptr.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/promise/activity.h"
42 #include "src/core/lib/promise/all_ok.h"
43 #include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
44 #include "src/core/lib/promise/loop.h"
45 #include "src/core/lib/promise/map.h"
46 #include "src/core/lib/promise/promise.h"
47 #include "src/core/lib/promise/try_join.h"
48 #include "src/core/lib/promise/try_seq.h"
49 #include "src/core/lib/resource_quota/arena.h"
50 #include "src/core/lib/resource_quota/resource_quota.h"
51 #include "src/core/lib/slice/slice.h"
52 #include "src/core/lib/slice/slice_buffer.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/transport/promise_endpoint.h"
55 
56 namespace grpc_core {
57 namespace chaotic_good {
58 
TransportWriteLoop(RefCountedPtr<ChaoticGoodTransport> transport)59 auto ChaoticGoodClientTransport::TransportWriteLoop(
60     RefCountedPtr<ChaoticGoodTransport> transport) {
61   return Loop([this, transport = std::move(transport)] {
62     return TrySeq(
63         // Get next outgoing frame.
64         outgoing_frames_.Next(),
65         // Serialize and write it out.
66         [transport = transport.get()](ClientFrame client_frame) {
67           return transport->WriteFrame(GetFrameInterface(client_frame));
68         },
69         []() -> LoopCtl<absl::Status> {
70           // The write failures will be caught in TrySeq and exit loop.
71           // Therefore, only need to return Continue() in the last lambda
72           // function.
73           return Continue();
74         });
75   });
76 }
77 
LookupStream(uint32_t stream_id)78 absl::optional<CallHandler> ChaoticGoodClientTransport::LookupStream(
79     uint32_t stream_id) {
80   MutexLock lock(&mu_);
81   auto it = stream_map_.find(stream_id);
82   if (it == stream_map_.end()) {
83     return absl::nullopt;
84   }
85   return it->second;
86 }
87 
PushFrameIntoCall(ServerFragmentFrame frame,CallHandler call_handler)88 auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
89                                                    CallHandler call_handler) {
90   auto& headers = frame.headers;
91   auto push = TrySeq(
92       If(
93           headers != nullptr,
94           [call_handler, &headers]() mutable {
95             return call_handler.PushServerInitialMetadata(std::move(headers));
96           },
97           []() -> StatusFlag { return Success{}; }),
98       [call_handler, message = std::move(frame.message)]() mutable {
99         return If(
100             message.has_value(),
101             [&call_handler, &message]() mutable {
102               return call_handler.PushMessage(std::move(message->message));
103             },
104             []() -> StatusFlag { return Success{}; });
105       },
106       [call_handler, trailers = std::move(frame.trailers)]() mutable {
107         return If(
108             trailers != nullptr,
109             [&call_handler, &trailers]() mutable {
110               return call_handler.PushServerTrailingMetadata(
111                   std::move(trailers));
112             },
113             []() -> StatusFlag { return Success{}; });
114       });
115   // Wrap the actual sequence with something that owns the call handler so that
116   // its lifetime extends until the push completes.
117   return [call_handler, push = std::move(push)]() mutable { return push(); };
118 }
119 
TransportReadLoop(RefCountedPtr<ChaoticGoodTransport> transport)120 auto ChaoticGoodClientTransport::TransportReadLoop(
121     RefCountedPtr<ChaoticGoodTransport> transport) {
122   return Loop([this, transport = std::move(transport)] {
123     return TrySeq(
124         transport->ReadFrameBytes(),
125         [](std::tuple<FrameHeader, BufferPair> frame_bytes)
126             -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
127           const auto& frame_header = std::get<0>(frame_bytes);
128           if (frame_header.type != FrameType::kFragment) {
129             return absl::InternalError(
130                 absl::StrCat("Expected fragment frame, got ",
131                              static_cast<int>(frame_header.type)));
132           }
133           return frame_bytes;
134         },
135         [this, transport = transport.get()](
136             std::tuple<FrameHeader, BufferPair> frame_bytes) {
137           const auto& frame_header = std::get<0>(frame_bytes);
138           auto& buffers = std::get<1>(frame_bytes);
139           absl::optional<CallHandler> call_handler =
140               LookupStream(frame_header.stream_id);
141           ServerFragmentFrame frame;
142           absl::Status deserialize_status;
143           const FrameLimits frame_limits{1024 * 1024 * 1024,
144                                          aligned_bytes_ - 1};
145           if (call_handler.has_value()) {
146             deserialize_status = transport->DeserializeFrame(
147                 frame_header, std::move(buffers), call_handler->arena(), frame,
148                 frame_limits);
149           } else {
150             // Stream not found, skip the frame.
151             auto arena = MakeScopedArena(1024, &allocator_);
152             deserialize_status =
153                 transport->DeserializeFrame(frame_header, std::move(buffers),
154                                             arena.get(), frame, frame_limits);
155           }
156           return If(
157               deserialize_status.ok() && call_handler.has_value(),
158               [this, &frame, &call_handler]() {
159                 return call_handler->SpawnWaitable(
160                     "push-frame", [this, call_handler = *call_handler,
161                                    frame = std::move(frame)]() mutable {
162                       return Map(call_handler.CancelIfFails(PushFrameIntoCall(
163                                      std::move(frame), call_handler)),
164                                  [](StatusFlag f) {
165                                    return StatusCast<absl::Status>(f);
166                                  });
167                     });
168               },
169               [&deserialize_status]() {
170                 // Stream not found, nothing to do.
171                 return [deserialize_status =
172                             std::move(deserialize_status)]() mutable {
173                   return std::move(deserialize_status);
174                 };
175               });
176         },
177         []() -> LoopCtl<absl::Status> { return Continue{}; });
178   });
179 }
180 
OnTransportActivityDone()181 auto ChaoticGoodClientTransport::OnTransportActivityDone() {
182   return [this](absl::Status) { AbortWithError(); };
183 }
184 
ChaoticGoodClientTransport(PromiseEndpoint control_endpoint,PromiseEndpoint data_endpoint,const ChannelArgs & args,std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,HPackParser hpack_parser,HPackCompressor hpack_encoder)185 ChaoticGoodClientTransport::ChaoticGoodClientTransport(
186     PromiseEndpoint control_endpoint, PromiseEndpoint data_endpoint,
187     const ChannelArgs& args,
188     std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine,
189     HPackParser hpack_parser, HPackCompressor hpack_encoder)
190     : allocator_(args.GetObject<ResourceQuota>()
191                      ->memory_quota()
192                      ->CreateMemoryAllocator("chaotic-good")),
193       outgoing_frames_(4) {
194   auto transport = MakeRefCounted<ChaoticGoodTransport>(
195       std::move(control_endpoint), std::move(data_endpoint),
196       std::move(hpack_parser), std::move(hpack_encoder));
197   writer_ = MakeActivity(
198       // Continuously write next outgoing frames to promise endpoints.
199       TransportWriteLoop(transport), EventEngineWakeupScheduler(event_engine),
200       OnTransportActivityDone());
201   reader_ = MakeActivity(
202       // Continuously read next incoming frames from promise endpoints.
203       TransportReadLoop(std::move(transport)),
204       EventEngineWakeupScheduler(event_engine), OnTransportActivityDone());
205 }
206 
~ChaoticGoodClientTransport()207 ChaoticGoodClientTransport::~ChaoticGoodClientTransport() {
208   if (writer_ != nullptr) {
209     writer_.reset();
210   }
211   if (reader_ != nullptr) {
212     reader_.reset();
213   }
214 }
215 
AbortWithError()216 void ChaoticGoodClientTransport::AbortWithError() {
217   // Mark transport as unavailable when the endpoint write/read failed.
218   // Close all the available pipes.
219   outgoing_frames_.MarkClosed();
220   ReleasableMutexLock lock(&mu_);
221   StreamMap stream_map = std::move(stream_map_);
222   stream_map_.clear();
223   lock.Release();
224   for (const auto& pair : stream_map) {
225     auto call_handler = pair.second;
226     call_handler.SpawnInfallible("cancel", [call_handler]() mutable {
227       call_handler.Cancel(ServerMetadataFromStatus(
228           absl::UnavailableError("Transport closed.")));
229       return Empty{};
230     });
231   }
232 }
233 
MakeStream(CallHandler call_handler)234 uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) {
235   ReleasableMutexLock lock(&mu_);
236   const uint32_t stream_id = next_stream_id_++;
237   stream_map_.emplace(stream_id, call_handler);
238   lock.Release();
239   call_handler.OnDone([this, stream_id]() {
240     MutexLock lock(&mu_);
241     stream_map_.erase(stream_id);
242   });
243   return stream_id;
244 }
245 
CallOutboundLoop(uint32_t stream_id,CallHandler call_handler)246 auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
247                                                   CallHandler call_handler) {
248   auto send_fragment = [stream_id,
249                         outgoing_frames = outgoing_frames_.MakeSender()](
250                            ClientFragmentFrame frame) mutable {
251     frame.stream_id = stream_id;
252     return Map(outgoing_frames.Send(std::move(frame)),
253                [](bool success) -> absl::Status {
254                  if (!success) {
255                    // Failed to send outgoing frame.
256                    return absl::UnavailableError("Transport closed.");
257                  }
258                  return absl::OkStatus();
259                });
260   };
261   return TrySeq(
262       // Wait for initial metadata then send it out.
263       call_handler.PullClientInitialMetadata(),
264       [send_fragment](ClientMetadataHandle md) mutable {
265         if (grpc_chaotic_good_trace.enabled()) {
266           gpr_log(GPR_INFO, "CHAOTIC_GOOD: Sending initial metadata: %s",
267                   md->DebugString().c_str());
268         }
269         ClientFragmentFrame frame;
270         frame.headers = std::move(md);
271         return send_fragment(std::move(frame));
272       },
273       // Continuously send client frame with client to server messages.
274       ForEach(OutgoingMessages(call_handler),
275               [send_fragment,
276                aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
277                 ClientFragmentFrame frame;
278                 // Construct frame header (flags, header_length and
279                 // trailer_length will be added in serialization).
280                 const uint32_t message_length = message->payload()->Length();
281                 const uint32_t padding =
282                     message_length % aligned_bytes == 0
283                         ? 0
284                         : aligned_bytes - message_length % aligned_bytes;
285                 GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
286                 frame.message = FragmentMessage(std::move(message), padding,
287                                                 message_length);
288                 return send_fragment(std::move(frame));
289               }),
290       [send_fragment]() mutable {
291         ClientFragmentFrame frame;
292         frame.end_of_stream = true;
293         return send_fragment(std::move(frame));
294       });
295 }
296 
StartCall(CallHandler call_handler)297 void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
298   // At this point, the connection is set up.
299   // Start sending data frames.
300   call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable {
301     const uint32_t stream_id = MakeStream(call_handler);
302     return Map(CallOutboundLoop(stream_id, call_handler),
303                [stream_id, this](absl::Status result) {
304                  if (!result.ok()) {
305                    CancelFrame frame;
306                    frame.stream_id = stream_id;
307                    outgoing_frames_.MakeSender().UnbufferedImmediateSend(
308                        std::move(frame));
309                  }
310                  return result;
311                });
312   });
313 }
314 
PerformOp(grpc_transport_op * op)315 void ChaoticGoodClientTransport::PerformOp(grpc_transport_op* op) {
316   MutexLock lock(&mu_);
317   bool did_stuff = false;
318   if (op->start_connectivity_watch != nullptr) {
319     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
320                               std::move(op->start_connectivity_watch));
321     did_stuff = true;
322   }
323   if (op->stop_connectivity_watch != nullptr) {
324     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
325     did_stuff = true;
326   }
327   if (op->set_accept_stream) {
328     Crash("set_accept_stream not supported on clients");
329   }
330   if (!did_stuff) {
331     Crash(absl::StrCat("unimplemented transport perform op: ",
332                        grpc_transport_op_string(op)));
333   }
334   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
335 }
336 
337 }  // namespace chaotic_good
338 }  // namespace grpc_core
339