xref: /aosp_15_r20/external/pigweed/pw_channel/epoll_channel_test.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_channel/epoll_channel.h"
16 
17 #include <fcntl.h>
18 #include <sys/stat.h>
19 #include <sys/types.h>
20 #include <unistd.h>
21 
22 #include "gtest/gtest.h"
23 #include "pw_assert/check.h"
24 #include "pw_async2/dispatcher.h"
25 #include "pw_bytes/array.h"
26 #include "pw_bytes/suffix.h"
27 #include "pw_channel/channel.h"
28 #include "pw_multibuf/simple_allocator_for_test.h"
29 #include "pw_status/status.h"
30 #include "pw_thread/sleep.h"
31 #include "pw_thread/thread.h"
32 #include "pw_thread_stl/options.h"
33 
34 namespace {
35 
36 using namespace std::chrono_literals;
37 
38 using ::pw::async2::Context;
39 using ::pw::async2::Dispatcher;
40 using ::pw::async2::Pending;
41 using ::pw::async2::Poll;
42 using ::pw::async2::Ready;
43 using ::pw::async2::Task;
44 using ::pw::channel::ByteReader;
45 using ::pw::channel::ByteWriter;
46 using ::pw::channel::EpollChannel;
47 using ::pw::multibuf::MultiBuf;
48 using ::pw::multibuf::test::SimpleAllocatorForTest;
49 
50 template <typename ChannelKind>
51 class ReaderTask : public Task {
52  public:
ReaderTask(ChannelKind & channel,int num_reads)53   ReaderTask(ChannelKind& channel, int num_reads)
54       : channel_(channel), num_reads_(num_reads) {}
55 
56   int poll_count = 0;
57   int read_count = 0;
58   int bytes_read = 0;
59   pw::Status read_status = pw::Status::Unknown();
60 
61  private:
DoPend(Context & cx)62   Poll<> DoPend(Context& cx) final {
63     ++poll_count;
64     while (read_count < num_reads_) {
65       auto result = channel_.PendRead(cx);
66       if (result.IsPending()) {
67         return Pending();
68       }
69       read_status = result->status();
70       if (!result->ok()) {
71         // We hit an error-- call it quits.
72         return Ready();
73       }
74       ++read_count;
75       bytes_read += (**result).size();
76 
77       (**result).Release();
78     }
79 
80     return Ready();
81   }
82 
83   ChannelKind& channel_;
84   int num_reads_;
85 };
86 
87 template <typename ChannelKind>
88 class CloseTask : public Task {
89  public:
CloseTask(ChannelKind & channel)90   CloseTask(ChannelKind& channel) : channel_(channel) {}
91 
92   pw::Status close_status = pw::Status::Unknown();
93 
94  private:
DoPend(Context & cx)95   Poll<> DoPend(Context& cx) final {
96     auto result = channel_.PendClose(cx);
97     if (result.IsPending()) {
98       return Pending();
99     }
100 
101     close_status = *result;
102     return Ready();
103   }
104 
105   ChannelKind& channel_;
106 };
107 
108 class EpollChannelTest : public ::testing::Test {
109  protected:
EpollChannelTest()110   EpollChannelTest() {
111     int pipefd[2];
112     PW_CHECK_INT_NE(pipe(pipefd), -1);
113     read_fd_ = pipefd[0];
114     write_fd_ = pipefd[1];
115   }
116 
~EpollChannelTest()117   ~EpollChannelTest() override {
118     close(read_fd_);
119     close(write_fd_);
120   }
121 
122   int read_fd_;
123   int write_fd_;
124 };
125 
126 template <typename Func>
127 class FunctionThread : public pw::thread::ThreadCore {
128  public:
FunctionThread(Func && func)129   explicit FunctionThread(Func&& func) : func_(std::move(func)) {}
130 
131  private:
Run()132   void Run() override { func_(); }
133 
134   Func func_;
135 };
136 
TEST_F(EpollChannelTest,Read_ValidData_Succeeds)137 TEST_F(EpollChannelTest, Read_ValidData_Succeeds) {
138   SimpleAllocatorForTest alloc;
139   Dispatcher dispatcher;
140 
141   EpollChannel channel(read_fd_, dispatcher, alloc);
142   ASSERT_TRUE(channel.is_read_open());
143   ASSERT_TRUE(channel.is_write_open());
144 
145   ReaderTask<ByteReader> read_task(channel.channel(), 1);
146   dispatcher.Post(read_task);
147 
148   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
149   EXPECT_EQ(read_task.poll_count, 1);
150   EXPECT_EQ(read_task.read_count, 0);
151   EXPECT_EQ(read_task.bytes_read, 0);
152 
153   FunctionThread delayed_write([this]() {
154     pw::this_thread::sleep_for(500ms);
155     const char* data = "hello world";
156     PW_CHECK_INT_EQ(write(write_fd_, data, 11), 11);
157   });
158 
159   pw::Thread work_thread(pw::thread::stl::Options(), delayed_write);
160   work_thread.join();
161 
162   dispatcher.RunToCompletion();
163   EXPECT_EQ(read_task.read_status, pw::OkStatus());
164   EXPECT_EQ(read_task.poll_count, 2);
165   EXPECT_EQ(read_task.read_count, 1);
166   EXPECT_EQ(read_task.bytes_read, 11);
167 
168   CloseTask close_task(channel);
169   dispatcher.Post(close_task);
170   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
171   EXPECT_EQ(close_task.close_status, pw::OkStatus());
172 }
173 
TEST_F(EpollChannelTest,Read_Closed_ReturnsFailedPrecondition)174 TEST_F(EpollChannelTest, Read_Closed_ReturnsFailedPrecondition) {
175   SimpleAllocatorForTest alloc;
176   Dispatcher dispatcher;
177 
178   EpollChannel channel(read_fd_, dispatcher, alloc);
179   ASSERT_TRUE(channel.is_read_open());
180   ASSERT_TRUE(channel.is_write_open());
181 
182   CloseTask close_task(channel);
183   dispatcher.Post(close_task);
184   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
185   EXPECT_EQ(close_task.close_status, pw::OkStatus());
186 
187   ReaderTask<ByteReader> read_task(channel.channel(), 1);
188   dispatcher.Post(read_task);
189 
190   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
191   EXPECT_EQ(read_task.read_status, pw::Status::FailedPrecondition());
192 }
193 
194 template <typename ChannelKind>
195 class WriterTask : public Task {
196  public:
WriterTask(ChannelKind & channel,int num_writes,pw::ConstByteSpan data_to_write)197   WriterTask(ChannelKind& channel,
198              int num_writes,
199              pw::ConstByteSpan data_to_write)
200       : max_writes(num_writes),
201         channel_(channel),
202         data_to_write_(data_to_write) {}
203 
204   int poll_count = 0;
205   int write_pending_count = 0;
206   int write_count = 0;
207   int max_writes = 0;
208   pw::Status last_write_status = pw::Status::Unknown();
209 
210  private:
DoPend(Context & cx)211   Poll<> DoPend(Context& cx) final {
212     ++poll_count;
213 
214     while (write_count < max_writes) {
215       auto result = channel_.PendReadyToWrite(cx);
216       if (result.IsPending()) {
217         ++write_pending_count;
218         return Pending();
219       }
220       last_write_status = *result;
221       if (!result->ok()) {
222         // We hit an error-- call it quits.
223         return Ready();
224       }
225       ++write_count;
226 
227       Poll<std::optional<MultiBuf>> multibuf_result =
228           channel_.PendAllocateWriteBuffer(cx, data_to_write_.size());
229       PW_CHECK(multibuf_result.IsReady());
230       PW_CHECK(multibuf_result->has_value());
231       MultiBuf& multibuf = **multibuf_result;
232       std::copy(data_to_write_.begin(), data_to_write_.end(), multibuf.begin());
233 
234       last_write_status = channel_.StageWrite(std::move(multibuf));
235 
236       Poll<pw::Status> write_status = channel_.PendWrite(cx);
237       if (write_status.IsPending()) {
238         return Pending();
239       }
240 
241       PW_CHECK_OK(*write_status);
242     }
243 
244     return Ready();
245   }
246 
247   ChannelKind& channel_;
248   pw::ConstByteSpan data_to_write_;
249 };
250 
TEST_F(EpollChannelTest,Write_ValidData_Succeeds)251 TEST_F(EpollChannelTest, Write_ValidData_Succeeds) {
252   SimpleAllocatorForTest alloc;
253   Dispatcher dispatcher;
254 
255   EpollChannel channel(write_fd_, dispatcher, alloc);
256   ASSERT_TRUE(channel.is_read_open());
257   ASSERT_TRUE(channel.is_write_open());
258 
259   constexpr auto kData = pw::bytes::Initialized<32>(0x3f);
260   WriterTask<ByteWriter> write_task(channel.channel(), 1, kData);
261   dispatcher.Post(write_task);
262 
263   dispatcher.RunToCompletion();
264   EXPECT_EQ(write_task.last_write_status, pw::OkStatus());
265 
266   std::array<std::byte, 64> buffer;
267   EXPECT_EQ(read(read_fd_, buffer.data(), buffer.size()),
268             static_cast<int>(kData.size()));
269   EXPECT_EQ(std::memcmp(buffer.data(), kData.data(), kData.size()), 0);
270 
271   CloseTask close_task(channel);
272   dispatcher.Post(close_task);
273   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
274   EXPECT_EQ(close_task.close_status, pw::OkStatus());
275 }
276 
TEST_F(EpollChannelTest,Write_EmptyData_Succeeds)277 TEST_F(EpollChannelTest, Write_EmptyData_Succeeds) {
278   SimpleAllocatorForTest alloc;
279   Dispatcher dispatcher;
280 
281   EpollChannel channel(write_fd_, dispatcher, alloc);
282   ASSERT_TRUE(channel.is_read_open());
283   ASSERT_TRUE(channel.is_write_open());
284 
285   WriterTask<ByteWriter> write_task(channel.channel(), 1, {});
286   dispatcher.Post(write_task);
287 
288   dispatcher.RunToCompletion();
289   EXPECT_EQ(write_task.last_write_status, pw::OkStatus());
290 
291   CloseTask close_task(channel);
292   dispatcher.Post(close_task);
293   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
294   EXPECT_EQ(close_task.close_status, pw::OkStatus());
295 }
296 
TEST_F(EpollChannelTest,Write_Closed_ReturnsFailedPrecondition)297 TEST_F(EpollChannelTest, Write_Closed_ReturnsFailedPrecondition) {
298   SimpleAllocatorForTest alloc;
299   Dispatcher dispatcher;
300 
301   EpollChannel channel(write_fd_, dispatcher, alloc);
302   ASSERT_TRUE(channel.is_read_open());
303   ASSERT_TRUE(channel.is_write_open());
304 
305   CloseTask close_task(channel);
306   dispatcher.Post(close_task);
307   EXPECT_EQ(dispatcher.RunUntilStalled(), Ready());
308   EXPECT_EQ(close_task.close_status, pw::OkStatus());
309 
310   WriterTask<ByteWriter> write_task(channel.channel(), 1, {});
311   dispatcher.Post(write_task);
312 
313   dispatcher.RunToCompletion();
314   EXPECT_EQ(write_task.last_write_status, pw::Status::FailedPrecondition());
315 }
316 
TEST_F(EpollChannelTest,Destructor_ClosesFileDescriptor)317 TEST_F(EpollChannelTest, Destructor_ClosesFileDescriptor) {
318   SimpleAllocatorForTest alloc;
319   Dispatcher dispatcher;
320 
321   {
322     EpollChannel channel(write_fd_, dispatcher, alloc);
323     ASSERT_TRUE(channel.is_read_open());
324     ASSERT_TRUE(channel.is_write_open());
325   }
326 
327   const char kArbitraryByte = 'b';
328   EXPECT_EQ(write(write_fd_, &kArbitraryByte, 1), -1);
329   EXPECT_EQ(errno, EBADF);
330 }
331 
TEST_F(EpollChannelTest,PendReadyToWrite_BlocksWhenUnavailable)332 TEST_F(EpollChannelTest, PendReadyToWrite_BlocksWhenUnavailable) {
333   SimpleAllocatorForTest alloc;
334   Dispatcher dispatcher;
335   EpollChannel channel(write_fd_, dispatcher, alloc);
336   ASSERT_TRUE(channel.is_read_open());
337   ASSERT_TRUE(channel.is_write_open());
338 
339   constexpr auto kData =
340       pw::bytes::Initialized<decltype(alloc)::data_size_bytes()>('c');
341   WriterTask<ByteWriter> write_task(
342       channel.channel(),
343       100,  // Max writes set to some high number so the task fills the pipe.
344       pw::ConstByteSpan(kData));
345   dispatcher.Post(write_task);
346 
347   // Try to write a bunch of data, eventually filling the pipe and blocking the
348   // task.
349   EXPECT_EQ(dispatcher.RunUntilStalled(), Pending());
350   EXPECT_EQ(write_task.poll_count, 1);
351   EXPECT_EQ(write_task.write_pending_count, 1);
352   EXPECT_EQ(write_task.last_write_status, pw::Status::Unavailable());
353 
354   const int writes_to_drain = write_task.write_count;
355 
356   // End the task on the next successful write.
357   write_task.max_writes = write_task.write_count + 1;
358 
359   // Drain the pipe to make it writable again after a delay.
360   FunctionThread delayed_read([this, writes_to_drain]() {
361     pw::this_thread::sleep_for(500ms);
362     for (int i = 0; i < writes_to_drain; ++i) {
363       std::array<std::byte, decltype(alloc)::data_size_bytes()> buffer;
364       PW_CHECK_INT_GT(read(read_fd_, buffer.data(), buffer.size()), 0);
365     }
366   });
367 
368   pw::Thread work_thread(pw::thread::stl::Options(), delayed_read);
369 
370   dispatcher.RunToCompletion();
371   work_thread.join();
372 
373   EXPECT_EQ(write_task.poll_count, 2);
374   EXPECT_EQ(write_task.last_write_status, pw::OkStatus());
375 }
376 
377 }  // namespace
378