1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include <memory>
18 #include <utility>
19 
20 #include "absl/strings/str_cat.h"
21 #include "absl/strings/string_view.h"
22 
23 #include "src/core/lib/gprpp/crash.h"  // IWYU pragma: keep
24 #include "src/core/lib/iomgr/port.h"
25 
26 #ifdef GRPC_POSIX_WAKEUP_FD
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <unistd.h>
30 
31 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
32 #endif
33 
34 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
35 #include "src/core/lib/gprpp/strerror.h"
36 
37 namespace grpc_event_engine {
38 namespace experimental {
39 
40 #ifdef GRPC_POSIX_WAKEUP_FD
41 
42 namespace {
43 
SetSocketNonBlocking(int fd)44 absl::Status SetSocketNonBlocking(int fd) {
45   int oldflags = fcntl(fd, F_GETFL, 0);
46   if (oldflags < 0) {
47     return absl::Status(absl::StatusCode::kInternal,
48                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
49   }
50 
51   oldflags |= O_NONBLOCK;
52 
53   if (fcntl(fd, F_SETFL, oldflags) != 0) {
54     return absl::Status(absl::StatusCode::kInternal,
55                         absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
56   }
57 
58   return absl::OkStatus();
59 }
60 }  // namespace
61 
Init()62 absl::Status PipeWakeupFd::Init() {
63   int pipefd[2];
64   int r = pipe(pipefd);
65   if (0 != r) {
66     return absl::Status(absl::StatusCode::kInternal,
67                         absl::StrCat("pipe: ", grpc_core::StrError(errno)));
68   }
69   auto status = SetSocketNonBlocking(pipefd[0]);
70   if (!status.ok()) return status;
71   status = SetSocketNonBlocking(pipefd[1]);
72   if (!status.ok()) return status;
73   SetWakeupFds(pipefd[0], pipefd[1]);
74   return absl::OkStatus();
75 }
76 
ConsumeWakeup()77 absl::Status PipeWakeupFd::ConsumeWakeup() {
78   char buf[128];
79   ssize_t r;
80 
81   for (;;) {
82     r = read(ReadFd(), buf, sizeof(buf));
83     if (r > 0) continue;
84     if (r == 0) return absl::OkStatus();
85     switch (errno) {
86       case EAGAIN:
87         return absl::OkStatus();
88       case EINTR:
89         continue;
90       default:
91         return absl::Status(absl::StatusCode::kInternal,
92                             absl::StrCat("read: ", grpc_core::StrError(errno)));
93     }
94   }
95 }
96 
Wakeup()97 absl::Status PipeWakeupFd::Wakeup() {
98   char c = 0;
99   while (write(WriteFd(), &c, 1) != 1 && errno == EINTR) {
100   }
101   return absl::OkStatus();
102 }
103 
~PipeWakeupFd()104 PipeWakeupFd::~PipeWakeupFd() {
105   if (ReadFd() != 0) {
106     close(ReadFd());
107   }
108   if (WriteFd() != 0) {
109     close(WriteFd());
110   }
111 }
112 
IsSupported()113 bool PipeWakeupFd::IsSupported() {
114   PipeWakeupFd pipe_wakeup_fd;
115   return pipe_wakeup_fd.Init().ok();
116 }
117 
CreatePipeWakeupFd()118 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
119   static bool kIsPipeWakeupFdSupported = PipeWakeupFd::IsSupported();
120   if (kIsPipeWakeupFdSupported) {
121     auto pipe_wakeup_fd = std::make_unique<PipeWakeupFd>();
122     auto status = pipe_wakeup_fd->Init();
123     if (status.ok()) {
124       return std::unique_ptr<WakeupFd>(std::move(pipe_wakeup_fd));
125     }
126     return status;
127   }
128   return absl::NotFoundError("Pipe wakeup fd is not supported");
129 }
130 
131 #else  //  GRPC_POSIX_WAKEUP_FD
132 
133 absl::Status PipeWakeupFd::Init() { grpc_core::Crash("unimplemented"); }
134 
135 absl::Status PipeWakeupFd::ConsumeWakeup() {
136   grpc_core::Crash("unimplemented");
137 }
138 
139 absl::Status PipeWakeupFd::Wakeup() { grpc_core::Crash("unimplemented"); }
140 
141 bool PipeWakeupFd::IsSupported() { return false; }
142 
143 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
144   return absl::NotFoundError("Pipe wakeup fd is not supported");
145 }
146 
147 #endif  //  GRPC_POSIX_WAKEUP_FD
148 
149 }  // namespace experimental
150 }  // namespace grpc_event_engine
151