1 // Copyright 2014 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/ipc_channel_mojo.h"
6
7 #include <stddef.h>
8 #include <stdint.h>
9
10 #include <memory>
11 #include <utility>
12
13 #include "base/command_line.h"
14 #include "base/functional/bind.h"
15 #include "base/functional/callback_helpers.h"
16 #include "base/lazy_instance.h"
17 #include "base/memory/ptr_util.h"
18 #include "base/memory/raw_ref.h"
19 #include "base/process/process_handle.h"
20 #include "base/task/single_thread_task_runner.h"
21 #include "build/build_config.h"
22 #include "ipc/ipc_listener.h"
23 #include "ipc/ipc_logging.h"
24 #include "ipc/ipc_message_attachment_set.h"
25 #include "ipc/ipc_message_macros.h"
26 #include "ipc/ipc_mojo_bootstrap.h"
27 #include "ipc/ipc_mojo_handle_attachment.h"
28 #include "ipc/native_handle_type_converters.h"
29 #include "ipc/trace_ipc_message.h"
30 #include "mojo/public/cpp/bindings/associated_receiver.h"
31 #include "mojo/public/cpp/bindings/associated_remote.h"
32 #include "mojo/public/cpp/bindings/generic_pending_associated_receiver.h"
33 #include "mojo/public/cpp/bindings/pending_associated_receiver.h"
34 #include "mojo/public/cpp/bindings/thread_safe_proxy.h"
35 #include "mojo/public/cpp/system/platform_handle.h"
36
37 namespace IPC {
38
39 namespace {
40
41 class MojoChannelFactory : public ChannelFactory {
42 public:
MojoChannelFactory(mojo::ScopedMessagePipeHandle handle,Channel::Mode mode,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)43 MojoChannelFactory(
44 mojo::ScopedMessagePipeHandle handle,
45 Channel::Mode mode,
46 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
47 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
48 : handle_(std::move(handle)),
49 mode_(mode),
50 ipc_task_runner_(ipc_task_runner),
51 proxy_task_runner_(proxy_task_runner) {}
52
53 MojoChannelFactory(const MojoChannelFactory&) = delete;
54 MojoChannelFactory& operator=(const MojoChannelFactory&) = delete;
55
BuildChannel(Listener * listener)56 std::unique_ptr<Channel> BuildChannel(Listener* listener) override {
57 return ChannelMojo::Create(std::move(handle_), mode_, listener,
58 ipc_task_runner_, proxy_task_runner_);
59 }
60
GetIPCTaskRunner()61 scoped_refptr<base::SingleThreadTaskRunner> GetIPCTaskRunner() override {
62 return ipc_task_runner_;
63 }
64
65 private:
66 mojo::ScopedMessagePipeHandle handle_;
67 const Channel::Mode mode_;
68 scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
69 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
70 };
71
72 class ThreadSafeChannelProxy : public mojo::ThreadSafeProxy {
73 public:
74 using Forwarder = base::RepeatingCallback<void(mojo::Message)>;
75
ThreadSafeChannelProxy(scoped_refptr<base::SingleThreadTaskRunner> task_runner,Forwarder forwarder,mojo::AssociatedGroupController & group_controller)76 ThreadSafeChannelProxy(
77 scoped_refptr<base::SingleThreadTaskRunner> task_runner,
78 Forwarder forwarder,
79 mojo::AssociatedGroupController& group_controller)
80 : task_runner_(std::move(task_runner)),
81 forwarder_(std::move(forwarder)),
82 group_controller_(group_controller) {}
83
84 // mojo::ThreadSafeProxy:
SendMessage(mojo::Message & message)85 void SendMessage(mojo::Message& message) override {
86 message.SerializeHandles(&*group_controller_);
87 task_runner_->PostTask(FROM_HERE,
88 base::BindOnce(forwarder_, std::move(message)));
89 }
90
SendMessageWithResponder(mojo::Message & message,std::unique_ptr<mojo::MessageReceiver> responder)91 void SendMessageWithResponder(
92 mojo::Message& message,
93 std::unique_ptr<mojo::MessageReceiver> responder) override {
94 // We don't bother supporting this because it's not used in practice.
95 NOTREACHED();
96 }
97
98 private:
99 ~ThreadSafeChannelProxy() override = default;
100
101 const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
102 const Forwarder forwarder_;
103 const raw_ref<mojo::AssociatedGroupController, AcrossTasksDanglingUntriaged>
104 group_controller_;
105 };
106
GetSelfPID()107 base::ProcessId GetSelfPID() {
108 #if BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS)
109 if (int global_pid = Channel::GetGlobalPid())
110 return global_pid;
111 #endif // BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS)
112 #if BUILDFLAG(IS_NACL)
113 return -1;
114 #else
115 return base::GetCurrentProcId();
116 #endif // BUILDFLAG(IS_NACL)
117 }
118
119 } // namespace
120
121 //------------------------------------------------------------------------------
122
123 // static
Create(mojo::ScopedMessagePipeHandle handle,Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)124 std::unique_ptr<ChannelMojo> ChannelMojo::Create(
125 mojo::ScopedMessagePipeHandle handle,
126 Mode mode,
127 Listener* listener,
128 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
129 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
130 return base::WrapUnique(new ChannelMojo(std::move(handle), mode, listener,
131 ipc_task_runner, proxy_task_runner));
132 }
133
134 // static
CreateServerFactory(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)135 std::unique_ptr<ChannelFactory> ChannelMojo::CreateServerFactory(
136 mojo::ScopedMessagePipeHandle handle,
137 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
138 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
139 return std::make_unique<MojoChannelFactory>(
140 std::move(handle), Channel::MODE_SERVER, ipc_task_runner,
141 proxy_task_runner);
142 }
143
144 // static
CreateClientFactory(mojo::ScopedMessagePipeHandle handle,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)145 std::unique_ptr<ChannelFactory> ChannelMojo::CreateClientFactory(
146 mojo::ScopedMessagePipeHandle handle,
147 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
148 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner) {
149 return std::make_unique<MojoChannelFactory>(
150 std::move(handle), Channel::MODE_CLIENT, ipc_task_runner,
151 proxy_task_runner);
152 }
153
ChannelMojo(mojo::ScopedMessagePipeHandle handle,Mode mode,Listener * listener,const scoped_refptr<base::SingleThreadTaskRunner> & ipc_task_runner,const scoped_refptr<base::SingleThreadTaskRunner> & proxy_task_runner)154 ChannelMojo::ChannelMojo(
155 mojo::ScopedMessagePipeHandle handle,
156 Mode mode,
157 Listener* listener,
158 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner,
159 const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner)
160 : task_runner_(ipc_task_runner), pipe_(handle.get()), listener_(listener) {
161 weak_ptr_ = weak_factory_.GetWeakPtr();
162 bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, ipc_task_runner,
163 proxy_task_runner);
164 }
165
ForwardMessage(mojo::Message message)166 void ChannelMojo::ForwardMessage(mojo::Message message) {
167 DCHECK(task_runner_->RunsTasksInCurrentSequence());
168 if (!message_reader_ || !message_reader_->sender().is_bound())
169 return;
170 message_reader_->sender().internal_state()->ForwardMessage(
171 std::move(message));
172 }
173
~ChannelMojo()174 ChannelMojo::~ChannelMojo() {
175 DCHECK(task_runner_->RunsTasksInCurrentSequence());
176 Close();
177 }
178
Connect()179 bool ChannelMojo::Connect() {
180 WillConnect();
181
182 mojo::PendingAssociatedRemote<mojom::Channel> sender;
183 mojo::PendingAssociatedReceiver<mojom::Channel> receiver;
184 bootstrap_->Connect(&sender, &receiver);
185
186 DCHECK(!message_reader_);
187 message_reader_ = std::make_unique<internal::MessagePipeReader>(
188 pipe_, std::move(sender), std::move(receiver), task_runner_, this);
189
190 if (task_runner_->RunsTasksInCurrentSequence()) {
191 FinishConnectOnIOThread();
192 } else {
193 task_runner_->PostTask(
194 FROM_HERE,
195 base::BindOnce(&ChannelMojo::FinishConnectOnIOThread, weak_ptr_));
196 }
197 return true;
198 }
199
FinishConnectOnIOThread()200 void ChannelMojo::FinishConnectOnIOThread() {
201 DCHECK(message_reader_);
202 message_reader_->FinishInitializationOnIOThread(GetSelfPID());
203 bootstrap_->StartReceiving();
204 }
205
Pause()206 void ChannelMojo::Pause() {
207 bootstrap_->Pause();
208 }
209
Unpause(bool flush)210 void ChannelMojo::Unpause(bool flush) {
211 bootstrap_->Unpause();
212 if (flush)
213 Flush();
214 }
215
Flush()216 void ChannelMojo::Flush() {
217 bootstrap_->Flush();
218 }
219
Close()220 void ChannelMojo::Close() {
221 // NOTE: The MessagePipeReader's destructor may re-enter this function. Use
222 // caution when changing this method.
223 std::unique_ptr<internal::MessagePipeReader> reader =
224 std::move(message_reader_);
225 reader.reset();
226
227 base::AutoLock lock(associated_interface_lock_);
228 associated_interfaces_.clear();
229 }
230
OnPipeError()231 void ChannelMojo::OnPipeError() {
232 DCHECK(task_runner_);
233 if (task_runner_->RunsTasksInCurrentSequence()) {
234 listener_->OnChannelError();
235 } else {
236 task_runner_->PostTask(
237 FROM_HERE, base::BindOnce(&ChannelMojo::OnPipeError, weak_ptr_));
238 }
239 }
240
OnAssociatedInterfaceRequest(mojo::GenericPendingAssociatedReceiver receiver)241 void ChannelMojo::OnAssociatedInterfaceRequest(
242 mojo::GenericPendingAssociatedReceiver receiver) {
243 GenericAssociatedInterfaceFactory factory;
244 {
245 base::AutoLock locker(associated_interface_lock_);
246 auto iter = associated_interfaces_.find(*receiver.interface_name());
247 if (iter != associated_interfaces_.end())
248 factory = iter->second;
249 }
250
251 if (!factory.is_null()) {
252 factory.Run(receiver.PassHandle());
253 } else {
254 const std::string interface_name = *receiver.interface_name();
255 listener_->OnAssociatedInterfaceRequest(interface_name,
256 receiver.PassHandle());
257 }
258 }
259
Send(Message * message)260 bool ChannelMojo::Send(Message* message) {
261 DVLOG(2) << "sending message @" << message << " on channel @" << this
262 << " with type " << message->type();
263 #if BUILDFLAG(IPC_MESSAGE_LOG_ENABLED)
264 Logging::GetInstance()->OnSendMessage(message);
265 #endif
266
267 std::unique_ptr<Message> scoped_message = base::WrapUnique(message);
268 if (!message_reader_)
269 return false;
270
271 // Comment copied from ipc_channel_posix.cc:
272 // We can't close the pipe here, because calling OnChannelError may destroy
273 // this object, and that would be bad if we are called from Send(). Instead,
274 // we return false and hope the caller will close the pipe. If they do not,
275 // the pipe will still be closed next time OnFileCanReadWithoutBlocking is
276 // called.
277 //
278 // With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the
279 // pipe's connection error handler will be invoked in its place.
280 return message_reader_->Send(std::move(scoped_message));
281 }
282
283 Channel::AssociatedInterfaceSupport*
GetAssociatedInterfaceSupport()284 ChannelMojo::GetAssociatedInterfaceSupport() { return this; }
285
286 std::unique_ptr<mojo::ThreadSafeForwarder<mojom::Channel>>
CreateThreadSafeChannel()287 ChannelMojo::CreateThreadSafeChannel() {
288 return std::make_unique<mojo::ThreadSafeForwarder<mojom::Channel>>(
289 base::MakeRefCounted<ThreadSafeChannelProxy>(
290 task_runner_,
291 base::BindRepeating(&ChannelMojo::ForwardMessage, weak_ptr_),
292 *bootstrap_->GetAssociatedGroup()->GetController()));
293 }
294
OnPeerPidReceived(int32_t peer_pid)295 void ChannelMojo::OnPeerPidReceived(int32_t peer_pid) {
296 listener_->OnChannelConnected(peer_pid);
297 }
298
OnMessageReceived(const Message & message)299 void ChannelMojo::OnMessageReceived(const Message& message) {
300 const Message* message_ptr = &message;
301 TRACE_IPC_MESSAGE_SEND("ipc,toplevel", "ChannelMojo::OnMessageReceived",
302 message_ptr);
303 listener_->OnMessageReceived(message);
304 if (message.dispatch_error())
305 listener_->OnBadMessageReceived(message);
306 }
307
OnBrokenDataReceived()308 void ChannelMojo::OnBrokenDataReceived() {
309 listener_->OnBadMessageReceived(Message());
310 }
311
312 // static
ReadFromMessageAttachmentSet(Message * message,std::optional<std::vector<mojo::native::SerializedHandlePtr>> * handles)313 MojoResult ChannelMojo::ReadFromMessageAttachmentSet(
314 Message* message,
315 std::optional<std::vector<mojo::native::SerializedHandlePtr>>* handles) {
316 DCHECK(!*handles);
317
318 MojoResult result = MOJO_RESULT_OK;
319 if (!message->HasAttachments())
320 return result;
321
322 std::vector<mojo::native::SerializedHandlePtr> output_handles;
323 MessageAttachmentSet* set = message->attachment_set();
324
325 for (unsigned i = 0; result == MOJO_RESULT_OK && i < set->size(); ++i) {
326 auto attachment = set->GetAttachmentAt(i);
327 auto serialized_handle = mojo::native::SerializedHandle::New();
328 serialized_handle->the_handle = attachment->TakeMojoHandle();
329 serialized_handle->type =
330 mojo::ConvertTo<mojo::native::SerializedHandleType>(
331 attachment->GetType());
332 output_handles.emplace_back(std::move(serialized_handle));
333 }
334 set->CommitAllDescriptors();
335
336 if (!output_handles.empty())
337 *handles = std::move(output_handles);
338
339 return result;
340 }
341
342 // static
WriteToMessageAttachmentSet(std::optional<std::vector<mojo::native::SerializedHandlePtr>> handles,Message * message)343 MojoResult ChannelMojo::WriteToMessageAttachmentSet(
344 std::optional<std::vector<mojo::native::SerializedHandlePtr>> handles,
345 Message* message) {
346 if (!handles)
347 return MOJO_RESULT_OK;
348 for (size_t i = 0; i < handles->size(); ++i) {
349 auto& handle = handles->at(i);
350 scoped_refptr<MessageAttachment> unwrapped_attachment =
351 MessageAttachment::CreateFromMojoHandle(
352 std::move(handle->the_handle),
353 mojo::ConvertTo<MessageAttachment::Type>(handle->type));
354 if (!unwrapped_attachment) {
355 DLOG(WARNING) << "Pipe failed to unwrap handles.";
356 return MOJO_RESULT_UNKNOWN;
357 }
358
359 bool ok = message->attachment_set()->AddAttachment(
360 std::move(unwrapped_attachment));
361 DCHECK(ok);
362 if (!ok) {
363 LOG(ERROR) << "Failed to add new Mojo handle.";
364 return MOJO_RESULT_UNKNOWN;
365 }
366 }
367 return MOJO_RESULT_OK;
368 }
369
AddGenericAssociatedInterface(const std::string & name,const GenericAssociatedInterfaceFactory & factory)370 void ChannelMojo::AddGenericAssociatedInterface(
371 const std::string& name,
372 const GenericAssociatedInterfaceFactory& factory) {
373 base::AutoLock locker(associated_interface_lock_);
374 auto result = associated_interfaces_.insert({ name, factory });
375 DCHECK(result.second);
376 }
377
GetRemoteAssociatedInterface(mojo::GenericPendingAssociatedReceiver receiver)378 void ChannelMojo::GetRemoteAssociatedInterface(
379 mojo::GenericPendingAssociatedReceiver receiver) {
380 if (message_reader_) {
381 if (!task_runner_->RunsTasksInCurrentSequence()) {
382 message_reader_->thread_safe_sender().GetAssociatedInterface(
383 std::move(receiver));
384 return;
385 }
386 message_reader_->GetRemoteInterface(std::move(receiver));
387 } else {
388 // Attach the associated interface to a disconnected pipe, so that the
389 // associated interface pointer can be used to make calls (which are
390 // dropped).
391 mojo::AssociateWithDisconnectedPipe(receiver.PassHandle());
392 }
393 }
394
SetUrgentMessageObserver(UrgentMessageObserver * observer)395 void ChannelMojo::SetUrgentMessageObserver(UrgentMessageObserver* observer) {
396 bootstrap_->SetUrgentMessageObserver(observer);
397 }
398
399 } // namespace IPC
400