xref: /aosp_15_r20/external/cronet/ipc/ipc_channel_mojo.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_channel_mojo.h"
6 
7 #include <stddef.h>
8 #include <stdint.h>
9 
10 #include <memory>
11 #include <utility>
12 
13 #include "base/command_line.h"
14 #include "base/functional/bind.h"
15 #include "base/functional/callback_helpers.h"
16 #include "base/lazy_instance.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/raw_ref.h"
19 #include "base/process/process_handle.h"
20 #include "base/task/single_thread_task_runner.h"
21 #include "build/build_config.h"
22 #include "ipc/ipc_listener.h"
23 #include "ipc/ipc_logging.h"
24 #include "ipc/ipc_message_attachment_set.h"
25 #include "ipc/ipc_message_macros.h"
26 #include "ipc/ipc_mojo_bootstrap.h"
27 #include "ipc/ipc_mojo_handle_attachment.h"
28 #include "ipc/native_handle_type_converters.h"
29 #include "ipc/trace_ipc_message.h"
30 #include "mojo/public/cpp/bindings/associated_receiver.h"
31 #include "mojo/public/cpp/bindings/associated_remote.h"
32 #include "mojo/public/cpp/bindings/generic_pending_associated_receiver.h"
33 #include "mojo/public/cpp/bindings/pending_associated_receiver.h"
34 #include "mojo/public/cpp/bindings/thread_safe_proxy.h"
35 #include "mojo/public/cpp/system/platform_handle.h"
36 
37 namespace IPC {
38 
39 namespace {
40 
41 class MojoChannelFactory : public ChannelFactory {
42  public:
MojoChannelFactory(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)43   MojoChannelFactory(
44       mojo::ScopedMessagePipeHandle handle,
45       Channel::Mode mode,
46       const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
47       const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
48       : handle_(std::move(handle)),
49         mode_(mode),
50         ipc_task_runner_(ipc_task_runner),
51         proxy_task_runner_(proxy_task_runner) {}
52 
53   MojoChannelFactory(const MojoChannelFactory&) = delete;
54   MojoChannelFactory& operator=(const MojoChannelFactory&) = delete;
55 
BuildChannel(Listener * listener)56   std::unique_ptr<Channel> BuildChannel(Listener* listener) override {
57     return ChannelMojo::Create(std::move(handle_), mode_, listener,
58                                ipc_task_runner_, proxy_task_runner_);
59   }
60 
GetIPCTaskRunner()61   scoped_refptr<base::SingleThreadTaskRunner> GetIPCTaskRunner() override {
62     return ipc_task_runner_;
63   }
64 
65  private:
66   mojo::ScopedMessagePipeHandle handle_;
67   const Channel::Mode mode_;
68   scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
69   scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
70 };
71 
72 class ThreadSafeChannelProxy : public mojo::ThreadSafeProxy {
73  public:
74   using Forwarder = base::RepeatingCallback<void(mojo::Message)>;
75 
ThreadSafeChannelProxy(scoped_refptr<base::SingleThreadTaskRunner> task_runner,Forwarder forwarder,mojo::AssociatedGroupController & group_controller)76   ThreadSafeChannelProxy(
77       scoped_refptr<base::SingleThreadTaskRunner> task_runner,
78       Forwarder forwarder,
79       mojo::AssociatedGroupController& group_controller)
80       : task_runner_(std::move(task_runner)),
81         forwarder_(std::move(forwarder)),
82         group_controller_(group_controller) {}
83 
84   // mojo::ThreadSafeProxy:
SendMessage(mojo::Message & message)85   void SendMessage(mojo::Message& message) override {
86     message.SerializeHandles(&*group_controller_);
87     task_runner_->PostTask(FROM_HERE,
88                            base::BindOnce(forwarder_, std::move(message)));
89   }
90 
SendMessageWithResponder(mojo::Message & message,std::unique_ptr<mojo::MessageReceiver> responder)91   void SendMessageWithResponder(
92       mojo::Message& message,
93       std::unique_ptr<mojo::MessageReceiver> responder) override {
94     // We don't bother supporting this because it's not used in practice.
95     NOTREACHED();
96   }
97 
98  private:
99   ~ThreadSafeChannelProxy() override = default;
100 
101   const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
102   const Forwarder forwarder_;
103   const raw_ref<mojo::AssociatedGroupController, AcrossTasksDanglingUntriaged>
104       group_controller_;
105 };
106 
GetSelfPID()107 base::ProcessId GetSelfPID() {
108 #if BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS)
109   if (int global_pid = Channel::GetGlobalPid())
110     return global_pid;
111 #endif  // BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS)
112 #if BUILDFLAG(IS_NACL)
113   return -1;
114 #else
115   return base::GetCurrentProcId();
116 #endif  // BUILDFLAG(IS_NACL)
117 }
118 
119 }  // namespace
120 
121 //------------------------------------------------------------------------------
122 
123 // static
Create(mojo::ScopedMessagePipeHandle handle,Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)124 std::unique_ptr<ChannelMojo> ChannelMojo::Create(
125     mojo::ScopedMessagePipeHandle handle,
126     Mode mode,
127     Listener* listener,
128     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
129     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
130   return base::WrapUnique(new ChannelMojo(std::move(handle), mode, listener,
131                                           ipc_task_runner, proxy_task_runner));
132 }
133 
134 // static
CreateServerFactory(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)135 std::unique_ptr<ChannelFactory> ChannelMojo::CreateServerFactory(
136     mojo::ScopedMessagePipeHandle handle,
137     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
138     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
139   return std::make_unique<MojoChannelFactory>(
140       std::move(handle), Channel::MODE_SERVER, ipc_task_runner,
141       proxy_task_runner);
142 }
143 
144 // static
CreateClientFactory(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)145 std::unique_ptr<ChannelFactory> ChannelMojo::CreateClientFactory(
146     mojo::ScopedMessagePipeHandle handle,
147     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
148     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
149   return std::make_unique<MojoChannelFactory>(
150       std::move(handle), Channel::MODE_CLIENT, ipc_task_runner,
151       proxy_task_runner);
152 }
153 
ChannelMojo(mojo::ScopedMessagePipeHandle handle,Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)154 ChannelMojo::ChannelMojo(
155     mojo::ScopedMessagePipeHandle handle,
156     Mode mode,
157     Listener* listener,
158     const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
159     const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
160     : task_runner_(ipc_task_runner), pipe_(handle.get()), listener_(listener) {
161   weak_ptr_ = weak_factory_.GetWeakPtr();
162   bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, ipc_task_runner,
163                                      proxy_task_runner);
164 }
165 
ForwardMessage(mojo::Message message)166 void ChannelMojo::ForwardMessage(mojo::Message message) {
167   DCHECK(task_runner_->RunsTasksInCurrentSequence());
168   if (!message_reader_ || !message_reader_->sender().is_bound())
169     return;
170   message_reader_->sender().internal_state()->ForwardMessage(
171       std::move(message));
172 }
173 
~ChannelMojo()174 ChannelMojo::~ChannelMojo() {
175   DCHECK(task_runner_->RunsTasksInCurrentSequence());
176   Close();
177 }
178 
Connect()179 bool ChannelMojo::Connect() {
180   WillConnect();
181 
182   mojo::PendingAssociatedRemote<mojom::Channel> sender;
183   mojo::PendingAssociatedReceiver<mojom::Channel> receiver;
184   bootstrap_->Connect(&sender, &receiver);
185 
186   DCHECK(!message_reader_);
187   message_reader_ = std::make_unique<internal::MessagePipeReader>(
188       pipe_, std::move(sender), std::move(receiver), task_runner_, this);
189 
190   if (task_runner_->RunsTasksInCurrentSequence()) {
191     FinishConnectOnIOThread();
192   } else {
193     task_runner_->PostTask(
194         FROM_HERE,
195         base::BindOnce(&ChannelMojo::FinishConnectOnIOThread, weak_ptr_));
196   }
197   return true;
198 }
199 
FinishConnectOnIOThread()200 void ChannelMojo::FinishConnectOnIOThread() {
201   DCHECK(message_reader_);
202   message_reader_->FinishInitializationOnIOThread(GetSelfPID());
203   bootstrap_->StartReceiving();
204 }
205 
Pause()206 void ChannelMojo::Pause() {
207   bootstrap_->Pause();
208 }
209 
Unpause(bool flush)210 void ChannelMojo::Unpause(bool flush) {
211   bootstrap_->Unpause();
212   if (flush)
213     Flush();
214 }
215 
Flush()216 void ChannelMojo::Flush() {
217   bootstrap_->Flush();
218 }
219 
Close()220 void ChannelMojo::Close() {
221   // NOTE: The MessagePipeReader's destructor may re-enter this function. Use
222   // caution when changing this method.
223   std::unique_ptr<internal::MessagePipeReader> reader =
224       std::move(message_reader_);
225   reader.reset();
226 
227   base::AutoLock lock(associated_interface_lock_);
228   associated_interfaces_.clear();
229 }
230 
OnPipeError()231 void ChannelMojo::OnPipeError() {
232   DCHECK(task_runner_);
233   if (task_runner_->RunsTasksInCurrentSequence()) {
234     listener_->OnChannelError();
235   } else {
236     task_runner_->PostTask(
237         FROM_HERE, base::BindOnce(&ChannelMojo::OnPipeError, weak_ptr_));
238   }
239 }
240 
OnAssociatedInterfaceRequest(mojo::GenericPendingAssociatedReceiver receiver)241 void ChannelMojo::OnAssociatedInterfaceRequest(
242     mojo::GenericPendingAssociatedReceiver receiver) {
243   GenericAssociatedInterfaceFactory factory;
244   {
245     base::AutoLock locker(associated_interface_lock_);
246     auto iter = associated_interfaces_.find(*receiver.interface_name());
247     if (iter != associated_interfaces_.end())
248       factory = iter->second;
249   }
250 
251   if (!factory.is_null()) {
252     factory.Run(receiver.PassHandle());
253   } else {
254     const std::string interface_name = *receiver.interface_name();
255     listener_->OnAssociatedInterfaceRequest(interface_name,
256                                             receiver.PassHandle());
257   }
258 }
259 
Send(Message * message)260 bool ChannelMojo::Send(Message* message) {
261   DVLOG(2) << "sending message @" << message << " on channel @" << this
262            << " with type " << message->type();
263 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
264   Logging::GetInstance()->OnSendMessage(message);
265 #endif
266 
267   std::unique_ptr<Message> scoped_message = base::WrapUnique(message);
268   if (!message_reader_)
269     return false;
270 
271   // Comment copied from ipc_channel_posix.cc:
272   // We can't close the pipe here, because calling OnChannelError may destroy
273   // this object, and that would be bad if we are called from Send(). Instead,
274   // we return false and hope the caller will close the pipe. If they do not,
275   // the pipe will still be closed next time OnFileCanReadWithoutBlocking is
276   // called.
277   //
278   // With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the
279   // pipe's connection error handler will be invoked in its place.
280   return message_reader_->Send(std::move(scoped_message));
281 }
282 
283 Channel::AssociatedInterfaceSupport*
GetAssociatedInterfaceSupport()284 ChannelMojo::GetAssociatedInterfaceSupport() { return this; }
285 
286 std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
CreateThreadSafeChannel()287 ChannelMojo::CreateThreadSafeChannel() {
288   return std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
289       base::MakeRefCounted<ThreadSafeChannelProxy>(
290           task_runner_,
291           base::BindRepeating(&ChannelMojo::ForwardMessage, weak_ptr_),
292           *bootstrap_->GetAssociatedGroup()->GetController()));
293 }
294 
OnPeerPidReceived(int32_t peer_pid)295 void ChannelMojo::OnPeerPidReceived(int32_t peer_pid) {
296   listener_->OnChannelConnected(peer_pid);
297 }
298 
OnMessageReceived(const Message & message)299 void ChannelMojo::OnMessageReceived(const Message& message) {
300   const Message* message_ptr = &message;
301   TRACE_IPC_MESSAGE_SEND("ipc,toplevel", "ChannelMojo::OnMessageReceived",
302                          message_ptr);
303   listener_->OnMessageReceived(message);
304   if (message.dispatch_error())
305     listener_->OnBadMessageReceived(message);
306 }
307 
OnBrokenDataReceived()308 void ChannelMojo::OnBrokenDataReceived() {
309   listener_->OnBadMessageReceived(Message());
310 }
311 
312 // static
ReadFromMessageAttachmentSet(Message * message,std::optional<std::vector<mojo::native::SerializedHandlePtr>> * handles)313 MojoResult ChannelMojo::ReadFromMessageAttachmentSet(
314     Message* message,
315     std::optional<std::vector<mojo::native::SerializedHandlePtr>>* handles) {
316   DCHECK(!*handles);
317 
318   MojoResult result = MOJO_RESULT_OK;
319   if (!message->HasAttachments())
320     return result;
321 
322   std::vector<mojo::native::SerializedHandlePtr> output_handles;
323   MessageAttachmentSet* set = message->attachment_set();
324 
325   for (unsigned i = 0; result == MOJO_RESULT_OK && i < set->size(); ++i) {
326     auto attachment = set->GetAttachmentAt(i);
327     auto serialized_handle = mojo::native::SerializedHandle::New();
328     serialized_handle->the_handle = attachment->TakeMojoHandle();
329     serialized_handle->type =
330         mojo::ConvertTo<mojo::native::SerializedHandleType>(
331             attachment->GetType());
332     output_handles.emplace_back(std::move(serialized_handle));
333   }
334   set->CommitAllDescriptors();
335 
336   if (!output_handles.empty())
337     *handles = std::move(output_handles);
338 
339   return result;
340 }
341 
342 // static
WriteToMessageAttachmentSet(std::optional<std::vector<mojo::native::SerializedHandlePtr>> handles,Message * message)343 MojoResult ChannelMojo::WriteToMessageAttachmentSet(
344     std::optional<std::vector<mojo::native::SerializedHandlePtr>> handles,
345     Message* message) {
346   if (!handles)
347     return MOJO_RESULT_OK;
348   for (size_t i = 0; i < handles->size(); ++i) {
349     auto& handle = handles->at(i);
350     scoped_refptr<MessageAttachment> unwrapped_attachment =
351         MessageAttachment::CreateFromMojoHandle(
352             std::move(handle->the_handle),
353             mojo::ConvertTo<MessageAttachment::Type>(handle->type));
354     if (!unwrapped_attachment) {
355       DLOG(WARNING) << "Pipe failed to unwrap handles.";
356       return MOJO_RESULT_UNKNOWN;
357     }
358 
359     bool ok = message->attachment_set()->AddAttachment(
360         std::move(unwrapped_attachment));
361     DCHECK(ok);
362     if (!ok) {
363       LOG(ERROR) << "Failed to add new Mojo handle.";
364       return MOJO_RESULT_UNKNOWN;
365     }
366   }
367   return MOJO_RESULT_OK;
368 }
369 
AddGenericAssociatedInterface(const std::string & name,const GenericAssociatedInterfaceFactory & factory)370 void ChannelMojo::AddGenericAssociatedInterface(
371     const std::string& name,
372     const GenericAssociatedInterfaceFactory& factory) {
373   base::AutoLock locker(associated_interface_lock_);
374   auto result = associated_interfaces_.insert({ name, factory });
375   DCHECK(result.second);
376 }
377 
GetRemoteAssociatedInterface(mojo::GenericPendingAssociatedReceiver receiver)378 void ChannelMojo::GetRemoteAssociatedInterface(
379     mojo::GenericPendingAssociatedReceiver receiver) {
380   if (message_reader_) {
381     if (!task_runner_->RunsTasksInCurrentSequence()) {
382       message_reader_->thread_safe_sender().GetAssociatedInterface(
383           std::move(receiver));
384       return;
385     }
386     message_reader_->GetRemoteInterface(std::move(receiver));
387   } else {
388     // Attach the associated interface to a disconnected pipe, so that the
389     // associated interface pointer can be used to make calls (which are
390     // dropped).
391     mojo::AssociateWithDisconnectedPipe(receiver.PassHandle());
392   }
393 }
394 
SetUrgentMessageObserver(UrgentMessageObserver * observer)395 void ChannelMojo::SetUrgentMessageObserver(UrgentMessageObserver* observer) {
396   bootstrap_->SetUrgentMessageObserver(observer);
397 }
398 
399 }  // namespace IPC
400