1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include <memory>
18 #include <utility>
19
20 #include "absl/strings/str_cat.h"
21 #include "absl/strings/string_view.h"
22
23 #include "src/core/lib/gprpp/crash.h" // IWYU pragma: keep
24 #include "src/core/lib/iomgr/port.h"
25
26 #ifdef GRPC_POSIX_WAKEUP_FD
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <unistd.h>
30
31 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
32 #endif
33
34 #include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
35 #include "src/core/lib/gprpp/strerror.h"
36
37 namespace grpc_event_engine {
38 namespace experimental {
39
40 #ifdef GRPC_POSIX_WAKEUP_FD
41
42 namespace {
43
SetSocketNonBlocking(int fd)44 absl::Status SetSocketNonBlocking(int fd) {
45 int oldflags = fcntl(fd, F_GETFL, 0);
46 if (oldflags < 0) {
47 return absl::Status(absl::StatusCode::kInternal,
48 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
49 }
50
51 oldflags |= O_NONBLOCK;
52
53 if (fcntl(fd, F_SETFL, oldflags) != 0) {
54 return absl::Status(absl::StatusCode::kInternal,
55 absl::StrCat("fcntl: ", grpc_core::StrError(errno)));
56 }
57
58 return absl::OkStatus();
59 }
60 } // namespace
61
Init()62 absl::Status PipeWakeupFd::Init() {
63 int pipefd[2];
64 int r = pipe(pipefd);
65 if (0 != r) {
66 return absl::Status(absl::StatusCode::kInternal,
67 absl::StrCat("pipe: ", grpc_core::StrError(errno)));
68 }
69 auto status = SetSocketNonBlocking(pipefd[0]);
70 if (!status.ok()) return status;
71 status = SetSocketNonBlocking(pipefd[1]);
72 if (!status.ok()) return status;
73 SetWakeupFds(pipefd[0], pipefd[1]);
74 return absl::OkStatus();
75 }
76
ConsumeWakeup()77 absl::Status PipeWakeupFd::ConsumeWakeup() {
78 char buf[128];
79 ssize_t r;
80
81 for (;;) {
82 r = read(ReadFd(), buf, sizeof(buf));
83 if (r > 0) continue;
84 if (r == 0) return absl::OkStatus();
85 switch (errno) {
86 case EAGAIN:
87 return absl::OkStatus();
88 case EINTR:
89 continue;
90 default:
91 return absl::Status(absl::StatusCode::kInternal,
92 absl::StrCat("read: ", grpc_core::StrError(errno)));
93 }
94 }
95 }
96
Wakeup()97 absl::Status PipeWakeupFd::Wakeup() {
98 char c = 0;
99 while (write(WriteFd(), &c, 1) != 1 && errno == EINTR) {
100 }
101 return absl::OkStatus();
102 }
103
~PipeWakeupFd()104 PipeWakeupFd::~PipeWakeupFd() {
105 if (ReadFd() != 0) {
106 close(ReadFd());
107 }
108 if (WriteFd() != 0) {
109 close(WriteFd());
110 }
111 }
112
IsSupported()113 bool PipeWakeupFd::IsSupported() {
114 PipeWakeupFd pipe_wakeup_fd;
115 return pipe_wakeup_fd.Init().ok();
116 }
117
CreatePipeWakeupFd()118 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
119 static bool kIsPipeWakeupFdSupported = PipeWakeupFd::IsSupported();
120 if (kIsPipeWakeupFdSupported) {
121 auto pipe_wakeup_fd = std::make_unique<PipeWakeupFd>();
122 auto status = pipe_wakeup_fd->Init();
123 if (status.ok()) {
124 return std::unique_ptr<WakeupFd>(std::move(pipe_wakeup_fd));
125 }
126 return status;
127 }
128 return absl::NotFoundError("Pipe wakeup fd is not supported");
129 }
130
131 #else // GRPC_POSIX_WAKEUP_FD
132
133 absl::Status PipeWakeupFd::Init() { grpc_core::Crash("unimplemented"); }
134
135 absl::Status PipeWakeupFd::ConsumeWakeup() {
136 grpc_core::Crash("unimplemented");
137 }
138
139 absl::Status PipeWakeupFd::Wakeup() { grpc_core::Crash("unimplemented"); }
140
141 bool PipeWakeupFd::IsSupported() { return false; }
142
143 absl::StatusOr<std::unique_ptr<WakeupFd>> PipeWakeupFd::CreatePipeWakeupFd() {
144 return absl::NotFoundError("Pipe wakeup fd is not supported");
145 }
146
147 #endif // GRPC_POSIX_WAKEUP_FD
148
149 } // namespace experimental
150 } // namespace grpc_event_engine
151