/* * Copyright (C) 2023 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "src/traced_relay/socket_relay_handler.h" #include #include #include #include #include #include #include #include "perfetto/base/logging.h" #include "perfetto/base/platform_handle.h" #include "perfetto/ext/base/thread_checker.h" #include "perfetto/ext/base/utils.h" #include "perfetto/ext/base/watchdog.h" namespace perfetto { namespace { // Use the default watchdog timeout for task runners. static constexpr int kWatchdogTimeoutMs = 30000; // Timeout of the epoll_wait() call. static constexpr int kPollTimeoutMs = 30000; } // namespace FdPoller::Watcher::~Watcher() = default; FdPoller::FdPoller(Watcher* watcher) : watcher_(watcher) { WatchForRead(notify_fd_.fd()); // This is done last in the ctor because WatchForRead() asserts using // |thread_checker_|. PERFETTO_DETACH_FROM_THREAD(thread_checker_); } void FdPoller::Poll() { PERFETTO_DCHECK_THREAD(thread_checker_); int num_fds = PERFETTO_EINTR(poll( &poll_fds_[0], static_cast(poll_fds_.size()), kPollTimeoutMs)); if (num_fds == -1 && base::IsAgain(errno)) return; // Poll again. PERFETTO_DCHECK(num_fds <= static_cast(poll_fds_.size())); // Make a copy of |poll_fds_| so it's safe to watch and unwatch while // notifying the watcher. const auto poll_fds(poll_fds_); for (const auto& event : poll_fds) { if (!event.revents) // This event isn't active. continue; // Check whether the poller needs to break the polling loop for updates. if (event.fd == notify_fd_.fd()) { notify_fd_.Clear(); continue; } // Notify the callers on fd events. if (event.revents & POLLOUT) { watcher_->OnFdWritable(event.fd); } else if (event.revents & POLLIN) { watcher_->OnFdReadable(event.fd); } else { PERFETTO_DLOG("poll() returns events %d on fd %d", event.events, event.fd); } // Other events like POLLHUP or POLLERR are ignored. } } void FdPoller::Notify() { // Can be called from any thread. notify_fd_.Notify(); } std::vector::iterator FdPoller::FindPollEvent(base::PlatformHandle fd) { PERFETTO_DCHECK_THREAD(thread_checker_); return std::find_if(poll_fds_.begin(), poll_fds_.end(), [fd](const pollfd& item) { return fd == item.fd; }); } void FdPoller::WatchFd(base::PlatformHandle fd, WatchEvents events) { auto it = FindPollEvent(fd); if (it == poll_fds_.end()) { poll_fds_.push_back({fd, events, 0}); } else { it->events |= events; } } void FdPoller::UnwatchFd(base::PlatformHandle fd, WatchEvents events) { auto it = FindPollEvent(fd); PERFETTO_CHECK(it != poll_fds_.end()); it->events &= ~events; } void FdPoller::RemoveWatch(base::PlatformHandle fd) { auto it = FindPollEvent(fd); PERFETTO_CHECK(it != poll_fds_.end()); poll_fds_.erase(it); } SocketRelayHandler::SocketRelayHandler() : fd_poller_(this) { PERFETTO_DETACH_FROM_THREAD(io_thread_checker_); io_thread_ = std::thread([this]() { this->Run(); }); } SocketRelayHandler::~SocketRelayHandler() { RunOnIOThread([this]() { this->exited_ = true; }); io_thread_.join(); } void SocketRelayHandler::AddSocketPair( std::unique_ptr socket_pair) { RunOnIOThread([this, socket_pair = std::move(socket_pair)]() mutable { PERFETTO_DCHECK_THREAD(io_thread_checker_); base::PlatformHandle fd1 = socket_pair->first.sock.fd(); base::PlatformHandle fd2 = socket_pair->second.sock.fd(); auto* ptr = socket_pair.get(); socket_pairs_.emplace_back(std::move(socket_pair)); fd_poller_.WatchForRead(fd1); fd_poller_.WatchForRead(fd2); socket_pairs_by_fd_[fd1] = ptr; socket_pairs_by_fd_[fd2] = ptr; }); } void SocketRelayHandler::Run() { PERFETTO_DCHECK_THREAD(io_thread_checker_); while (!exited_) { fd_poller_.Poll(); auto handle = base::Watchdog::GetInstance()->CreateFatalTimer( kWatchdogTimeoutMs, base::WatchdogCrashReason::kTaskRunnerHung); std::deque> pending_tasks; { std::lock_guard lock(mutex_); pending_tasks = std::move(pending_tasks_); } while (!pending_tasks.empty()) { auto task = std::move(pending_tasks.front()); pending_tasks.pop_front(); task(); } } } void SocketRelayHandler::OnFdReadable(base::PlatformHandle fd) { PERFETTO_DCHECK_THREAD(io_thread_checker_); auto socket_pair = GetSocketPair(fd); if (!socket_pair) return; // Already removed. auto [fd_sock, peer_sock] = *socket_pair; // Buffer some bytes. auto peer_fd = peer_sock.sock.fd(); while (fd_sock.available_bytes() > 0) { auto rsize = fd_sock.sock.Receive(fd_sock.buffer(), fd_sock.available_bytes()); if (rsize > 0) { fd_sock.EnqueueData(static_cast(rsize)); continue; } if (rsize == 0 || (rsize == -1 && !base::IsAgain(errno))) { // TODO(chinglinyu): flush the remaining data to |peer_sock|. RemoveSocketPair(fd_sock, peer_sock); return; } // If there is any buffered data that needs to be sent to |peer_sock|, arm // the write watcher. if (fd_sock.data_size() > 0) { fd_poller_.WatchForWrite(peer_fd); } return; } // We are not bufferable: need to turn off POLLIN to avoid spinning. fd_poller_.UnwatchForRead(fd); PERFETTO_DCHECK(fd_sock.data_size() > 0); // Watching for POLLOUT will cause an OnFdWritable() event of // |peer_sock|. fd_poller_.WatchForWrite(peer_fd); } void SocketRelayHandler::OnFdWritable(base::PlatformHandle fd) { PERFETTO_DCHECK_THREAD(io_thread_checker_); auto socket_pair = GetSocketPair(fd); if (!socket_pair) return; // Already removed. auto [fd_sock, peer_sock] = *socket_pair; // |fd_sock| can be written to without blocking. Now we can transfer from the // buffer in |peer_sock|. while (peer_sock.data_size() > 0) { auto wsize = fd_sock.sock.Send(peer_sock.data(), peer_sock.data_size()); if (wsize > 0) { peer_sock.DequeueData(static_cast(wsize)); continue; } if (wsize == -1 && !base::IsAgain(errno)) { RemoveSocketPair(fd_sock, peer_sock); } // errno == EAGAIN and we still have data to send: continue watching for // read. return; } // We don't have buffered data to send. Disable watching for write. fd_poller_.UnwatchForWrite(fd); auto peer_fd = peer_sock.sock.fd(); if (peer_sock.available_bytes()) fd_poller_.WatchForRead(peer_fd); } std::optional> SocketRelayHandler::GetSocketPair(base::PlatformHandle fd) { PERFETTO_DCHECK_THREAD(io_thread_checker_); auto* socket_pair = socket_pairs_by_fd_.Find(fd); if (!socket_pair) return std::nullopt; PERFETTO_DCHECK(fd == (*socket_pair)->first.sock.fd() || fd == (*socket_pair)->second.sock.fd()); if (fd == (*socket_pair)->first.sock.fd()) return std::tie((*socket_pair)->first, (*socket_pair)->second); return std::tie((*socket_pair)->second, (*socket_pair)->first); } void SocketRelayHandler::RemoveSocketPair(SocketWithBuffer& sock1, SocketWithBuffer& sock2) { PERFETTO_DCHECK_THREAD(io_thread_checker_); auto fd1 = sock1.sock.fd(); auto fd2 = sock2.sock.fd(); fd_poller_.RemoveWatch(fd1); fd_poller_.RemoveWatch(fd2); auto* ptr1 = socket_pairs_by_fd_.Find(fd1); auto* ptr2 = socket_pairs_by_fd_.Find(fd2); PERFETTO_DCHECK(ptr1 && ptr2); PERFETTO_DCHECK(*ptr1 == *ptr2); auto* socket_pair_ptr = *ptr1; socket_pairs_by_fd_.Erase(fd1); socket_pairs_by_fd_.Erase(fd2); socket_pairs_.erase( std::remove_if( socket_pairs_.begin(), socket_pairs_.end(), [socket_pair_ptr](const std::unique_ptr& item) { return item.get() == socket_pair_ptr; }), socket_pairs_.end()); } } // namespace perfetto