1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/ext/transport/binder/wire_format/wire_writer.h"
18
19 #ifndef GRPC_NO_BINDER
20
21 #include <utility>
22
23 #include "absl/cleanup/cleanup.h"
24 #include "absl/types/variant.h"
25
26 #include <grpc/support/log.h>
27
28 #include "src/core/lib/event_engine/default_event_engine.h"
29 #include "src/core/lib/gprpp/crash.h"
30
31 #define RETURN_IF_ERROR(expr) \
32 do { \
33 const absl::Status status = (expr); \
34 if (!status.ok()) return status; \
35 } while (0)
36
37 namespace grpc_binder {
38
CanBeSentInOneTransaction(const Transaction & tx)39 bool CanBeSentInOneTransaction(const Transaction& tx) {
40 return (tx.GetFlags() & kFlagMessageData) == 0 ||
41 static_cast<int64_t>(tx.GetMessageData().size()) <=
42 WireWriterImpl::kBlockSize;
43 }
44
45 // Simply forward the call to `WireWriterImpl::RunScheduledTx`.
RunScheduledTx(void * arg,grpc_error_handle)46 void RunScheduledTx(void* arg, grpc_error_handle /*error*/) {
47 auto* run_scheduled_tx_args =
48 static_cast<WireWriterImpl::RunScheduledTxArgs*>(arg);
49 run_scheduled_tx_args->writer->RunScheduledTxInternal(run_scheduled_tx_args);
50 }
51
WriteInitialMetadata(const Transaction & tx,WritableParcel * parcel)52 absl::Status WriteInitialMetadata(const Transaction& tx,
53 WritableParcel* parcel) {
54 if (tx.IsClient()) {
55 // Only client sends method ref.
56 RETURN_IF_ERROR(parcel->WriteString(tx.GetMethodRef()));
57 }
58 RETURN_IF_ERROR(parcel->WriteInt32(tx.GetPrefixMetadata().size()));
59 for (const auto& md : tx.GetPrefixMetadata()) {
60 RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.first));
61 RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.second));
62 }
63 return absl::OkStatus();
64 }
65
WriteTrailingMetadata(const Transaction & tx,WritableParcel * parcel)66 absl::Status WriteTrailingMetadata(const Transaction& tx,
67 WritableParcel* parcel) {
68 if (tx.IsServer()) {
69 if (tx.GetFlags() & kFlagStatusDescription) {
70 RETURN_IF_ERROR(parcel->WriteString(tx.GetStatusDesc()));
71 }
72 RETURN_IF_ERROR(parcel->WriteInt32(tx.GetSuffixMetadata().size()));
73 for (const auto& md : tx.GetSuffixMetadata()) {
74 RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.first));
75 RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.second));
76 }
77 } else {
78 // client suffix currently is always empty according to the wireformat
79 if (!tx.GetSuffixMetadata().empty()) {
80 gpr_log(GPR_ERROR, "Got non-empty suffix metadata from client.");
81 }
82 }
83 return absl::OkStatus();
84 }
85
WireWriterImpl(std::unique_ptr<Binder> binder)86 WireWriterImpl::WireWriterImpl(std::unique_ptr<Binder> binder)
87 : binder_(std::move(binder)), combiner_(grpc_combiner_create()) {}
88
~WireWriterImpl()89 WireWriterImpl::~WireWriterImpl() {
90 GRPC_COMBINER_UNREF(combiner_, "wire_writer_impl");
91 while (!pending_outgoing_tx_.empty()) {
92 delete pending_outgoing_tx_.front();
93 pending_outgoing_tx_.pop();
94 }
95 }
96
97 // Flow control constant are specified at
98 // https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#flow-control
99 const int64_t WireWriterImpl::kBlockSize = 16 * 1024;
100 const int64_t WireWriterImpl::kFlowControlWindowSize = 128 * 1024;
101
MakeBinderTransaction(BinderTransportTxCode tx_code,std::function<absl::Status (WritableParcel *)> fill_parcel)102 absl::Status WireWriterImpl::MakeBinderTransaction(
103 BinderTransportTxCode tx_code,
104 std::function<absl::Status(WritableParcel*)> fill_parcel) {
105 grpc_core::MutexLock lock(&write_mu_);
106 RETURN_IF_ERROR(binder_->PrepareTransaction());
107 WritableParcel* parcel = binder_->GetWritableParcel();
108 RETURN_IF_ERROR(fill_parcel(parcel));
109 // Only stream transaction is accounted in flow control spec.
110 if (static_cast<int32_t>(tx_code) >= kFirstCallId) {
111 int64_t parcel_size = parcel->GetDataSize();
112 if (parcel_size > 2 * kBlockSize) {
113 gpr_log(GPR_ERROR,
114 "Unexpected large transaction (possibly caused by a very large "
115 "metadata). This might overflow the binder "
116 "transaction buffer. Size: %" PRId64 " bytes",
117 parcel_size);
118 }
119 num_outgoing_bytes_ += parcel_size;
120 gpr_log(GPR_INFO, "Total outgoing bytes: %" PRId64,
121 num_outgoing_bytes_.load());
122 }
123 GPR_ASSERT(!is_transacting_);
124 is_transacting_ = true;
125 absl::Status result = binder_->Transact(tx_code);
126 is_transacting_ = false;
127 return result;
128 }
129
RpcCallFastPath(std::unique_ptr<Transaction> tx)130 absl::Status WireWriterImpl::RpcCallFastPath(std::unique_ptr<Transaction> tx) {
131 return MakeBinderTransaction(
132 static_cast<BinderTransportTxCode>(tx->GetTxCode()),
133 [this, tx = tx.get()](
134 WritableParcel* parcel) ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_) {
135 RETURN_IF_ERROR(parcel->WriteInt32(tx->GetFlags()));
136 RETURN_IF_ERROR(parcel->WriteInt32(next_seq_num_[tx->GetTxCode()]++));
137 if (tx->GetFlags() & kFlagPrefix) {
138 RETURN_IF_ERROR(WriteInitialMetadata(*tx, parcel));
139 }
140 if (tx->GetFlags() & kFlagMessageData) {
141 RETURN_IF_ERROR(
142 parcel->WriteByteArrayWithLength(tx->GetMessageData()));
143 }
144 if (tx->GetFlags() & kFlagSuffix) {
145 RETURN_IF_ERROR(WriteTrailingMetadata(*tx, parcel));
146 }
147 return absl::OkStatus();
148 });
149 }
150
RunStreamTx(RunScheduledTxArgs::StreamTx * stream_tx,WritableParcel * parcel,bool * is_last_chunk)151 absl::Status WireWriterImpl::RunStreamTx(
152 RunScheduledTxArgs::StreamTx* stream_tx, WritableParcel* parcel,
153 bool* is_last_chunk) {
154 Transaction* tx = stream_tx->tx.get();
155 // Transaction without data flag should go to fast path.
156 GPR_ASSERT(tx->GetFlags() & kFlagMessageData);
157
158 absl::string_view data = tx->GetMessageData();
159 GPR_ASSERT(stream_tx->bytes_sent <= static_cast<int64_t>(data.size()));
160
161 int flags = kFlagMessageData;
162
163 if (stream_tx->bytes_sent == 0) {
164 // This is the first transaction. Include initial
165 // metadata if there's any.
166 if (tx->GetFlags() & kFlagPrefix) {
167 flags |= kFlagPrefix;
168 }
169 }
170 // There is also prefix/suffix in transaction beside the transaction data so
171 // actual transaction size will be greater than `kBlockSize`. This is
172 // unavoidable because we cannot split the prefix metadata and trailing
173 // metadata into different binder transactions. In most cases this is fine
174 // because single transaction size is not required to be strictly lower than
175 // `kBlockSize`, as long as it won't overflow Android's binder buffer.
176 int64_t size = std::min<int64_t>(WireWriterImpl::kBlockSize,
177 data.size() - stream_tx->bytes_sent);
178 if (stream_tx->bytes_sent + WireWriterImpl::kBlockSize >=
179 static_cast<int64_t>(data.size())) {
180 // This is the last transaction. Include trailing
181 // metadata if there's any.
182 if (tx->GetFlags() & kFlagSuffix) {
183 flags |= kFlagSuffix;
184 }
185 size = data.size() - stream_tx->bytes_sent;
186 *is_last_chunk = true;
187 } else {
188 // There are more messages to send.
189 flags |= kFlagMessageDataIsPartial;
190 *is_last_chunk = false;
191 }
192 RETURN_IF_ERROR(parcel->WriteInt32(flags));
193 RETURN_IF_ERROR(parcel->WriteInt32(next_seq_num_[tx->GetTxCode()]++));
194 if (flags & kFlagPrefix) {
195 RETURN_IF_ERROR(WriteInitialMetadata(*tx, parcel));
196 }
197 RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(
198 data.substr(stream_tx->bytes_sent, size)));
199 if (flags & kFlagSuffix) {
200 RETURN_IF_ERROR(WriteTrailingMetadata(*tx, parcel));
201 }
202 stream_tx->bytes_sent += size;
203 return absl::OkStatus();
204 }
205
RunScheduledTxInternal(RunScheduledTxArgs * args)206 void WireWriterImpl::RunScheduledTxInternal(RunScheduledTxArgs* args) {
207 GPR_ASSERT(args->writer == this);
208 if (absl::holds_alternative<RunScheduledTxArgs::AckTx>(args->tx)) {
209 int64_t num_bytes =
210 absl::get<RunScheduledTxArgs::AckTx>(args->tx).num_bytes;
211 absl::Status result =
212 MakeBinderTransaction(BinderTransportTxCode::ACKNOWLEDGE_BYTES,
213 [num_bytes](WritableParcel* parcel) {
214 RETURN_IF_ERROR(parcel->WriteInt64(num_bytes));
215 return absl::OkStatus();
216 });
217 if (!result.ok()) {
218 gpr_log(GPR_ERROR, "Failed to make binder transaction %s",
219 result.ToString().c_str());
220 }
221 delete args;
222 return;
223 }
224 GPR_ASSERT(absl::holds_alternative<RunScheduledTxArgs::StreamTx>(args->tx));
225 RunScheduledTxArgs::StreamTx* stream_tx =
226 &absl::get<RunScheduledTxArgs::StreamTx>(args->tx);
227 // Be reservative. Decrease CombinerTxCount after the data size of this
228 // transaction has already been added to `num_outgoing_bytes_`, to make sure
229 // we never underestimate `num_outgoing_bytes_`.
230 auto decrease_combiner_tx_count = absl::MakeCleanup([this]() {
231 {
232 grpc_core::MutexLock lock(&flow_control_mu_);
233 GPR_ASSERT(num_non_acked_tx_in_combiner_ > 0);
234 num_non_acked_tx_in_combiner_--;
235 }
236 // New transaction might be ready to be scheduled.
237 TryScheduleTransaction();
238 });
239 if (CanBeSentInOneTransaction(*stream_tx->tx.get())) { // NOLINT
240 absl::Status result = RpcCallFastPath(std::move(stream_tx->tx));
241 if (!result.ok()) {
242 gpr_log(GPR_ERROR, "Failed to handle non-chunked RPC call %s",
243 result.ToString().c_str());
244 }
245 delete args;
246 return;
247 }
248 bool is_last_chunk = true;
249 absl::Status result = MakeBinderTransaction(
250 static_cast<BinderTransportTxCode>(stream_tx->tx->GetTxCode()),
251 [stream_tx, &is_last_chunk, this](WritableParcel* parcel)
252 ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_) {
253 return RunStreamTx(stream_tx, parcel, &is_last_chunk);
254 });
255 if (!result.ok()) {
256 gpr_log(GPR_ERROR, "Failed to make binder transaction %s",
257 result.ToString().c_str());
258 }
259 if (!is_last_chunk) {
260 {
261 grpc_core::MutexLock lock(&flow_control_mu_);
262 pending_outgoing_tx_.push(args);
263 }
264 TryScheduleTransaction();
265 } else {
266 delete args;
267 }
268 }
269
RpcCall(std::unique_ptr<Transaction> tx)270 absl::Status WireWriterImpl::RpcCall(std::unique_ptr<Transaction> tx) {
271 // TODO(mingcl): check tx_code <= last call id
272 GPR_ASSERT(tx->GetTxCode() >= kFirstCallId);
273 auto args = new RunScheduledTxArgs();
274 args->writer = this;
275 args->tx = RunScheduledTxArgs::StreamTx();
276 absl::get<RunScheduledTxArgs::StreamTx>(args->tx).tx = std::move(tx);
277 absl::get<RunScheduledTxArgs::StreamTx>(args->tx).bytes_sent = 0;
278 {
279 grpc_core::MutexLock lock(&flow_control_mu_);
280 pending_outgoing_tx_.push(args);
281 }
282 TryScheduleTransaction();
283 return absl::OkStatus();
284 }
285
SendAck(int64_t num_bytes)286 absl::Status WireWriterImpl::SendAck(int64_t num_bytes) {
287 // Ensure combiner will be run if this is not called from top-level gRPC API
288 // entrypoint.
289 grpc_core::ExecCtx exec_ctx;
290 gpr_log(GPR_INFO, "Ack %" PRId64 " bytes received", num_bytes);
291 if (is_transacting_) {
292 // This can happen because NDK might call our registered callback function
293 // in the same thread while we are telling it to send a transaction
294 // `is_transacting_` will be true. `Binder::Transact` is now being called on
295 // the same thread or the other thread. We are currently in the call stack
296 // of other transaction, Liveness of ACK is still guaranteed even if this is
297 // a race with another thread.
298 gpr_log(
299 GPR_INFO,
300 "Scheduling ACK transaction instead of directly execute it to avoid "
301 "deadlock.");
302 auto args = new RunScheduledTxArgs();
303 args->writer = this;
304 args->tx = RunScheduledTxArgs::AckTx();
305 absl::get<RunScheduledTxArgs::AckTx>(args->tx).num_bytes = num_bytes;
306 auto cl = GRPC_CLOSURE_CREATE(RunScheduledTx, args, nullptr);
307 combiner_->Run(cl, absl::OkStatus());
308 return absl::OkStatus();
309 }
310 // Otherwise, we can directly send ack.
311 absl::Status result =
312 MakeBinderTransaction((BinderTransportTxCode::ACKNOWLEDGE_BYTES),
313 [num_bytes](WritableParcel* parcel) {
314 RETURN_IF_ERROR(parcel->WriteInt64(num_bytes));
315 return absl::OkStatus();
316 });
317 if (!result.ok()) {
318 gpr_log(GPR_ERROR, "Failed to make binder transaction %s",
319 result.ToString().c_str());
320 }
321 return result;
322 }
323
OnAckReceived(int64_t num_bytes)324 void WireWriterImpl::OnAckReceived(int64_t num_bytes) {
325 // Ensure combiner will be run if this is not called from top-level gRPC API
326 // entrypoint.
327 grpc_core::ExecCtx exec_ctx;
328 gpr_log(GPR_INFO, "OnAckReceived %" PRId64, num_bytes);
329 // Do not try to obtain `write_mu_` in this function. NDKBinder might invoke
330 // the callback to notify us about new incoming binder transaction when we are
331 // sending transaction. i.e. `write_mu_` might have already been acquired by
332 // this thread.
333 {
334 grpc_core::MutexLock lock(&flow_control_mu_);
335 num_acknowledged_bytes_ = std::max(num_acknowledged_bytes_, num_bytes);
336 int64_t num_outgoing_bytes = num_outgoing_bytes_;
337 if (num_acknowledged_bytes_ > num_outgoing_bytes) {
338 gpr_log(GPR_ERROR,
339 "The other end of transport acked more bytes than we ever sent, "
340 "%" PRId64 " > %" PRId64,
341 num_acknowledged_bytes_, num_outgoing_bytes);
342 }
343 }
344 TryScheduleTransaction();
345 }
346
TryScheduleTransaction()347 void WireWriterImpl::TryScheduleTransaction() {
348 while (true) {
349 grpc_core::MutexLock lock(&flow_control_mu_);
350 if (pending_outgoing_tx_.empty()) {
351 // Nothing to be schduled.
352 break;
353 }
354 // Number of bytes we have scheduled in combiner but have not yet be
355 // executed by combiner. Here we make an assumption that every binder
356 // transaction will take `kBlockSize`. This should be close to truth when a
357 // large message is being cut to `kBlockSize` chunks.
358 int64_t num_bytes_scheduled_in_combiner =
359 num_non_acked_tx_in_combiner_ * kBlockSize;
360 // An estimation of number of bytes of traffic we will eventually send to
361 // the other end, assuming all tasks in combiner will be executed and we
362 // receive no new ACK from the other end of transport.
363 int64_t num_total_bytes_will_be_sent =
364 num_outgoing_bytes_ + num_bytes_scheduled_in_combiner;
365 // An estimation of number of bytes of traffic that will not be
366 // acknowledged, assuming all tasks in combiner will be executed and we
367 // receive no new ack message fomr the other end of transport.
368 int64_t num_non_acked_bytes_estimation =
369 num_total_bytes_will_be_sent - num_acknowledged_bytes_;
370 if (num_non_acked_bytes_estimation < 0) {
371 gpr_log(
372 GPR_ERROR,
373 "Something went wrong. `num_non_acked_bytes_estimation` should be "
374 "non-negative but it is %" PRId64,
375 num_non_acked_bytes_estimation);
376 }
377 // If we can schedule another transaction (which has size estimation of
378 // `kBlockSize`) without exceeding `kFlowControlWindowSize`, schedule it.
379 if ((num_non_acked_bytes_estimation + kBlockSize <
380 kFlowControlWindowSize)) {
381 num_non_acked_tx_in_combiner_++;
382 combiner_->Run(GRPC_CLOSURE_CREATE(RunScheduledTx,
383 pending_outgoing_tx_.front(), nullptr),
384 absl::OkStatus());
385 pending_outgoing_tx_.pop();
386 } else {
387 // It is common to fill `kFlowControlWindowSize` completely because
388 // transactions are send at faster rate than the other end of transport
389 // can handle it, so here we use `GPR_DEBUG` log level.
390 gpr_log(GPR_DEBUG,
391 "Some work cannot be scheduled yet due to slow ack from the "
392 "other end of transport. This transport might be blocked if this "
393 "number don't go down. pending_outgoing_tx_.size() = %zu "
394 "pending_outgoing_tx_.front() = %p",
395 pending_outgoing_tx_.size(), pending_outgoing_tx_.front());
396 break;
397 }
398 }
399 }
400
401 } // namespace grpc_binder
402
403 #endif
404