1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/ext/transport/binder/wire_format/wire_writer.h"
18 
19 #ifndef GRPC_NO_BINDER
20 
21 #include <utility>
22 
23 #include "absl/cleanup/cleanup.h"
24 #include "absl/types/variant.h"
25 
26 #include <grpc/support/log.h>
27 
28 #include "src/core/lib/event_engine/default_event_engine.h"
29 #include "src/core/lib/gprpp/crash.h"
30 
31 #define RETURN_IF_ERROR(expr)           \
32   do {                                  \
33     const absl::Status status = (expr); \
34     if (!status.ok()) return status;    \
35   } while (0)
36 
37 namespace grpc_binder {
38 
CanBeSentInOneTransaction(const Transaction & tx)39 bool CanBeSentInOneTransaction(const Transaction& tx) {
40   return (tx.GetFlags() & kFlagMessageData) == 0 ||
41          static_cast<int64_t>(tx.GetMessageData().size()) <=
42              WireWriterImpl::kBlockSize;
43 }
44 
45 // Simply forward the call to `WireWriterImpl::RunScheduledTx`.
RunScheduledTx(void * arg,grpc_error_handle)46 void RunScheduledTx(void* arg, grpc_error_handle /*error*/) {
47   auto* run_scheduled_tx_args =
48       static_cast<WireWriterImpl::RunScheduledTxArgs*>(arg);
49   run_scheduled_tx_args->writer->RunScheduledTxInternal(run_scheduled_tx_args);
50 }
51 
WriteInitialMetadata(const Transaction & tx,WritableParcel * parcel)52 absl::Status WriteInitialMetadata(const Transaction& tx,
53                                   WritableParcel* parcel) {
54   if (tx.IsClient()) {
55     // Only client sends method ref.
56     RETURN_IF_ERROR(parcel->WriteString(tx.GetMethodRef()));
57   }
58   RETURN_IF_ERROR(parcel->WriteInt32(tx.GetPrefixMetadata().size()));
59   for (const auto& md : tx.GetPrefixMetadata()) {
60     RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.first));
61     RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.second));
62   }
63   return absl::OkStatus();
64 }
65 
WriteTrailingMetadata(const Transaction & tx,WritableParcel * parcel)66 absl::Status WriteTrailingMetadata(const Transaction& tx,
67                                    WritableParcel* parcel) {
68   if (tx.IsServer()) {
69     if (tx.GetFlags() & kFlagStatusDescription) {
70       RETURN_IF_ERROR(parcel->WriteString(tx.GetStatusDesc()));
71     }
72     RETURN_IF_ERROR(parcel->WriteInt32(tx.GetSuffixMetadata().size()));
73     for (const auto& md : tx.GetSuffixMetadata()) {
74       RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.first));
75       RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(md.second));
76     }
77   } else {
78     // client suffix currently is always empty according to the wireformat
79     if (!tx.GetSuffixMetadata().empty()) {
80       gpr_log(GPR_ERROR, "Got non-empty suffix metadata from client.");
81     }
82   }
83   return absl::OkStatus();
84 }
85 
WireWriterImpl(std::unique_ptr<Binder> binder)86 WireWriterImpl::WireWriterImpl(std::unique_ptr<Binder> binder)
87     : binder_(std::move(binder)), combiner_(grpc_combiner_create()) {}
88 
~WireWriterImpl()89 WireWriterImpl::~WireWriterImpl() {
90   GRPC_COMBINER_UNREF(combiner_, "wire_writer_impl");
91   while (!pending_outgoing_tx_.empty()) {
92     delete pending_outgoing_tx_.front();
93     pending_outgoing_tx_.pop();
94   }
95 }
96 
97 // Flow control constant are specified at
98 // https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#flow-control
99 const int64_t WireWriterImpl::kBlockSize = 16 * 1024;
100 const int64_t WireWriterImpl::kFlowControlWindowSize = 128 * 1024;
101 
MakeBinderTransaction(BinderTransportTxCode tx_code,std::function<absl::Status (WritableParcel *)> fill_parcel)102 absl::Status WireWriterImpl::MakeBinderTransaction(
103     BinderTransportTxCode tx_code,
104     std::function<absl::Status(WritableParcel*)> fill_parcel) {
105   grpc_core::MutexLock lock(&write_mu_);
106   RETURN_IF_ERROR(binder_->PrepareTransaction());
107   WritableParcel* parcel = binder_->GetWritableParcel();
108   RETURN_IF_ERROR(fill_parcel(parcel));
109   // Only stream transaction is accounted in flow control spec.
110   if (static_cast<int32_t>(tx_code) >= kFirstCallId) {
111     int64_t parcel_size = parcel->GetDataSize();
112     if (parcel_size > 2 * kBlockSize) {
113       gpr_log(GPR_ERROR,
114               "Unexpected large transaction (possibly caused by a very large "
115               "metadata). This might overflow the binder "
116               "transaction buffer. Size: %" PRId64 " bytes",
117               parcel_size);
118     }
119     num_outgoing_bytes_ += parcel_size;
120     gpr_log(GPR_INFO, "Total outgoing bytes: %" PRId64,
121             num_outgoing_bytes_.load());
122   }
123   GPR_ASSERT(!is_transacting_);
124   is_transacting_ = true;
125   absl::Status result = binder_->Transact(tx_code);
126   is_transacting_ = false;
127   return result;
128 }
129 
RpcCallFastPath(std::unique_ptr<Transaction> tx)130 absl::Status WireWriterImpl::RpcCallFastPath(std::unique_ptr<Transaction> tx) {
131   return MakeBinderTransaction(
132       static_cast<BinderTransportTxCode>(tx->GetTxCode()),
133       [this, tx = tx.get()](
134           WritableParcel* parcel) ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_) {
135         RETURN_IF_ERROR(parcel->WriteInt32(tx->GetFlags()));
136         RETURN_IF_ERROR(parcel->WriteInt32(next_seq_num_[tx->GetTxCode()]++));
137         if (tx->GetFlags() & kFlagPrefix) {
138           RETURN_IF_ERROR(WriteInitialMetadata(*tx, parcel));
139         }
140         if (tx->GetFlags() & kFlagMessageData) {
141           RETURN_IF_ERROR(
142               parcel->WriteByteArrayWithLength(tx->GetMessageData()));
143         }
144         if (tx->GetFlags() & kFlagSuffix) {
145           RETURN_IF_ERROR(WriteTrailingMetadata(*tx, parcel));
146         }
147         return absl::OkStatus();
148       });
149 }
150 
RunStreamTx(RunScheduledTxArgs::StreamTx * stream_tx,WritableParcel * parcel,bool * is_last_chunk)151 absl::Status WireWriterImpl::RunStreamTx(
152     RunScheduledTxArgs::StreamTx* stream_tx, WritableParcel* parcel,
153     bool* is_last_chunk) {
154   Transaction* tx = stream_tx->tx.get();
155   // Transaction without data flag should go to fast path.
156   GPR_ASSERT(tx->GetFlags() & kFlagMessageData);
157 
158   absl::string_view data = tx->GetMessageData();
159   GPR_ASSERT(stream_tx->bytes_sent <= static_cast<int64_t>(data.size()));
160 
161   int flags = kFlagMessageData;
162 
163   if (stream_tx->bytes_sent == 0) {
164     // This is the first transaction. Include initial
165     // metadata if there's any.
166     if (tx->GetFlags() & kFlagPrefix) {
167       flags |= kFlagPrefix;
168     }
169   }
170   // There is also prefix/suffix in transaction beside the transaction data so
171   // actual transaction size will be greater than `kBlockSize`. This is
172   // unavoidable because we cannot split the prefix metadata and trailing
173   // metadata into different binder transactions. In most cases this is fine
174   // because single transaction size is not required to be strictly lower than
175   // `kBlockSize`, as long as it won't overflow Android's binder buffer.
176   int64_t size = std::min<int64_t>(WireWriterImpl::kBlockSize,
177                                    data.size() - stream_tx->bytes_sent);
178   if (stream_tx->bytes_sent + WireWriterImpl::kBlockSize >=
179       static_cast<int64_t>(data.size())) {
180     // This is the last transaction. Include trailing
181     // metadata if there's any.
182     if (tx->GetFlags() & kFlagSuffix) {
183       flags |= kFlagSuffix;
184     }
185     size = data.size() - stream_tx->bytes_sent;
186     *is_last_chunk = true;
187   } else {
188     // There are more messages to send.
189     flags |= kFlagMessageDataIsPartial;
190     *is_last_chunk = false;
191   }
192   RETURN_IF_ERROR(parcel->WriteInt32(flags));
193   RETURN_IF_ERROR(parcel->WriteInt32(next_seq_num_[tx->GetTxCode()]++));
194   if (flags & kFlagPrefix) {
195     RETURN_IF_ERROR(WriteInitialMetadata(*tx, parcel));
196   }
197   RETURN_IF_ERROR(parcel->WriteByteArrayWithLength(
198       data.substr(stream_tx->bytes_sent, size)));
199   if (flags & kFlagSuffix) {
200     RETURN_IF_ERROR(WriteTrailingMetadata(*tx, parcel));
201   }
202   stream_tx->bytes_sent += size;
203   return absl::OkStatus();
204 }
205 
RunScheduledTxInternal(RunScheduledTxArgs * args)206 void WireWriterImpl::RunScheduledTxInternal(RunScheduledTxArgs* args) {
207   GPR_ASSERT(args->writer == this);
208   if (absl::holds_alternative<RunScheduledTxArgs::AckTx>(args->tx)) {
209     int64_t num_bytes =
210         absl::get<RunScheduledTxArgs::AckTx>(args->tx).num_bytes;
211     absl::Status result =
212         MakeBinderTransaction(BinderTransportTxCode::ACKNOWLEDGE_BYTES,
213                               [num_bytes](WritableParcel* parcel) {
214                                 RETURN_IF_ERROR(parcel->WriteInt64(num_bytes));
215                                 return absl::OkStatus();
216                               });
217     if (!result.ok()) {
218       gpr_log(GPR_ERROR, "Failed to make binder transaction %s",
219               result.ToString().c_str());
220     }
221     delete args;
222     return;
223   }
224   GPR_ASSERT(absl::holds_alternative<RunScheduledTxArgs::StreamTx>(args->tx));
225   RunScheduledTxArgs::StreamTx* stream_tx =
226       &absl::get<RunScheduledTxArgs::StreamTx>(args->tx);
227   // Be reservative. Decrease CombinerTxCount after the data size of this
228   // transaction has already been added to `num_outgoing_bytes_`, to make sure
229   // we never underestimate `num_outgoing_bytes_`.
230   auto decrease_combiner_tx_count = absl::MakeCleanup([this]() {
231     {
232       grpc_core::MutexLock lock(&flow_control_mu_);
233       GPR_ASSERT(num_non_acked_tx_in_combiner_ > 0);
234       num_non_acked_tx_in_combiner_--;
235     }
236     // New transaction might be ready to be scheduled.
237     TryScheduleTransaction();
238   });
239   if (CanBeSentInOneTransaction(*stream_tx->tx.get())) {  // NOLINT
240     absl::Status result = RpcCallFastPath(std::move(stream_tx->tx));
241     if (!result.ok()) {
242       gpr_log(GPR_ERROR, "Failed to handle non-chunked RPC call %s",
243               result.ToString().c_str());
244     }
245     delete args;
246     return;
247   }
248   bool is_last_chunk = true;
249   absl::Status result = MakeBinderTransaction(
250       static_cast<BinderTransportTxCode>(stream_tx->tx->GetTxCode()),
251       [stream_tx, &is_last_chunk, this](WritableParcel* parcel)
252           ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_) {
253             return RunStreamTx(stream_tx, parcel, &is_last_chunk);
254           });
255   if (!result.ok()) {
256     gpr_log(GPR_ERROR, "Failed to make binder transaction %s",
257             result.ToString().c_str());
258   }
259   if (!is_last_chunk) {
260     {
261       grpc_core::MutexLock lock(&flow_control_mu_);
262       pending_outgoing_tx_.push(args);
263     }
264     TryScheduleTransaction();
265   } else {
266     delete args;
267   }
268 }
269 
RpcCall(std::unique_ptr<Transaction> tx)270 absl::Status WireWriterImpl::RpcCall(std::unique_ptr<Transaction> tx) {
271   // TODO(mingcl): check tx_code <= last call id
272   GPR_ASSERT(tx->GetTxCode() >= kFirstCallId);
273   auto args = new RunScheduledTxArgs();
274   args->writer = this;
275   args->tx = RunScheduledTxArgs::StreamTx();
276   absl::get<RunScheduledTxArgs::StreamTx>(args->tx).tx = std::move(tx);
277   absl::get<RunScheduledTxArgs::StreamTx>(args->tx).bytes_sent = 0;
278   {
279     grpc_core::MutexLock lock(&flow_control_mu_);
280     pending_outgoing_tx_.push(args);
281   }
282   TryScheduleTransaction();
283   return absl::OkStatus();
284 }
285 
SendAck(int64_t num_bytes)286 absl::Status WireWriterImpl::SendAck(int64_t num_bytes) {
287   // Ensure combiner will be run if this is not called from top-level gRPC API
288   // entrypoint.
289   grpc_core::ExecCtx exec_ctx;
290   gpr_log(GPR_INFO, "Ack %" PRId64 " bytes received", num_bytes);
291   if (is_transacting_) {
292     // This can happen because NDK might call our registered callback function
293     // in the same thread while we are telling it to send a transaction
294     // `is_transacting_` will be true. `Binder::Transact` is now being called on
295     // the same thread or the other thread. We are currently in the call stack
296     // of other transaction, Liveness of ACK is still guaranteed even if this is
297     // a race with another thread.
298     gpr_log(
299         GPR_INFO,
300         "Scheduling ACK transaction instead of directly execute it to avoid "
301         "deadlock.");
302     auto args = new RunScheduledTxArgs();
303     args->writer = this;
304     args->tx = RunScheduledTxArgs::AckTx();
305     absl::get<RunScheduledTxArgs::AckTx>(args->tx).num_bytes = num_bytes;
306     auto cl = GRPC_CLOSURE_CREATE(RunScheduledTx, args, nullptr);
307     combiner_->Run(cl, absl::OkStatus());
308     return absl::OkStatus();
309   }
310   // Otherwise, we can directly send ack.
311   absl::Status result =
312       MakeBinderTransaction((BinderTransportTxCode::ACKNOWLEDGE_BYTES),
313                             [num_bytes](WritableParcel* parcel) {
314                               RETURN_IF_ERROR(parcel->WriteInt64(num_bytes));
315                               return absl::OkStatus();
316                             });
317   if (!result.ok()) {
318     gpr_log(GPR_ERROR, "Failed to make binder transaction %s",
319             result.ToString().c_str());
320   }
321   return result;
322 }
323 
OnAckReceived(int64_t num_bytes)324 void WireWriterImpl::OnAckReceived(int64_t num_bytes) {
325   // Ensure combiner will be run if this is not called from top-level gRPC API
326   // entrypoint.
327   grpc_core::ExecCtx exec_ctx;
328   gpr_log(GPR_INFO, "OnAckReceived %" PRId64, num_bytes);
329   // Do not try to obtain `write_mu_` in this function. NDKBinder might invoke
330   // the callback to notify us about new incoming binder transaction when we are
331   // sending transaction. i.e. `write_mu_` might have already been acquired by
332   // this thread.
333   {
334     grpc_core::MutexLock lock(&flow_control_mu_);
335     num_acknowledged_bytes_ = std::max(num_acknowledged_bytes_, num_bytes);
336     int64_t num_outgoing_bytes = num_outgoing_bytes_;
337     if (num_acknowledged_bytes_ > num_outgoing_bytes) {
338       gpr_log(GPR_ERROR,
339               "The other end of transport acked more bytes than we ever sent, "
340               "%" PRId64 " > %" PRId64,
341               num_acknowledged_bytes_, num_outgoing_bytes);
342     }
343   }
344   TryScheduleTransaction();
345 }
346 
TryScheduleTransaction()347 void WireWriterImpl::TryScheduleTransaction() {
348   while (true) {
349     grpc_core::MutexLock lock(&flow_control_mu_);
350     if (pending_outgoing_tx_.empty()) {
351       // Nothing to be schduled.
352       break;
353     }
354     // Number of bytes we have scheduled in combiner but have not yet be
355     // executed by combiner. Here we make an assumption that every binder
356     // transaction will take `kBlockSize`. This should be close to truth when a
357     // large message is being cut to `kBlockSize` chunks.
358     int64_t num_bytes_scheduled_in_combiner =
359         num_non_acked_tx_in_combiner_ * kBlockSize;
360     // An estimation of number of bytes of traffic we will eventually send to
361     // the other end, assuming all tasks in combiner will be executed and we
362     // receive no new ACK from the other end of transport.
363     int64_t num_total_bytes_will_be_sent =
364         num_outgoing_bytes_ + num_bytes_scheduled_in_combiner;
365     // An estimation of number of bytes of traffic that will not be
366     // acknowledged, assuming all tasks in combiner will be executed and we
367     // receive no new ack message fomr the other end of transport.
368     int64_t num_non_acked_bytes_estimation =
369         num_total_bytes_will_be_sent - num_acknowledged_bytes_;
370     if (num_non_acked_bytes_estimation < 0) {
371       gpr_log(
372           GPR_ERROR,
373           "Something went wrong. `num_non_acked_bytes_estimation` should be "
374           "non-negative but it is %" PRId64,
375           num_non_acked_bytes_estimation);
376     }
377     // If we can schedule another transaction (which has size estimation of
378     // `kBlockSize`) without exceeding `kFlowControlWindowSize`, schedule it.
379     if ((num_non_acked_bytes_estimation + kBlockSize <
380          kFlowControlWindowSize)) {
381       num_non_acked_tx_in_combiner_++;
382       combiner_->Run(GRPC_CLOSURE_CREATE(RunScheduledTx,
383                                          pending_outgoing_tx_.front(), nullptr),
384                      absl::OkStatus());
385       pending_outgoing_tx_.pop();
386     } else {
387       // It is common to fill `kFlowControlWindowSize` completely because
388       // transactions are send at faster rate than the other end of transport
389       // can handle it, so here we use `GPR_DEBUG` log level.
390       gpr_log(GPR_DEBUG,
391               "Some work cannot be scheduled yet due to slow ack from the "
392               "other end of transport. This transport might be blocked if this "
393               "number don't go down. pending_outgoing_tx_.size() = %zu "
394               "pending_outgoing_tx_.front() = %p",
395               pending_outgoing_tx_.size(), pending_outgoing_tx_.front());
396       break;
397     }
398   }
399 }
400 
401 }  // namespace grpc_binder
402 
403 #endif
404