1 // Copyright 2022 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "quiche/quic/core/io/quic_poll_event_loop.h"
6
7 #include <algorithm>
8 #include <cerrno>
9 #include <memory>
10
11 #include "absl/types/span.h"
12 #include "quiche/quic/core/io/quic_event_loop.h"
13 #include "quiche/quic/core/quic_alarm.h"
14 #include "quiche/quic/core/quic_time.h"
15 #include "quiche/quic/platform/api/quic_bug_tracker.h"
16
17 namespace quic {
18
19 namespace {
20
21 using PollMask = decltype(::pollfd().events);
22
GetPollMask(QuicSocketEventMask event_mask)23 PollMask GetPollMask(QuicSocketEventMask event_mask) {
24 return ((event_mask & kSocketEventReadable) ? POLLIN : 0) |
25 ((event_mask & kSocketEventWritable) ? POLLOUT : 0) |
26 ((event_mask & kSocketEventError) ? POLLERR : 0);
27 }
28
GetEventMask(PollMask poll_mask)29 QuicSocketEventMask GetEventMask(PollMask poll_mask) {
30 return ((poll_mask & POLLIN) ? kSocketEventReadable : 0) |
31 ((poll_mask & POLLOUT) ? kSocketEventWritable : 0) |
32 ((poll_mask & POLLERR) ? kSocketEventError : 0);
33 }
34
35 } // namespace
36
QuicPollEventLoop(QuicClock * clock)37 QuicPollEventLoop::QuicPollEventLoop(QuicClock* clock) : clock_(clock) {}
38
RegisterSocket(SocketFd fd,QuicSocketEventMask events,QuicSocketEventListener * listener)39 bool QuicPollEventLoop::RegisterSocket(SocketFd fd, QuicSocketEventMask events,
40 QuicSocketEventListener* listener) {
41 auto [it, success] =
42 registrations_.insert({fd, std::make_shared<Registration>()});
43 if (!success) {
44 return false;
45 }
46 Registration& registration = *it->second;
47 registration.events = events;
48 registration.listener = listener;
49 return true;
50 }
51
UnregisterSocket(SocketFd fd)52 bool QuicPollEventLoop::UnregisterSocket(SocketFd fd) {
53 return registrations_.erase(fd);
54 }
55
RearmSocket(SocketFd fd,QuicSocketEventMask events)56 bool QuicPollEventLoop::RearmSocket(SocketFd fd, QuicSocketEventMask events) {
57 auto it = registrations_.find(fd);
58 if (it == registrations_.end()) {
59 return false;
60 }
61 it->second->events |= events;
62 return true;
63 }
64
ArtificiallyNotifyEvent(SocketFd fd,QuicSocketEventMask events)65 bool QuicPollEventLoop::ArtificiallyNotifyEvent(SocketFd fd,
66 QuicSocketEventMask events) {
67 auto it = registrations_.find(fd);
68 if (it == registrations_.end()) {
69 return false;
70 }
71 has_artificial_events_pending_ = true;
72 it->second->artificially_notify_at_next_iteration |= events;
73 return true;
74 }
75
RunEventLoopOnce(QuicTime::Delta default_timeout)76 void QuicPollEventLoop::RunEventLoopOnce(QuicTime::Delta default_timeout) {
77 const QuicTime start_time = clock_->Now();
78 ProcessAlarmsUpTo(start_time);
79
80 QuicTime::Delta timeout = ComputePollTimeout(start_time, default_timeout);
81 ProcessIoEvents(start_time, timeout);
82
83 const QuicTime end_time = clock_->Now();
84 ProcessAlarmsUpTo(end_time);
85 }
86
ComputePollTimeout(QuicTime now,QuicTime::Delta default_timeout) const87 QuicTime::Delta QuicPollEventLoop::ComputePollTimeout(
88 QuicTime now, QuicTime::Delta default_timeout) const {
89 default_timeout = std::max(default_timeout, QuicTime::Delta::Zero());
90 if (has_artificial_events_pending_) {
91 return QuicTime::Delta::Zero();
92 }
93 if (alarms_.empty()) {
94 return default_timeout;
95 }
96 QuicTime end_time = std::min(now + default_timeout, alarms_.begin()->first);
97 if (end_time < now) {
98 // We only run a single pass of processing alarm callbacks per
99 // RunEventLoopOnce() call. If an alarm schedules another alarm in the past
100 // while in the callback, this will happen.
101 return QuicTime::Delta::Zero();
102 }
103 return end_time - now;
104 }
105
PollWithRetries(absl::Span<pollfd> fds,QuicTime start_time,QuicTime::Delta timeout)106 int QuicPollEventLoop::PollWithRetries(absl::Span<pollfd> fds,
107 QuicTime start_time,
108 QuicTime::Delta timeout) {
109 const QuicTime timeout_at = start_time + timeout;
110 int poll_result;
111 for (;;) {
112 float timeout_ms = std::ceil(timeout.ToMicroseconds() / 1000.f);
113 poll_result =
114 PollSyscall(fds.data(), fds.size(), static_cast<int>(timeout_ms));
115
116 // Stop if there are events or a non-EINTR error.
117 bool done = poll_result > 0 || (poll_result < 0 && errno != EINTR);
118 if (done) {
119 break;
120 }
121 // Poll until `clock_` shows the timeout was exceeded.
122 // PollSyscall uses a system clock internally that may run faster.
123 QuicTime now = clock_->Now();
124 if (now >= timeout_at) {
125 break;
126 }
127 timeout = timeout_at - now;
128 }
129 return poll_result;
130 }
131
ProcessIoEvents(QuicTime start_time,QuicTime::Delta timeout)132 void QuicPollEventLoop::ProcessIoEvents(QuicTime start_time,
133 QuicTime::Delta timeout) {
134 // Set up the pollfd[] array.
135 const size_t registration_count = registrations_.size();
136 auto pollfds = std::make_unique<pollfd[]>(registration_count);
137 size_t i = 0;
138 for (auto& [fd, registration] : registrations_) {
139 QUICHE_CHECK_LT(
140 i, registration_count); // Crash instead of out-of-bounds access.
141 pollfds[i].fd = fd;
142 pollfds[i].events = GetPollMask(registration->events);
143 pollfds[i].revents = 0;
144 ++i;
145 }
146
147 // Actually run poll(2).
148 int poll_result =
149 PollWithRetries(absl::Span<pollfd>(pollfds.get(), registration_count),
150 start_time, timeout);
151 if (poll_result == 0 && !has_artificial_events_pending_) {
152 return;
153 }
154
155 // Prepare the list of all callbacks to be called, while resetting all events,
156 // since we're operating in the level-triggered mode.
157 std::vector<ReadyListEntry> ready_list;
158 ready_list.reserve(registration_count);
159 for (i = 0; i < registration_count; i++) {
160 DispatchIoEvent(ready_list, pollfds[i].fd, pollfds[i].revents);
161 }
162 has_artificial_events_pending_ = false;
163
164 // Actually call all of the callbacks.
165 RunReadyCallbacks(ready_list);
166 }
167
DispatchIoEvent(std::vector<ReadyListEntry> & ready_list,SocketFd fd,PollMask mask)168 void QuicPollEventLoop::DispatchIoEvent(std::vector<ReadyListEntry>& ready_list,
169 SocketFd fd, PollMask mask) {
170 auto it = registrations_.find(fd);
171 if (it == registrations_.end()) {
172 QUIC_BUG(poll returned an unregistered fd) << fd;
173 return;
174 }
175 Registration& registration = *it->second;
176
177 mask |= GetPollMask(registration.artificially_notify_at_next_iteration);
178 registration.artificially_notify_at_next_iteration = QuicSocketEventMask();
179
180 // poll() always returns certain classes of events even if not requested.
181 mask &= GetPollMask(registration.events);
182 if (!mask) {
183 return;
184 }
185
186 ready_list.push_back(ReadyListEntry{fd, it->second, GetEventMask(mask)});
187 registration.events &= ~GetEventMask(mask);
188 }
189
RunReadyCallbacks(std::vector<ReadyListEntry> & ready_list)190 void QuicPollEventLoop::RunReadyCallbacks(
191 std::vector<ReadyListEntry>& ready_list) {
192 for (ReadyListEntry& entry : ready_list) {
193 std::shared_ptr<Registration> registration = entry.registration.lock();
194 if (!registration) {
195 // The socket has been unregistered from within one of the callbacks.
196 continue;
197 }
198 registration->listener->OnSocketEvent(this, entry.fd, entry.events);
199 }
200 ready_list.clear();
201 }
202
ProcessAlarmsUpTo(QuicTime time)203 void QuicPollEventLoop::ProcessAlarmsUpTo(QuicTime time) {
204 // Determine which alarm callbacks needs to be run.
205 std::vector<std::weak_ptr<Alarm*>> alarms_to_call;
206 while (!alarms_.empty() && alarms_.begin()->first <= time) {
207 auto& [deadline, schedule_handle_weak] = *alarms_.begin();
208 alarms_to_call.push_back(std::move(schedule_handle_weak));
209 alarms_.erase(alarms_.begin());
210 }
211 // Actually run those callbacks.
212 for (std::weak_ptr<Alarm*>& schedule_handle_weak : alarms_to_call) {
213 std::shared_ptr<Alarm*> schedule_handle = schedule_handle_weak.lock();
214 if (!schedule_handle) {
215 // The alarm has been cancelled and might not even exist anymore.
216 continue;
217 }
218 (*schedule_handle)->DoFire();
219 }
220 // Clean up all of the alarms in the front that have been cancelled.
221 while (!alarms_.empty()) {
222 if (alarms_.begin()->second.expired()) {
223 alarms_.erase(alarms_.begin());
224 } else {
225 break;
226 }
227 }
228 }
229
CreateAlarm(QuicAlarm::Delegate * delegate)230 QuicAlarm* QuicPollEventLoop::AlarmFactory::CreateAlarm(
231 QuicAlarm::Delegate* delegate) {
232 return new Alarm(loop_, QuicArenaScopedPtr<QuicAlarm::Delegate>(delegate));
233 }
234
CreateAlarm(QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,QuicConnectionArena * arena)235 QuicArenaScopedPtr<QuicAlarm> QuicPollEventLoop::AlarmFactory::CreateAlarm(
236 QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,
237 QuicConnectionArena* arena) {
238 if (arena != nullptr) {
239 return arena->New<Alarm>(loop_, std::move(delegate));
240 }
241 return QuicArenaScopedPtr<QuicAlarm>(new Alarm(loop_, std::move(delegate)));
242 }
243
Alarm(QuicPollEventLoop * loop,QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)244 QuicPollEventLoop::Alarm::Alarm(
245 QuicPollEventLoop* loop, QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)
246 : QuicAlarm(std::move(delegate)), loop_(loop) {}
247
SetImpl()248 void QuicPollEventLoop::Alarm::SetImpl() {
249 current_schedule_handle_ = std::make_shared<Alarm*>(this);
250 loop_->alarms_.insert({deadline(), current_schedule_handle_});
251 }
252
CancelImpl()253 void QuicPollEventLoop::Alarm::CancelImpl() {
254 current_schedule_handle_.reset();
255 }
256
CreateAlarmFactory()257 std::unique_ptr<QuicAlarmFactory> QuicPollEventLoop::CreateAlarmFactory() {
258 return std::make_unique<AlarmFactory>(this);
259 }
260
PollSyscall(pollfd * fds,size_t nfds,int timeout)261 int QuicPollEventLoop::PollSyscall(pollfd* fds, size_t nfds, int timeout) {
262 #if defined(_WIN32)
263 return WSAPoll(fds, nfds, timeout);
264 #else
265 return ::poll(fds, nfds, timeout);
266 #endif // defined(_WIN32)
267 }
268
269 } // namespace quic
270