1 // Copyright 2012 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_libevent.h"
6
7 #include <fcntl.h>
8 #include <sys/socket.h>
9 #include <unistd.h>
10
11 #include <memory>
12 #include <string_view>
13 #include <utility>
14
15 #include "base/containers/span.h"
16 #include "base/files/file_util.h"
17 #include "base/files/scoped_file.h"
18 #include "base/functional/bind.h"
19 #include "base/functional/callback_helpers.h"
20 #include "base/logging.h"
21 #include "base/memory/ptr_util.h"
22 #include "base/memory/raw_ptr.h"
23 #include "base/memory/raw_ref.h"
24 #include "base/message_loop/message_pump_buildflags.h"
25 #include "base/message_loop/message_pump_type.h"
26 #include "base/posix/eintr_wrapper.h"
27 #include "base/run_loop.h"
28 #include "base/synchronization/waitable_event.h"
29 #include "base/synchronization/waitable_event_watcher.h"
30 #include "base/task/current_thread.h"
31 #include "base/task/sequenced_task_runner.h"
32 #include "base/task/single_thread_task_executor.h"
33 #include "base/task/single_thread_task_runner.h"
34 #include "base/test/gtest_util.h"
35 #include "base/test/scoped_feature_list.h"
36 #include "base/test/task_environment.h"
37 #include "base/threading/thread.h"
38 #include "build/build_config.h"
39 #include "testing/gtest/include/gtest/gtest.h"
40 #include "third_party/libevent/event.h"
41
42 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
43 #include "base/message_loop/message_pump_epoll.h"
44 #endif
45
46 namespace base {
47
48 enum PumpType {
49 kLibevent,
50 kEpoll,
51 };
52
53 class MessagePumpLibeventTest : public testing::Test,
54 public testing::WithParamInterface<PumpType> {
55 public:
receiver() const56 int receiver() const { return receiver_.get(); }
sender() const57 int sender() const { return sender_.get(); }
58
io_runner() const59 scoped_refptr<SingleThreadTaskRunner> io_runner() const {
60 return io_thread_.task_runner();
61 }
62
ClearNotifications()63 void ClearNotifications() {
64 int unused;
65 while (read(receiver_.get(), &unused, sizeof(unused)) == sizeof(unused)) {
66 }
67 }
68
Notify()69 void Notify() {
70 const int data = 42;
71 PCHECK(write(sender_.get(), &data, sizeof(data)) == sizeof(data));
72 }
73
74 protected:
MessagePumpLibeventTest()75 MessagePumpLibeventTest()
76 : task_environment_(std::make_unique<test::SingleThreadTaskEnvironment>(
77 test::SingleThreadTaskEnvironment::MainThreadType::UI)),
78 io_thread_("MessagePumpLibeventTestIOThread") {}
79 ~MessagePumpLibeventTest() override = default;
80
SetUp()81 void SetUp() override {
82 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
83 // Select MessagePumpLibevent or MessagePumpEpoll based on the test
84 // parameter.
85 scoped_feature_list_.InitWithFeatureState(base::kMessagePumpEpoll,
86 GetParam() == kEpoll);
87 MessagePumpLibevent::InitializeFeatures();
88 #endif // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
89
90 Thread::Options options(MessagePumpType::IO, 0);
91 ASSERT_TRUE(io_thread_.StartWithOptions(std::move(options)));
92 int fds[2];
93 int rv = socketpair(AF_UNIX, SOCK_STREAM, 0, fds);
94 CHECK_EQ(rv, 0);
95 PCHECK(fcntl(fds[0], F_SETFL, O_NONBLOCK) == 0);
96 receiver_ = base::ScopedFD(fds[0]);
97 sender_ = base::ScopedFD(fds[1]);
98 }
99
TearDown()100 void TearDown() override {
101 // Some tests watch `receiver_` from the `io_thread_`. The `io_thread_` must
102 // thus be joined to ensure those watches are complete before closing the
103 // sockets.
104 io_thread_.Stop();
105
106 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
107 // Reset feature state for other tests running in this process.
108 scoped_feature_list_.Reset();
109 MessagePumpLibevent::InitializeFeatures();
110 #endif // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
111 }
112
CreateMessagePump()113 std::unique_ptr<MessagePumpLibevent> CreateMessagePump() {
114 return std::make_unique<MessagePumpLibevent>();
115 }
116
SimulateIOEvent(MessagePumpLibevent * pump,MessagePumpLibevent::FdWatchController * controller)117 void SimulateIOEvent(MessagePumpLibevent* pump,
118 MessagePumpLibevent::FdWatchController* controller) {
119 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
120 if (GetParam() == kEpoll) {
121 pump->epoll_pump_->HandleEvent(0, /*can_read=*/true, /*can_write=*/true,
122 controller);
123 return;
124 }
125 #endif
126 pump->OnLibeventNotification(0, EV_WRITE | EV_READ, controller);
127 }
128
129 static constexpr char null_byte_ = 0;
130 std::unique_ptr<test::SingleThreadTaskEnvironment> task_environment_;
131
132 private:
133 Thread io_thread_;
134 base::ScopedFD receiver_;
135 base::ScopedFD sender_;
136
137 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
138 // Features to override default feature settings.
139 base::test::ScopedFeatureList scoped_feature_list_;
140 #endif // BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
141 };
142
143 namespace {
144
145 // Concrete implementation of MessagePumpLibevent::FdWatcher that does
146 // nothing useful.
147 class StupidWatcher : public MessagePumpLibevent::FdWatcher {
148 public:
149 ~StupidWatcher() override = default;
150
151 // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int fd)152 void OnFileCanReadWithoutBlocking(int fd) override {}
OnFileCanWriteWithoutBlocking(int fd)153 void OnFileCanWriteWithoutBlocking(int fd) override {}
154 };
155
TEST_P(MessagePumpLibeventTest,QuitOutsideOfRun)156 TEST_P(MessagePumpLibeventTest, QuitOutsideOfRun) {
157 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
158 ASSERT_DCHECK_DEATH(pump->Quit());
159 }
160
161 class BaseWatcher : public MessagePumpLibevent::FdWatcher {
162 public:
163 BaseWatcher() = default;
164 ~BaseWatcher() override = default;
165
166 // base:MessagePumpLibevent::FdWatcher interface
OnFileCanReadWithoutBlocking(int)167 void OnFileCanReadWithoutBlocking(int /* fd */) override { NOTREACHED(); }
168
OnFileCanWriteWithoutBlocking(int)169 void OnFileCanWriteWithoutBlocking(int /* fd */) override { NOTREACHED(); }
170 };
171
172 class DeleteWatcher : public BaseWatcher {
173 public:
DeleteWatcher(std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)174 explicit DeleteWatcher(
175 std::unique_ptr<MessagePumpLibevent::FdWatchController> controller)
176 : controller_(std::move(controller)) {}
177
~DeleteWatcher()178 ~DeleteWatcher() override { DCHECK(!controller_); }
179
controller()180 MessagePumpLibevent::FdWatchController* controller() {
181 return controller_.get();
182 }
183
OnFileCanWriteWithoutBlocking(int)184 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
185 DCHECK(controller_);
186 controller_.reset();
187 }
188
189 private:
190 std::unique_ptr<MessagePumpLibevent::FdWatchController> controller_;
191 };
192
TEST_P(MessagePumpLibeventTest,DeleteWatcher)193 TEST_P(MessagePumpLibeventTest, DeleteWatcher) {
194 DeleteWatcher delegate(
195 std::make_unique<MessagePumpLibevent::FdWatchController>(FROM_HERE));
196 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
197 pump->WatchFileDescriptor(receiver(), false,
198 MessagePumpLibevent::WATCH_READ_WRITE,
199 delegate.controller(), &delegate);
200 SimulateIOEvent(pump.get(), delegate.controller());
201 }
202
203 class StopWatcher : public BaseWatcher {
204 public:
StopWatcher(MessagePumpLibevent::FdWatchController * controller)205 explicit StopWatcher(MessagePumpLibevent::FdWatchController* controller)
206 : controller_(controller) {}
207
208 ~StopWatcher() override = default;
209
OnFileCanWriteWithoutBlocking(int)210 void OnFileCanWriteWithoutBlocking(int /* fd */) override {
211 controller_->StopWatchingFileDescriptor();
212 }
213
214 private:
215 raw_ptr<MessagePumpLibevent::FdWatchController> controller_ = nullptr;
216 };
217
TEST_P(MessagePumpLibeventTest,StopWatcher)218 TEST_P(MessagePumpLibeventTest, StopWatcher) {
219 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
220 MessagePumpLibevent::FdWatchController controller(FROM_HERE);
221 StopWatcher delegate(&controller);
222 pump->WatchFileDescriptor(receiver(), false,
223 MessagePumpLibevent::WATCH_READ_WRITE, &controller,
224 &delegate);
225 SimulateIOEvent(pump.get(), &controller);
226 }
227
QuitMessageLoopAndStart(OnceClosure quit_closure)228 void QuitMessageLoopAndStart(OnceClosure quit_closure) {
229 std::move(quit_closure).Run();
230
231 RunLoop runloop(RunLoop::Type::kNestableTasksAllowed);
232 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
233 runloop.QuitClosure());
234 runloop.Run();
235 }
236
237 class NestedPumpWatcher : public MessagePumpLibevent::FdWatcher {
238 public:
239 NestedPumpWatcher() = default;
240 ~NestedPumpWatcher() override = default;
241
OnFileCanReadWithoutBlocking(int)242 void OnFileCanReadWithoutBlocking(int /* fd */) override {
243 RunLoop runloop;
244 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
245 FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure()));
246 runloop.Run();
247 }
248
OnFileCanWriteWithoutBlocking(int)249 void OnFileCanWriteWithoutBlocking(int /* fd */) override {}
250 };
251
TEST_P(MessagePumpLibeventTest,NestedPumpWatcher)252 TEST_P(MessagePumpLibeventTest, NestedPumpWatcher) {
253 NestedPumpWatcher delegate;
254 std::unique_ptr<MessagePumpLibevent> pump = CreateMessagePump();
255 MessagePumpLibevent::FdWatchController controller(FROM_HERE);
256 pump->WatchFileDescriptor(receiver(), false, MessagePumpLibevent::WATCH_READ,
257 &controller, &delegate);
258 SimulateIOEvent(pump.get(), &controller);
259 }
260
FatalClosure()261 void FatalClosure() {
262 FAIL() << "Reached fatal closure.";
263 }
264
265 class QuitWatcher : public BaseWatcher {
266 public:
QuitWatcher(base::OnceClosure quit_closure)267 QuitWatcher(base::OnceClosure quit_closure)
268 : quit_closure_(std::move(quit_closure)) {}
269
OnFileCanReadWithoutBlocking(int)270 void OnFileCanReadWithoutBlocking(int /* fd */) override {
271 // Post a fatal closure to the MessageLoop before we quit it.
272 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
273 FROM_HERE, BindOnce(&FatalClosure));
274
275 if (quit_closure_)
276 std::move(quit_closure_).Run();
277 }
278
279 private:
280 base::OnceClosure quit_closure_;
281 };
282
WriteFDWrapper(const int fd,const char * buf,int size,WaitableEvent * event)283 void WriteFDWrapper(const int fd,
284 const char* buf,
285 int size,
286 WaitableEvent* event) {
287 ASSERT_TRUE(WriteFileDescriptor(fd, std::string_view(buf, size)));
288 }
289
290 // Tests that MessagePumpLibevent quits immediately when it is quit from
291 // libevent's event_base_loop().
TEST_P(MessagePumpLibeventTest,QuitWatcher)292 TEST_P(MessagePumpLibeventTest, QuitWatcher) {
293 // Delete the old TaskEnvironment so that we can manage our own one here.
294 task_environment_.reset();
295
296 std::unique_ptr<MessagePumpLibevent> executor_pump = CreateMessagePump();
297 MessagePumpLibevent* pump = executor_pump.get();
298 SingleThreadTaskExecutor executor(std::move(executor_pump));
299 RunLoop run_loop;
300 QuitWatcher delegate(run_loop.QuitClosure());
301 MessagePumpLibevent::FdWatchController controller(FROM_HERE);
302 WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC,
303 WaitableEvent::InitialState::NOT_SIGNALED);
304 std::unique_ptr<WaitableEventWatcher> watcher(new WaitableEventWatcher);
305
306 // Tell the pump to watch the `receiver_`.
307 pump->WatchFileDescriptor(receiver(), false, MessagePumpLibevent::WATCH_READ,
308 &controller, &delegate);
309
310 // Make the IO thread wait for |event| before writing to sender().
311 WaitableEventWatcher::EventCallback write_fd_task =
312 BindOnce(&WriteFDWrapper, sender(), &null_byte_, 1);
313 io_runner()->PostTask(
314 FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching),
315 Unretained(watcher.get()), &event,
316 std::move(write_fd_task), io_runner()));
317
318 // Queue |event| to signal on |sequence_manager|.
319 SingleThreadTaskRunner::GetCurrentDefault()->PostTask(
320 FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event)));
321
322 // Now run the MessageLoop.
323 run_loop.Run();
324
325 // StartWatching can move |watcher| to IO thread. Release on IO thread.
326 io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching,
327 Owned(watcher.release())));
328 }
329
330 class InnerNestedWatcher : public MessagePumpLibevent::FdWatcher {
331 public:
InnerNestedWatcher(MessagePumpLibeventTest & test,MessagePumpLibevent::FdWatchController & outer_controller,base::OnceClosure callback)332 InnerNestedWatcher(MessagePumpLibeventTest& test,
333 MessagePumpLibevent::FdWatchController& outer_controller,
334 base::OnceClosure callback)
335 : test_(test),
336 outer_controller_(outer_controller),
337 callback_(std::move(callback)) {
338 base::CurrentIOThread::Get().WatchFileDescriptor(
339 test_->receiver(), false, MessagePumpLibevent::WATCH_READ, &controller_,
340 this);
341 }
342 ~InnerNestedWatcher() override = default;
343
OnFileCanReadWithoutBlocking(int)344 void OnFileCanReadWithoutBlocking(int) override {
345 // Cancelling the outer watch from within this inner event handler must be
346 // safe.
347 outer_controller_->StopWatchingFileDescriptor();
348 std::move(callback_).Run();
349 }
350
OnFileCanWriteWithoutBlocking(int)351 void OnFileCanWriteWithoutBlocking(int) override {}
352
353 private:
354 const raw_ref<MessagePumpLibeventTest> test_;
355 const raw_ref<MessagePumpLibevent::FdWatchController> outer_controller_;
356 base::OnceClosure callback_;
357 MessagePumpLibevent::FdWatchController controller_{FROM_HERE};
358 };
359
360 class OuterNestedWatcher : public MessagePumpLibevent::FdWatcher {
361 public:
OuterNestedWatcher(MessagePumpLibeventTest & test,base::OnceClosure callback)362 OuterNestedWatcher(MessagePumpLibeventTest& test, base::OnceClosure callback)
363 : test_(test), callback_(std::move(callback)) {
364 base::RunLoop loop;
365 test_->io_runner()->PostTask(
366 FROM_HERE, base::BindOnce(&OuterNestedWatcher::InitOnIOThread,
367 base::Unretained(this), loop.QuitClosure()));
368 loop.Run();
369 }
370
371 ~OuterNestedWatcher() override = default;
372
OnFileCanReadWithoutBlocking(int)373 void OnFileCanReadWithoutBlocking(int) override {
374 // Ensure that another notification will wake any active FdWatcher.
375 test_->ClearNotifications();
376
377 base::RunLoop loop;
378 std::unique_ptr<InnerNestedWatcher> inner_watcher =
379 std::make_unique<InnerNestedWatcher>(test_.get(), *controller_,
380 loop.QuitClosure());
381 test_->Notify();
382 loop.Run();
383
384 // Ensure that `InnerNestedWatcher` is destroyed before
385 // `OuterNestedWatcher`.
386 inner_watcher.reset();
387 std::move(callback_).Run();
388 }
389
OnFileCanWriteWithoutBlocking(int)390 void OnFileCanWriteWithoutBlocking(int) override {}
391
392 private:
InitOnIOThread(base::OnceClosure ready_callback)393 void InitOnIOThread(base::OnceClosure ready_callback) {
394 controller_ =
395 std::make_unique<MessagePumpLibevent::FdWatchController>(FROM_HERE);
396 base::CurrentIOThread::Get().WatchFileDescriptor(
397 test_->receiver(), false, MessagePumpLibevent::WATCH_READ,
398 controller_.get(), this);
399 std::move(ready_callback).Run();
400 }
401
402 const raw_ref<MessagePumpLibeventTest> test_;
403 base::OnceClosure callback_;
404 std::unique_ptr<MessagePumpLibevent::FdWatchController> controller_;
405 };
406
TEST_P(MessagePumpLibeventTest,NestedNotification)407 TEST_P(MessagePumpLibeventTest, NestedNotification) {
408 // Regression test for https://crbug.com/1469529. Verifies that it's safe for
409 // a nested RunLoop to stop watching a file descriptor while the outer RunLoop
410 // is handling an event for the same descriptor.
411 base::RunLoop loop;
412 OuterNestedWatcher watcher(*this, loop.QuitClosure());
413 Notify();
414 loop.Run();
415 }
416
417 #if BUILDFLAG(ENABLE_MESSAGE_PUMP_EPOLL)
418 #define TEST_PARAM_VALUES kLibevent, kEpoll
419 #else
420 #define TEST_PARAM_VALUES kLibevent
421 #endif
422
423 INSTANTIATE_TEST_SUITE_P(,
424 MessagePumpLibeventTest,
425 ::testing::Values(TEST_PARAM_VALUES));
426
427 } // namespace
428
429 } // namespace base
430