xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chaotic_good/chaotic_good_transport.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H
16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <cstdint>
21 #include <utility>
22 
23 #include "absl/random/random.h"
24 
25 #include "src/core/ext/transport/chaotic_good/frame.h"
26 #include "src/core/ext/transport/chaotic_good/frame_header.h"
27 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/event_engine/tcp_socket_utils.h"
30 #include "src/core/lib/promise/if.h"
31 #include "src/core/lib/promise/promise.h"
32 #include "src/core/lib/promise/try_join.h"
33 #include "src/core/lib/promise/try_seq.h"
34 #include "src/core/lib/transport/promise_endpoint.h"
35 
36 extern grpc_core::TraceFlag grpc_chaotic_good_trace;
37 
38 namespace grpc_core {
39 namespace chaotic_good {
40 
41 class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> {
42  public:
ChaoticGoodTransport(PromiseEndpoint control_endpoint,PromiseEndpoint data_endpoint,HPackParser hpack_parser,HPackCompressor hpack_encoder)43   ChaoticGoodTransport(PromiseEndpoint control_endpoint,
44                        PromiseEndpoint data_endpoint, HPackParser hpack_parser,
45                        HPackCompressor hpack_encoder)
46       : control_endpoint_(std::move(control_endpoint)),
47         data_endpoint_(std::move(data_endpoint)),
48         encoder_(std::move(hpack_encoder)),
49         parser_(std::move(hpack_parser)) {
50     // Enable RxMemoryAlignment and RPC receive coalescing after the transport
51     // setup is complete. At this point all the settings frames should have
52     // been read.
53     data_endpoint_.EnforceRxMemoryAlignmentAndCoalescing();
54   }
55 
WriteFrame(const FrameInterface & frame)56   auto WriteFrame(const FrameInterface& frame) {
57     auto buffers = frame.Serialize(&encoder_);
58     if (grpc_chaotic_good_trace.enabled()) {
59       gpr_log(GPR_INFO, "CHAOTIC_GOOD: WriteFrame to:%s %s",
60               ResolvedAddressToString(control_endpoint_.GetPeerAddress())
61                   .value_or("<<unknown peer address>>")
62                   .c_str(),
63               frame.ToString().c_str());
64     }
65     return TryJoin<absl::StatusOr>(
66         control_endpoint_.Write(std::move(buffers.control)),
67         data_endpoint_.Write(std::move(buffers.data)));
68   }
69 
70   // Read frame header and payloads for control and data portions of one frame.
71   // Resolves to StatusOr<tuple<FrameHeader, BufferPair>>.
ReadFrameBytes()72   auto ReadFrameBytes() {
73     return TrySeq(
74         control_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize),
75         [this](Slice read_buffer) {
76           auto frame_header =
77               FrameHeader::Parse(reinterpret_cast<const uint8_t*>(
78                   GRPC_SLICE_START_PTR(read_buffer.c_slice())));
79           if (grpc_chaotic_good_trace.enabled()) {
80             gpr_log(GPR_INFO, "CHAOTIC_GOOD: ReadHeader from:%s %s",
81                     ResolvedAddressToString(control_endpoint_.GetPeerAddress())
82                         .value_or("<<unknown peer address>>")
83                         .c_str(),
84                     frame_header.ok()
85                         ? frame_header->ToString().c_str()
86                         : frame_header.status().ToString().c_str());
87           }
88           // Read header and trailers from control endpoint.
89           // Read message padding and message from data endpoint.
90           return If(
91               frame_header.ok(),
92               [this, &frame_header] {
93                 const uint32_t message_padding = frame_header->message_padding;
94                 const uint32_t message_length = frame_header->message_length;
95                 return Map(
96                     TryJoin<absl::StatusOr>(
97                         control_endpoint_.Read(frame_header->GetFrameLength()),
98                         data_endpoint_.Read(message_length + message_padding)),
99                     [frame_header = *frame_header, message_padding](
100                         absl::StatusOr<std::tuple<SliceBuffer, SliceBuffer>>
101                             buffers)
102                         -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
103                       if (!buffers.ok()) return buffers.status();
104                       SliceBuffer data_read = std::move(std::get<1>(*buffers));
105                       if (message_padding > 0) {
106                         data_read.RemoveLastNBytesNoInline(message_padding);
107                       }
108                       return std::tuple<FrameHeader, BufferPair>(
109                           frame_header,
110                           BufferPair{std::move(std::get<0>(*buffers)),
111                                      std::move(data_read)});
112                     });
113               },
114               [&frame_header]() {
115                 return [status = frame_header.status()]() mutable
116                        -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
117                   return std::move(status);
118                 };
119               });
120         });
121   }
122 
DeserializeFrame(FrameHeader header,BufferPair buffers,Arena * arena,FrameInterface & frame,FrameLimits limits)123   absl::Status DeserializeFrame(FrameHeader header, BufferPair buffers,
124                                 Arena* arena, FrameInterface& frame,
125                                 FrameLimits limits) {
126     auto s = frame.Deserialize(&parser_, header, bitgen_, arena,
127                                std::move(buffers), limits);
128     if (grpc_chaotic_good_trace.enabled()) {
129       gpr_log(GPR_INFO, "CHAOTIC_GOOD: DeserializeFrame %s",
130               s.ok() ? frame.ToString().c_str() : s.ToString().c_str());
131     }
132     return s;
133   }
134 
135  private:
136   PromiseEndpoint control_endpoint_;
137   PromiseEndpoint data_endpoint_;
138   HPackCompressor encoder_;
139   HPackParser parser_;
140   absl::BitGen bitgen_;
141 };
142 
143 }  // namespace chaotic_good
144 }  // namespace grpc_core
145 
146 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H
147