1 // Copyright 2017 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "base/message_loop/message_pump_fuchsia.h"
6
7 #include <lib/async-loop/cpp/loop.h>
8 #include <lib/async-loop/default.h>
9 #include <lib/fdio/io.h>
10 #include <lib/fdio/unsafe.h>
11 #include <lib/zx/time.h>
12 #include <zircon/errors.h>
13
14 #include "base/auto_reset.h"
15 #include "base/fuchsia/fuchsia_logging.h"
16 #include "base/logging.h"
17 #include "base/trace_event/base_tracing.h"
18
19 namespace base {
20
ZxHandleWatchController(const Location & from_here)21 MessagePumpFuchsia::ZxHandleWatchController::ZxHandleWatchController(
22 const Location& from_here)
23 : async_wait_t({}), created_from_location_(from_here) {}
24
~ZxHandleWatchController()25 MessagePumpFuchsia::ZxHandleWatchController::~ZxHandleWatchController() {
26 if (!StopWatchingZxHandle())
27 NOTREACHED();
28 }
29
WaitBegin()30 bool MessagePumpFuchsia::ZxHandleWatchController::WaitBegin() {
31 DCHECK(!handler);
32 async_wait_t::handler = &HandleSignal;
33
34 zx_status_t status =
35 async_begin_wait(weak_pump_->async_loop_->dispatcher(), this);
36 if (status != ZX_OK) {
37 ZX_DLOG(ERROR, status) << "async_begin_wait():"
38 << created_from_location_.ToString();
39 async_wait_t::handler = nullptr;
40 return false;
41 }
42
43 return true;
44 }
45
StopWatchingZxHandle()46 bool MessagePumpFuchsia::ZxHandleWatchController::StopWatchingZxHandle() {
47 if (was_stopped_) {
48 DCHECK(!*was_stopped_);
49 *was_stopped_ = true;
50
51 // |was_stopped_| points at a value stored on the stack, which will go out
52 // of scope. MessagePumpFuchsia::Run() will reset it only if the value is
53 // false. So we need to reset this pointer here as well, to make sure it's
54 // not used again.
55 was_stopped_ = nullptr;
56 }
57
58 // If the pump is gone then there is nothing to cancel.
59 if (!weak_pump_)
60 return true;
61
62 if (!is_active())
63 return true;
64
65 async_wait_t::handler = nullptr;
66
67 zx_status_t result =
68 async_cancel_wait(weak_pump_->async_loop_->dispatcher(), this);
69 ZX_DLOG_IF(ERROR, result != ZX_OK, result)
70 << "async_cancel_wait(): " << created_from_location_.ToString();
71 return result == ZX_OK;
72 }
73
74 // static
HandleSignal(async_dispatcher_t * async,async_wait_t * wait,zx_status_t status,const zx_packet_signal_t * signal)75 void MessagePumpFuchsia::ZxHandleWatchController::HandleSignal(
76 async_dispatcher_t* async,
77 async_wait_t* wait,
78 zx_status_t status,
79 const zx_packet_signal_t* signal) {
80 ZxHandleWatchController* controller =
81 static_cast<ZxHandleWatchController*>(wait);
82 DCHECK_EQ(controller->handler, &HandleSignal);
83
84 // Inform ThreadController of this native work item for tracking purposes.
85 // This call must precede the "ZxHandleSignal" trace event below as
86 // BeginWorkItem() might generate a top-level trace event for the thread if
87 // this is the first work item post-wakeup.
88 Delegate::ScopedDoWorkItem scoped_do_work_item;
89 if (controller->weak_pump_ && controller->weak_pump_->run_state_) {
90 scoped_do_work_item =
91 controller->weak_pump_->run_state_->delegate->BeginWorkItem();
92 }
93
94 TRACE_EVENT0("toplevel", "ZxHandleSignal");
95
96 if (status != ZX_OK) {
97 ZX_DLOG(WARNING, status) << "async wait failed: "
98 << controller->created_from_location_.ToString();
99 return;
100 }
101
102 controller->handler = nullptr;
103
104 // In the case of a persistent Watch, the Watch may be stopped and
105 // potentially deleted by the caller within the callback, in which case
106 // |controller| should not be accessed again, and we mustn't continue the
107 // watch. We check for this with a bool on the stack, which the Watch
108 // receives a pointer to.
109 bool was_stopped = false;
110 controller->was_stopped_ = &was_stopped;
111
112 controller->watcher_->OnZxHandleSignalled(wait->object, signal->observed);
113
114 if (was_stopped)
115 return;
116
117 controller->was_stopped_ = nullptr;
118
119 if (controller->persistent_)
120 controller->WaitBegin();
121 }
122
OnZxHandleSignalled(zx_handle_t handle,zx_signals_t signals)123 void MessagePumpFuchsia::FdWatchController::OnZxHandleSignalled(
124 zx_handle_t handle,
125 zx_signals_t signals) {
126 uint32_t events;
127 fdio_unsafe_wait_end(io_, signals, &events);
128
129 // |events| can include other spurious things, in particular, that an fd
130 // is writable, when we only asked to know when it was readable. In that
131 // case, we don't want to call both the CanWrite and CanRead callback,
132 // when the caller asked for only, for example, readable callbacks. So,
133 // mask with the events that we actually wanted to know about.
134 //
135 // Note that errors are always included.
136 const uint32_t desired_events = desired_events_ | FDIO_EVT_ERROR;
137 const uint32_t filtered_events = events & desired_events;
138 DCHECK_NE(filtered_events, 0u) << events << " & " << desired_events;
139
140 // Each |watcher_| callback we invoke may stop or delete |this|. The pump has
141 // set |was_stopped_| to point to a safe location on the calling stack, so we
142 // can use that to detect being stopped mid-callback and avoid doing further
143 // work that would touch |this|.
144 bool* was_stopped = was_stopped_;
145 if (filtered_events & FDIO_EVT_WRITABLE)
146 watcher_->OnFileCanWriteWithoutBlocking(fd_);
147 if (!*was_stopped && (filtered_events & FDIO_EVT_READABLE))
148 watcher_->OnFileCanReadWithoutBlocking(fd_);
149
150 // Don't add additional work here without checking |*was_stopped_| again.
151 }
152
FdWatchController(const Location & from_here)153 MessagePumpFuchsia::FdWatchController::FdWatchController(
154 const Location& from_here)
155 : FdWatchControllerInterface(from_here),
156 ZxHandleWatchController(from_here) {}
157
~FdWatchController()158 MessagePumpFuchsia::FdWatchController::~FdWatchController() {
159 if (!StopWatchingFileDescriptor())
160 NOTREACHED();
161 }
162
WaitBegin()163 bool MessagePumpFuchsia::FdWatchController::WaitBegin() {
164 // Refresh the |handle_| and |desired_signals_| from the fdio for the fd.
165 // Some types of fdio map read/write events to different signals depending on
166 // their current state, so we must do this every time we begin to wait.
167 fdio_unsafe_wait_begin(io_, desired_events_, &object, &trigger);
168 if (async_wait_t::object == ZX_HANDLE_INVALID) {
169 DLOG(ERROR) << "fdio_wait_begin failed: "
170 << ZxHandleWatchController::created_from_location_.ToString();
171 return false;
172 }
173
174 return MessagePumpFuchsia::ZxHandleWatchController::WaitBegin();
175 }
176
StopWatchingFileDescriptor()177 bool MessagePumpFuchsia::FdWatchController::StopWatchingFileDescriptor() {
178 bool success = StopWatchingZxHandle();
179 if (io_) {
180 fdio_unsafe_release(io_);
181 io_ = nullptr;
182 }
183 return success;
184 }
185
MessagePumpFuchsia()186 MessagePumpFuchsia::MessagePumpFuchsia()
187 : async_loop_(new async::Loop(&kAsyncLoopConfigAttachToCurrentThread)),
188 weak_factory_(this) {}
189 MessagePumpFuchsia::~MessagePumpFuchsia() = default;
190
WatchFileDescriptor(int fd,bool persistent,int mode,FdWatchController * controller,FdWatcher * delegate)191 bool MessagePumpFuchsia::WatchFileDescriptor(int fd,
192 bool persistent,
193 int mode,
194 FdWatchController* controller,
195 FdWatcher* delegate) {
196 DCHECK_GE(fd, 0);
197 DCHECK(controller);
198 DCHECK(delegate);
199
200 if (!controller->StopWatchingFileDescriptor())
201 NOTREACHED();
202
203 controller->fd_ = fd;
204 controller->watcher_ = delegate;
205
206 DCHECK(!controller->io_);
207 controller->io_ = fdio_unsafe_fd_to_io(fd);
208 if (!controller->io_) {
209 DLOG(ERROR) << "Failed to get IO for FD";
210 return false;
211 }
212
213 switch (mode) {
214 case WATCH_READ:
215 controller->desired_events_ = FDIO_EVT_READABLE;
216 break;
217 case WATCH_WRITE:
218 controller->desired_events_ = FDIO_EVT_WRITABLE;
219 break;
220 case WATCH_READ_WRITE:
221 controller->desired_events_ = FDIO_EVT_READABLE | FDIO_EVT_WRITABLE;
222 break;
223 default:
224 NOTREACHED() << "unexpected mode: " << mode;
225 return false;
226 }
227
228 // Pass dummy |handle| and |signals| values to WatchZxHandle(). The real
229 // values will be populated by FdWatchController::WaitBegin(), before actually
230 // starting the wait operation.
231 return WatchZxHandle(ZX_HANDLE_INVALID, persistent, 1, controller,
232 controller);
233 }
234
WatchZxHandle(zx_handle_t handle,bool persistent,zx_signals_t signals,ZxHandleWatchController * controller,ZxHandleWatcher * delegate)235 bool MessagePumpFuchsia::WatchZxHandle(zx_handle_t handle,
236 bool persistent,
237 zx_signals_t signals,
238 ZxHandleWatchController* controller,
239 ZxHandleWatcher* delegate) {
240 DCHECK_NE(0u, signals);
241 DCHECK(controller);
242 DCHECK(delegate);
243
244 // If the watch controller is active then WatchZxHandle() can be called only
245 // for the same handle.
246 DCHECK(handle == ZX_HANDLE_INVALID || !controller->is_active() ||
247 handle == controller->async_wait_t::object);
248
249 if (!controller->StopWatchingZxHandle())
250 NOTREACHED();
251
252 controller->async_wait_t::object = handle;
253 controller->persistent_ = persistent;
254 controller->async_wait_t::trigger = signals;
255 controller->watcher_ = delegate;
256
257 controller->weak_pump_ = weak_factory_.GetWeakPtr();
258
259 return controller->WaitBegin();
260 }
261
HandleIoEventsUntil(zx_time_t deadline)262 bool MessagePumpFuchsia::HandleIoEventsUntil(zx_time_t deadline) {
263 zx_status_t status = async_loop_->Run(zx::time(deadline), /*once=*/true);
264 switch (status) {
265 // Return true if some tasks or events were dispatched or if the dispatcher
266 // was stopped by ScheduleWork().
267 case ZX_OK:
268 return true;
269
270 case ZX_ERR_CANCELED:
271 async_loop_->ResetQuit();
272 return true;
273
274 case ZX_ERR_TIMED_OUT:
275 return false;
276
277 default:
278 ZX_DLOG(FATAL, status) << "unexpected wait status";
279 return false;
280 }
281 }
282
Run(Delegate * delegate)283 void MessagePumpFuchsia::Run(Delegate* delegate) {
284 RunState run_state(delegate);
285 AutoReset<RunState*> auto_reset_run_state(&run_state_, &run_state);
286
287 for (;;) {
288 const Delegate::NextWorkInfo next_work_info = delegate->DoWork();
289 if (run_state.should_quit) {
290 break;
291 }
292
293 const bool did_handle_io_event = HandleIoEventsUntil(/*deadline=*/0);
294 if (run_state.should_quit) {
295 break;
296 }
297
298 bool attempt_more_work =
299 next_work_info.is_immediate() || did_handle_io_event;
300 if (attempt_more_work)
301 continue;
302
303 attempt_more_work = delegate->DoIdleWork();
304 if (run_state.should_quit) {
305 break;
306 }
307
308 if (attempt_more_work)
309 continue;
310
311 delegate->BeforeWait();
312
313 zx_time_t deadline = next_work_info.delayed_run_time.is_max()
314 ? ZX_TIME_INFINITE
315 : next_work_info.delayed_run_time.ToZxTime();
316
317 HandleIoEventsUntil(deadline);
318 }
319 }
320
Quit()321 void MessagePumpFuchsia::Quit() {
322 CHECK(run_state_);
323 run_state_->should_quit = true;
324 }
325
ScheduleWork()326 void MessagePumpFuchsia::ScheduleWork() {
327 // Stop async_loop to let MessagePumpFuchsia::Run() handle message loop tasks.
328 async_loop_->Quit();
329 }
330
ScheduleDelayedWork(const Delegate::NextWorkInfo & next_work_info)331 void MessagePumpFuchsia::ScheduleDelayedWork(
332 const Delegate::NextWorkInfo& next_work_info) {
333 // Since this is always called from the same thread as Run(), there is nothing
334 // to do as the loop is already running. It will wait in Run() with the
335 // correct timeout when it's out of immediate tasks.
336 // TODO(https://crbug.com/885371): Consider removing ScheduleDelayedWork()
337 // when all pumps function this way (bit.ly/merge-message-pump-do-work).
338 }
339
340 } // namespace base
341