1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_channel/epoll_channel.h"
16
17 #include <fcntl.h>
18 #include <unistd.h>
19
20 #include "pw_log/log.h"
21 #include "pw_status/try.h"
22
23 namespace pw::channel {
24
Register()25 void EpollChannel::Register() {
26 if (fcntl(channel_fd_, F_SETFL, O_NONBLOCK) != 0) {
27 PW_LOG_ERROR("Failed to make channel file descriptor nonblocking: %s",
28 std::strerror(errno));
29 set_closed();
30 return;
31 }
32
33 if (!dispatcher_->native()
34 .NativeRegisterFileDescriptor(channel_fd_,
35 async2::backend::NativeDispatcher::
36 FileDescriptorType::kReadWrite)
37 .ok()) {
38 set_closed();
39 return;
40 }
41
42 ready_to_write_ = true;
43 }
44
DoPendRead(async2::Context & cx)45 async2::Poll<Result<multibuf::MultiBuf>> EpollChannel::DoPendRead(
46 async2::Context& cx) {
47 write_alloc_future_.SetDesiredSizes(
48 kMinimumReadSize, kDesiredReadSize, pw::multibuf::kNeedsContiguous);
49 async2::Poll<std::optional<multibuf::MultiBuf>> maybe_multibuf =
50 write_alloc_future_.Pend(cx);
51 if (maybe_multibuf.IsPending()) {
52 return async2::Pending();
53 }
54
55 if (!maybe_multibuf->has_value()) {
56 PW_LOG_ERROR("Failed to allocate multibuf for reading");
57 return Status::ResourceExhausted();
58 }
59
60 multibuf::MultiBuf buf = std::move(**maybe_multibuf);
61 multibuf::Chunk& chunk = *buf.Chunks().begin();
62
63 int bytes_read = read(channel_fd_, chunk.data(), chunk.size());
64 if (bytes_read >= 0) {
65 buf.Truncate(bytes_read);
66 return async2::Ready(std::move(buf));
67 }
68
69 if (errno == EAGAIN) {
70 // EAGAIN on a non-blocking read indicates that there is no data available.
71 // Put the task to sleep until the dispatcher is notified that the file
72 // descriptor is active.
73 PW_ASYNC_STORE_WAKER(
74 cx,
75 cx.dispatcher().native().NativeAddReadWakerForFileDescriptor(
76 channel_fd_),
77 "EpollChannel is waiting on a file descriptor read");
78 return async2::Pending();
79 }
80
81 return Status::Internal();
82 }
83
DoPendReadyToWrite(async2::Context & cx)84 async2::Poll<Status> EpollChannel::DoPendReadyToWrite(async2::Context& cx) {
85 if (ready_to_write_) {
86 return OkStatus();
87 }
88 // The previous write operation failed. Block the task until the dispatcher
89 // receives a notification for the channel's file descriptor.
90 ready_to_write_ = true;
91 PW_ASYNC_STORE_WAKER(
92 cx,
93 cx.dispatcher().native().NativeAddWriteWakerForFileDescriptor(
94 channel_fd_),
95 "EpollChannel is waiting on a file descriptor write");
96 return async2::Pending();
97 }
98
DoStageWrite(multibuf::MultiBuf && data)99 Status EpollChannel::DoStageWrite(multibuf::MultiBuf&& data) {
100 for (multibuf::Chunk& chunk : data.Chunks()) {
101 if (write(channel_fd_, chunk.data(), chunk.size()) < 0) {
102 if (errno == EAGAIN || errno == EWOULDBLOCK) {
103 // The file descriptor is not currently available. The next call to
104 // `PendReadyToWrite` will put the task to sleep until it is writable
105 // again.
106 ready_to_write_ = false;
107 return Status::Unavailable();
108 }
109
110 PW_LOG_ERROR("Epoll channel write failed: %s", std::strerror(errno));
111 return Status::Internal();
112 }
113 }
114
115 return OkStatus();
116 }
117
Cleanup()118 void EpollChannel::Cleanup() {
119 if (is_read_or_write_open()) {
120 dispatcher_->native()
121 .NativeUnregisterFileDescriptor(channel_fd_)
122 .IgnoreError();
123 set_closed();
124 }
125 close(channel_fd_);
126 }
127
128 } // namespace pw::channel
129