xref: /aosp_15_r20/external/cronet/ipc/ipc_message_pipe_reader.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1*6777b538SAndroid Build Coastguard Worker // Copyright 2014 The Chromium Authors
2*6777b538SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*6777b538SAndroid Build Coastguard Worker // found in the LICENSE file.
4*6777b538SAndroid Build Coastguard Worker 
5*6777b538SAndroid Build Coastguard Worker #include "ipc/ipc_message_pipe_reader.h"
6*6777b538SAndroid Build Coastguard Worker 
7*6777b538SAndroid Build Coastguard Worker #include <stdint.h>
8*6777b538SAndroid Build Coastguard Worker 
9*6777b538SAndroid Build Coastguard Worker #include <utility>
10*6777b538SAndroid Build Coastguard Worker 
11*6777b538SAndroid Build Coastguard Worker #include "base/containers/span.h"
12*6777b538SAndroid Build Coastguard Worker #include "base/functional/bind.h"
13*6777b538SAndroid Build Coastguard Worker #include "base/functional/callback_helpers.h"
14*6777b538SAndroid Build Coastguard Worker #include "base/location.h"
15*6777b538SAndroid Build Coastguard Worker #include "base/logging.h"
16*6777b538SAndroid Build Coastguard Worker #include "base/memory/raw_ref.h"
17*6777b538SAndroid Build Coastguard Worker #include "base/task/sequenced_task_runner.h"
18*6777b538SAndroid Build Coastguard Worker #include "base/task/single_thread_task_runner.h"
19*6777b538SAndroid Build Coastguard Worker #include "base/trace_event/trace_event.h"
20*6777b538SAndroid Build Coastguard Worker #include "ipc/ipc_channel_mojo.h"
21*6777b538SAndroid Build Coastguard Worker #include "mojo/public/cpp/bindings/message.h"
22*6777b538SAndroid Build Coastguard Worker #include "mojo/public/cpp/bindings/thread_safe_proxy.h"
23*6777b538SAndroid Build Coastguard Worker 
24*6777b538SAndroid Build Coastguard Worker namespace IPC {
25*6777b538SAndroid Build Coastguard Worker namespace internal {
26*6777b538SAndroid Build Coastguard Worker 
27*6777b538SAndroid Build Coastguard Worker namespace {
28*6777b538SAndroid Build Coastguard Worker 
29*6777b538SAndroid Build Coastguard Worker class ThreadSafeProxy : public mojo::ThreadSafeProxy {
30*6777b538SAndroid Build Coastguard Worker  public:
31*6777b538SAndroid Build Coastguard Worker   using Forwarder = base::RepeatingCallback<void(mojo::Message)>;
32*6777b538SAndroid Build Coastguard Worker 
ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,Forwarder forwarder,mojo::AssociatedGroupController & group_controller)33*6777b538SAndroid Build Coastguard Worker   ThreadSafeProxy(scoped_refptr<base::SequencedTaskRunner> task_runner,
34*6777b538SAndroid Build Coastguard Worker                   Forwarder forwarder,
35*6777b538SAndroid Build Coastguard Worker                   mojo::AssociatedGroupController& group_controller)
36*6777b538SAndroid Build Coastguard Worker       : task_runner_(std::move(task_runner)),
37*6777b538SAndroid Build Coastguard Worker         forwarder_(std::move(forwarder)),
38*6777b538SAndroid Build Coastguard Worker         group_controller_(group_controller) {}
39*6777b538SAndroid Build Coastguard Worker 
40*6777b538SAndroid Build Coastguard Worker   // mojo::ThreadSafeProxy:
SendMessage(mojo::Message & message)41*6777b538SAndroid Build Coastguard Worker   void SendMessage(mojo::Message& message) override {
42*6777b538SAndroid Build Coastguard Worker     message.SerializeHandles(&*group_controller_);
43*6777b538SAndroid Build Coastguard Worker     task_runner_->PostTask(FROM_HERE,
44*6777b538SAndroid Build Coastguard Worker                            base::BindOnce(forwarder_, std::move(message)));
45*6777b538SAndroid Build Coastguard Worker   }
46*6777b538SAndroid Build Coastguard Worker 
SendMessageWithResponder(mojo::Message & message,std::unique_ptr<mojo::MessageReceiver> responder)47*6777b538SAndroid Build Coastguard Worker   void SendMessageWithResponder(
48*6777b538SAndroid Build Coastguard Worker       mojo::Message& message,
49*6777b538SAndroid Build Coastguard Worker       std::unique_ptr<mojo::MessageReceiver> responder) override {
50*6777b538SAndroid Build Coastguard Worker     // We don't bother supporting this because it's not used in practice.
51*6777b538SAndroid Build Coastguard Worker     NOTREACHED();
52*6777b538SAndroid Build Coastguard Worker   }
53*6777b538SAndroid Build Coastguard Worker 
54*6777b538SAndroid Build Coastguard Worker  private:
55*6777b538SAndroid Build Coastguard Worker   ~ThreadSafeProxy() override = default;
56*6777b538SAndroid Build Coastguard Worker 
57*6777b538SAndroid Build Coastguard Worker   const scoped_refptr<base::SequencedTaskRunner> task_runner_;
58*6777b538SAndroid Build Coastguard Worker   const Forwarder forwarder_;
59*6777b538SAndroid Build Coastguard Worker   const raw_ref<mojo::AssociatedGroupController> group_controller_;
60*6777b538SAndroid Build Coastguard Worker };
61*6777b538SAndroid Build Coastguard Worker 
62*6777b538SAndroid Build Coastguard Worker }  // namespace
63*6777b538SAndroid Build Coastguard Worker 
MessagePipeReader(mojo::MessagePipeHandle pipe,mojo::PendingAssociatedRemote<mojom::Channel> sender,mojo::PendingAssociatedReceiver<mojom::Channel> receiver,scoped_refptr<base::SequencedTaskRunner> task_runner,MessagePipeReader::Delegate * delegate)64*6777b538SAndroid Build Coastguard Worker MessagePipeReader::MessagePipeReader(
65*6777b538SAndroid Build Coastguard Worker     mojo::MessagePipeHandle pipe,
66*6777b538SAndroid Build Coastguard Worker     mojo::PendingAssociatedRemote<mojom::Channel> sender,
67*6777b538SAndroid Build Coastguard Worker     mojo::PendingAssociatedReceiver<mojom::Channel> receiver,
68*6777b538SAndroid Build Coastguard Worker     scoped_refptr<base::SequencedTaskRunner> task_runner,
69*6777b538SAndroid Build Coastguard Worker     MessagePipeReader::Delegate* delegate)
70*6777b538SAndroid Build Coastguard Worker     : delegate_(delegate),
71*6777b538SAndroid Build Coastguard Worker       sender_(std::move(sender), task_runner),
72*6777b538SAndroid Build Coastguard Worker       receiver_(this, std::move(receiver), task_runner) {
73*6777b538SAndroid Build Coastguard Worker   thread_safe_sender_ =
74*6777b538SAndroid Build Coastguard Worker       std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
75*6777b538SAndroid Build Coastguard Worker           base::MakeRefCounted<ThreadSafeProxy>(
76*6777b538SAndroid Build Coastguard Worker               task_runner,
77*6777b538SAndroid Build Coastguard Worker               base::BindRepeating(&MessagePipeReader::ForwardMessage,
78*6777b538SAndroid Build Coastguard Worker                                   weak_ptr_factory_.GetWeakPtr()),
79*6777b538SAndroid Build Coastguard Worker               *sender_.internal_state()->associated_group()->GetController()));
80*6777b538SAndroid Build Coastguard Worker 
81*6777b538SAndroid Build Coastguard Worker   thread_checker_.DetachFromThread();
82*6777b538SAndroid Build Coastguard Worker }
83*6777b538SAndroid Build Coastguard Worker 
~MessagePipeReader()84*6777b538SAndroid Build Coastguard Worker MessagePipeReader::~MessagePipeReader() {
85*6777b538SAndroid Build Coastguard Worker   DCHECK(thread_checker_.CalledOnValidThread());
86*6777b538SAndroid Build Coastguard Worker   // The pipe should be closed before deletion.
87*6777b538SAndroid Build Coastguard Worker }
88*6777b538SAndroid Build Coastguard Worker 
FinishInitializationOnIOThread(base::ProcessId self_pid)89*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::FinishInitializationOnIOThread(
90*6777b538SAndroid Build Coastguard Worker     base::ProcessId self_pid) {
91*6777b538SAndroid Build Coastguard Worker   sender_.set_disconnect_handler(
92*6777b538SAndroid Build Coastguard Worker       base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
93*6777b538SAndroid Build Coastguard Worker                      MOJO_RESULT_FAILED_PRECONDITION));
94*6777b538SAndroid Build Coastguard Worker   receiver_.set_disconnect_handler(
95*6777b538SAndroid Build Coastguard Worker       base::BindOnce(&MessagePipeReader::OnPipeError, base::Unretained(this),
96*6777b538SAndroid Build Coastguard Worker                      MOJO_RESULT_FAILED_PRECONDITION));
97*6777b538SAndroid Build Coastguard Worker 
98*6777b538SAndroid Build Coastguard Worker   sender_->SetPeerPid(self_pid);
99*6777b538SAndroid Build Coastguard Worker }
100*6777b538SAndroid Build Coastguard Worker 
Close()101*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::Close() {
102*6777b538SAndroid Build Coastguard Worker   DCHECK(thread_checker_.CalledOnValidThread());
103*6777b538SAndroid Build Coastguard Worker   sender_.reset();
104*6777b538SAndroid Build Coastguard Worker   if (receiver_.is_bound())
105*6777b538SAndroid Build Coastguard Worker     receiver_.reset();
106*6777b538SAndroid Build Coastguard Worker }
107*6777b538SAndroid Build Coastguard Worker 
Send(std::unique_ptr<Message> message)108*6777b538SAndroid Build Coastguard Worker bool MessagePipeReader::Send(std::unique_ptr<Message> message) {
109*6777b538SAndroid Build Coastguard Worker   CHECK(message->IsValid());
110*6777b538SAndroid Build Coastguard Worker   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Send",
111*6777b538SAndroid Build Coastguard Worker                          message->flags(), TRACE_EVENT_FLAG_FLOW_OUT);
112*6777b538SAndroid Build Coastguard Worker   std::optional<std::vector<mojo::native::SerializedHandlePtr>> handles;
113*6777b538SAndroid Build Coastguard Worker   MojoResult result = MOJO_RESULT_OK;
114*6777b538SAndroid Build Coastguard Worker   result = ChannelMojo::ReadFromMessageAttachmentSet(message.get(), &handles);
115*6777b538SAndroid Build Coastguard Worker   if (result != MOJO_RESULT_OK)
116*6777b538SAndroid Build Coastguard Worker     return false;
117*6777b538SAndroid Build Coastguard Worker 
118*6777b538SAndroid Build Coastguard Worker   if (!sender_)
119*6777b538SAndroid Build Coastguard Worker     return false;
120*6777b538SAndroid Build Coastguard Worker 
121*6777b538SAndroid Build Coastguard Worker   base::span<const uint8_t> bytes(static_cast<const uint8_t*>(message->data()),
122*6777b538SAndroid Build Coastguard Worker                                   message->size());
123*6777b538SAndroid Build Coastguard Worker   sender_->Receive(MessageView(bytes, std::move(handles)));
124*6777b538SAndroid Build Coastguard Worker   DVLOG(4) << "Send " << message->type() << ": " << message->size();
125*6777b538SAndroid Build Coastguard Worker   return true;
126*6777b538SAndroid Build Coastguard Worker }
127*6777b538SAndroid Build Coastguard Worker 
GetRemoteInterface(mojo::GenericPendingAssociatedReceiver receiver)128*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::GetRemoteInterface(
129*6777b538SAndroid Build Coastguard Worker     mojo::GenericPendingAssociatedReceiver receiver) {
130*6777b538SAndroid Build Coastguard Worker   if (!sender_.is_bound())
131*6777b538SAndroid Build Coastguard Worker     return;
132*6777b538SAndroid Build Coastguard Worker   sender_->GetAssociatedInterface(std::move(receiver));
133*6777b538SAndroid Build Coastguard Worker }
134*6777b538SAndroid Build Coastguard Worker 
SetPeerPid(int32_t peer_pid)135*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::SetPeerPid(int32_t peer_pid) {
136*6777b538SAndroid Build Coastguard Worker   delegate_->OnPeerPidReceived(peer_pid);
137*6777b538SAndroid Build Coastguard Worker }
138*6777b538SAndroid Build Coastguard Worker 
Receive(MessageView message_view)139*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::Receive(MessageView message_view) {
140*6777b538SAndroid Build Coastguard Worker   if (message_view.bytes().empty()) {
141*6777b538SAndroid Build Coastguard Worker     delegate_->OnBrokenDataReceived();
142*6777b538SAndroid Build Coastguard Worker     return;
143*6777b538SAndroid Build Coastguard Worker   }
144*6777b538SAndroid Build Coastguard Worker   Message message(reinterpret_cast<const char*>(message_view.bytes().data()),
145*6777b538SAndroid Build Coastguard Worker                   message_view.bytes().size());
146*6777b538SAndroid Build Coastguard Worker   if (!message.IsValid()) {
147*6777b538SAndroid Build Coastguard Worker     delegate_->OnBrokenDataReceived();
148*6777b538SAndroid Build Coastguard Worker     return;
149*6777b538SAndroid Build Coastguard Worker   }
150*6777b538SAndroid Build Coastguard Worker 
151*6777b538SAndroid Build Coastguard Worker   DVLOG(4) << "Receive " << message.type() << ": " << message.size();
152*6777b538SAndroid Build Coastguard Worker   MojoResult write_result = ChannelMojo::WriteToMessageAttachmentSet(
153*6777b538SAndroid Build Coastguard Worker       message_view.TakeHandles(), &message);
154*6777b538SAndroid Build Coastguard Worker   if (write_result != MOJO_RESULT_OK) {
155*6777b538SAndroid Build Coastguard Worker     OnPipeError(write_result);
156*6777b538SAndroid Build Coastguard Worker     return;
157*6777b538SAndroid Build Coastguard Worker   }
158*6777b538SAndroid Build Coastguard Worker 
159*6777b538SAndroid Build Coastguard Worker   TRACE_EVENT_WITH_FLOW0("toplevel.flow", "MessagePipeReader::Receive",
160*6777b538SAndroid Build Coastguard Worker                          message.flags(), TRACE_EVENT_FLAG_FLOW_IN);
161*6777b538SAndroid Build Coastguard Worker   delegate_->OnMessageReceived(message);
162*6777b538SAndroid Build Coastguard Worker }
163*6777b538SAndroid Build Coastguard Worker 
GetAssociatedInterface(mojo::GenericPendingAssociatedReceiver receiver)164*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::GetAssociatedInterface(
165*6777b538SAndroid Build Coastguard Worker     mojo::GenericPendingAssociatedReceiver receiver) {
166*6777b538SAndroid Build Coastguard Worker   DCHECK(thread_checker_.CalledOnValidThread());
167*6777b538SAndroid Build Coastguard Worker   if (delegate_)
168*6777b538SAndroid Build Coastguard Worker     delegate_->OnAssociatedInterfaceRequest(std::move(receiver));
169*6777b538SAndroid Build Coastguard Worker }
170*6777b538SAndroid Build Coastguard Worker 
OnPipeError(MojoResult error)171*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::OnPipeError(MojoResult error) {
172*6777b538SAndroid Build Coastguard Worker   DCHECK(thread_checker_.CalledOnValidThread());
173*6777b538SAndroid Build Coastguard Worker 
174*6777b538SAndroid Build Coastguard Worker   Close();
175*6777b538SAndroid Build Coastguard Worker 
176*6777b538SAndroid Build Coastguard Worker   // NOTE: The delegate call below may delete |this|.
177*6777b538SAndroid Build Coastguard Worker   if (delegate_)
178*6777b538SAndroid Build Coastguard Worker     delegate_->OnPipeError();
179*6777b538SAndroid Build Coastguard Worker }
180*6777b538SAndroid Build Coastguard Worker 
ForwardMessage(mojo::Message message)181*6777b538SAndroid Build Coastguard Worker void MessagePipeReader::ForwardMessage(mojo::Message message) {
182*6777b538SAndroid Build Coastguard Worker   sender_.internal_state()->ForwardMessage(std::move(message));
183*6777b538SAndroid Build Coastguard Worker }
184*6777b538SAndroid Build Coastguard Worker 
185*6777b538SAndroid Build Coastguard Worker }  // namespace internal
186*6777b538SAndroid Build Coastguard Worker }  // namespace IPC
187