1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H 16 #define GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stddef.h> 21 #include <stdint.h> 22 23 #include <atomic> 24 #include <cstring> 25 #include <functional> 26 #include <memory> 27 #include <utility> 28 29 #include "absl/base/thread_annotations.h" 30 #include "absl/status/status.h" 31 #include "absl/status/statusor.h" 32 #include "absl/types/optional.h" 33 34 #include <grpc/event_engine/event_engine.h> 35 #include <grpc/event_engine/slice.h> 36 #include <grpc/event_engine/slice_buffer.h> 37 #include <grpc/slice_buffer.h> 38 #include <grpc/support/log.h> 39 40 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h" 41 #include "src/core/lib/event_engine/query_extensions.h" 42 #include "src/core/lib/gprpp/sync.h" 43 #include "src/core/lib/iomgr/exec_ctx.h" 44 #include "src/core/lib/promise/activity.h" 45 #include "src/core/lib/promise/cancel_callback.h" 46 #include "src/core/lib/promise/if.h" 47 #include "src/core/lib/promise/map.h" 48 #include "src/core/lib/promise/poll.h" 49 #include "src/core/lib/slice/slice.h" 50 #include "src/core/lib/slice/slice_buffer.h" 51 52 namespace grpc_core { 53 54 // Wrapper around event engine endpoint that provides a promise like API. 55 class PromiseEndpoint { 56 public: 57 PromiseEndpoint( 58 std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> 59 endpoint, 60 SliceBuffer already_received); 61 PromiseEndpoint() = default; 62 ~PromiseEndpoint() = default; 63 /// Prevent copying of PromiseEndpoint; moving is fine. 64 PromiseEndpoint(const PromiseEndpoint&) = delete; 65 PromiseEndpoint& operator=(const PromiseEndpoint&) = delete; 66 PromiseEndpoint(PromiseEndpoint&&) = default; 67 PromiseEndpoint& operator=(PromiseEndpoint&&) = default; 68 69 // Returns a promise that resolves to a `absl::Status` indicating the result 70 // of the write operation. 71 // 72 // Concurrent writes are not supported, which means callers should not call 73 // `Write()` before the previous write finishes. Doing that results in 74 // undefined behavior. Write(SliceBuffer data)75 auto Write(SliceBuffer data) { 76 // Start write and assert previous write finishes. 77 auto prev = write_state_->state.exchange(WriteState::kWriting, 78 std::memory_order_relaxed); 79 GPR_ASSERT(prev == WriteState::kIdle); 80 bool completed; 81 if (data.Length() == 0) { 82 completed = true; 83 } else { 84 // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is 85 // available. 86 grpc_slice_buffer_swap(write_state_->buffer.c_slice_buffer(), 87 data.c_slice_buffer()); 88 // If `Write()` returns true immediately, the callback will not be called. 89 // We still need to call our callback to pick up the result. 90 write_state_->waker = GetContext<Activity>()->MakeNonOwningWaker(); 91 completed = endpoint_->Write( 92 [write_state = write_state_](absl::Status status) { 93 ApplicationCallbackExecCtx callback_exec_ctx; 94 ExecCtx exec_ctx; 95 write_state->Complete(std::move(status)); 96 }, 97 &write_state_->buffer, nullptr /* uses default arguments */); 98 if (completed) write_state_->waker = Waker(); 99 } 100 return If( 101 completed, 102 [this]() { 103 return [write_state = write_state_]() { 104 auto prev = write_state->state.exchange(WriteState::kIdle, 105 std::memory_order_relaxed); 106 GPR_ASSERT(prev == WriteState::kWriting); 107 return absl::OkStatus(); 108 }; 109 }, 110 [this]() { 111 return [write_state = write_state_]() -> Poll<absl::Status> { 112 // If current write isn't finished return `Pending()`, else 113 // return write result. 114 WriteState::State expected = WriteState::kWritten; 115 if (write_state->state.compare_exchange_strong( 116 expected, WriteState::kIdle, std::memory_order_acquire, 117 std::memory_order_relaxed)) { 118 // State was Written, and we changed it to Idle. We can return 119 // the result. 120 return std::move(write_state->result); 121 } 122 // State was not Written; since we're polling it must be 123 // Writing. Assert that and return Pending. 124 GPR_ASSERT(expected == WriteState::kWriting); 125 return Pending(); 126 }; 127 }); 128 } 129 130 // Returns a promise that resolves to `SliceBuffer` with 131 // `num_bytes` bytes. 132 // 133 // Concurrent reads are not supported, which means callers should not call 134 // `Read()` before the previous read finishes. Doing that results in 135 // undefined behavior. Read(size_t num_bytes)136 auto Read(size_t num_bytes) { 137 // Assert previous read finishes. 138 GPR_ASSERT(!read_state_->complete.load(std::memory_order_relaxed)); 139 // Should not have pending reads. 140 GPR_ASSERT(read_state_->pending_buffer.Count() == 0u); 141 bool complete = true; 142 while (read_state_->buffer.Length() < num_bytes) { 143 // Set read args with hinted bytes. 144 grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs 145 read_args = { 146 static_cast<int64_t>(num_bytes - read_state_->buffer.Length())}; 147 // If `Read()` returns true immediately, the callback will not be 148 // called. 149 read_state_->waker = GetContext<Activity>()->MakeNonOwningWaker(); 150 if (endpoint_->Read( 151 [read_state = read_state_, num_bytes](absl::Status status) { 152 ApplicationCallbackExecCtx callback_exec_ctx; 153 ExecCtx exec_ctx; 154 read_state->Complete(std::move(status), num_bytes); 155 }, 156 &read_state_->pending_buffer, &read_args)) { 157 read_state_->waker = Waker(); 158 read_state_->pending_buffer.MoveFirstNBytesIntoSliceBuffer( 159 read_state_->pending_buffer.Length(), read_state_->buffer); 160 GPR_DEBUG_ASSERT(read_state_->pending_buffer.Count() == 0u); 161 } else { 162 complete = false; 163 break; 164 } 165 } 166 return If( 167 complete, 168 [this, num_bytes]() { 169 SliceBuffer ret; 170 grpc_slice_buffer_move_first_no_inline( 171 read_state_->buffer.c_slice_buffer(), num_bytes, 172 ret.c_slice_buffer()); 173 return [ret = std::move( 174 ret)]() mutable -> Poll<absl::StatusOr<SliceBuffer>> { 175 return std::move(ret); 176 }; 177 }, 178 [this, num_bytes]() { 179 return [read_state = read_state_, 180 num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> { 181 if (!read_state->complete.load(std::memory_order_acquire)) { 182 return Pending(); 183 } 184 // If read succeeds, return `SliceBuffer` with `num_bytes` bytes. 185 if (read_state->result.ok()) { 186 SliceBuffer ret; 187 grpc_slice_buffer_move_first_no_inline( 188 read_state->buffer.c_slice_buffer(), num_bytes, 189 ret.c_slice_buffer()); 190 read_state->complete.store(false, std::memory_order_relaxed); 191 return std::move(ret); 192 } 193 read_state->complete.store(false, std::memory_order_relaxed); 194 return std::move(read_state->result); 195 }; 196 }); 197 } 198 199 // Returns a promise that resolves to `Slice` with at least 200 // `num_bytes` bytes which should be less than INT64_MAX bytes. 201 // 202 // Concurrent reads are not supported, which means callers should not call 203 // `ReadSlice()` before the previous read finishes. Doing that results in 204 // undefined behavior. ReadSlice(size_t num_bytes)205 auto ReadSlice(size_t num_bytes) { 206 return Map(Read(num_bytes), 207 [](absl::StatusOr<SliceBuffer> buffer) -> absl::StatusOr<Slice> { 208 if (!buffer.ok()) return buffer.status(); 209 return buffer->JoinIntoSlice(); 210 }); 211 } 212 213 // Returns a promise that resolves to a byte with type `uint8_t`. ReadByte()214 auto ReadByte() { 215 return Map(ReadSlice(1), 216 [](absl::StatusOr<Slice> slice) -> absl::StatusOr<uint8_t> { 217 if (!slice.ok()) return slice.status(); 218 return (*slice)[0]; 219 }); 220 } 221 222 // Enables RPC receive coalescing and alignment of memory holding received 223 // RPCs. EnforceRxMemoryAlignmentAndCoalescing()224 void EnforceRxMemoryAlignmentAndCoalescing() { 225 auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension< 226 grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get()); 227 if (chaotic_good_ext != nullptr) { 228 chaotic_good_ext->EnforceRxMemoryAlignment(); 229 chaotic_good_ext->EnableRpcReceiveCoalescing(); 230 if (read_state_->buffer.Length() == 0) { 231 return; 232 } 233 234 // Copy everything from read_state_->buffer into a single slice and 235 // replace the contents of read_state_->buffer with that slice. 236 grpc_slice slice = grpc_slice_malloc_large(read_state_->buffer.Length()); 237 GPR_ASSERT( 238 reinterpret_cast<uintptr_t>(GRPC_SLICE_START_PTR(slice)) % 64 == 0); 239 size_t ofs = 0; 240 for (size_t i = 0; i < read_state_->buffer.Count(); i++) { 241 memcpy( 242 GRPC_SLICE_START_PTR(slice) + ofs, 243 GRPC_SLICE_START_PTR( 244 read_state_->buffer.c_slice_buffer()->slices[i]), 245 GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i])); 246 ofs += 247 GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]); 248 } 249 250 read_state_->buffer.Clear(); 251 read_state_->buffer.AppendIndexed( 252 grpc_event_engine::experimental::Slice(slice)); 253 GPR_DEBUG_ASSERT(read_state_->buffer.Length() == ofs); 254 } 255 } 256 257 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& 258 GetPeerAddress() const; 259 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& 260 GetLocalAddress() const; 261 262 private: 263 std::shared_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> 264 endpoint_; 265 266 struct ReadState : public RefCounted<ReadState> { 267 std::atomic<bool> complete{false}; 268 // Read buffer used for storing successful reads given by 269 // `EventEngine::Endpoint` but not yet requested by the caller. 270 grpc_event_engine::experimental::SliceBuffer buffer; 271 // Buffer used to accept data from `EventEngine::Endpoint`. 272 // Every time after a successful read from `EventEngine::Endpoint`, the data 273 // in this buffer should be appended to `buffer`. 274 grpc_event_engine::experimental::SliceBuffer pending_buffer; 275 // Used for store the result from `EventEngine::Endpoint::Read()`. 276 absl::Status result; 277 Waker waker; 278 // Backing endpoint: we keep this on ReadState as reads will need to 279 // repeatedly read until the target size is hit, and we don't want to access 280 // the main object during this dance (indeed the main object may be 281 // deleted). 282 std::weak_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> 283 endpoint; 284 285 void Complete(absl::Status status, size_t num_bytes_requested); 286 }; 287 288 struct WriteState : public RefCounted<WriteState> { 289 enum State : uint8_t { 290 kIdle, // Not writing. 291 kWriting, // Write started, but not completed. 292 kWritten, // Write completed. 293 }; 294 295 std::atomic<State> state{kIdle}; 296 // Write buffer used for `EventEngine::Endpoint::Write()` to ensure the 297 // memory behind the buffer is not lost. 298 grpc_event_engine::experimental::SliceBuffer buffer; 299 // Used for store the result from `EventEngine::Endpoint::Write()`. 300 absl::Status result; 301 Waker waker; 302 CompleteWriteState303 void Complete(absl::Status status) { 304 result = std::move(status); 305 auto w = std::move(waker); 306 auto prev = state.exchange(kWritten, std::memory_order_release); 307 // Previous state should be Writing. If we got anything else we've entered 308 // the callback path twice. 309 GPR_ASSERT(prev == kWriting); 310 w.Wakeup(); 311 } 312 }; 313 314 RefCountedPtr<WriteState> write_state_ = MakeRefCounted<WriteState>(); 315 RefCountedPtr<ReadState> read_state_ = MakeRefCounted<ReadState>(); 316 }; 317 318 } // namespace grpc_core 319 320 #endif // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H 321