1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_channel/epoll_channel.h"
16
17 #include <fcntl.h>
18 #include <sys/stat.h>
19 #include <sys/types.h>
20 #include <unistd.h>
21
22 #include "gtest/gtest.h"
23 #include "pw_assert/check.h"
24 #include "pw_async2/dispatcher.h"
25 #include "pw_bytes/array.h"
26 #include "pw_bytes/suffix.h"
27 #include "pw_channel/channel.h"
28 #include "pw_multibuf/simple_allocator_for_test.h"
29 #include "pw_status/status.h"
30 #include "pw_thread/sleep.h"
31 #include "pw_thread/thread.h"
32 #include "pw_thread_stl/options.h"
33
34 namespace {
35
36 using namespace std::chrono_literals;
37
38 using ::pw::async2::Context;
39 using ::pw::async2::Dispatcher;
40 using ::pw::async2::Pending;
41 using ::pw::async2::Poll;
42 using ::pw::async2::Ready;
43 using ::pw::async2::Task;
44 using ::pw::channel::ByteReader;
45 using ::pw::channel::ByteWriter;
46 using ::pw::channel::EpollChannel;
47 using ::pw::multibuf::MultiBuf;
48 using ::pw::multibuf::test::SimpleAllocatorForTest;
49
50 template <typename ChannelKind>
51 class ReaderTask : public Task {
52 public:
ReaderTask(ChannelKind & channel,int num_reads)53 ReaderTask(ChannelKind& channel, int num_reads)
54 : channel_(channel), num_reads_(num_reads) {}
55
56 int poll_count = 0;
57 int read_count = 0;
58 int bytes_read = 0;
59 pw::Status read_status = pw::Status::Unknown();
60
61 private:
DoPend(Context & cx)62 Poll<> DoPend(Context& cx) final {
63 ++poll_count;
64 while (read_count < num_reads_) {
65 auto result = channel_.PendRead(cx);
66 if (result.IsPending()) {
67 return Pending();
68 }
69 read_status = result->status();
70 if (!result->ok()) {
71 // We hit an error-- call it quits.
72 return Ready();
73 }
74 ++read_count;
75 bytes_read += (**result).size();
76
77 (**result).Release();
78 }
79
80 return Ready();
81 }
82
83 ChannelKind& channel_;
84 int num_reads_;
85 };
86
87 template <typename ChannelKind>
88 class CloseTask : public Task {
89 public:
CloseTask(ChannelKind & channel)90 CloseTask(ChannelKind& channel) : channel_(channel) {}
91
92 pw::Status close_status = pw::Status::Unknown();
93
94 private:
DoPend(Context & cx)95 Poll<> DoPend(Context& cx) final {
96 auto result = channel_.PendClose(cx);
97 if (result.IsPending()) {
98 return Pending();
99 }
100
101 close_status = *result;
102 return Ready();
103 }
104
105 ChannelKind& channel_;
106 };
107
108 class EpollChannelTest : public ::testing::Test {
109 protected:
EpollChannelTest()110 EpollChannelTest() {
111 int pipefd[2];
112 PW_CHECK_INT_NE(pipe(pipefd), -1);
113 read_fd_ = pipefd[0];
114 write_fd_ = pipefd[1];
115 }
116
~EpollChannelTest()117 ~EpollChannelTest() override {
118 close(read_fd_);
119 close(write_fd_);
120 }
121
122 int read_fd_;
123 int write_fd_;
124 };
125
126 template <typename Func>
127 class FunctionThread : public pw::thread::ThreadCore {
128 public:
FunctionThread(Func && func)129 explicit FunctionThread(Func&& func) : func_(std::move(func)) {}
130
131 private:
Run()132 void Run() override { func_(); }
133
134 Func func_;
135 };
136
TEST_F(EpollChannelTest,Read_ValidData_Succeeds)137 TEST_F(EpollChannelTest, Read_ValidData_Succeeds) {
138 SimpleAllocatorForTest alloc;
139 Dispatcher dispatcher;
140
141 EpollChannel channel(read_fd_, dispatcher, alloc);
142 ASSERT_TRUE(channel.is_read_open());
143 ASSERT_TRUE(channel.is_write_open());
144
145 ReaderTask<ByteReader> read_task(channel.channel(), 1);
146 dispatcher.Post(read_task);
147
148 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
149 EXPECT_EQ(read_task.poll_count, 1);
150 EXPECT_EQ(read_task.read_count, 0);
151 EXPECT_EQ(read_task.bytes_read, 0);
152
153 FunctionThread delayed_write([this]() {
154 pw::this_thread::sleep_for(500ms);
155 const char* data = "hello world";
156 PW_CHECK_INT_EQ(write(write_fd_, data, 11), 11);
157 });
158
159 pw::Thread work_thread(pw::thread::stl::Options(), delayed_write);
160 work_thread.join();
161
162 dispatcher.RunToCompletion();
163 EXPECT_EQ(read_task.read_status, pw::OkStatus());
164 EXPECT_EQ(read_task.poll_count, 2);
165 EXPECT_EQ(read_task.read_count, 1);
166 EXPECT_EQ(read_task.bytes_read, 11);
167
168 CloseTask close_task(channel);
169 dispatcher.Post(close_task);
170 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
171 EXPECT_EQ(close_task.close_status, pw::OkStatus());
172 }
173
TEST_F(EpollChannelTest,Read_Closed_ReturnsFailedPrecondition)174 TEST_F(EpollChannelTest, Read_Closed_ReturnsFailedPrecondition) {
175 SimpleAllocatorForTest alloc;
176 Dispatcher dispatcher;
177
178 EpollChannel channel(read_fd_, dispatcher, alloc);
179 ASSERT_TRUE(channel.is_read_open());
180 ASSERT_TRUE(channel.is_write_open());
181
182 CloseTask close_task(channel);
183 dispatcher.Post(close_task);
184 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
185 EXPECT_EQ(close_task.close_status, pw::OkStatus());
186
187 ReaderTask<ByteReader> read_task(channel.channel(), 1);
188 dispatcher.Post(read_task);
189
190 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
191 EXPECT_EQ(read_task.read_status, pw::Status::FailedPrecondition());
192 }
193
194 template <typename ChannelKind>
195 class WriterTask : public Task {
196 public:
WriterTask(ChannelKind & channel,int num_writes,pw::ConstByteSpan data_to_write)197 WriterTask(ChannelKind& channel,
198 int num_writes,
199 pw::ConstByteSpan data_to_write)
200 : max_writes(num_writes),
201 channel_(channel),
202 data_to_write_(data_to_write) {}
203
204 int poll_count = 0;
205 int write_pending_count = 0;
206 int write_count = 0;
207 int max_writes = 0;
208 pw::Status last_write_status = pw::Status::Unknown();
209
210 private:
DoPend(Context & cx)211 Poll<> DoPend(Context& cx) final {
212 ++poll_count;
213
214 while (write_count < max_writes) {
215 auto result = channel_.PendReadyToWrite(cx);
216 if (result.IsPending()) {
217 ++write_pending_count;
218 return Pending();
219 }
220 last_write_status = *result;
221 if (!result->ok()) {
222 // We hit an error-- call it quits.
223 return Ready();
224 }
225 ++write_count;
226
227 Poll<std::optional<MultiBuf>> multibuf_result =
228 channel_.PendAllocateWriteBuffer(cx, data_to_write_.size());
229 PW_CHECK(multibuf_result.IsReady());
230 PW_CHECK(multibuf_result->has_value());
231 MultiBuf& multibuf = **multibuf_result;
232 std::copy(data_to_write_.begin(), data_to_write_.end(), multibuf.begin());
233
234 last_write_status = channel_.StageWrite(std::move(multibuf));
235
236 Poll<pw::Status> write_status = channel_.PendWrite(cx);
237 if (write_status.IsPending()) {
238 return Pending();
239 }
240
241 PW_CHECK_OK(*write_status);
242 }
243
244 return Ready();
245 }
246
247 ChannelKind& channel_;
248 pw::ConstByteSpan data_to_write_;
249 };
250
TEST_F(EpollChannelTest,Write_ValidData_Succeeds)251 TEST_F(EpollChannelTest, Write_ValidData_Succeeds) {
252 SimpleAllocatorForTest alloc;
253 Dispatcher dispatcher;
254
255 EpollChannel channel(write_fd_, dispatcher, alloc);
256 ASSERT_TRUE(channel.is_read_open());
257 ASSERT_TRUE(channel.is_write_open());
258
259 constexpr auto kData = pw::bytes::Initialized<32>(0x3f);
260 WriterTask<ByteWriter> write_task(channel.channel(), 1, kData);
261 dispatcher.Post(write_task);
262
263 dispatcher.RunToCompletion();
264 EXPECT_EQ(write_task.last_write_status, pw::OkStatus());
265
266 std::array<std::byte, 64> buffer;
267 EXPECT_EQ(read(read_fd_, buffer.data(), buffer.size()),
268 static_cast<int>(kData.size()));
269 EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
270
271 CloseTask close_task(channel);
272 dispatcher.Post(close_task);
273 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
274 EXPECT_EQ(close_task.close_status, pw::OkStatus());
275 }
276
TEST_F(EpollChannelTest,Write_EmptyData_Succeeds)277 TEST_F(EpollChannelTest, Write_EmptyData_Succeeds) {
278 SimpleAllocatorForTest alloc;
279 Dispatcher dispatcher;
280
281 EpollChannel channel(write_fd_, dispatcher, alloc);
282 ASSERT_TRUE(channel.is_read_open());
283 ASSERT_TRUE(channel.is_write_open());
284
285 WriterTask<ByteWriter> write_task(channel.channel(), 1, {});
286 dispatcher.Post(write_task);
287
288 dispatcher.RunToCompletion();
289 EXPECT_EQ(write_task.last_write_status, pw::OkStatus());
290
291 CloseTask close_task(channel);
292 dispatcher.Post(close_task);
293 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
294 EXPECT_EQ(close_task.close_status, pw::OkStatus());
295 }
296
TEST_F(EpollChannelTest,Write_Closed_ReturnsFailedPrecondition)297 TEST_F(EpollChannelTest, Write_Closed_ReturnsFailedPrecondition) {
298 SimpleAllocatorForTest alloc;
299 Dispatcher dispatcher;
300
301 EpollChannel channel(write_fd_, dispatcher, alloc);
302 ASSERT_TRUE(channel.is_read_open());
303 ASSERT_TRUE(channel.is_write_open());
304
305 CloseTask close_task(channel);
306 dispatcher.Post(close_task);
307 EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
308 EXPECT_EQ(close_task.close_status, pw::OkStatus());
309
310 WriterTask<ByteWriter> write_task(channel.channel(), 1, {});
311 dispatcher.Post(write_task);
312
313 dispatcher.RunToCompletion();
314 EXPECT_EQ(write_task.last_write_status, pw::Status::FailedPrecondition());
315 }
316
TEST_F(EpollChannelTest,Destructor_ClosesFileDescriptor)317 TEST_F(EpollChannelTest, Destructor_ClosesFileDescriptor) {
318 SimpleAllocatorForTest alloc;
319 Dispatcher dispatcher;
320
321 {
322 EpollChannel channel(write_fd_, dispatcher, alloc);
323 ASSERT_TRUE(channel.is_read_open());
324 ASSERT_TRUE(channel.is_write_open());
325 }
326
327 const char kArbitraryByte = 'b';
328 EXPECT_EQ(write(write_fd_, &kArbitraryByte, 1), -1);
329 EXPECT_EQ(errno, EBADF);
330 }
331
TEST_F(EpollChannelTest,PendReadyToWrite_BlocksWhenUnavailable)332 TEST_F(EpollChannelTest, PendReadyToWrite_BlocksWhenUnavailable) {
333 SimpleAllocatorForTest alloc;
334 Dispatcher dispatcher;
335 EpollChannel channel(write_fd_, dispatcher, alloc);
336 ASSERT_TRUE(channel.is_read_open());
337 ASSERT_TRUE(channel.is_write_open());
338
339 constexpr auto kData =
340 pw::bytes::Initialized<decltype(alloc)::data_size_bytes()>('c');
341 WriterTask<ByteWriter> write_task(
342 channel.channel(),
343 100, // Max writes set to some high number so the task fills the pipe.
344 pw::ConstByteSpan(kData));
345 dispatcher.Post(write_task);
346
347 // Try to write a bunch of data, eventually filling the pipe and blocking the
348 // task.
349 EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
350 EXPECT_EQ(write_task.poll_count, 1);
351 EXPECT_EQ(write_task.write_pending_count, 1);
352 EXPECT_EQ(write_task.last_write_status, pw::Status::Unavailable());
353
354 const int writes_to_drain = write_task.write_count;
355
356 // End the task on the next successful write.
357 write_task.max_writes = write_task.write_count + 1;
358
359 // Drain the pipe to make it writable again after a delay.
360 FunctionThread delayed_read([this, writes_to_drain]() {
361 pw::this_thread::sleep_for(500ms);
362 for (int i = 0; i < writes_to_drain; ++i) {
363 std::array<std::byte, decltype(alloc)::data_size_bytes()> buffer;
364 PW_CHECK_INT_GT(read(read_fd_, buffer.data(), buffer.size()), 0);
365 }
366 });
367
368 pw::Thread work_thread(pw::thread::stl::Options(), delayed_read);
369
370 dispatcher.RunToCompletion();
371 work_thread.join();
372
373 EXPECT_EQ(write_task.poll_count, 2);
374 EXPECT_EQ(write_task.last_write_status, pw::OkStatus());
375 }
376
377 } // namespace
378