xref: /aosp_15_r20/external/cronet/base/message_loop/message_pump_libevent_unittest.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "base/message_loop/message_pump_libevent.h"
6 
7 #include <fcntl.h>
8 #include <sys/socket.h>
9 #include <unistd.h>
10 
11 #include <memory>
12 #include <string_view>
13 #include <utility>
14 
15 #include "base/containers/span.h"
16 #include "base/files/file_util.h"
17 #include "base/files/scoped_file.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/logging.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/raw_ref.h"
24 #include "base/message_loop/message_pump_buildflags.h"
25 #include "base/message_loop/message_pump_type.h"
26 #include "base/posix/eintr_wrapper.h"
27 #include "base/run_loop.h"
28 #include "base/synchronization/waitable_event.h"
29 #include "base/synchronization/waitable_event_watcher.h"
30 #include "base/task/current_thread.h"
31 #include "base/task/sequenced_task_runner.h"
32 #include "base/task/single_thread_task_executor.h"
33 #include "base/task/single_thread_task_runner.h"
34 #include "base/test/gtest_util.h"
35 #include "base/test/scoped_feature_list.h"
36 #include "base/test/task_environment.h"
37 #include "base/threading/thread.h"
38 #include "build/build_config.h"
39 #include "testing/gtest/include/gtest/gtest.h"
40 #include "third_party/libevent/event.h"
41 
42 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
43 #include "base/message_loop/message_pump_epoll.h"
44 #endif
45 
46 namespace base {
47 
48 enum PumpType {
49   kLibevent,
50   kEpoll,
51 };
52 
53 class MessagePumpLibeventTest : public testing::Test,
54                                 public testing::WithParamInterface<PumpType> {
55  public:
receiver() const56   int receiver() const { return receiver_.get(); }
sender() const57   int sender() const { return sender_.get(); }
58 
io_runner() const59   scoped_refptr<SingleThreadTaskRunner> io_runner() const {
60     return io_thread_.task_runner();
61   }
62 
ClearNotifications()63   void ClearNotifications() {
64     int unused;
65     while (read(receiver_.get(), &unused, sizeof(unused)) == sizeof(unused)) {
66     }
67   }
68 
Notify()69   void Notify() {
70     const int data = 42;
71     PCHECK(write(sender_.get(), &data, sizeof(data)) == sizeof(data));
72   }
73 
74  protected:
MessagePumpLibeventTest()75   MessagePumpLibeventTest()
76       : task_environment_(std::make_unique<test::SingleThreadTaskEnvironment>(
77             test::SingleThreadTaskEnvironment::MainThreadType::UI)),
78         io_thread_("MessagePumpLibeventTestIOThread") {}
79   ~MessagePumpLibeventTest() override = default;
80 
SetUp()81   void SetUp() override {
82 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
83     // Select MessagePumpLibevent or MessagePumpEpoll based on the test
84     // parameter.
85     scoped_feature_list_.InitWithFeatureState(base::kMessagePumpEpoll,
86                                               GetParam() == kEpoll);
87     MessagePumpLibevent::InitializeFeatures();
88 #endif  // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
89 
90     Thread::Options options(MessagePumpType::IO, 0);
91     ASSERT_TRUE(io_thread_.StartWithOptions(std::move(options)));
92     int fds[2];
93     int rv = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
94     CHECK_EQ(rv, 0);
95     PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
96     receiver_ = base::ScopedFD(fds[0]);
97     sender_ = base::ScopedFD(fds[1]);
98   }
99 
TearDown()100   void TearDown() override {
101     // Some tests watch `receiver_` from the `io_thread_`. The `io_thread_` must
102     // thus be joined to ensure those watches are complete before closing the
103     // sockets.
104     io_thread_.Stop();
105 
106 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
107     // Reset feature state for other tests running in this process.
108     scoped_feature_list_.Reset();
109     MessagePumpLibevent::InitializeFeatures();
110 #endif  // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
111   }
112 
CreateMessagePump()113   std::unique_ptr<MessagePumpLibevent> CreateMessagePump() {
114     return std::make_unique<MessagePumpLibevent>();
115   }
116 
SimulateIOEvent(MessagePumpLibevent * pump,MessagePumpLibevent::FdWatchController * controller)117   void SimulateIOEvent(MessagePumpLibevent* pump,
118                        MessagePumpLibevent::FdWatchController* controller) {
119 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
120     if (GetParam() == kEpoll) {
121       pump->epoll_pump_->HandleEvent(0, /*can_read=*/true, /*can_write=*/true,
122                                      controller);
123       return;
124     }
125 #endif
126     pump->OnLibeventNotification(0, EV_WRITE | EV_READ, controller);
127   }
128 
129   static constexpr char null_byte_ = 0;
130   std::unique_ptr<test::SingleThreadTaskEnvironment> task_environment_;
131 
132  private:
133   Thread io_thread_;
134   base::ScopedFD receiver_;
135   base::ScopedFD sender_;
136 
137 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
138   // Features to override default feature settings.
139   base::test::ScopedFeatureList scoped_feature_list_;
140 #endif  // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
141 };
142 
143 namespace {
144 
145 // Concrete implementation of MessagePumpLibevent::FdWatcher that does
146 // nothing useful.
147 class StupidWatcher : public MessagePumpLibevent::FdWatcher {
148  public:
149   ~StupidWatcher() override = default;
150 
151   // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int fd)152   void OnFileCanReadWithoutBlocking(int fd) override {}
OnFileCanWriteWithoutBlocking(int fd)153   void OnFileCanWriteWithoutBlocking(int fd) override {}
154 };
155 
TEST_P(MessagePumpLibeventTest,QuitOutsideOfRun)156 TEST_P(MessagePumpLibeventTest, QuitOutsideOfRun) {
157   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
158   ASSERT_DCHECK_DEATH(pump->Quit());
159 }
160 
161 class BaseWatcher : public MessagePumpLibevent::FdWatcher {
162  public:
163   BaseWatcher() = default;
164   ~BaseWatcher() override = default;
165 
166   // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int)167   void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
168 
OnFileCanWriteWithoutBlocking(int)169   void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
170 };
171 
172 class DeleteWatcher : public BaseWatcher {
173  public:
DeleteWatcher(std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)174   explicit DeleteWatcher(
175       std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)
176       : controller_(std::move(controller)) {}
177 
~DeleteWatcher()178   ~DeleteWatcher() override { DCHECK(!controller_); }
179 
controller()180   MessagePumpLibevent::FdWatchController* controller() {
181     return controller_.get();
182   }
183 
OnFileCanWriteWithoutBlocking(int)184   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
185     DCHECK(controller_);
186     controller_.reset();
187   }
188 
189  private:
190   std::unique_ptr<MessagePumpLibevent::FdWatchController> controller_;
191 };
192 
TEST_P(MessagePumpLibeventTest,DeleteWatcher)193 TEST_P(MessagePumpLibeventTest, DeleteWatcher) {
194   DeleteWatcher delegate(
195       std::make_unique<MessagePumpLibevent::FdWatchController>(FROM_HERE));
196   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
197   pump->WatchFileDescriptor(receiver(), false,
198                             MessagePumpLibevent::WATCH_READ_WRITE,
199                             delegate.controller(), &delegate);
200   SimulateIOEvent(pump.get(), delegate.controller());
201 }
202 
203 class StopWatcher : public BaseWatcher {
204  public:
StopWatcher(MessagePumpLibevent::FdWatchController * controller)205   explicit StopWatcher(MessagePumpLibevent::FdWatchController* controller)
206       : controller_(controller) {}
207 
208   ~StopWatcher() override = default;
209 
OnFileCanWriteWithoutBlocking(int)210   void OnFileCanWriteWithoutBlocking(int /* fd */) override {
211     controller_->StopWatchingFileDescriptor();
212   }
213 
214  private:
215   raw_ptr<MessagePumpLibevent::FdWatchController> controller_ = nullptr;
216 };
217 
TEST_P(MessagePumpLibeventTest,StopWatcher)218 TEST_P(MessagePumpLibeventTest, StopWatcher) {
219   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
220   MessagePumpLibevent::FdWatchController controller(FROM_HERE);
221   StopWatcher delegate(&controller);
222   pump->WatchFileDescriptor(receiver(), false,
223                             MessagePumpLibevent::WATCH_READ_WRITE, &controller,
224                             &delegate);
225   SimulateIOEvent(pump.get(), &controller);
226 }
227 
QuitMessageLoopAndStart(OnceClosure quit_closure)228 void QuitMessageLoopAndStart(OnceClosure quit_closure) {
229   std::move(quit_closure).Run();
230 
231   RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
232   SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
233                                                         runloop.QuitClosure());
234   runloop.Run();
235 }
236 
237 class NestedPumpWatcher : public MessagePumpLibevent::FdWatcher {
238  public:
239   NestedPumpWatcher() = default;
240   ~NestedPumpWatcher() override = default;
241 
OnFileCanReadWithoutBlocking(int)242   void OnFileCanReadWithoutBlocking(int /* fd */) override {
243     RunLoop runloop;
244     SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
245         FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
246     runloop.Run();
247   }
248 
OnFileCanWriteWithoutBlocking(int)249   void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
250 };
251 
TEST_P(MessagePumpLibeventTest,NestedPumpWatcher)252 TEST_P(MessagePumpLibeventTest, NestedPumpWatcher) {
253   NestedPumpWatcher delegate;
254   std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
255   MessagePumpLibevent::FdWatchController controller(FROM_HERE);
256   pump->WatchFileDescriptor(receiver(), false, MessagePumpLibevent::WATCH_READ,
257                             &controller, &delegate);
258   SimulateIOEvent(pump.get(), &controller);
259 }
260 
FatalClosure()261 void FatalClosure() {
262   FAIL() << "Reached fatal closure.";
263 }
264 
265 class QuitWatcher : public BaseWatcher {
266  public:
QuitWatcher(base::OnceClosure quit_closure)267   QuitWatcher(base::OnceClosure quit_closure)
268       : quit_closure_(std::move(quit_closure)) {}
269 
OnFileCanReadWithoutBlocking(int)270   void OnFileCanReadWithoutBlocking(int /* fd */) override {
271     // Post a fatal closure to the MessageLoop before we quit it.
272     SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
273         FROM_HERE, BindOnce(&FatalClosure));
274 
275     if (quit_closure_)
276       std::move(quit_closure_).Run();
277   }
278 
279  private:
280   base::OnceClosure quit_closure_;
281 };
282 
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)283 void WriteFDWrapper(const int fd,
284                     const char* buf,
285                     int size,
286                     WaitableEvent* event) {
287   ASSERT_TRUE(WriteFileDescriptor(fd, std::string_view(buf, size)));
288 }
289 
290 // Tests that MessagePumpLibevent quits immediately when it is quit from
291 // libevent's event_base_loop().
TEST_P(MessagePumpLibeventTest,QuitWatcher)292 TEST_P(MessagePumpLibeventTest, QuitWatcher) {
293   // Delete the old TaskEnvironment so that we can manage our own one here.
294   task_environment_.reset();
295 
296   std::unique_ptr<MessagePumpLibevent> executor_pump = CreateMessagePump();
297   MessagePumpLibevent* pump = executor_pump.get();
298   SingleThreadTaskExecutor executor(std::move(executor_pump));
299   RunLoop run_loop;
300   QuitWatcher delegate(run_loop.QuitClosure());
301   MessagePumpLibevent::FdWatchController controller(FROM_HERE);
302   WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
303                       WaitableEvent::InitialState::NOT_SIGNALED);
304   std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
305 
306   // Tell the pump to watch the `receiver_`.
307   pump->WatchFileDescriptor(receiver(), false, MessagePumpLibevent::WATCH_READ,
308                             &controller, &delegate);
309 
310   // Make the IO thread wait for |event| before writing to sender().
311   WaitableEventWatcher::EventCallback write_fd_task =
312       BindOnce(&WriteFDWrapper, sender(), &null_byte_, 1);
313   io_runner()->PostTask(
314       FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
315                           Unretained(watcher.get()), &event,
316                           std::move(write_fd_task), io_runner()));
317 
318   // Queue |event| to signal on |sequence_manager|.
319   SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
320       FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
321 
322   // Now run the MessageLoop.
323   run_loop.Run();
324 
325   // StartWatching can move |watcher| to IO thread. Release on IO thread.
326   io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
327                                             Owned(watcher.release())));
328 }
329 
330 class InnerNestedWatcher : public MessagePumpLibevent::FdWatcher {
331  public:
InnerNestedWatcher(MessagePumpLibeventTest & test,MessagePumpLibevent::FdWatchController & outer_controller,base::OnceClosure callback)332   InnerNestedWatcher(MessagePumpLibeventTest& test,
333                      MessagePumpLibevent::FdWatchController& outer_controller,
334                      base::OnceClosure callback)
335       : test_(test),
336         outer_controller_(outer_controller),
337         callback_(std::move(callback)) {
338     base::CurrentIOThread::Get().WatchFileDescriptor(
339         test_->receiver(), false, MessagePumpLibevent::WATCH_READ, &controller_,
340         this);
341   }
342   ~InnerNestedWatcher() override = default;
343 
OnFileCanReadWithoutBlocking(int)344   void OnFileCanReadWithoutBlocking(int) override {
345     // Cancelling the outer watch from within this inner event handler must be
346     // safe.
347     outer_controller_->StopWatchingFileDescriptor();
348     std::move(callback_).Run();
349   }
350 
OnFileCanWriteWithoutBlocking(int)351   void OnFileCanWriteWithoutBlocking(int) override {}
352 
353  private:
354   const raw_ref<MessagePumpLibeventTest> test_;
355   const raw_ref<MessagePumpLibevent::FdWatchController> outer_controller_;
356   base::OnceClosure callback_;
357   MessagePumpLibevent::FdWatchController controller_{FROM_HERE};
358 };
359 
360 class OuterNestedWatcher : public MessagePumpLibevent::FdWatcher {
361  public:
OuterNestedWatcher(MessagePumpLibeventTest & test,base::OnceClosure callback)362   OuterNestedWatcher(MessagePumpLibeventTest& test, base::OnceClosure callback)
363       : test_(test), callback_(std::move(callback)) {
364     base::RunLoop loop;
365     test_->io_runner()->PostTask(
366         FROM_HERE, base::BindOnce(&OuterNestedWatcher::InitOnIOThread,
367                                   base::Unretained(this), loop.QuitClosure()));
368     loop.Run();
369   }
370 
371   ~OuterNestedWatcher() override = default;
372 
OnFileCanReadWithoutBlocking(int)373   void OnFileCanReadWithoutBlocking(int) override {
374     // Ensure that another notification will wake any active FdWatcher.
375     test_->ClearNotifications();
376 
377     base::RunLoop loop;
378     std::unique_ptr<InnerNestedWatcher> inner_watcher =
379         std::make_unique<InnerNestedWatcher>(test_.get(), *controller_,
380                                              loop.QuitClosure());
381     test_->Notify();
382     loop.Run();
383 
384     // Ensure that `InnerNestedWatcher` is destroyed before
385     // `OuterNestedWatcher`.
386     inner_watcher.reset();
387     std::move(callback_).Run();
388   }
389 
OnFileCanWriteWithoutBlocking(int)390   void OnFileCanWriteWithoutBlocking(int) override {}
391 
392  private:
InitOnIOThread(base::OnceClosure ready_callback)393   void InitOnIOThread(base::OnceClosure ready_callback) {
394     controller_ =
395         std::make_unique<MessagePumpLibevent::FdWatchController>(FROM_HERE);
396     base::CurrentIOThread::Get().WatchFileDescriptor(
397         test_->receiver(), false, MessagePumpLibevent::WATCH_READ,
398         controller_.get(), this);
399     std::move(ready_callback).Run();
400   }
401 
402   const raw_ref<MessagePumpLibeventTest> test_;
403   base::OnceClosure callback_;
404   std::unique_ptr<MessagePumpLibevent::FdWatchController> controller_;
405 };
406 
TEST_P(MessagePumpLibeventTest,NestedNotification)407 TEST_P(MessagePumpLibeventTest, NestedNotification) {
408   // Regression test for https://crbug.com/1469529. Verifies that it's safe for
409   // a nested RunLoop to stop watching a file descriptor while the outer RunLoop
410   // is handling an event for the same descriptor.
411   base::RunLoop loop;
412   OuterNestedWatcher watcher(*this, loop.QuitClosure());
413   Notify();
414   loop.Run();
415 }
416 
417 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
418 #define TEST_PARAM_VALUES kLibevent, kEpoll
419 #else
420 #define TEST_PARAM_VALUES kLibevent
421 #endif
422 
423 INSTANTIATE_TEST_SUITE_P(,
424                          MessagePumpLibeventTest,
425                          ::testing::Values(TEST_PARAM_VALUES));
426 
427 }  // namespace
428 
429 }  // namespace base
430