1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
16 #define GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <functional>
21 #include <map>
22 #include <queue>
23 #include <set>
24 #include <string>
25 #include <vector>
26 
27 #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h"
28 #include "src/core/lib/gprpp/sync.h"
29 
30 namespace grpc_binder {
31 
32 // Routes the data received from transport to corresponding streams
33 class TransportStreamReceiverImpl : public TransportStreamReceiver {
34  public:
35   explicit TransportStreamReceiverImpl(
36       bool is_client, std::function<void()> accept_stream_callback = nullptr)
is_client_(is_client)37       : is_client_(is_client),
38         accept_stream_callback_(accept_stream_callback) {}
39   void RegisterRecvInitialMetadata(StreamIdentifier id,
40                                    InitialMetadataCallbackType cb) override;
41   void RegisterRecvMessage(StreamIdentifier id,
42                            MessageDataCallbackType cb) override;
43   void RegisterRecvTrailingMetadata(StreamIdentifier id,
44                                     TrailingMetadataCallbackType cb) override;
45   void NotifyRecvInitialMetadata(
46       StreamIdentifier id, absl::StatusOr<Metadata> initial_metadata) override;
47   void NotifyRecvMessage(StreamIdentifier id,
48                          absl::StatusOr<std::string> message) override;
49   void NotifyRecvTrailingMetadata(StreamIdentifier id,
50                                   absl::StatusOr<Metadata> trailing_metadata,
51                                   int status) override;
52 
53   void CancelStream(StreamIdentifier id) override;
54 
55  private:
56   // Trailing metadata marks the end of one-side of the stream. Thus, after
57   // receiving trailing metadata from the other-end, we know that there will
58   // never be in-coming message data anymore, and all recv_message callbacks
59   // (as well as recv_initial_metadata callback, if there's any) registered will
60   // never be satisfied. This function cancels all such callbacks gracefully
61   // (with absl::OkStatus()) to avoid being blocked waiting for them.
62   void OnRecvTrailingMetadata(StreamIdentifier id);
63 
64   void CancelInitialMetadataCallback(StreamIdentifier id, absl::Status error);
65   void CancelMessageCallback(StreamIdentifier id, absl::Status error);
66   void CancelTrailingMetadataCallback(StreamIdentifier id, absl::Status error);
67 
68   std::map<StreamIdentifier, InitialMetadataCallbackType> initial_metadata_cbs_;
69   std::map<StreamIdentifier, MessageDataCallbackType> message_cbs_;
70   std::map<StreamIdentifier, TrailingMetadataCallbackType>
71       trailing_metadata_cbs_;
72   // TODO(waynetu): Better thread safety design. For example, use separate
73   // mutexes for different type of messages.
74   grpc_core::Mutex m_;
75   // TODO(waynetu): gRPC surface layer will not wait for the current message to
76   // be delivered before sending the next message. The following implementation
77   // is still buggy with the current implementation of wire writer if
78   // transaction issued first completes after the one issued later does. This is
79   // because we just take the first element out of the queue and assume it's the
80   // one issued first without further checking, which results in callbacks being
81   // invoked with incorrect data.
82   //
83   // This should be fixed in the wire writer level and make sure out-of-order
84   // messages will be re-ordered by it. In such case, the queueing approach will
85   // work fine. Refer to the TODO in WireWriterImpl::ProcessTransaction() at
86   // wire_reader_impl.cc for detecting and resolving out-of-order transactions.
87   //
88   // TODO(waynetu): Use absl::flat_hash_map.
89   std::map<StreamIdentifier, std::queue<absl::StatusOr<Metadata>>>
90       pending_initial_metadata_ ABSL_GUARDED_BY(m_);
91   std::map<StreamIdentifier, std::queue<absl::StatusOr<std::string>>>
92       pending_message_ ABSL_GUARDED_BY(m_);
93   std::map<StreamIdentifier,
94            std::queue<std::pair<absl::StatusOr<Metadata>, int>>>
95       pending_trailing_metadata_ ABSL_GUARDED_BY(m_);
96   // Record whether or not the recv_message callbacks of a given stream is
97   // cancelled. Although we explicitly cancel the registered recv_message() in
98   // CancelRecvMessageCallbacksDueToTrailingMetadata(), there are chances that
99   // the registration comes "after" we receive trailing metadata. Therefore,
100   // when RegisterRecvMessage() gets called, we should check whether
101   // recv_message_cancelled_ contains the corresponding stream ID, and if so,
102   // directly cancel the callback gracefully without pending it.
103   std::set<StreamIdentifier> trailing_metadata_recvd_ ABSL_GUARDED_BY(m_);
104 
105   bool is_client_;
106   // Called when receiving initial metadata to inform the server about a new
107   // stream.
108   std::function<void()> accept_stream_callback_;
109 };
110 }  // namespace grpc_binder
111 
112 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H
113