1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H 16 #define GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <functional> 21 #include <map> 22 #include <queue> 23 #include <set> 24 #include <string> 25 #include <vector> 26 27 #include "src/core/ext/transport/binder/utils/transport_stream_receiver.h" 28 #include "src/core/lib/gprpp/sync.h" 29 30 namespace grpc_binder { 31 32 // Routes the data received from transport to corresponding streams 33 class TransportStreamReceiverImpl : public TransportStreamReceiver { 34 public: 35 explicit TransportStreamReceiverImpl( 36 bool is_client, std::function<void()> accept_stream_callback = nullptr) is_client_(is_client)37 : is_client_(is_client), 38 accept_stream_callback_(accept_stream_callback) {} 39 void RegisterRecvInitialMetadata(StreamIdentifier id, 40 InitialMetadataCallbackType cb) override; 41 void RegisterRecvMessage(StreamIdentifier id, 42 MessageDataCallbackType cb) override; 43 void RegisterRecvTrailingMetadata(StreamIdentifier id, 44 TrailingMetadataCallbackType cb) override; 45 void NotifyRecvInitialMetadata( 46 StreamIdentifier id, absl::StatusOr<Metadata> initial_metadata) override; 47 void NotifyRecvMessage(StreamIdentifier id, 48 absl::StatusOr<std::string> message) override; 49 void NotifyRecvTrailingMetadata(StreamIdentifier id, 50 absl::StatusOr<Metadata> trailing_metadata, 51 int status) override; 52 53 void CancelStream(StreamIdentifier id) override; 54 55 private: 56 // Trailing metadata marks the end of one-side of the stream. Thus, after 57 // receiving trailing metadata from the other-end, we know that there will 58 // never be in-coming message data anymore, and all recv_message callbacks 59 // (as well as recv_initial_metadata callback, if there's any) registered will 60 // never be satisfied. This function cancels all such callbacks gracefully 61 // (with absl::OkStatus()) to avoid being blocked waiting for them. 62 void OnRecvTrailingMetadata(StreamIdentifier id); 63 64 void CancelInitialMetadataCallback(StreamIdentifier id, absl::Status error); 65 void CancelMessageCallback(StreamIdentifier id, absl::Status error); 66 void CancelTrailingMetadataCallback(StreamIdentifier id, absl::Status error); 67 68 std::map<StreamIdentifier, InitialMetadataCallbackType> initial_metadata_cbs_; 69 std::map<StreamIdentifier, MessageDataCallbackType> message_cbs_; 70 std::map<StreamIdentifier, TrailingMetadataCallbackType> 71 trailing_metadata_cbs_; 72 // TODO(waynetu): Better thread safety design. For example, use separate 73 // mutexes for different type of messages. 74 grpc_core::Mutex m_; 75 // TODO(waynetu): gRPC surface layer will not wait for the current message to 76 // be delivered before sending the next message. The following implementation 77 // is still buggy with the current implementation of wire writer if 78 // transaction issued first completes after the one issued later does. This is 79 // because we just take the first element out of the queue and assume it's the 80 // one issued first without further checking, which results in callbacks being 81 // invoked with incorrect data. 82 // 83 // This should be fixed in the wire writer level and make sure out-of-order 84 // messages will be re-ordered by it. In such case, the queueing approach will 85 // work fine. Refer to the TODO in WireWriterImpl::ProcessTransaction() at 86 // wire_reader_impl.cc for detecting and resolving out-of-order transactions. 87 // 88 // TODO(waynetu): Use absl::flat_hash_map. 89 std::map<StreamIdentifier, std::queue<absl::StatusOr<Metadata>>> 90 pending_initial_metadata_ ABSL_GUARDED_BY(m_); 91 std::map<StreamIdentifier, std::queue<absl::StatusOr<std::string>>> 92 pending_message_ ABSL_GUARDED_BY(m_); 93 std::map<StreamIdentifier, 94 std::queue<std::pair<absl::StatusOr<Metadata>, int>>> 95 pending_trailing_metadata_ ABSL_GUARDED_BY(m_); 96 // Record whether or not the recv_message callbacks of a given stream is 97 // cancelled. Although we explicitly cancel the registered recv_message() in 98 // CancelRecvMessageCallbacksDueToTrailingMetadata(), there are chances that 99 // the registration comes "after" we receive trailing metadata. Therefore, 100 // when RegisterRecvMessage() gets called, we should check whether 101 // recv_message_cancelled_ contains the corresponding stream ID, and if so, 102 // directly cancel the callback gracefully without pending it. 103 std::set<StreamIdentifier> trailing_metadata_recvd_ ABSL_GUARDED_BY(m_); 104 105 bool is_client_; 106 // Called when receiving initial metadata to inform the server about a new 107 // stream. 108 std::function<void()> accept_stream_callback_; 109 }; 110 } // namespace grpc_binder 111 112 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_BINDER_UTILS_TRANSPORT_STREAM_RECEIVER_IMPL_H 113