xref: /aosp_15_r20/external/cronet/ipc/ipc_sync_channel_unittest.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "ipc/ipc_sync_channel.h"
6 
7 #include <stddef.h>
8 
9 #include <memory>
10 #include <string>
11 #include <utility>
12 #include <vector>
13 
14 #include "base/functional/bind.h"
15 #include "base/location.h"
16 #include "base/logging.h"
17 #include "base/memory/raw_ptr.h"
18 #include "base/memory/raw_ptr_exclusion.h"
19 #include "base/message_loop/message_pump_type.h"
20 #include "base/process/process_handle.h"
21 #include "base/run_loop.h"
22 #include "base/strings/string_util.h"
23 #include "base/synchronization/waitable_event.h"
24 #include "base/task/single_thread_task_runner.h"
25 #include "base/test/task_environment.h"
26 #include "base/threading/platform_thread.h"
27 #include "base/threading/thread.h"
28 #include "build/build_config.h"
29 #include "ipc/ipc_listener.h"
30 #include "ipc/ipc_message.h"
31 #include "ipc/ipc_sender.h"
32 #include "ipc/ipc_sync_message_filter.h"
33 #include "ipc/ipc_sync_message_unittest.h"
34 #include "mojo/public/cpp/system/message_pipe.h"
35 #include "testing/gtest/include/gtest/gtest.h"
36 
37 using base::WaitableEvent;
38 
39 namespace IPC {
40 namespace {
41 
42 // Base class for a "process" with listener and IPC threads.
43 class Worker : public Listener, public Sender {
44  public:
45   // Will create a channel without a name.
Worker(Channel::Mode mode,const std::string & thread_name,mojo::ScopedMessagePipeHandle channel_handle)46   Worker(Channel::Mode mode,
47          const std::string& thread_name,
48          mojo::ScopedMessagePipeHandle channel_handle)
49       : done_(
50             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
51                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
52         channel_created_(
53             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
54                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
55         channel_handle_(std::move(channel_handle)),
56         mode_(mode),
57         ipc_thread_(
58             std::make_unique<base::Thread>((thread_name + "_ipc").c_str())),
59         listener_thread_((thread_name + "_listener").c_str()),
60         overrided_thread_(nullptr),
61         shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL,
62                         base::WaitableEvent::InitialState::NOT_SIGNALED),
63         is_shutdown_(false) {}
64 
65   // Will create a named channel and use this name for the threads' name.
Worker(mojo::ScopedMessagePipeHandle channel_handle,Channel::Mode mode)66   Worker(mojo::ScopedMessagePipeHandle channel_handle, Channel::Mode mode)
67       : done_(
68             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
69                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
70         channel_created_(
71             new WaitableEvent(base::WaitableEvent::ResetPolicy::AUTOMATIC,
72                               base::WaitableEvent::InitialState::NOT_SIGNALED)),
73         channel_handle_(std::move(channel_handle)),
74         mode_(mode),
75         ipc_thread_(std::make_unique<base::Thread>("ipc thread")),
76         listener_thread_("listener thread"),
77         overrided_thread_(nullptr),
78         shutdown_event_(base::WaitableEvent::ResetPolicy::MANUAL,
79                         base::WaitableEvent::InitialState::NOT_SIGNALED),
80         is_shutdown_(false) {}
81 
82   Worker(const Worker&) = delete;
83   Worker& operator=(const Worker&) = delete;
84 
~Worker()85   ~Worker() override {
86     // Shutdown() must be called before destruction.
87     CHECK(is_shutdown_);
88   }
Send(Message * msg)89   bool Send(Message* msg) override { return channel_->Send(msg); }
WaitForChannelCreation()90   void WaitForChannelCreation() { channel_created_->Wait(); }
CloseChannel()91   void CloseChannel() {
92     DCHECK(ListenerThread()->task_runner()->BelongsToCurrentThread());
93     channel_->Close();
94   }
Start()95   void Start() {
96     StartThread(&listener_thread_, base::MessagePumpType::DEFAULT);
97     ListenerThread()->task_runner()->PostTask(
98         FROM_HERE, base::BindOnce(&Worker::OnStart, base::Unretained(this)));
99   }
Shutdown()100   void Shutdown() {
101     // The IPC thread needs to outlive SyncChannel. We can't do this in
102     // ~Worker(), since that'll reset the vtable pointer (to Worker's), which
103     // may result in a race conditions. See http://crbug.com/25841.
104     WaitableEvent listener_done(
105         base::WaitableEvent::ResetPolicy::AUTOMATIC,
106         base::WaitableEvent::InitialState::NOT_SIGNALED);
107     ListenerThread()->task_runner()->PostTask(
108         FROM_HERE, base::BindOnce(&Worker::OnListenerThreadShutdown1,
109                                   base::Unretained(this), &listener_done));
110     listener_done.Wait();
111     listener_thread_.Stop();
112     is_shutdown_ = true;
113   }
OverrideThread(base::Thread * overrided_thread)114   void OverrideThread(base::Thread* overrided_thread) {
115     DCHECK(!overrided_thread_);
116     overrided_thread_ = overrided_thread;
117   }
SendAnswerToLife(bool succeed)118   bool SendAnswerToLife(bool succeed) {
119     int answer = 0;
120     SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
121     bool result = Send(msg);
122     DCHECK_EQ(result, succeed);
123     DCHECK_EQ(answer, (succeed ? 42 : 0));
124     return result;
125   }
SendDouble(bool succeed)126   bool SendDouble(bool succeed) {
127     int answer = 0;
128     SyncMessage* msg = new SyncChannelTestMsg_Double(5, &answer);
129     bool result = Send(msg);
130     DCHECK_EQ(result, succeed);
131     DCHECK_EQ(answer, (succeed ? 10 : 0));
132     return result;
133   }
TakeChannelHandle()134   mojo::MessagePipeHandle TakeChannelHandle() {
135     DCHECK(channel_handle_.is_valid());
136     return channel_handle_.release();
137   }
mode()138   Channel::Mode mode() { return mode_; }
done_event()139   WaitableEvent* done_event() { return done_.get(); }
shutdown_event()140   WaitableEvent* shutdown_event() { return &shutdown_event_; }
ResetChannel()141   void ResetChannel() { channel_.reset(); }
142   // Derived classes need to call this when they've completed their part of
143   // the test.
Done()144   void Done() { done_->Signal(); }
145 
146  protected:
channel()147   SyncChannel* channel() { return channel_.get(); }
148   // Functions for derived classes to implement if they wish.
Run()149   virtual void Run() { }
OnAnswer(int * answer)150   virtual void OnAnswer(int* answer) { NOTREACHED(); }
OnAnswerDelay(Message * reply_msg)151   virtual void OnAnswerDelay(Message* reply_msg) {
152     // The message handler map below can only take one entry for
153     // SyncChannelTestMsg_AnswerToLife, so since some classes want
154     // the normal version while other want the delayed reply, we
155     // call the normal version if the derived class didn't override
156     // this function.
157     int answer;
158     OnAnswer(&answer);
159     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, answer);
160     Send(reply_msg);
161   }
OnDouble(int in,int * out)162   virtual void OnDouble(int in, int* out) { NOTREACHED(); }
OnDoubleDelay(int in,Message * reply_msg)163   virtual void OnDoubleDelay(int in, Message* reply_msg) {
164     int result;
165     OnDouble(in, &result);
166     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, result);
167     Send(reply_msg);
168   }
169 
OnNestedTestMsg(Message * reply_msg)170   virtual void OnNestedTestMsg(Message* reply_msg) {
171     NOTREACHED();
172   }
173 
CreateChannel()174   virtual SyncChannel* CreateChannel() {
175     std::unique_ptr<SyncChannel> channel = SyncChannel::Create(
176         TakeChannelHandle(), mode_, this, ipc_thread_->task_runner(),
177         base::SingleThreadTaskRunner::GetCurrentDefault(), true,
178         &shutdown_event_);
179     return channel.release();
180   }
181 
ListenerThread()182   base::Thread* ListenerThread() {
183     return overrided_thread_ ? overrided_thread_.get() : &listener_thread_;
184   }
185 
ipc_thread() const186   const base::Thread& ipc_thread() const { return *ipc_thread_.get(); }
187 
188  private:
189   // Called on the listener thread to create the sync channel.
OnStart()190   void OnStart() {
191     // Link ipc_thread_, listener_thread_ and channel_ altogether.
192     StartThread(ipc_thread_.get(), base::MessagePumpType::IO);
193     channel_.reset(CreateChannel());
194     channel_created_->Signal();
195     Run();
196   }
197 
OnListenerThreadShutdown1(WaitableEvent * listener_event)198   void OnListenerThreadShutdown1(WaitableEvent* listener_event) {
199     WaitableEvent ipc_event(base::WaitableEvent::ResetPolicy::AUTOMATIC,
200                             base::WaitableEvent::InitialState::NOT_SIGNALED);
201     // SyncChannel needs to be destructed on the thread that it was created on.
202     channel_.reset();
203 
204     base::RunLoop().RunUntilIdle();
205 
206     ipc_thread_->task_runner()->PostTask(
207         FROM_HERE,
208         base::BindOnce(&Worker::OnIPCThreadShutdown, base::Unretained(this),
209                        listener_event, &ipc_event));
210     ipc_event.Wait();
211     // This destructs `ipc_thread_` on the listener thread.
212     ipc_thread_.reset();
213 
214     listener_thread_.task_runner()->PostTask(
215         FROM_HERE, base::BindOnce(&Worker::OnListenerThreadShutdown2,
216                                   base::Unretained(this), listener_event));
217   }
218 
OnIPCThreadShutdown(WaitableEvent * listener_event,WaitableEvent * ipc_event)219   void OnIPCThreadShutdown(WaitableEvent* listener_event,
220                            WaitableEvent* ipc_event) {
221     base::RunLoop().RunUntilIdle();
222     ipc_event->Signal();
223   }
224 
OnListenerThreadShutdown2(WaitableEvent * listener_event)225   void OnListenerThreadShutdown2(WaitableEvent* listener_event) {
226     base::RunLoop().RunUntilIdle();
227     listener_event->Signal();
228   }
229 
OnMessageReceived(const Message & message)230   bool OnMessageReceived(const Message& message) override {
231     IPC_BEGIN_MESSAGE_MAP(Worker, message)
232      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_Double, OnDoubleDelay)
233      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_AnswerToLife,
234                                      OnAnswerDelay)
235      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelNestedTestMsg_String,
236                                      OnNestedTestMsg)
237     IPC_END_MESSAGE_MAP()
238     return true;
239   }
240 
StartThread(base::Thread * thread,base::MessagePumpType type)241   void StartThread(base::Thread* thread, base::MessagePumpType type) {
242     base::Thread::Options options;
243     options.message_pump_type = type;
244     thread->StartWithOptions(std::move(options));
245   }
246 
247   std::unique_ptr<WaitableEvent> done_;
248   std::unique_ptr<WaitableEvent> channel_created_;
249   mojo::ScopedMessagePipeHandle channel_handle_;
250   Channel::Mode mode_;
251   std::unique_ptr<SyncChannel> channel_;
252   // This thread is constructed on the main thread, Start() on
253   // `listener_thread_`, and therefore destructed/Stop()'d on the
254   // `listener_thread_` too.
255   std::unique_ptr<base::Thread> ipc_thread_;
256   base::Thread listener_thread_;
257   raw_ptr<base::Thread> overrided_thread_;
258 
259   base::WaitableEvent shutdown_event_;
260 
261   bool is_shutdown_;
262 };
263 
264 
265 // Starts the test with the given workers.  This function deletes the workers
266 // when it's done.
RunTest(std::vector<Worker * > workers)267 void RunTest(std::vector<Worker*> workers) {
268   // First we create the workers that are channel servers, or else the other
269   // workers' channel initialization might fail because the pipe isn't created..
270   for (size_t i = 0; i < workers.size(); ++i) {
271     if (workers[i]->mode() & Channel::MODE_SERVER_FLAG) {
272       workers[i]->Start();
273       workers[i]->WaitForChannelCreation();
274     }
275   }
276 
277   // now create the clients
278   for (size_t i = 0; i < workers.size(); ++i) {
279     if (workers[i]->mode() & Channel::MODE_CLIENT_FLAG)
280       workers[i]->Start();
281   }
282 
283   // wait for all the workers to finish
284   for (size_t i = 0; i < workers.size(); ++i)
285     workers[i]->done_event()->Wait();
286 
287   for (size_t i = 0; i < workers.size(); ++i) {
288     workers[i]->Shutdown();
289     delete workers[i];
290   }
291 }
292 
293 class IPCSyncChannelTest : public testing::Test {
294  private:
295   base::test::SingleThreadTaskEnvironment task_environment_;
296 };
297 
298 //------------------------------------------------------------------------------
299 
300 class SimpleServer : public Worker {
301  public:
SimpleServer(mojo::ScopedMessagePipeHandle channel_handle)302   explicit SimpleServer(mojo::ScopedMessagePipeHandle channel_handle)
303       : Worker(Channel::MODE_SERVER,
304                "simpler_server",
305                std::move(channel_handle)) {}
Run()306   void Run() override {
307     SendAnswerToLife(true);
308     Done();
309   }
310 };
311 
312 class SimpleClient : public Worker {
313  public:
SimpleClient(mojo::ScopedMessagePipeHandle channel_handle)314   explicit SimpleClient(mojo::ScopedMessagePipeHandle channel_handle)
315       : Worker(Channel::MODE_CLIENT,
316                "simple_client",
317                std::move(channel_handle)) {}
318 
OnAnswer(int * answer)319   void OnAnswer(int* answer) override {
320     *answer = 42;
321     Done();
322   }
323 };
324 
Simple()325 void Simple() {
326   std::vector<Worker*> workers;
327   mojo::MessagePipe pipe;
328   workers.push_back(new SimpleServer(std::move(pipe.handle0)));
329   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
330   RunTest(workers);
331 }
332 
333 #if BUILDFLAG(IS_ANDROID)
334 #define MAYBE_Simple DISABLED_Simple
335 #else
336 #define MAYBE_Simple Simple
337 #endif
338 // Tests basic synchronous call
TEST_F(IPCSyncChannelTest,MAYBE_Simple)339 TEST_F(IPCSyncChannelTest, MAYBE_Simple) {
340   Simple();
341 }
342 
343 //------------------------------------------------------------------------------
344 
345 // Worker classes which override how the sync channel is created to use the
346 // two-step initialization (calling the lightweight constructor and then
347 // ChannelProxy::Init separately) process.
348 class TwoStepServer : public Worker {
349  public:
TwoStepServer(bool create_pipe_now,mojo::ScopedMessagePipeHandle channel_handle)350   TwoStepServer(bool create_pipe_now,
351                 mojo::ScopedMessagePipeHandle channel_handle)
352       : Worker(Channel::MODE_SERVER,
353                "simpler_server",
354                std::move(channel_handle)),
355         create_pipe_now_(create_pipe_now) {}
356 
Run()357   void Run() override {
358     SendAnswerToLife(true);
359     Done();
360   }
361 
CreateChannel()362   SyncChannel* CreateChannel() override {
363     SyncChannel* channel =
364         SyncChannel::Create(TakeChannelHandle(), mode(), this,
365                             ipc_thread().task_runner(),
366                             base::SingleThreadTaskRunner::GetCurrentDefault(),
367                             create_pipe_now_, shutdown_event())
368             .release();
369     return channel;
370   }
371 
372   bool create_pipe_now_;
373 };
374 
375 class TwoStepClient : public Worker {
376  public:
TwoStepClient(bool create_pipe_now,mojo::ScopedMessagePipeHandle channel_handle)377   TwoStepClient(bool create_pipe_now,
378                 mojo::ScopedMessagePipeHandle channel_handle)
379       : Worker(Channel::MODE_CLIENT,
380                "simple_client",
381                std::move(channel_handle)),
382         create_pipe_now_(create_pipe_now) {}
383 
OnAnswer(int * answer)384   void OnAnswer(int* answer) override {
385     *answer = 42;
386     Done();
387   }
388 
CreateChannel()389   SyncChannel* CreateChannel() override {
390     SyncChannel* channel =
391         SyncChannel::Create(TakeChannelHandle(), mode(), this,
392                             ipc_thread().task_runner(),
393                             base::SingleThreadTaskRunner::GetCurrentDefault(),
394                             create_pipe_now_, shutdown_event())
395             .release();
396     return channel;
397   }
398 
399   bool create_pipe_now_;
400 };
401 
TwoStep(bool create_server_pipe_now,bool create_client_pipe_now)402 void TwoStep(bool create_server_pipe_now, bool create_client_pipe_now) {
403   std::vector<Worker*> workers;
404   mojo::MessagePipe pipe;
405   workers.push_back(
406       new TwoStepServer(create_server_pipe_now, std::move(pipe.handle0)));
407   workers.push_back(
408       new TwoStepClient(create_client_pipe_now, std::move(pipe.handle1)));
409   RunTest(workers);
410 }
411 
412 // Tests basic two-step initialization, where you call the lightweight
413 // constructor then Init.
TEST_F(IPCSyncChannelTest,TwoStepInitialization)414 TEST_F(IPCSyncChannelTest, TwoStepInitialization) {
415   TwoStep(false, false);
416   TwoStep(false, true);
417   TwoStep(true, false);
418   TwoStep(true, true);
419 }
420 
421 //------------------------------------------------------------------------------
422 
423 class DelayClient : public Worker {
424  public:
DelayClient(mojo::ScopedMessagePipeHandle channel_handle)425   explicit DelayClient(mojo::ScopedMessagePipeHandle channel_handle)
426       : Worker(Channel::MODE_CLIENT,
427                "delay_client",
428                std::move(channel_handle)) {}
429 
OnAnswerDelay(Message * reply_msg)430   void OnAnswerDelay(Message* reply_msg) override {
431     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
432     Send(reply_msg);
433     Done();
434   }
435 };
436 
DelayReply()437 void DelayReply() {
438   std::vector<Worker*> workers;
439   mojo::MessagePipe pipe;
440   workers.push_back(new SimpleServer(std::move(pipe.handle0)));
441   workers.push_back(new DelayClient(std::move(pipe.handle1)));
442   RunTest(workers);
443 }
444 
445 // Tests that asynchronous replies work
TEST_F(IPCSyncChannelTest,DelayReply)446 TEST_F(IPCSyncChannelTest, DelayReply) {
447   DelayReply();
448 }
449 
450 //------------------------------------------------------------------------------
451 
452 class NoHangServer : public Worker {
453  public:
NoHangServer(WaitableEvent * got_first_reply,mojo::ScopedMessagePipeHandle channel_handle)454   NoHangServer(WaitableEvent* got_first_reply,
455                mojo::ScopedMessagePipeHandle channel_handle)
456       : Worker(Channel::MODE_SERVER,
457                "no_hang_server",
458                std::move(channel_handle)),
459         got_first_reply_(got_first_reply) {}
Run()460   void Run() override {
461     SendAnswerToLife(true);
462     got_first_reply_->Signal();
463 
464     SendAnswerToLife(false);
465     Done();
466   }
467 
468   raw_ptr<WaitableEvent> got_first_reply_;
469 };
470 
471 class NoHangClient : public Worker {
472  public:
NoHangClient(WaitableEvent * got_first_reply,mojo::ScopedMessagePipeHandle channel_handle)473   NoHangClient(WaitableEvent* got_first_reply,
474                mojo::ScopedMessagePipeHandle channel_handle)
475       : Worker(Channel::MODE_CLIENT,
476                "no_hang_client",
477                std::move(channel_handle)),
478         got_first_reply_(got_first_reply) {}
479 
OnAnswerDelay(Message * reply_msg)480   void OnAnswerDelay(Message* reply_msg) override {
481     // Use the DELAY_REPLY macro so that we can force the reply to be sent
482     // before this function returns (when the channel will be reset).
483     SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
484     Send(reply_msg);
485     got_first_reply_->Wait();
486     CloseChannel();
487     Done();
488   }
489 
490   raw_ptr<WaitableEvent> got_first_reply_;
491 };
492 
NoHang()493 void NoHang() {
494   WaitableEvent got_first_reply(
495       base::WaitableEvent::ResetPolicy::AUTOMATIC,
496       base::WaitableEvent::InitialState::NOT_SIGNALED);
497   std::vector<Worker*> workers;
498   mojo::MessagePipe pipe;
499   workers.push_back(
500       new NoHangServer(&got_first_reply, std::move(pipe.handle0)));
501   workers.push_back(
502       new NoHangClient(&got_first_reply, std::move(pipe.handle1)));
503   RunTest(workers);
504 }
505 
506 // Tests that caller doesn't hang if receiver dies
TEST_F(IPCSyncChannelTest,NoHang)507 TEST_F(IPCSyncChannelTest, NoHang) {
508   NoHang();
509 }
510 
511 //------------------------------------------------------------------------------
512 
513 class UnblockServer : public Worker {
514  public:
UnblockServer(bool delete_during_send,mojo::ScopedMessagePipeHandle channel_handle)515   UnblockServer(bool delete_during_send,
516                 mojo::ScopedMessagePipeHandle channel_handle)
517       : Worker(Channel::MODE_SERVER,
518                "unblock_server",
519                std::move(channel_handle)),
520         delete_during_send_(delete_during_send) {}
Run()521   void Run() override {
522     if (delete_during_send_) {
523       // Use custom code since race conditions mean the answer may or may not be
524       // available.
525       int answer = 0;
526       SyncMessage* msg = new SyncChannelTestMsg_AnswerToLife(&answer);
527       Send(msg);
528     } else {
529       SendAnswerToLife(true);
530     }
531     Done();
532   }
533 
OnDoubleDelay(int in,Message * reply_msg)534   void OnDoubleDelay(int in, Message* reply_msg) override {
535     SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
536     Send(reply_msg);
537     if (delete_during_send_)
538       ResetChannel();
539   }
540 
541   bool delete_during_send_;
542 };
543 
544 class UnblockClient : public Worker {
545  public:
UnblockClient(mojo::ScopedMessagePipeHandle channel_handle)546   explicit UnblockClient(mojo::ScopedMessagePipeHandle channel_handle)
547       : Worker(Channel::MODE_CLIENT,
548                "unblock_client",
549                std::move(channel_handle)) {}
550 
OnAnswer(int * answer)551   void OnAnswer(int* answer) override {
552     SendDouble(true);
553     *answer = 42;
554     Done();
555   }
556 };
557 
Unblock(bool delete_during_send)558 void Unblock(bool delete_during_send) {
559   std::vector<Worker*> workers;
560   mojo::MessagePipe pipe;
561   workers.push_back(
562       new UnblockServer(delete_during_send, std::move(pipe.handle0)));
563   workers.push_back(new UnblockClient(std::move(pipe.handle1)));
564   RunTest(workers);
565 }
566 
567 // Tests that the caller unblocks to answer a sync message from the receiver.
TEST_F(IPCSyncChannelTest,Unblock)568 TEST_F(IPCSyncChannelTest, Unblock) {
569   Unblock(false);
570 }
571 
572 //------------------------------------------------------------------------------
573 
574 #if BUILDFLAG(IS_ANDROID)
575 #define MAYBE_ChannelDeleteDuringSend DISABLED_ChannelDeleteDuringSend
576 #else
577 #define MAYBE_ChannelDeleteDuringSend ChannelDeleteDuringSend
578 #endif
579 // Tests that the the SyncChannel object can be deleted during a Send.
TEST_F(IPCSyncChannelTest,MAYBE_ChannelDeleteDuringSend)580 TEST_F(IPCSyncChannelTest, MAYBE_ChannelDeleteDuringSend) {
581   Unblock(true);
582 }
583 
584 //------------------------------------------------------------------------------
585 
586 class RecursiveServer : public Worker {
587  public:
RecursiveServer(bool expected_send_result,mojo::ScopedMessagePipeHandle channel_handle)588   RecursiveServer(bool expected_send_result,
589                   mojo::ScopedMessagePipeHandle channel_handle)
590       : Worker(Channel::MODE_SERVER,
591                "recursive_server",
592                std::move(channel_handle)),
593         expected_send_result_(expected_send_result) {}
Run()594   void Run() override {
595     SendDouble(expected_send_result_);
596     Done();
597   }
598 
OnDouble(int in,int * out)599   void OnDouble(int in, int* out) override {
600     *out = in * 2;
601     SendAnswerToLife(expected_send_result_);
602   }
603 
604   bool expected_send_result_;
605 };
606 
607 class RecursiveClient : public Worker {
608  public:
RecursiveClient(bool close_channel,mojo::ScopedMessagePipeHandle channel_handle)609   RecursiveClient(bool close_channel,
610                   mojo::ScopedMessagePipeHandle channel_handle)
611       : Worker(Channel::MODE_CLIENT,
612                "recursive_client",
613                std::move(channel_handle)),
614         close_channel_(close_channel) {}
615 
OnDoubleDelay(int in,Message * reply_msg)616   void OnDoubleDelay(int in, Message* reply_msg) override {
617     SendDouble(!close_channel_);
618     if (close_channel_) {
619       delete reply_msg;
620     } else {
621       SyncChannelTestMsg_Double::WriteReplyParams(reply_msg, in * 2);
622       Send(reply_msg);
623     }
624     Done();
625   }
626 
OnAnswerDelay(Message * reply_msg)627   void OnAnswerDelay(Message* reply_msg) override {
628     if (close_channel_) {
629       delete reply_msg;
630       CloseChannel();
631     } else {
632       SyncChannelTestMsg_AnswerToLife::WriteReplyParams(reply_msg, 42);
633       Send(reply_msg);
634     }
635   }
636 
637   bool close_channel_;
638 };
639 
Recursive()640 void Recursive() {
641   std::vector<Worker*> workers;
642   mojo::MessagePipe pipe;
643   workers.push_back(new RecursiveServer(true, std::move(pipe.handle0)));
644   workers.push_back(new RecursiveClient(false, std::move(pipe.handle1)));
645   RunTest(workers);
646 }
647 
648 // Tests a server calling Send while another Send is pending.
TEST_F(IPCSyncChannelTest,Recursive)649 TEST_F(IPCSyncChannelTest, Recursive) {
650   Recursive();
651 }
652 
653 //------------------------------------------------------------------------------
654 
RecursiveNoHang()655 void RecursiveNoHang() {
656   std::vector<Worker*> workers;
657   mojo::MessagePipe pipe;
658   workers.push_back(new RecursiveServer(false, std::move(pipe.handle0)));
659   workers.push_back(new RecursiveClient(true, std::move(pipe.handle1)));
660   RunTest(workers);
661 }
662 
663 // Tests that if a caller makes a sync call during an existing sync call and
664 // the receiver dies, neither of the Send() calls hang.
TEST_F(IPCSyncChannelTest,RecursiveNoHang)665 TEST_F(IPCSyncChannelTest, RecursiveNoHang) {
666   RecursiveNoHang();
667 }
668 
669 //------------------------------------------------------------------------------
670 
671 class MultipleServer1 : public Worker {
672  public:
MultipleServer1(mojo::ScopedMessagePipeHandle channel_handle)673   explicit MultipleServer1(mojo::ScopedMessagePipeHandle channel_handle)
674       : Worker(std::move(channel_handle), Channel::MODE_SERVER) {}
675 
Run()676   void Run() override {
677     SendDouble(true);
678     Done();
679   }
680 };
681 
682 class MultipleClient1 : public Worker {
683  public:
MultipleClient1(WaitableEvent * client1_msg_received,WaitableEvent * client1_can_reply,mojo::ScopedMessagePipeHandle channel_handle)684   MultipleClient1(WaitableEvent* client1_msg_received,
685                   WaitableEvent* client1_can_reply,
686                   mojo::ScopedMessagePipeHandle channel_handle)
687       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
688         client1_msg_received_(client1_msg_received),
689         client1_can_reply_(client1_can_reply) {}
690 
OnDouble(int in,int * out)691   void OnDouble(int in, int* out) override {
692     client1_msg_received_->Signal();
693     *out = in * 2;
694     client1_can_reply_->Wait();
695     Done();
696   }
697 
698  private:
699   // This field is not a raw_ptr<> because it was filtered by the rewriter for:
700   // #overlapping
701   RAW_PTR_EXCLUSION WaitableEvent *client1_msg_received_, *client1_can_reply_;
702 };
703 
704 class MultipleServer2 : public Worker {
705  public:
MultipleServer2(mojo::ScopedMessagePipeHandle channel_handle)706   explicit MultipleServer2(mojo::ScopedMessagePipeHandle channel_handle)
707       : Worker(std::move(channel_handle), Channel::MODE_SERVER) {}
708 
OnAnswer(int * result)709   void OnAnswer(int* result) override {
710     *result = 42;
711     Done();
712   }
713 };
714 
715 class MultipleClient2 : public Worker {
716  public:
MultipleClient2(WaitableEvent * client1_msg_received,WaitableEvent * client1_can_reply,mojo::ScopedMessagePipeHandle channel_handle)717   MultipleClient2(WaitableEvent* client1_msg_received,
718                   WaitableEvent* client1_can_reply,
719                   mojo::ScopedMessagePipeHandle channel_handle)
720       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
721         client1_msg_received_(client1_msg_received),
722         client1_can_reply_(client1_can_reply) {}
723 
Run()724   void Run() override {
725     client1_msg_received_->Wait();
726     SendAnswerToLife(true);
727     client1_can_reply_->Signal();
728     Done();
729   }
730 
731  private:
732   // This field is not a raw_ptr<> because it was filtered by the rewriter for:
733   // #overlapping
734   RAW_PTR_EXCLUSION WaitableEvent *client1_msg_received_, *client1_can_reply_;
735 };
736 
Multiple()737 void Multiple() {
738   std::vector<Worker*> workers;
739 
740   // A shared worker thread so that server1 and server2 run on one thread.
741   base::Thread worker_thread("Multiple");
742   ASSERT_TRUE(worker_thread.Start());
743 
744   // Server1 sends a sync msg to client1, which blocks the reply until
745   // server2 (which runs on the same worker thread as server1) responds
746   // to a sync msg from client2.
747   WaitableEvent client1_msg_received(
748       base::WaitableEvent::ResetPolicy::AUTOMATIC,
749       base::WaitableEvent::InitialState::NOT_SIGNALED);
750   WaitableEvent client1_can_reply(
751       base::WaitableEvent::ResetPolicy::AUTOMATIC,
752       base::WaitableEvent::InitialState::NOT_SIGNALED);
753 
754   Worker* worker;
755 
756   mojo::MessagePipe pipe1, pipe2;
757   worker = new MultipleServer2(std::move(pipe2.handle0));
758   worker->OverrideThread(&worker_thread);
759   workers.push_back(worker);
760 
761   worker = new MultipleClient2(&client1_msg_received, &client1_can_reply,
762                                std::move(pipe2.handle1));
763   workers.push_back(worker);
764 
765   worker = new MultipleServer1(std::move(pipe1.handle0));
766   worker->OverrideThread(&worker_thread);
767   workers.push_back(worker);
768 
769   worker = new MultipleClient1(&client1_msg_received, &client1_can_reply,
770                                std::move(pipe1.handle1));
771   workers.push_back(worker);
772 
773   RunTest(workers);
774 }
775 
776 // Tests that multiple SyncObjects on the same listener thread can unblock each
777 // other.
TEST_F(IPCSyncChannelTest,Multiple)778 TEST_F(IPCSyncChannelTest, Multiple) {
779   Multiple();
780 }
781 
782 //------------------------------------------------------------------------------
783 
784 // This class provides server side functionality to test the case where
785 // multiple sync channels are in use on the same thread on the client.
786 class QueuedReplyServer : public Worker {
787  public:
QueuedReplyServer(base::Thread * listener_thread,mojo::ScopedMessagePipeHandle channel_handle,const std::string & reply_text)788   QueuedReplyServer(base::Thread* listener_thread,
789                     mojo::ScopedMessagePipeHandle channel_handle,
790                     const std::string& reply_text)
791       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
792         reply_text_(reply_text) {
793     Worker::OverrideThread(listener_thread);
794   }
795 
OnNestedTestMsg(Message * reply_msg)796   void OnNestedTestMsg(Message* reply_msg) override {
797     VLOG(1) << __FUNCTION__ << " Sending reply: " << reply_text_;
798     SyncChannelNestedTestMsg_String::WriteReplyParams(reply_msg, reply_text_);
799     Send(reply_msg);
800     Done();
801   }
802 
803  private:
804   std::string reply_text_;
805 };
806 
807 // The QueuedReplyClient class provides functionality to test the case where
808 // multiple sync channels are in use on the same thread.
809 class QueuedReplyClient : public Worker {
810  public:
QueuedReplyClient(base::Thread * listener_thread,mojo::ScopedMessagePipeHandle channel_handle,const std::string & expected_text)811   QueuedReplyClient(base::Thread* listener_thread,
812                     mojo::ScopedMessagePipeHandle channel_handle,
813                     const std::string& expected_text)
814       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
815         expected_text_(expected_text) {
816     Worker::OverrideThread(listener_thread);
817   }
818 
Run()819   void Run() override {
820     std::string response;
821     SyncMessage* msg = new SyncChannelNestedTestMsg_String(&response);
822     bool result = Send(msg);
823     DCHECK(result);
824     DCHECK_EQ(response, expected_text_);
825 
826     VLOG(1) << __FUNCTION__ << " Received reply: " << response;
827     Done();
828   }
829 
830  private:
831   std::string expected_text_;
832 };
833 
QueuedReply()834 void QueuedReply() {
835   std::vector<Worker*> workers;
836 
837   // A shared worker thread for servers
838   base::Thread server_worker_thread("QueuedReply_ServerListener");
839   ASSERT_TRUE(server_worker_thread.Start());
840 
841   base::Thread client_worker_thread("QueuedReply_ClientListener");
842   ASSERT_TRUE(client_worker_thread.Start());
843 
844   Worker* worker;
845 
846   mojo::MessagePipe pipe1, pipe2;
847   worker = new QueuedReplyServer(&server_worker_thread,
848                                  std::move(pipe1.handle0), "Got first message");
849   workers.push_back(worker);
850 
851   worker = new QueuedReplyServer(
852       &server_worker_thread, std::move(pipe2.handle0), "Got second message");
853   workers.push_back(worker);
854 
855   worker = new QueuedReplyClient(&client_worker_thread,
856                                  std::move(pipe1.handle1), "Got first message");
857   workers.push_back(worker);
858 
859   worker = new QueuedReplyClient(
860       &client_worker_thread, std::move(pipe2.handle1), "Got second message");
861   workers.push_back(worker);
862 
863   RunTest(workers);
864 }
865 
866 // While a blocking send is in progress, the listener thread might answer other
867 // synchronous messages.  This tests that if during the response to another
868 // message the reply to the original messages comes, it is queued up correctly
869 // and the original Send is unblocked later.
TEST_F(IPCSyncChannelTest,QueuedReply)870 TEST_F(IPCSyncChannelTest, QueuedReply) {
871   QueuedReply();
872 }
873 
874 //------------------------------------------------------------------------------
875 
876 class TestSyncMessageFilter : public SyncMessageFilter {
877  public:
TestSyncMessageFilter(base::WaitableEvent * shutdown_event,Worker * worker,scoped_refptr<base::SingleThreadTaskRunner> task_runner)878   TestSyncMessageFilter(
879       base::WaitableEvent* shutdown_event,
880       Worker* worker,
881       scoped_refptr<base::SingleThreadTaskRunner> task_runner)
882       : SyncMessageFilter(shutdown_event),
883         worker_(worker),
884         task_runner_(task_runner) {}
885 
OnFilterAdded(Channel * channel)886   void OnFilterAdded(Channel* channel) override {
887     SyncMessageFilter::OnFilterAdded(channel);
888     task_runner_->PostTask(
889         FROM_HERE,
890         base::BindOnce(&TestSyncMessageFilter::SendMessageOnHelperThread,
891                        this));
892   }
893 
SendMessageOnHelperThread()894   void SendMessageOnHelperThread() {
895     int answer = 0;
896     bool result = Send(new SyncChannelTestMsg_AnswerToLife(&answer));
897     DCHECK(result);
898     DCHECK_EQ(answer, 42);
899 
900     worker_->Done();
901   }
902 
903  private:
904   ~TestSyncMessageFilter() override = default;
905 
906   raw_ptr<Worker> worker_;
907   scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
908 };
909 
910 class SyncMessageFilterServer : public Worker {
911  public:
SyncMessageFilterServer(mojo::ScopedMessagePipeHandle channel_handle)912   explicit SyncMessageFilterServer(mojo::ScopedMessagePipeHandle channel_handle)
913       : Worker(Channel::MODE_SERVER,
914                "sync_message_filter_server",
915                std::move(channel_handle)),
916         thread_("helper_thread") {
917     base::Thread::Options options;
918     options.message_pump_type = base::MessagePumpType::DEFAULT;
919     thread_.StartWithOptions(std::move(options));
920     filter_ = new TestSyncMessageFilter(shutdown_event(), this,
921                                         thread_.task_runner());
922   }
923 
Run()924   void Run() override {
925     channel()->AddFilter(filter_.get());
926   }
927 
928   base::Thread thread_;
929   scoped_refptr<TestSyncMessageFilter> filter_;
930 };
931 
932 // This class provides functionality to test the case that a Send on the sync
933 // channel does not crash after the channel has been closed.
934 class ServerSendAfterClose : public Worker {
935  public:
ServerSendAfterClose(mojo::ScopedMessagePipeHandle channel_handle)936   explicit ServerSendAfterClose(mojo::ScopedMessagePipeHandle channel_handle)
937       : Worker(Channel::MODE_SERVER,
938                "simpler_server",
939                std::move(channel_handle)),
940         send_result_(true) {}
941 
SendDummy()942   bool SendDummy() {
943     ListenerThread()->task_runner()->PostTask(
944         FROM_HERE,
945         base::BindOnce(base::IgnoreResult(&ServerSendAfterClose::Send),
946                        base::Unretained(this), new SyncChannelTestMsg_NoArgs));
947     return true;
948   }
949 
send_result() const950   bool send_result() const {
951     return send_result_;
952   }
953 
954  private:
Run()955   void Run() override {
956     CloseChannel();
957     Done();
958   }
959 
Send(Message * msg)960   bool Send(Message* msg) override {
961     send_result_ = Worker::Send(msg);
962     Done();
963     return send_result_;
964   }
965 
966   bool send_result_;
967 };
968 
969 // Tests basic synchronous call
TEST_F(IPCSyncChannelTest,SyncMessageFilter)970 TEST_F(IPCSyncChannelTest, SyncMessageFilter) {
971   std::vector<Worker*> workers;
972   mojo::MessagePipe pipe;
973   workers.push_back(new SyncMessageFilterServer(std::move(pipe.handle0)));
974   workers.push_back(new SimpleClient(std::move(pipe.handle1)));
975   RunTest(workers);
976 }
977 
978 // Test the case when the channel is closed and a Send is attempted after that.
TEST_F(IPCSyncChannelTest,SendAfterClose)979 TEST_F(IPCSyncChannelTest, SendAfterClose) {
980   mojo::MessagePipe pipe;
981   ServerSendAfterClose server(std::move(pipe.handle0));
982   server.Start();
983 
984   server.done_event()->Wait();
985   server.done_event()->Reset();
986 
987   server.SendDummy();
988   server.done_event()->Wait();
989 
990   EXPECT_FALSE(server.send_result());
991 
992   server.Shutdown();
993 }
994 
995 //------------------------------------------------------------------------------
996 
997 class RestrictedDispatchServer : public Worker {
998  public:
RestrictedDispatchServer(WaitableEvent * sent_ping_event,WaitableEvent * wait_event,mojo::ScopedMessagePipeHandle channel_handle)999   RestrictedDispatchServer(WaitableEvent* sent_ping_event,
1000                            WaitableEvent* wait_event,
1001                            mojo::ScopedMessagePipeHandle channel_handle)
1002       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
1003         sent_ping_event_(sent_ping_event),
1004         wait_event_(wait_event) {}
1005 
OnDoPing(int ping)1006   void OnDoPing(int ping) {
1007     // Send an asynchronous message that unblocks the caller.
1008     Message* msg = new SyncChannelTestMsg_Ping(ping);
1009     msg->set_unblock(true);
1010     Send(msg);
1011     // Signal the event after the message has been sent on the channel, on the
1012     // IPC thread.
1013     ipc_thread().task_runner()->PostTask(
1014         FROM_HERE, base::BindOnce(&RestrictedDispatchServer::OnPingSent,
1015                                   base::Unretained(this)));
1016   }
1017 
OnPingTTL(int ping,int * out)1018   void OnPingTTL(int ping, int* out) {
1019     *out = ping;
1020     wait_event_->Wait();
1021   }
1022 
ListenerThread()1023   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1024 
1025  private:
OnMessageReceived(const Message & message)1026   bool OnMessageReceived(const Message& message) override {
1027     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchServer, message)
1028      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1029      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1030      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1031     IPC_END_MESSAGE_MAP()
1032     return true;
1033   }
1034 
OnPingSent()1035   void OnPingSent() {
1036     sent_ping_event_->Signal();
1037   }
1038 
OnNoArgs()1039   void OnNoArgs() { }
1040   raw_ptr<WaitableEvent> sent_ping_event_;
1041   raw_ptr<WaitableEvent> wait_event_;
1042 };
1043 
1044 class NonRestrictedDispatchServer : public Worker {
1045  public:
NonRestrictedDispatchServer(WaitableEvent * signal_event,mojo::ScopedMessagePipeHandle channel_handle)1046   NonRestrictedDispatchServer(WaitableEvent* signal_event,
1047                               mojo::ScopedMessagePipeHandle channel_handle)
1048       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
1049         signal_event_(signal_event) {}
1050 
ListenerThread()1051   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1052 
OnDoPingTTL(int ping)1053   void OnDoPingTTL(int ping) {
1054     int value = 0;
1055     Send(new SyncChannelTestMsg_PingTTL(ping, &value));
1056     signal_event_->Signal();
1057   }
1058 
1059  private:
OnMessageReceived(const Message & message)1060   bool OnMessageReceived(const Message& message) override {
1061     IPC_BEGIN_MESSAGE_MAP(NonRestrictedDispatchServer, message)
1062      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1063      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1064     IPC_END_MESSAGE_MAP()
1065     return true;
1066   }
1067 
OnNoArgs()1068   void OnNoArgs() { }
1069   raw_ptr<WaitableEvent> signal_event_;
1070 };
1071 
1072 class RestrictedDispatchClient : public Worker {
1073  public:
RestrictedDispatchClient(WaitableEvent * sent_ping_event,RestrictedDispatchServer * server,NonRestrictedDispatchServer * server2,int * success,mojo::ScopedMessagePipeHandle restricted_channel_handle,mojo::ScopedMessagePipeHandle non_restricted_channel_handle)1074   RestrictedDispatchClient(
1075       WaitableEvent* sent_ping_event,
1076       RestrictedDispatchServer* server,
1077       NonRestrictedDispatchServer* server2,
1078       int* success,
1079       mojo::ScopedMessagePipeHandle restricted_channel_handle,
1080       mojo::ScopedMessagePipeHandle non_restricted_channel_handle)
1081       : Worker(std::move(restricted_channel_handle), Channel::MODE_CLIENT),
1082         ping_(0),
1083         server_(server),
1084         server2_(server2),
1085         success_(success),
1086         sent_ping_event_(sent_ping_event),
1087         non_restricted_channel_handle_(
1088             std::move(non_restricted_channel_handle)) {}
1089 
Run()1090   void Run() override {
1091     // Incoming messages from our channel should only be dispatched when we
1092     // send a message on that same channel.
1093     channel()->SetRestrictDispatchChannelGroup(1);
1094 
1095     server_->ListenerThread()->task_runner()->PostTask(
1096         FROM_HERE, base::BindOnce(&RestrictedDispatchServer::OnDoPing,
1097                                   base::Unretained(server_), 1));
1098     sent_ping_event_->Wait();
1099     Send(new SyncChannelTestMsg_NoArgs);
1100     if (ping_ == 1)
1101       ++*success_;
1102     else
1103       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1104 
1105     non_restricted_channel_ = SyncChannel::Create(
1106         non_restricted_channel_handle_.release(), IPC::Channel::MODE_CLIENT,
1107         this, ipc_thread().task_runner(),
1108         base::SingleThreadTaskRunner::GetCurrentDefault(), true,
1109         shutdown_event());
1110 
1111     server_->ListenerThread()->task_runner()->PostTask(
1112         FROM_HERE, base::BindOnce(&RestrictedDispatchServer::OnDoPing,
1113                                   base::Unretained(server_), 2));
1114     sent_ping_event_->Wait();
1115     // Check that the incoming message is *not* dispatched when sending on the
1116     // non restricted channel.
1117     // TODO(piman): there is a possibility of a false positive race condition
1118     // here, if the message that was posted on the server-side end of the pipe
1119     // is not visible yet on the client side, but I don't know how to solve this
1120     // without hooking into the internals of SyncChannel. I haven't seen it in
1121     // practice (i.e. not setting SetRestrictDispatchToSameChannel does cause
1122     // the following to fail).
1123     non_restricted_channel_->Send(new SyncChannelTestMsg_NoArgs);
1124     if (ping_ == 1)
1125       ++*success_;
1126     else
1127       LOG(ERROR) << "Send dispatched message from restricted channel";
1128 
1129     Send(new SyncChannelTestMsg_NoArgs);
1130     if (ping_ == 2)
1131       ++*success_;
1132     else
1133       LOG(ERROR) << "Send failed to dispatch incoming message on same channel";
1134 
1135     // Check that the incoming message on the non-restricted channel is
1136     // dispatched when sending on the restricted channel.
1137     server2_->ListenerThread()->task_runner()->PostTask(
1138         FROM_HERE, base::BindOnce(&NonRestrictedDispatchServer::OnDoPingTTL,
1139                                   base::Unretained(server2_), 3));
1140     int value = 0;
1141     Send(new SyncChannelTestMsg_PingTTL(4, &value));
1142     if (ping_ == 3 && value == 4)
1143       ++*success_;
1144     else
1145       LOG(ERROR) << "Send failed to dispatch message from unrestricted channel";
1146 
1147     non_restricted_channel_->Send(new SyncChannelTestMsg_Done);
1148     non_restricted_channel_.reset();
1149     Send(new SyncChannelTestMsg_Done);
1150     Done();
1151   }
1152 
1153  private:
OnMessageReceived(const Message & message)1154   bool OnMessageReceived(const Message& message) override {
1155     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchClient, message)
1156      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Ping, OnPing)
1157      IPC_MESSAGE_HANDLER_DELAY_REPLY(SyncChannelTestMsg_PingTTL, OnPingTTL)
1158     IPC_END_MESSAGE_MAP()
1159     return true;
1160   }
1161 
OnPing(int ping)1162   void OnPing(int ping) {
1163     ping_ = ping;
1164   }
1165 
OnPingTTL(int ping,IPC::Message * reply)1166   void OnPingTTL(int ping, IPC::Message* reply) {
1167     ping_ = ping;
1168     // This message comes from the NonRestrictedDispatchServer, we have to send
1169     // the reply back manually.
1170     SyncChannelTestMsg_PingTTL::WriteReplyParams(reply, ping);
1171     non_restricted_channel_->Send(reply);
1172   }
1173 
1174   int ping_;
1175   raw_ptr<RestrictedDispatchServer, DanglingUntriaged> server_;
1176   raw_ptr<NonRestrictedDispatchServer, DanglingUntriaged> server2_;
1177   raw_ptr<int> success_;
1178   raw_ptr<WaitableEvent> sent_ping_event_;
1179   std::unique_ptr<SyncChannel> non_restricted_channel_;
1180   mojo::ScopedMessagePipeHandle non_restricted_channel_handle_;
1181 };
1182 
TEST_F(IPCSyncChannelTest,RestrictedDispatch)1183 TEST_F(IPCSyncChannelTest, RestrictedDispatch) {
1184   WaitableEvent sent_ping_event(
1185       base::WaitableEvent::ResetPolicy::AUTOMATIC,
1186       base::WaitableEvent::InitialState::NOT_SIGNALED);
1187   WaitableEvent wait_event(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1188                            base::WaitableEvent::InitialState::NOT_SIGNALED);
1189   mojo::MessagePipe restricted_pipe, non_restricted_pipe;
1190   RestrictedDispatchServer* server = new RestrictedDispatchServer(
1191       &sent_ping_event, &wait_event, std::move(restricted_pipe.handle0));
1192   NonRestrictedDispatchServer* server2 = new NonRestrictedDispatchServer(
1193       &wait_event, std::move(non_restricted_pipe.handle0));
1194 
1195   int success = 0;
1196   std::vector<Worker*> workers;
1197   workers.push_back(server);
1198   workers.push_back(server2);
1199   workers.push_back(
1200       new RestrictedDispatchClient(&sent_ping_event, server, server2, &success,
1201                                    std::move(restricted_pipe.handle1),
1202                                    std::move(non_restricted_pipe.handle1)));
1203   RunTest(workers);
1204   EXPECT_EQ(4, success);
1205 }
1206 
1207 //------------------------------------------------------------------------------
1208 
1209 // This test case inspired by crbug.com/108491
1210 // We create two servers that use the same ListenerThread but have
1211 // SetRestrictDispatchToSameChannel set to true.
1212 // We create clients, then use some specific WaitableEvent wait/signalling to
1213 // ensure that messages get dispatched in a way that causes a deadlock due to
1214 // a nested dispatch and an eligible message in a higher-level dispatch's
1215 // delayed_queue. Specifically, we start with client1 about so send an
1216 // unblocking message to server1, while the shared listener thread for the
1217 // servers server1 and server2 is about to send a non-unblocking message to
1218 // client1. At the same time, client2 will be about to send an unblocking
1219 // message to server2. Server1 will handle the client1->server1 message by
1220 // telling server2 to send a non-unblocking message to client2.
1221 // What should happen is that the send to server2 should find the pending,
1222 // same-context client2->server2 message to dispatch, causing client2 to
1223 // unblock then handle the server2->client2 message, so that the shared
1224 // servers' listener thread can then respond to the client1->server1 message.
1225 // Then client1 can handle the non-unblocking server1->client1 message.
1226 // The old code would end up in a state where the server2->client2 message is
1227 // sent, but the client2->server2 message (which is eligible for dispatch, and
1228 // which is what client2 is waiting for) is stashed in a local delayed_queue
1229 // that has server1's channel context, causing a deadlock.
1230 // WaitableEvents in the events array are used to:
1231 //   event 0: indicate to client1 that server listener is in OnDoServerTask
1232 //   event 1: indicate to client1 that client2 listener is in OnDoClient2Task
1233 //   event 2: indicate to server1 that client2 listener is in OnDoClient2Task
1234 //   event 3: indicate to client2 that server listener is in OnDoServerTask
1235 
1236 class RestrictedDispatchDeadlockServer : public Worker {
1237  public:
RestrictedDispatchDeadlockServer(int server_num,WaitableEvent * server_ready_event,WaitableEvent ** events,RestrictedDispatchDeadlockServer * peer,mojo::ScopedMessagePipeHandle channel_handle)1238   RestrictedDispatchDeadlockServer(int server_num,
1239                                    WaitableEvent* server_ready_event,
1240                                    WaitableEvent** events,
1241                                    RestrictedDispatchDeadlockServer* peer,
1242                                    mojo::ScopedMessagePipeHandle channel_handle)
1243       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
1244         server_num_(server_num),
1245         server_ready_event_(server_ready_event),
1246         events_(events),
1247         peer_(peer) {}
1248 
OnDoServerTask()1249   void OnDoServerTask() {
1250     events_[3]->Signal();
1251     events_[2]->Wait();
1252     events_[0]->Signal();
1253     SendMessageToClient();
1254   }
1255 
Run()1256   void Run() override {
1257     channel()->SetRestrictDispatchChannelGroup(1);
1258     server_ready_event_->Signal();
1259   }
1260 
ListenerThread()1261   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1262 
1263  private:
OnMessageReceived(const Message & message)1264   bool OnMessageReceived(const Message& message) override {
1265     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockServer, message)
1266      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1267      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, Done)
1268     IPC_END_MESSAGE_MAP()
1269     return true;
1270   }
1271 
OnNoArgs()1272   void OnNoArgs() {
1273     if (server_num_ == 1) {
1274       DCHECK(peer_);
1275       peer_->SendMessageToClient();
1276     }
1277   }
1278 
SendMessageToClient()1279   void SendMessageToClient() {
1280     Message* msg = new SyncChannelTestMsg_NoArgs;
1281     msg->set_unblock(false);
1282     DCHECK(!msg->should_unblock());
1283     Send(msg);
1284   }
1285 
1286   int server_num_;
1287   raw_ptr<WaitableEvent> server_ready_event_;
1288   raw_ptr<WaitableEvent*, AllowPtrArithmetic> events_;
1289   raw_ptr<RestrictedDispatchDeadlockServer, DanglingUntriaged> peer_;
1290 };
1291 
1292 class RestrictedDispatchDeadlockClient2 : public Worker {
1293  public:
RestrictedDispatchDeadlockClient2(RestrictedDispatchDeadlockServer * server,WaitableEvent * server_ready_event,WaitableEvent ** events,mojo::ScopedMessagePipeHandle channel_handle)1294   RestrictedDispatchDeadlockClient2(
1295       RestrictedDispatchDeadlockServer* server,
1296       WaitableEvent* server_ready_event,
1297       WaitableEvent** events,
1298       mojo::ScopedMessagePipeHandle channel_handle)
1299       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
1300         server_ready_event_(server_ready_event),
1301         events_(events),
1302         received_msg_(false),
1303         received_noarg_reply_(false),
1304         done_issued_(false) {}
1305 
Run()1306   void Run() override {
1307     server_ready_event_->Wait();
1308   }
1309 
OnDoClient2Task()1310   void OnDoClient2Task() {
1311     events_[3]->Wait();
1312     events_[1]->Signal();
1313     events_[2]->Signal();
1314     DCHECK(received_msg_ == false);
1315 
1316     Message* message = new SyncChannelTestMsg_NoArgs;
1317     message->set_unblock(true);
1318     Send(message);
1319     received_noarg_reply_ = true;
1320   }
1321 
ListenerThread()1322   base::Thread* ListenerThread() { return Worker::ListenerThread(); }
1323 
1324  private:
OnMessageReceived(const Message & message)1325   bool OnMessageReceived(const Message& message) override {
1326     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient2, message)
1327      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1328     IPC_END_MESSAGE_MAP()
1329     return true;
1330   }
1331 
OnNoArgs()1332   void OnNoArgs() {
1333     received_msg_ = true;
1334     PossiblyDone();
1335   }
1336 
PossiblyDone()1337   void PossiblyDone() {
1338     if (received_noarg_reply_ && received_msg_) {
1339       DCHECK(done_issued_ == false);
1340       done_issued_ = true;
1341       Send(new SyncChannelTestMsg_Done);
1342       Done();
1343     }
1344   }
1345 
1346   raw_ptr<WaitableEvent> server_ready_event_;
1347   raw_ptr<WaitableEvent*, AllowPtrArithmetic> events_;
1348   bool received_msg_;
1349   bool received_noarg_reply_;
1350   bool done_issued_;
1351 };
1352 
1353 class RestrictedDispatchDeadlockClient1 : public Worker {
1354  public:
RestrictedDispatchDeadlockClient1(RestrictedDispatchDeadlockServer * server,RestrictedDispatchDeadlockClient2 * peer,WaitableEvent * server_ready_event,WaitableEvent ** events,mojo::ScopedMessagePipeHandle channel_handle)1355   RestrictedDispatchDeadlockClient1(
1356       RestrictedDispatchDeadlockServer* server,
1357       RestrictedDispatchDeadlockClient2* peer,
1358       WaitableEvent* server_ready_event,
1359       WaitableEvent** events,
1360       mojo::ScopedMessagePipeHandle channel_handle)
1361       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
1362         server_(server),
1363         peer_(peer),
1364         server_ready_event_(server_ready_event),
1365         events_(events),
1366         received_msg_(false),
1367         received_noarg_reply_(false),
1368         done_issued_(false) {}
1369 
Run()1370   void Run() override {
1371     server_ready_event_->Wait();
1372     server_->ListenerThread()->task_runner()->PostTask(
1373         FROM_HERE,
1374         base::BindOnce(&RestrictedDispatchDeadlockServer::OnDoServerTask,
1375                        base::Unretained(server_)));
1376     peer_->ListenerThread()->task_runner()->PostTask(
1377         FROM_HERE,
1378         base::BindOnce(&RestrictedDispatchDeadlockClient2::OnDoClient2Task,
1379                        base::Unretained(peer_)));
1380     events_[0]->Wait();
1381     events_[1]->Wait();
1382     DCHECK(received_msg_ == false);
1383 
1384     Message* message = new SyncChannelTestMsg_NoArgs;
1385     message->set_unblock(true);
1386     Send(message);
1387     received_noarg_reply_ = true;
1388     PossiblyDone();
1389   }
1390 
1391  private:
OnMessageReceived(const Message & message)1392   bool OnMessageReceived(const Message& message) override {
1393     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchDeadlockClient1, message)
1394      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_NoArgs, OnNoArgs)
1395     IPC_END_MESSAGE_MAP()
1396     return true;
1397   }
1398 
OnNoArgs()1399   void OnNoArgs() {
1400     received_msg_ = true;
1401     PossiblyDone();
1402   }
1403 
PossiblyDone()1404   void PossiblyDone() {
1405     if (received_noarg_reply_ && received_msg_) {
1406       DCHECK(done_issued_ == false);
1407       done_issued_ = true;
1408       Send(new SyncChannelTestMsg_Done);
1409       Done();
1410     }
1411   }
1412 
1413   raw_ptr<RestrictedDispatchDeadlockServer, DanglingUntriaged> server_;
1414   raw_ptr<RestrictedDispatchDeadlockClient2, DanglingUntriaged> peer_;
1415   raw_ptr<WaitableEvent> server_ready_event_;
1416   raw_ptr<WaitableEvent*, AllowPtrArithmetic> events_;
1417   bool received_msg_;
1418   bool received_noarg_reply_;
1419   bool done_issued_;
1420 };
1421 
TEST_F(IPCSyncChannelTest,RestrictedDispatchDeadlock)1422 TEST_F(IPCSyncChannelTest, RestrictedDispatchDeadlock) {
1423   std::vector<Worker*> workers;
1424 
1425   // A shared worker thread so that server1 and server2 run on one thread.
1426   base::Thread worker_thread("RestrictedDispatchDeadlock");
1427   ASSERT_TRUE(worker_thread.Start());
1428 
1429   WaitableEvent server1_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1430                               base::WaitableEvent::InitialState::NOT_SIGNALED);
1431   WaitableEvent server2_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1432                               base::WaitableEvent::InitialState::NOT_SIGNALED);
1433 
1434   WaitableEvent event0(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1435                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1436   WaitableEvent event1(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1437                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1438   WaitableEvent event2(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1439                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1440   WaitableEvent event3(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1441                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1442   WaitableEvent* events[4] = {&event0, &event1, &event2, &event3};
1443 
1444   RestrictedDispatchDeadlockServer* server1;
1445   RestrictedDispatchDeadlockServer* server2;
1446   RestrictedDispatchDeadlockClient1* client1;
1447   RestrictedDispatchDeadlockClient2* client2;
1448 
1449   mojo::MessagePipe pipe1, pipe2;
1450   server2 = new RestrictedDispatchDeadlockServer(
1451       2, &server2_ready, events, nullptr, std::move(pipe2.handle0));
1452   server2->OverrideThread(&worker_thread);
1453   workers.push_back(server2);
1454 
1455   client2 = new RestrictedDispatchDeadlockClient2(
1456       server2, &server2_ready, events, std::move(pipe2.handle1));
1457   workers.push_back(client2);
1458 
1459   server1 = new RestrictedDispatchDeadlockServer(
1460       1, &server1_ready, events, server2, std::move(pipe1.handle0));
1461   server1->OverrideThread(&worker_thread);
1462   workers.push_back(server1);
1463 
1464   client1 = new RestrictedDispatchDeadlockClient1(
1465       server1, client2, &server1_ready, events, std::move(pipe1.handle1));
1466   workers.push_back(client1);
1467 
1468   RunTest(workers);
1469 }
1470 
1471 //------------------------------------------------------------------------------
1472 
1473 // This test case inspired by crbug.com/120530
1474 // We create 4 workers that pipe to each other W1->W2->W3->W4->W1 then we send a
1475 // message that recurses through 3, 4 or 5 steps to make sure, say, W1 can
1476 // re-enter when called from W4 while it's sending a message to W2.
1477 // The first worker drives the whole test so it must be treated specially.
1478 
1479 class RestrictedDispatchPipeWorker : public Worker {
1480  public:
RestrictedDispatchPipeWorker(mojo::ScopedMessagePipeHandle channel_handle1,WaitableEvent * event1,mojo::ScopedMessagePipeHandle channel_handle2,WaitableEvent * event2,int group,int * success)1481   RestrictedDispatchPipeWorker(mojo::ScopedMessagePipeHandle channel_handle1,
1482                                WaitableEvent* event1,
1483                                mojo::ScopedMessagePipeHandle channel_handle2,
1484                                WaitableEvent* event2,
1485                                int group,
1486                                int* success)
1487       : Worker(std::move(channel_handle1), Channel::MODE_SERVER),
1488         event1_(event1),
1489         event2_(event2),
1490         other_channel_handle_(std::move(channel_handle2)),
1491         group_(group),
1492         success_(success) {}
1493 
OnPingTTL(int ping,int * ret)1494   void OnPingTTL(int ping, int* ret) {
1495     *ret = 0;
1496     if (!ping)
1497       return;
1498     other_channel_->Send(new SyncChannelTestMsg_PingTTL(ping - 1, ret));
1499     ++*ret;
1500   }
1501 
OnDone()1502   void OnDone() {
1503     if (is_first())
1504       return;
1505     other_channel_->Send(new SyncChannelTestMsg_Done);
1506     other_channel_.reset();
1507     Done();
1508   }
1509 
Run()1510   void Run() override {
1511     channel()->SetRestrictDispatchChannelGroup(group_);
1512     if (is_first())
1513       event1_->Signal();
1514     event2_->Wait();
1515     other_channel_ = SyncChannel::Create(
1516         other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this,
1517         ipc_thread().task_runner(),
1518         base::SingleThreadTaskRunner::GetCurrentDefault(), true,
1519         shutdown_event());
1520     other_channel_->SetRestrictDispatchChannelGroup(group_);
1521     if (!is_first()) {
1522       event1_->Signal();
1523       return;
1524     }
1525     *success_ = 0;
1526     int value = 0;
1527     OnPingTTL(3, &value);
1528     *success_ += (value == 3);
1529     OnPingTTL(4, &value);
1530     *success_ += (value == 4);
1531     OnPingTTL(5, &value);
1532     *success_ += (value == 5);
1533     other_channel_->Send(new SyncChannelTestMsg_Done);
1534     other_channel_.reset();
1535     Done();
1536   }
1537 
is_first()1538   bool is_first() { return !!success_; }
1539 
1540  private:
OnMessageReceived(const Message & message)1541   bool OnMessageReceived(const Message& message) override {
1542     IPC_BEGIN_MESSAGE_MAP(RestrictedDispatchPipeWorker, message)
1543      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_PingTTL, OnPingTTL)
1544      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Done, OnDone)
1545     IPC_END_MESSAGE_MAP()
1546     return true;
1547   }
1548 
1549   std::unique_ptr<SyncChannel> other_channel_;
1550   raw_ptr<WaitableEvent> event1_;
1551   raw_ptr<WaitableEvent> event2_;
1552   mojo::ScopedMessagePipeHandle other_channel_handle_;
1553   int group_;
1554   raw_ptr<int> success_;
1555 };
1556 
1557 #if BUILDFLAG(IS_ANDROID)
1558 #define MAYBE_RestrictedDispatch4WayDeadlock \
1559   DISABLED_RestrictedDispatch4WayDeadlock
1560 #else
1561 #define MAYBE_RestrictedDispatch4WayDeadlock RestrictedDispatch4WayDeadlock
1562 #endif
TEST_F(IPCSyncChannelTest,MAYBE_RestrictedDispatch4WayDeadlock)1563 TEST_F(IPCSyncChannelTest, MAYBE_RestrictedDispatch4WayDeadlock) {
1564   int success = 0;
1565   std::vector<Worker*> workers;
1566   WaitableEvent event0(base::WaitableEvent::ResetPolicy::MANUAL,
1567                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1568   WaitableEvent event1(base::WaitableEvent::ResetPolicy::MANUAL,
1569                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1570   WaitableEvent event2(base::WaitableEvent::ResetPolicy::MANUAL,
1571                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1572   WaitableEvent event3(base::WaitableEvent::ResetPolicy::MANUAL,
1573                        base::WaitableEvent::InitialState::NOT_SIGNALED);
1574   mojo::MessagePipe pipe0, pipe1, pipe2, pipe3;
1575   workers.push_back(new RestrictedDispatchPipeWorker(
1576       std::move(pipe0.handle0), &event0, std::move(pipe1.handle1), &event1, 1,
1577       &success));
1578   workers.push_back(new RestrictedDispatchPipeWorker(
1579       std::move(pipe1.handle0), &event1, std::move(pipe2.handle1), &event2, 2,
1580       nullptr));
1581   workers.push_back(new RestrictedDispatchPipeWorker(
1582       std::move(pipe2.handle0), &event2, std::move(pipe3.handle1), &event3, 3,
1583       nullptr));
1584   workers.push_back(new RestrictedDispatchPipeWorker(
1585       std::move(pipe3.handle0), &event3, std::move(pipe0.handle1), &event0, 4,
1586       nullptr));
1587   RunTest(workers);
1588   EXPECT_EQ(3, success);
1589 }
1590 
1591 //------------------------------------------------------------------------------
1592 
1593 // This test case inspired by crbug.com/122443
1594 // We want to make sure a reply message with the unblock flag set correctly
1595 // behaves as a reply, not a regular message.
1596 // We have 3 workers. Server1 will send a message to Server2 (which will block),
1597 // during which it will dispatch a message comming from Client, at which point
1598 // it will send another message to Server2. While sending that second message it
1599 // will receive a reply from Server1 with the unblock flag.
1600 
1601 class ReentrantReplyServer1 : public Worker {
1602  public:
ReentrantReplyServer1(WaitableEvent * server_ready,mojo::ScopedMessagePipeHandle channel_handle1,mojo::ScopedMessagePipeHandle channel_handle2)1603   ReentrantReplyServer1(WaitableEvent* server_ready,
1604                         mojo::ScopedMessagePipeHandle channel_handle1,
1605                         mojo::ScopedMessagePipeHandle channel_handle2)
1606       : Worker(std::move(channel_handle1), Channel::MODE_SERVER),
1607         server_ready_(server_ready),
1608         other_channel_handle_(std::move(channel_handle2)) {}
1609 
Run()1610   void Run() override {
1611     server2_channel_ = SyncChannel::Create(
1612         other_channel_handle_.release(), IPC::Channel::MODE_CLIENT, this,
1613         ipc_thread().task_runner(),
1614         base::SingleThreadTaskRunner::GetCurrentDefault(), true,
1615         shutdown_event());
1616     server_ready_->Signal();
1617     Message* msg = new SyncChannelTestMsg_Reentrant1();
1618     server2_channel_->Send(msg);
1619     server2_channel_.reset();
1620     Done();
1621   }
1622 
1623  private:
OnMessageReceived(const Message & message)1624   bool OnMessageReceived(const Message& message) override {
1625     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer1, message)
1626      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant2, OnReentrant2)
1627      IPC_REPLY_HANDLER(OnReply)
1628     IPC_END_MESSAGE_MAP()
1629     return true;
1630   }
1631 
OnReentrant2()1632   void OnReentrant2() {
1633     Message* msg = new SyncChannelTestMsg_Reentrant3();
1634     server2_channel_->Send(msg);
1635   }
1636 
OnReply(const Message & message)1637   void OnReply(const Message& message) {
1638     // If we get here, the Send() will never receive the reply (thus would
1639     // hang), so abort instead.
1640     LOG(FATAL) << "Reply message was dispatched";
1641   }
1642 
1643   raw_ptr<WaitableEvent> server_ready_;
1644   std::unique_ptr<SyncChannel> server2_channel_;
1645   mojo::ScopedMessagePipeHandle other_channel_handle_;
1646 };
1647 
1648 class ReentrantReplyServer2 : public Worker {
1649  public:
ReentrantReplyServer2(mojo::ScopedMessagePipeHandle channel_handle)1650   ReentrantReplyServer2(mojo::ScopedMessagePipeHandle channel_handle)
1651       : Worker(std::move(channel_handle), Channel::MODE_SERVER),
1652         reply_(nullptr) {}
1653 
1654  private:
OnMessageReceived(const Message & message)1655   bool OnMessageReceived(const Message& message) override {
1656     IPC_BEGIN_MESSAGE_MAP(ReentrantReplyServer2, message)
1657      IPC_MESSAGE_HANDLER_DELAY_REPLY(
1658          SyncChannelTestMsg_Reentrant1, OnReentrant1)
1659      IPC_MESSAGE_HANDLER(SyncChannelTestMsg_Reentrant3, OnReentrant3)
1660     IPC_END_MESSAGE_MAP()
1661     return true;
1662   }
1663 
OnReentrant1(Message * reply)1664   void OnReentrant1(Message* reply) {
1665     DCHECK(!reply_);
1666     reply_ = reply;
1667   }
1668 
OnReentrant3()1669   void OnReentrant3() {
1670     DCHECK(reply_);
1671     Message* reply = reply_;
1672     reply_ = nullptr;
1673     reply->set_unblock(true);
1674     Send(reply);
1675     Done();
1676   }
1677 
1678   raw_ptr<Message> reply_;
1679 };
1680 
1681 class ReentrantReplyClient : public Worker {
1682  public:
ReentrantReplyClient(WaitableEvent * server_ready,mojo::ScopedMessagePipeHandle channel_handle)1683   ReentrantReplyClient(WaitableEvent* server_ready,
1684                        mojo::ScopedMessagePipeHandle channel_handle)
1685       : Worker(std::move(channel_handle), Channel::MODE_CLIENT),
1686         server_ready_(server_ready) {}
1687 
Run()1688   void Run() override {
1689     server_ready_->Wait();
1690     Send(new SyncChannelTestMsg_Reentrant2());
1691     Done();
1692   }
1693 
1694  private:
1695   raw_ptr<WaitableEvent> server_ready_;
1696 };
1697 
TEST_F(IPCSyncChannelTest,ReentrantReply)1698 TEST_F(IPCSyncChannelTest, ReentrantReply) {
1699   std::vector<Worker*> workers;
1700   WaitableEvent server_ready(base::WaitableEvent::ResetPolicy::AUTOMATIC,
1701                              base::WaitableEvent::InitialState::NOT_SIGNALED);
1702   mojo::MessagePipe pipe1, pipe2;
1703   workers.push_back(new ReentrantReplyServer2(std::move(pipe2.handle0)));
1704   workers.push_back(new ReentrantReplyServer1(
1705       &server_ready, std::move(pipe1.handle0), std::move(pipe2.handle1)));
1706   workers.push_back(
1707       new ReentrantReplyClient(&server_ready, std::move(pipe1.handle1)));
1708   RunTest(workers);
1709 }
1710 
1711 }  // namespace
1712 }  // namespace IPC
1713