1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
16 #define GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <queue>
21 #include <string>
22 #include <vector>
23 
24 #include "absl/container/flat_hash_map.h"
25 
26 #include "src/core/ext/transport/binder/wire_format/binder.h"
27 #include "src/core/ext/transport/binder/wire_format/transaction.h"
28 #include "src/core/lib/gprpp/sync.h"
29 #include "src/core/lib/iomgr/combiner.h"
30 
31 namespace grpc_binder {
32 
33 // Member functions are thread safe.
34 class WireWriter {
35  public:
36   virtual ~WireWriter() = default;
37   virtual absl::Status RpcCall(std::unique_ptr<Transaction> tx) = 0;
38   virtual absl::Status SendAck(int64_t num_bytes) = 0;
39   virtual void OnAckReceived(int64_t num_bytes) = 0;
40 };
41 
42 class WireWriterImpl : public WireWriter {
43  public:
44   explicit WireWriterImpl(std::unique_ptr<Binder> binder);
45   ~WireWriterImpl() override;
46   absl::Status RpcCall(std::unique_ptr<Transaction> tx) override;
47   absl::Status SendAck(int64_t num_bytes) override;
48   void OnAckReceived(int64_t num_bytes) override;
49 
50   // Required to be public because we would like to call this in combiner (which
51   // cannot invoke class member function). `RunScheduledTxArgs` and
52   // `RunScheduledTxInternal` should not be used by user directly.
53   struct RunScheduledTxArgs {
54     WireWriterImpl* writer;
55     struct StreamTx {
56       std::unique_ptr<Transaction> tx;
57       // How many data in transaction's `data` field has been sent.
58       int64_t bytes_sent = 0;
59     };
60     struct AckTx {
61       int64_t num_bytes;
62     };
63     absl::variant<AckTx, StreamTx> tx;
64   };
65 
66   void RunScheduledTxInternal(RunScheduledTxArgs* arg);
67 
68   // Split long message into chunks of size 16k. This doesn't necessarily have
69   // to be the same as the flow control acknowledgement size, but it should not
70   // exceed 128k.
71   static const int64_t kBlockSize;
72 
73   // Flow control allows sending at most 128k between acknowledgements.
74   static const int64_t kFlowControlWindowSize;
75 
76  private:
77   // Fast path: send data in one transaction.
78   absl::Status RpcCallFastPath(std::unique_ptr<Transaction> tx);
79 
80   // This function will acquire `write_mu_` to make sure the binder is not used
81   // concurrently, so this can be called by different threads safely.
82   absl::Status MakeBinderTransaction(
83       BinderTransportTxCode tx_code,
84       std::function<absl::Status(WritableParcel*)> fill_parcel);
85 
86   // Send a stream to `binder_`. Set `is_last_chunk` to `true` if the stream
87   // transaction has been sent completely. Otherwise set to `false`.
88   absl::Status RunStreamTx(RunScheduledTxArgs::StreamTx* stream_tx,
89                            WritableParcel* parcel, bool* is_last_chunk)
90       ABSL_EXCLUSIVE_LOCKS_REQUIRED(write_mu_);
91 
92   // Schdule `RunScheduledTxArgs*` in `pending_outgoing_tx_` to `combiner_`, as
93   // many as possible (under the constraint of `kFlowControlWindowSize`).
94   void TryScheduleTransaction();
95 
96   // Guards variables related to transport state.
97   grpc_core::Mutex write_mu_;
98   std::unique_ptr<Binder> binder_ ABSL_GUARDED_BY(write_mu_);
99 
100   // Maps the transaction code (which identifies streams) to their next
101   // available sequence number. See
102   // https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md#sequence-number
103   absl::flat_hash_map<int, int> next_seq_num_ ABSL_GUARDED_BY(write_mu_);
104 
105   // Number of bytes we have already sent in stream transactions.
106   std::atomic<int64_t> num_outgoing_bytes_{0};
107 
108   // Guards variables related to flow control logic.
109   grpc_core::Mutex flow_control_mu_;
110   int64_t num_acknowledged_bytes_ ABSL_GUARDED_BY(flow_control_mu_) = 0;
111 
112   // The queue takes ownership of the pointer.
113   std::queue<RunScheduledTxArgs*> pending_outgoing_tx_
114       ABSL_GUARDED_BY(flow_control_mu_);
115   int num_non_acked_tx_in_combiner_ ABSL_GUARDED_BY(flow_control_mu_) = 0;
116 
117   // Helper variable for determining if we are currently calling into
118   // `Binder::Transact`. Useful for avoiding the attempt of acquiring
119   // `write_mu_` multiple times on the same thread.
120   std::atomic_bool is_transacting_{false};
121 
122   grpc_core::Combiner* combiner_;
123 };
124 
125 }  // namespace grpc_binder
126 
127 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_WIRE_FORMAT_WIRE_WRITER_H
128