xref: /aosp_15_r20/external/cronet/net/third_party/quiche/src/quiche/quic/core/io/quic_poll_event_loop.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2022 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "quiche/quic/core/io/quic_poll_event_loop.h"
6 
7 #include <algorithm>
8 #include <cerrno>
9 #include <memory>
10 
11 #include "absl/types/span.h"
12 #include "quiche/quic/core/io/quic_event_loop.h"
13 #include "quiche/quic/core/quic_alarm.h"
14 #include "quiche/quic/core/quic_time.h"
15 #include "quiche/quic/platform/api/quic_bug_tracker.h"
16 
17 namespace quic {
18 
19 namespace {
20 
21 using PollMask = decltype(::pollfd().events);
22 
GetPollMask(QuicSocketEventMask event_mask)23 PollMask GetPollMask(QuicSocketEventMask event_mask) {
24   return ((event_mask & kSocketEventReadable) ? POLLIN : 0) |
25          ((event_mask & kSocketEventWritable) ? POLLOUT : 0) |
26          ((event_mask & kSocketEventError) ? POLLERR : 0);
27 }
28 
GetEventMask(PollMask poll_mask)29 QuicSocketEventMask GetEventMask(PollMask poll_mask) {
30   return ((poll_mask & POLLIN) ? kSocketEventReadable : 0) |
31          ((poll_mask & POLLOUT) ? kSocketEventWritable : 0) |
32          ((poll_mask & POLLERR) ? kSocketEventError : 0);
33 }
34 
35 }  // namespace
36 
QuicPollEventLoop(QuicClock * clock)37 QuicPollEventLoop::QuicPollEventLoop(QuicClock* clock) : clock_(clock) {}
38 
RegisterSocket(SocketFd fd,QuicSocketEventMask events,QuicSocketEventListener * listener)39 bool QuicPollEventLoop::RegisterSocket(SocketFd fd, QuicSocketEventMask events,
40                                        QuicSocketEventListener* listener) {
41   auto [it, success] =
42       registrations_.insert({fd, std::make_shared<Registration>()});
43   if (!success) {
44     return false;
45   }
46   Registration& registration = *it->second;
47   registration.events = events;
48   registration.listener = listener;
49   return true;
50 }
51 
UnregisterSocket(SocketFd fd)52 bool QuicPollEventLoop::UnregisterSocket(SocketFd fd) {
53   return registrations_.erase(fd);
54 }
55 
RearmSocket(SocketFd fd,QuicSocketEventMask events)56 bool QuicPollEventLoop::RearmSocket(SocketFd fd, QuicSocketEventMask events) {
57   auto it = registrations_.find(fd);
58   if (it == registrations_.end()) {
59     return false;
60   }
61   it->second->events |= events;
62   return true;
63 }
64 
ArtificiallyNotifyEvent(SocketFd fd,QuicSocketEventMask events)65 bool QuicPollEventLoop::ArtificiallyNotifyEvent(SocketFd fd,
66                                                 QuicSocketEventMask events) {
67   auto it = registrations_.find(fd);
68   if (it == registrations_.end()) {
69     return false;
70   }
71   has_artificial_events_pending_ = true;
72   it->second->artificially_notify_at_next_iteration |= events;
73   return true;
74 }
75 
RunEventLoopOnce(QuicTime::Delta default_timeout)76 void QuicPollEventLoop::RunEventLoopOnce(QuicTime::Delta default_timeout) {
77   const QuicTime start_time = clock_->Now();
78   ProcessAlarmsUpTo(start_time);
79 
80   QuicTime::Delta timeout = ComputePollTimeout(start_time, default_timeout);
81   ProcessIoEvents(start_time, timeout);
82 
83   const QuicTime end_time = clock_->Now();
84   ProcessAlarmsUpTo(end_time);
85 }
86 
ComputePollTimeout(QuicTime now,QuicTime::Delta default_timeout) const87 QuicTime::Delta QuicPollEventLoop::ComputePollTimeout(
88     QuicTime now, QuicTime::Delta default_timeout) const {
89   default_timeout = std::max(default_timeout, QuicTime::Delta::Zero());
90   if (has_artificial_events_pending_) {
91     return QuicTime::Delta::Zero();
92   }
93   if (alarms_.empty()) {
94     return default_timeout;
95   }
96   QuicTime end_time = std::min(now + default_timeout, alarms_.begin()->first);
97   if (end_time < now) {
98     // We only run a single pass of processing alarm callbacks per
99     // RunEventLoopOnce() call.  If an alarm schedules another alarm in the past
100     // while in the callback, this will happen.
101     return QuicTime::Delta::Zero();
102   }
103   return end_time - now;
104 }
105 
PollWithRetries(absl::Span<pollfd> fds,QuicTime start_time,QuicTime::Delta timeout)106 int QuicPollEventLoop::PollWithRetries(absl::Span<pollfd> fds,
107                                        QuicTime start_time,
108                                        QuicTime::Delta timeout) {
109   const QuicTime timeout_at = start_time + timeout;
110   int poll_result;
111   for (;;) {
112     float timeout_ms = std::ceil(timeout.ToMicroseconds() / 1000.f);
113     poll_result =
114         PollSyscall(fds.data(), fds.size(), static_cast<int>(timeout_ms));
115 
116     // Stop if there are events or a non-EINTR error.
117     bool done = poll_result > 0 || (poll_result < 0 && errno != EINTR);
118     if (done) {
119       break;
120     }
121     // Poll until `clock_` shows the timeout was exceeded.
122     // PollSyscall uses a system clock internally that may run faster.
123     QuicTime now = clock_->Now();
124     if (now >= timeout_at) {
125       break;
126     }
127     timeout = timeout_at - now;
128   }
129   return poll_result;
130 }
131 
ProcessIoEvents(QuicTime start_time,QuicTime::Delta timeout)132 void QuicPollEventLoop::ProcessIoEvents(QuicTime start_time,
133                                         QuicTime::Delta timeout) {
134   // Set up the pollfd[] array.
135   const size_t registration_count = registrations_.size();
136   auto pollfds = std::make_unique<pollfd[]>(registration_count);
137   size_t i = 0;
138   for (auto& [fd, registration] : registrations_) {
139     QUICHE_CHECK_LT(
140         i, registration_count);  // Crash instead of out-of-bounds access.
141     pollfds[i].fd = fd;
142     pollfds[i].events = GetPollMask(registration->events);
143     pollfds[i].revents = 0;
144     ++i;
145   }
146 
147   // Actually run poll(2).
148   int poll_result =
149       PollWithRetries(absl::Span<pollfd>(pollfds.get(), registration_count),
150                       start_time, timeout);
151   if (poll_result == 0 && !has_artificial_events_pending_) {
152     return;
153   }
154 
155   // Prepare the list of all callbacks to be called, while resetting all events,
156   // since we're operating in the level-triggered mode.
157   std::vector<ReadyListEntry> ready_list;
158   ready_list.reserve(registration_count);
159   for (i = 0; i < registration_count; i++) {
160     DispatchIoEvent(ready_list, pollfds[i].fd, pollfds[i].revents);
161   }
162   has_artificial_events_pending_ = false;
163 
164   // Actually call all of the callbacks.
165   RunReadyCallbacks(ready_list);
166 }
167 
DispatchIoEvent(std::vector<ReadyListEntry> & ready_list,SocketFd fd,PollMask mask)168 void QuicPollEventLoop::DispatchIoEvent(std::vector<ReadyListEntry>& ready_list,
169                                         SocketFd fd, PollMask mask) {
170   auto it = registrations_.find(fd);
171   if (it == registrations_.end()) {
172     QUIC_BUG(poll returned an unregistered fd) << fd;
173     return;
174   }
175   Registration& registration = *it->second;
176 
177   mask |= GetPollMask(registration.artificially_notify_at_next_iteration);
178   registration.artificially_notify_at_next_iteration = QuicSocketEventMask();
179 
180   // poll() always returns certain classes of events even if not requested.
181   mask &= GetPollMask(registration.events);
182   if (!mask) {
183     return;
184   }
185 
186   ready_list.push_back(ReadyListEntry{fd, it->second, GetEventMask(mask)});
187   registration.events &= ~GetEventMask(mask);
188 }
189 
RunReadyCallbacks(std::vector<ReadyListEntry> & ready_list)190 void QuicPollEventLoop::RunReadyCallbacks(
191     std::vector<ReadyListEntry>& ready_list) {
192   for (ReadyListEntry& entry : ready_list) {
193     std::shared_ptr<Registration> registration = entry.registration.lock();
194     if (!registration) {
195       // The socket has been unregistered from within one of the callbacks.
196       continue;
197     }
198     registration->listener->OnSocketEvent(this, entry.fd, entry.events);
199   }
200   ready_list.clear();
201 }
202 
ProcessAlarmsUpTo(QuicTime time)203 void QuicPollEventLoop::ProcessAlarmsUpTo(QuicTime time) {
204   // Determine which alarm callbacks needs to be run.
205   std::vector<std::weak_ptr<Alarm*>> alarms_to_call;
206   while (!alarms_.empty() && alarms_.begin()->first <= time) {
207     auto& [deadline, schedule_handle_weak] = *alarms_.begin();
208     alarms_to_call.push_back(std::move(schedule_handle_weak));
209     alarms_.erase(alarms_.begin());
210   }
211   // Actually run those callbacks.
212   for (std::weak_ptr<Alarm*>& schedule_handle_weak : alarms_to_call) {
213     std::shared_ptr<Alarm*> schedule_handle = schedule_handle_weak.lock();
214     if (!schedule_handle) {
215       // The alarm has been cancelled and might not even exist anymore.
216       continue;
217     }
218     (*schedule_handle)->DoFire();
219   }
220   // Clean up all of the alarms in the front that have been cancelled.
221   while (!alarms_.empty()) {
222     if (alarms_.begin()->second.expired()) {
223       alarms_.erase(alarms_.begin());
224     } else {
225       break;
226     }
227   }
228 }
229 
CreateAlarm(QuicAlarm::Delegate * delegate)230 QuicAlarm* QuicPollEventLoop::AlarmFactory::CreateAlarm(
231     QuicAlarm::Delegate* delegate) {
232   return new Alarm(loop_, QuicArenaScopedPtr<QuicAlarm::Delegate>(delegate));
233 }
234 
CreateAlarm(QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,QuicConnectionArena * arena)235 QuicArenaScopedPtr<QuicAlarm> QuicPollEventLoop::AlarmFactory::CreateAlarm(
236     QuicArenaScopedPtr<QuicAlarm::Delegate> delegate,
237     QuicConnectionArena* arena) {
238   if (arena != nullptr) {
239     return arena->New<Alarm>(loop_, std::move(delegate));
240   }
241   return QuicArenaScopedPtr<QuicAlarm>(new Alarm(loop_, std::move(delegate)));
242 }
243 
Alarm(QuicPollEventLoop * loop,QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)244 QuicPollEventLoop::Alarm::Alarm(
245     QuicPollEventLoop* loop, QuicArenaScopedPtr<QuicAlarm::Delegate> delegate)
246     : QuicAlarm(std::move(delegate)), loop_(loop) {}
247 
SetImpl()248 void QuicPollEventLoop::Alarm::SetImpl() {
249   current_schedule_handle_ = std::make_shared<Alarm*>(this);
250   loop_->alarms_.insert({deadline(), current_schedule_handle_});
251 }
252 
CancelImpl()253 void QuicPollEventLoop::Alarm::CancelImpl() {
254   current_schedule_handle_.reset();
255 }
256 
CreateAlarmFactory()257 std::unique_ptr<QuicAlarmFactory> QuicPollEventLoop::CreateAlarmFactory() {
258   return std::make_unique<AlarmFactory>(this);
259 }
260 
PollSyscall(pollfd * fds,size_t nfds,int timeout)261 int QuicPollEventLoop::PollSyscall(pollfd* fds, size_t nfds, int timeout) {
262 #if defined(_WIN32)
263   return WSAPoll(fds, nfds, timeout);
264 #else
265   return ::poll(fds, nfds, timeout);
266 #endif  // defined(_WIN32)
267 }
268 
269 }  // namespace quic
270