1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H 16 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <cstdint> 21 #include <utility> 22 23 #include "absl/random/random.h" 24 25 #include "src/core/ext/transport/chaotic_good/frame.h" 26 #include "src/core/ext/transport/chaotic_good/frame_header.h" 27 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" 28 #include "src/core/lib/debug/trace.h" 29 #include "src/core/lib/event_engine/tcp_socket_utils.h" 30 #include "src/core/lib/promise/if.h" 31 #include "src/core/lib/promise/promise.h" 32 #include "src/core/lib/promise/try_join.h" 33 #include "src/core/lib/promise/try_seq.h" 34 #include "src/core/lib/transport/promise_endpoint.h" 35 36 extern grpc_core::TraceFlag grpc_chaotic_good_trace; 37 38 namespace grpc_core { 39 namespace chaotic_good { 40 41 class ChaoticGoodTransport : public RefCounted<ChaoticGoodTransport> { 42 public: ChaoticGoodTransport(PromiseEndpoint control_endpoint,PromiseEndpoint data_endpoint,HPackParser hpack_parser,HPackCompressor hpack_encoder)43 ChaoticGoodTransport(PromiseEndpoint control_endpoint, 44 PromiseEndpoint data_endpoint, HPackParser hpack_parser, 45 HPackCompressor hpack_encoder) 46 : control_endpoint_(std::move(control_endpoint)), 47 data_endpoint_(std::move(data_endpoint)), 48 encoder_(std::move(hpack_encoder)), 49 parser_(std::move(hpack_parser)) { 50 // Enable RxMemoryAlignment and RPC receive coalescing after the transport 51 // setup is complete. At this point all the settings frames should have 52 // been read. 53 data_endpoint_.EnforceRxMemoryAlignmentAndCoalescing(); 54 } 55 WriteFrame(const FrameInterface & frame)56 auto WriteFrame(const FrameInterface& frame) { 57 auto buffers = frame.Serialize(&encoder_); 58 if (grpc_chaotic_good_trace.enabled()) { 59 gpr_log(GPR_INFO, "CHAOTIC_GOOD: WriteFrame to:%s %s", 60 ResolvedAddressToString(control_endpoint_.GetPeerAddress()) 61 .value_or("<<unknown peer address>>") 62 .c_str(), 63 frame.ToString().c_str()); 64 } 65 return TryJoin<absl::StatusOr>( 66 control_endpoint_.Write(std::move(buffers.control)), 67 data_endpoint_.Write(std::move(buffers.data))); 68 } 69 70 // Read frame header and payloads for control and data portions of one frame. 71 // Resolves to StatusOr<tuple<FrameHeader, BufferPair>>. ReadFrameBytes()72 auto ReadFrameBytes() { 73 return TrySeq( 74 control_endpoint_.ReadSlice(FrameHeader::kFrameHeaderSize), 75 [this](Slice read_buffer) { 76 auto frame_header = 77 FrameHeader::Parse(reinterpret_cast<const uint8_t*>( 78 GRPC_SLICE_START_PTR(read_buffer.c_slice()))); 79 if (grpc_chaotic_good_trace.enabled()) { 80 gpr_log(GPR_INFO, "CHAOTIC_GOOD: ReadHeader from:%s %s", 81 ResolvedAddressToString(control_endpoint_.GetPeerAddress()) 82 .value_or("<<unknown peer address>>") 83 .c_str(), 84 frame_header.ok() 85 ? frame_header->ToString().c_str() 86 : frame_header.status().ToString().c_str()); 87 } 88 // Read header and trailers from control endpoint. 89 // Read message padding and message from data endpoint. 90 return If( 91 frame_header.ok(), 92 [this, &frame_header] { 93 const uint32_t message_padding = frame_header->message_padding; 94 const uint32_t message_length = frame_header->message_length; 95 return Map( 96 TryJoin<absl::StatusOr>( 97 control_endpoint_.Read(frame_header->GetFrameLength()), 98 data_endpoint_.Read(message_length + message_padding)), 99 [frame_header = *frame_header, message_padding]( 100 absl::StatusOr<std::tuple<SliceBuffer, SliceBuffer>> 101 buffers) 102 -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> { 103 if (!buffers.ok()) return buffers.status(); 104 SliceBuffer data_read = std::move(std::get<1>(*buffers)); 105 if (message_padding > 0) { 106 data_read.RemoveLastNBytesNoInline(message_padding); 107 } 108 return std::tuple<FrameHeader, BufferPair>( 109 frame_header, 110 BufferPair{std::move(std::get<0>(*buffers)), 111 std::move(data_read)}); 112 }); 113 }, 114 [&frame_header]() { 115 return [status = frame_header.status()]() mutable 116 -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> { 117 return std::move(status); 118 }; 119 }); 120 }); 121 } 122 DeserializeFrame(FrameHeader header,BufferPair buffers,Arena * arena,FrameInterface & frame,FrameLimits limits)123 absl::Status DeserializeFrame(FrameHeader header, BufferPair buffers, 124 Arena* arena, FrameInterface& frame, 125 FrameLimits limits) { 126 auto s = frame.Deserialize(&parser_, header, bitgen_, arena, 127 std::move(buffers), limits); 128 if (grpc_chaotic_good_trace.enabled()) { 129 gpr_log(GPR_INFO, "CHAOTIC_GOOD: DeserializeFrame %s", 130 s.ok() ? frame.ToString().c_str() : s.ToString().c_str()); 131 } 132 return s; 133 } 134 135 private: 136 PromiseEndpoint control_endpoint_; 137 PromiseEndpoint data_endpoint_; 138 HPackCompressor encoder_; 139 HPackParser parser_; 140 absl::BitGen bitgen_; 141 }; 142 143 } // namespace chaotic_good 144 } // namespace grpc_core 145 146 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHAOTIC_GOOD_CHAOTIC_GOOD_TRANSPORT_H 147