1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 // Posix implementation for gpr threads.
20
21 #include <grpc/support/port_platform.h>
22
23 #include <string>
24
25 #include <grpc/support/time.h>
26
27 #ifdef GPR_POSIX_SYNC
28
29 #include <pthread.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
33
34 #include <grpc/support/log.h>
35 #include <grpc/support/sync.h>
36 #include <grpc/support/thd_id.h>
37
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/gprpp/crash.h"
40 #include "src/core/lib/gprpp/fork.h"
41 #include "src/core/lib/gprpp/strerror.h"
42 #include "src/core/lib/gprpp/thd.h"
43
44 namespace grpc_core {
45 namespace {
46 class ThreadInternalsPosix;
47
48 struct thd_arg {
49 ThreadInternalsPosix* thread;
50 void (*body)(void* arg); // body of a thread
51 void* arg; // argument to a thread
52 const char* name; // name of thread. Can be nullptr.
53 bool joinable;
54 bool tracked;
55 };
56
RoundUpToPageSize(size_t size)57 size_t RoundUpToPageSize(size_t size) {
58 // TODO(yunjiaw): Change this variable (page_size) to a function-level static
59 // when possible
60 size_t page_size = static_cast<size_t>(sysconf(_SC_PAGESIZE));
61 return (size + page_size - 1) & ~(page_size - 1);
62 }
63
64 // Returns the minimum valid stack size that can be passed to
65 // pthread_attr_setstacksize.
MinValidStackSize(size_t request_size)66 size_t MinValidStackSize(size_t request_size) {
67 size_t min_stacksize = sysconf(_SC_THREAD_STACK_MIN);
68 if (request_size < min_stacksize) {
69 request_size = min_stacksize;
70 }
71
72 // On some systems, pthread_attr_setstacksize() can fail if stacksize is
73 // not a multiple of the system page size.
74 return RoundUpToPageSize(request_size);
75 }
76
77 class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
78 public:
ThreadInternalsPosix(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Thread::Options & options)79 ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
80 void* arg, bool* success, const Thread::Options& options)
81 : started_(false) {
82 gpr_mu_init(&mu_);
83 gpr_cv_init(&ready_);
84 pthread_attr_t attr;
85 // don't use gpr_malloc as we may cause an infinite recursion with
86 // the profiling code
87 thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
88 GPR_ASSERT(info != nullptr);
89 info->thread = this;
90 info->body = thd_body;
91 info->arg = arg;
92 info->name = thd_name;
93 info->joinable = options.joinable();
94 info->tracked = options.tracked();
95 if (options.tracked()) {
96 Fork::IncThreadCount();
97 }
98
99 GPR_ASSERT(pthread_attr_init(&attr) == 0);
100 if (options.joinable()) {
101 GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
102 0);
103 } else {
104 GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
105 0);
106 }
107
108 if (options.stack_size() != 0) {
109 size_t stack_size = MinValidStackSize(options.stack_size());
110 GPR_ASSERT(pthread_attr_setstacksize(&attr, stack_size) == 0);
111 }
112
113 int pthread_create_err = pthread_create(
114 &pthread_id_, &attr,
115 [](void* v) -> void* {
116 thd_arg arg = *static_cast<thd_arg*>(v);
117 free(v);
118 if (arg.name != nullptr) {
119 #if GPR_APPLE_PTHREAD_NAME
120 // Apple supports 64 characters, and will
121 // truncate if it's longer.
122 pthread_setname_np(arg.name);
123 #elif GPR_LINUX_PTHREAD_NAME
124 // Linux supports 16 characters max, and will
125 // error if it's longer.
126 char buf[16];
127 size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
128 strncpy(buf, arg.name, buf_len);
129 buf[buf_len] = '\0';
130 pthread_setname_np(pthread_self(), buf);
131 #endif // GPR_APPLE_PTHREAD_NAME
132 }
133
134 gpr_mu_lock(&arg.thread->mu_);
135 while (!arg.thread->started_) {
136 gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
137 gpr_inf_future(GPR_CLOCK_MONOTONIC));
138 }
139 gpr_mu_unlock(&arg.thread->mu_);
140
141 if (!arg.joinable) {
142 delete arg.thread;
143 }
144
145 (*arg.body)(arg.arg);
146 if (arg.tracked) {
147 Fork::DecThreadCount();
148 }
149 return nullptr;
150 },
151 info);
152 *success = (pthread_create_err == 0);
153
154 GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
155
156 if (!(*success)) {
157 gpr_log(GPR_ERROR, "pthread_create failed: %s",
158 StrError(pthread_create_err).c_str());
159 // don't use gpr_free, as this was allocated using malloc (see above)
160 free(info);
161 if (options.tracked()) {
162 Fork::DecThreadCount();
163 }
164 }
165 }
166
~ThreadInternalsPosix()167 ~ThreadInternalsPosix() override {
168 gpr_mu_destroy(&mu_);
169 gpr_cv_destroy(&ready_);
170 }
171
Start()172 void Start() override {
173 gpr_mu_lock(&mu_);
174 started_ = true;
175 gpr_cv_signal(&ready_);
176 gpr_mu_unlock(&mu_);
177 }
178
Join()179 void Join() override {
180 int pthread_join_err = pthread_join(pthread_id_, nullptr);
181 if (pthread_join_err != 0) {
182 Crash("pthread_join failed: " + StrError(pthread_join_err));
183 }
184 }
185
186 private:
187 gpr_mu mu_;
188 gpr_cv ready_;
189 bool started_;
190 pthread_t pthread_id_;
191 };
192
193 } // namespace
194
Thread(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Options & options)195 Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
196 bool* success, const Options& options)
197 : options_(options) {
198 bool outcome = false;
199 impl_ = new ThreadInternalsPosix(thd_name, thd_body, arg, &outcome, options);
200 if (outcome) {
201 state_ = ALIVE;
202 } else {
203 state_ = FAILED;
204 delete impl_;
205 impl_ = nullptr;
206 }
207
208 if (success != nullptr) {
209 *success = outcome;
210 }
211 }
212 } // namespace grpc_core
213
214 // The following is in the external namespace as it is exposed as C89 API
gpr_thd_currentid(void)215 gpr_thd_id gpr_thd_currentid(void) {
216 // Use C-style casting because Linux and OSX have different definitions
217 // of pthread_t so that a single C++ cast doesn't handle it.
218 // NOLINTNEXTLINE(google-readability-casting)
219 return (gpr_thd_id)pthread_self();
220 }
221
222 #endif // GPR_POSIX_SYNC
223