xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/transport/promise_endpoint.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
16 #define GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stddef.h>
21 #include <stdint.h>
22 
23 #include <atomic>
24 #include <cstring>
25 #include <functional>
26 #include <memory>
27 #include <utility>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/types/optional.h"
33 
34 #include <grpc/event_engine/event_engine.h>
35 #include <grpc/event_engine/slice.h>
36 #include <grpc/event_engine/slice_buffer.h>
37 #include <grpc/slice_buffer.h>
38 #include <grpc/support/log.h>
39 
40 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h"
41 #include "src/core/lib/event_engine/query_extensions.h"
42 #include "src/core/lib/gprpp/sync.h"
43 #include "src/core/lib/iomgr/exec_ctx.h"
44 #include "src/core/lib/promise/activity.h"
45 #include "src/core/lib/promise/cancel_callback.h"
46 #include "src/core/lib/promise/if.h"
47 #include "src/core/lib/promise/map.h"
48 #include "src/core/lib/promise/poll.h"
49 #include "src/core/lib/slice/slice.h"
50 #include "src/core/lib/slice/slice_buffer.h"
51 
52 namespace grpc_core {
53 
54 // Wrapper around event engine endpoint that provides a promise like API.
55 class PromiseEndpoint {
56  public:
57   PromiseEndpoint(
58       std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
59           endpoint,
60       SliceBuffer already_received);
61   PromiseEndpoint() = default;
62   ~PromiseEndpoint() = default;
63   /// Prevent copying of PromiseEndpoint; moving is fine.
64   PromiseEndpoint(const PromiseEndpoint&) = delete;
65   PromiseEndpoint& operator=(const PromiseEndpoint&) = delete;
66   PromiseEndpoint(PromiseEndpoint&&) = default;
67   PromiseEndpoint& operator=(PromiseEndpoint&&) = default;
68 
69   // Returns a promise that resolves to a `absl::Status` indicating the result
70   // of the write operation.
71   //
72   // Concurrent writes are not supported, which means callers should not call
73   // `Write()` before the previous write finishes. Doing that results in
74   // undefined behavior.
Write(SliceBuffer data)75   auto Write(SliceBuffer data) {
76     // Start write and assert previous write finishes.
77     auto prev = write_state_->state.exchange(WriteState::kWriting,
78                                              std::memory_order_relaxed);
79     GPR_ASSERT(prev == WriteState::kIdle);
80     bool completed;
81     if (data.Length() == 0) {
82       completed = true;
83     } else {
84       // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
85       // available.
86       grpc_slice_buffer_swap(write_state_->buffer.c_slice_buffer(),
87                              data.c_slice_buffer());
88       // If `Write()` returns true immediately, the callback will not be called.
89       // We still need to call our callback to pick up the result.
90       write_state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
91       completed = endpoint_->Write(
92           [write_state = write_state_](absl::Status status) {
93             ApplicationCallbackExecCtx callback_exec_ctx;
94             ExecCtx exec_ctx;
95             write_state->Complete(std::move(status));
96           },
97           &write_state_->buffer, nullptr /* uses default arguments */);
98       if (completed) write_state_->waker = Waker();
99     }
100     return If(
101         completed,
102         [this]() {
103           return [write_state = write_state_]() {
104             auto prev = write_state->state.exchange(WriteState::kIdle,
105                                                     std::memory_order_relaxed);
106             GPR_ASSERT(prev == WriteState::kWriting);
107             return absl::OkStatus();
108           };
109         },
110         [this]() {
111           return [write_state = write_state_]() -> Poll<absl::Status> {
112             // If current write isn't finished return `Pending()`, else
113             // return write result.
114             WriteState::State expected = WriteState::kWritten;
115             if (write_state->state.compare_exchange_strong(
116                     expected, WriteState::kIdle, std::memory_order_acquire,
117                     std::memory_order_relaxed)) {
118               // State was Written, and we changed it to Idle. We can return
119               // the result.
120               return std::move(write_state->result);
121             }
122             // State was not Written; since we're polling it must be
123             // Writing. Assert that and return Pending.
124             GPR_ASSERT(expected == WriteState::kWriting);
125             return Pending();
126           };
127         });
128   }
129 
130   // Returns a promise that resolves to `SliceBuffer` with
131   // `num_bytes` bytes.
132   //
133   // Concurrent reads are not supported, which means callers should not call
134   // `Read()` before the previous read finishes. Doing that results in
135   // undefined behavior.
Read(size_t num_bytes)136   auto Read(size_t num_bytes) {
137     // Assert previous read finishes.
138     GPR_ASSERT(!read_state_->complete.load(std::memory_order_relaxed));
139     // Should not have pending reads.
140     GPR_ASSERT(read_state_->pending_buffer.Count() == 0u);
141     bool complete = true;
142     while (read_state_->buffer.Length() < num_bytes) {
143       // Set read args with hinted bytes.
144       grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
145           read_args = {
146               static_cast<int64_t>(num_bytes - read_state_->buffer.Length())};
147       // If `Read()` returns true immediately, the callback will not be
148       // called.
149       read_state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
150       if (endpoint_->Read(
151               [read_state = read_state_, num_bytes](absl::Status status) {
152                 ApplicationCallbackExecCtx callback_exec_ctx;
153                 ExecCtx exec_ctx;
154                 read_state->Complete(std::move(status), num_bytes);
155               },
156               &read_state_->pending_buffer, &read_args)) {
157         read_state_->waker = Waker();
158         read_state_->pending_buffer.MoveFirstNBytesIntoSliceBuffer(
159             read_state_->pending_buffer.Length(), read_state_->buffer);
160         GPR_DEBUG_ASSERT(read_state_->pending_buffer.Count() == 0u);
161       } else {
162         complete = false;
163         break;
164       }
165     }
166     return If(
167         complete,
168         [this, num_bytes]() {
169           SliceBuffer ret;
170           grpc_slice_buffer_move_first_no_inline(
171               read_state_->buffer.c_slice_buffer(), num_bytes,
172               ret.c_slice_buffer());
173           return [ret = std::move(
174                       ret)]() mutable -> Poll<absl::StatusOr<SliceBuffer>> {
175             return std::move(ret);
176           };
177         },
178         [this, num_bytes]() {
179           return [read_state = read_state_,
180                   num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> {
181             if (!read_state->complete.load(std::memory_order_acquire)) {
182               return Pending();
183             }
184             // If read succeeds, return `SliceBuffer` with `num_bytes` bytes.
185             if (read_state->result.ok()) {
186               SliceBuffer ret;
187               grpc_slice_buffer_move_first_no_inline(
188                   read_state->buffer.c_slice_buffer(), num_bytes,
189                   ret.c_slice_buffer());
190               read_state->complete.store(false, std::memory_order_relaxed);
191               return std::move(ret);
192             }
193             read_state->complete.store(false, std::memory_order_relaxed);
194             return std::move(read_state->result);
195           };
196         });
197   }
198 
199   // Returns a promise that resolves to `Slice` with at least
200   // `num_bytes` bytes which should be less than INT64_MAX bytes.
201   //
202   // Concurrent reads are not supported, which means callers should not call
203   // `ReadSlice()` before the previous read finishes. Doing that results in
204   // undefined behavior.
ReadSlice(size_t num_bytes)205   auto ReadSlice(size_t num_bytes) {
206     return Map(Read(num_bytes),
207                [](absl::StatusOr<SliceBuffer> buffer) -> absl::StatusOr<Slice> {
208                  if (!buffer.ok()) return buffer.status();
209                  return buffer->JoinIntoSlice();
210                });
211   }
212 
213   // Returns a promise that resolves to a byte with type `uint8_t`.
ReadByte()214   auto ReadByte() {
215     return Map(ReadSlice(1),
216                [](absl::StatusOr<Slice> slice) -> absl::StatusOr<uint8_t> {
217                  if (!slice.ok()) return slice.status();
218                  return (*slice)[0];
219                });
220   }
221 
222   // Enables RPC receive coalescing and alignment of memory holding received
223   // RPCs.
EnforceRxMemoryAlignmentAndCoalescing()224   void EnforceRxMemoryAlignmentAndCoalescing() {
225     auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension<
226         grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get());
227     if (chaotic_good_ext != nullptr) {
228       chaotic_good_ext->EnforceRxMemoryAlignment();
229       chaotic_good_ext->EnableRpcReceiveCoalescing();
230       if (read_state_->buffer.Length() == 0) {
231         return;
232       }
233 
234       // Copy everything from read_state_->buffer into a single slice and
235       // replace the contents of read_state_->buffer with that slice.
236       grpc_slice slice = grpc_slice_malloc_large(read_state_->buffer.Length());
237       GPR_ASSERT(
238           reinterpret_cast<uintptr_t>(GRPC_SLICE_START_PTR(slice)) % 64 == 0);
239       size_t ofs = 0;
240       for (size_t i = 0; i < read_state_->buffer.Count(); i++) {
241         memcpy(
242             GRPC_SLICE_START_PTR(slice) + ofs,
243             GRPC_SLICE_START_PTR(
244                 read_state_->buffer.c_slice_buffer()->slices[i]),
245             GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]));
246         ofs +=
247             GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]);
248       }
249 
250       read_state_->buffer.Clear();
251       read_state_->buffer.AppendIndexed(
252           grpc_event_engine::experimental::Slice(slice));
253       GPR_DEBUG_ASSERT(read_state_->buffer.Length() == ofs);
254     }
255   }
256 
257   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
258   GetPeerAddress() const;
259   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
260   GetLocalAddress() const;
261 
262  private:
263   std::shared_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
264       endpoint_;
265 
266   struct ReadState : public RefCounted<ReadState> {
267     std::atomic<bool> complete{false};
268     // Read buffer used for storing successful reads given by
269     // `EventEngine::Endpoint` but not yet requested by the caller.
270     grpc_event_engine::experimental::SliceBuffer buffer;
271     // Buffer used to accept data from `EventEngine::Endpoint`.
272     // Every time after a successful read from `EventEngine::Endpoint`, the data
273     // in this buffer should be appended to `buffer`.
274     grpc_event_engine::experimental::SliceBuffer pending_buffer;
275     // Used for store the result from `EventEngine::Endpoint::Read()`.
276     absl::Status result;
277     Waker waker;
278     // Backing endpoint: we keep this on ReadState as reads will need to
279     // repeatedly read until the target size is hit, and we don't want to access
280     // the main object during this dance (indeed the main object may be
281     // deleted).
282     std::weak_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
283         endpoint;
284 
285     void Complete(absl::Status status, size_t num_bytes_requested);
286   };
287 
288   struct WriteState : public RefCounted<WriteState> {
289     enum State : uint8_t {
290       kIdle,     // Not writing.
291       kWriting,  // Write started, but not completed.
292       kWritten,  // Write completed.
293     };
294 
295     std::atomic<State> state{kIdle};
296     // Write buffer used for `EventEngine::Endpoint::Write()` to ensure the
297     // memory behind the buffer is not lost.
298     grpc_event_engine::experimental::SliceBuffer buffer;
299     // Used for store the result from `EventEngine::Endpoint::Write()`.
300     absl::Status result;
301     Waker waker;
302 
CompleteWriteState303     void Complete(absl::Status status) {
304       result = std::move(status);
305       auto w = std::move(waker);
306       auto prev = state.exchange(kWritten, std::memory_order_release);
307       // Previous state should be Writing. If we got anything else we've entered
308       // the callback path twice.
309       GPR_ASSERT(prev == kWriting);
310       w.Wakeup();
311     }
312   };
313 
314   RefCountedPtr<WriteState> write_state_ = MakeRefCounted<WriteState>();
315   RefCountedPtr<ReadState> read_state_ = MakeRefCounted<ReadState>();
316 };
317 
318 }  // namespace grpc_core
319 
320 #endif  // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
321