1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H 16 #define GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <queue> 21 #include <string> 22 #include <vector> 23 24 #include "absl/container/flat_hash_map.h" 25 26 #include "src/core/ext/transport/binder/wire_format/binder.h" 27 #include "src/core/ext/transport/binder/wire_format/transaction.h" 28 #include "src/core/lib/gprpp/sync.h" 29 #include "src/core/lib/iomgr/combiner.h" 30 31 namespace grpc_binder { 32 33 // Member functions are thread safe. 34 class WireWriter { 35 public: 36 virtual ~WireWriter() = default; 37 virtual absl::Status RpcCall(std::unique_ptr<Transaction> tx) = 0; 38 virtual absl::Status SendAck(int64_t num_bytes) = 0; 39 virtual void OnAckReceived(int64_t num_bytes) = 0; 40 }; 41 42 class WireWriterImpl : public WireWriter { 43 public: 44 explicit WireWriterImpl(std::unique_ptr<Binder> binder); 45 ~WireWriterImpl() override; 46 absl::Status RpcCall(std::unique_ptr<Transaction> tx) override; 47 absl::Status SendAck(int64_t num_bytes) override; 48 void OnAckReceived(int64_t num_bytes) override; 49 50 // Required to be public because we would like to call this in combiner (which 51 // cannot invoke class member function). `RunScheduledTxArgs` and 52 // `RunScheduledTxInternal` should not be used by user directly. 53 struct RunScheduledTxArgs { 54 WireWriterImpl* writer; 55 struct StreamTx { 56 std::unique_ptr<Transaction> tx; 57 // How many data in transaction's `data` field has been sent. 58 int64_t bytes_sent = 0; 59 }; 60 struct AckTx { 61 int64_t num_bytes; 62 }; 63 absl::variant<AckTx, StreamTx> tx; 64 }; 65 66 void RunScheduledTxInternal(RunScheduledTxArgs* arg); 67 68 // Split long message into chunks of size 16k. This doesn't necessarily have 69 // to be the same as the flow control acknowledgement size, but it should not 70 // exceed 128k. 71 static const int64_t kBlockSize; 72 73 // Flow control allows sending at most 128k between acknowledgements. 74 static const int64_t kFlowControlWindowSize; 75 76 private: 77 // Fast path: send data in one transaction. 78 absl::Status RpcCallFastPath(std::unique_ptr<Transaction> tx); 79 80 // This function will acquire `write_mu_` to make sure the binder is not used 81 // concurrently, so this can be called by different threads safely. 82 absl::Status MakeBinderTransaction( 83 BinderTransportTxCode tx_code, 84 std::function<absl::Status(WritableParcel*)> fill_parcel); 85 86 // Send a stream to `binder_`. Set `is_last_chunk` to `true` if the stream 87 // transaction has been sent completely. Otherwise set to `false`. 88 absl::Status RunStreamTx(RunScheduledTxArgs::StreamTx* stream_tx, 89 WritableParcel* parcel, bool* is_last_chunk) 90 ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_); 91 92 // Schdule `RunScheduledTxArgs*` in `pending_outgoing_tx_` to `combiner_`, as 93 // many as possible (under the constraint of `kFlowControlWindowSize`). 94 void TryScheduleTransaction(); 95 96 // Guards variables related to transport state. 97 grpc_core::Mutex write_mu_; 98 std::unique_ptr<Binder> binder_ ABSL_GUARDED_BY(write_mu_); 99 100 // Maps the transaction code (which identifies streams) to their next 101 // available sequence number. See 102 // https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#sequence-number 103 absl::flat_hash_map<int, int> next_seq_num_ ABSL_GUARDED_BY(write_mu_); 104 105 // Number of bytes we have already sent in stream transactions. 106 std::atomic<int64_t> num_outgoing_bytes_{0}; 107 108 // Guards variables related to flow control logic. 109 grpc_core::Mutex flow_control_mu_; 110 int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(flow_control_mu_) = 0; 111 112 // The queue takes ownership of the pointer. 113 std::queue<RunScheduledTxArgs*> pending_outgoing_tx_ 114 ABSL_GUARDED_BY(flow_control_mu_); 115 int num_non_acked_tx_in_combiner_ ABSL_GUARDED_BY(flow_control_mu_) = 0; 116 117 // Helper variable for determining if we are currently calling into 118 // `Binder::Transact`. Useful for avoiding the attempt of acquiring 119 // `write_mu_` multiple times on the same thread. 120 std::atomic_bool is_transacting_{false}; 121 122 grpc_core::Combiner* combiner_; 123 }; 124 125 } // namespace grpc_binder 126 127 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H 128