1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 // Posix implementation for gpr threads.
20 
21 #include <grpc/support/port_platform.h>
22 
23 #include <string>
24 
25 #include <grpc/support/time.h>
26 
27 #ifdef GPR_POSIX_SYNC
28 
29 #include <pthread.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <unistd.h>
33 
34 #include <grpc/support/log.h>
35 #include <grpc/support/sync.h>
36 #include <grpc/support/thd_id.h>
37 
38 #include "src/core/lib/gpr/useful.h"
39 #include "src/core/lib/gprpp/crash.h"
40 #include "src/core/lib/gprpp/fork.h"
41 #include "src/core/lib/gprpp/strerror.h"
42 #include "src/core/lib/gprpp/thd.h"
43 
44 namespace grpc_core {
45 namespace {
46 class ThreadInternalsPosix;
47 
48 struct thd_arg {
49   ThreadInternalsPosix* thread;
50   void (*body)(void* arg);  // body of a thread
51   void* arg;                // argument to a thread
52   const char* name;         // name of thread. Can be nullptr.
53   bool joinable;
54   bool tracked;
55 };
56 
RoundUpToPageSize(size_t size)57 size_t RoundUpToPageSize(size_t size) {
58   // TODO(yunjiaw): Change this variable (page_size) to a function-level static
59   // when possible
60   size_t page_size = static_cast<size_t>(sysconf(_SC_PAGESIZE));
61   return (size + page_size - 1) & ~(page_size - 1);
62 }
63 
64 // Returns the minimum valid stack size that can be passed to
65 // pthread_attr_setstacksize.
MinValidStackSize(size_t request_size)66 size_t MinValidStackSize(size_t request_size) {
67   size_t min_stacksize = sysconf(_SC_THREAD_STACK_MIN);
68   if (request_size < min_stacksize) {
69     request_size = min_stacksize;
70   }
71 
72   // On some systems, pthread_attr_setstacksize() can fail if stacksize is
73   // not a multiple of the system page size.
74   return RoundUpToPageSize(request_size);
75 }
76 
77 class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
78  public:
ThreadInternalsPosix(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Thread::Options & options)79   ThreadInternalsPosix(const char* thd_name, void (*thd_body)(void* arg),
80                        void* arg, bool* success, const Thread::Options& options)
81       : started_(false) {
82     gpr_mu_init(&mu_);
83     gpr_cv_init(&ready_);
84     pthread_attr_t attr;
85     // don't use gpr_malloc as we may cause an infinite recursion with
86     // the profiling code
87     thd_arg* info = static_cast<thd_arg*>(malloc(sizeof(*info)));
88     GPR_ASSERT(info != nullptr);
89     info->thread = this;
90     info->body = thd_body;
91     info->arg = arg;
92     info->name = thd_name;
93     info->joinable = options.joinable();
94     info->tracked = options.tracked();
95     if (options.tracked()) {
96       Fork::IncThreadCount();
97     }
98 
99     GPR_ASSERT(pthread_attr_init(&attr) == 0);
100     if (options.joinable()) {
101       GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE) ==
102                  0);
103     } else {
104       GPR_ASSERT(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ==
105                  0);
106     }
107 
108     if (options.stack_size() != 0) {
109       size_t stack_size = MinValidStackSize(options.stack_size());
110       GPR_ASSERT(pthread_attr_setstacksize(&attr, stack_size) == 0);
111     }
112 
113     int pthread_create_err = pthread_create(
114         &pthread_id_, &attr,
115         [](void* v) -> void* {
116           thd_arg arg = *static_cast<thd_arg*>(v);
117           free(v);
118           if (arg.name != nullptr) {
119 #if GPR_APPLE_PTHREAD_NAME
120             // Apple supports 64 characters, and will
121             // truncate if it's longer.
122             pthread_setname_np(arg.name);
123 #elif GPR_LINUX_PTHREAD_NAME
124             // Linux supports 16 characters max, and will
125             // error if it's longer.
126             char buf[16];
127             size_t buf_len = GPR_ARRAY_SIZE(buf) - 1;
128             strncpy(buf, arg.name, buf_len);
129             buf[buf_len] = '\0';
130             pthread_setname_np(pthread_self(), buf);
131 #endif  // GPR_APPLE_PTHREAD_NAME
132           }
133 
134           gpr_mu_lock(&arg.thread->mu_);
135           while (!arg.thread->started_) {
136             gpr_cv_wait(&arg.thread->ready_, &arg.thread->mu_,
137                         gpr_inf_future(GPR_CLOCK_MONOTONIC));
138           }
139           gpr_mu_unlock(&arg.thread->mu_);
140 
141           if (!arg.joinable) {
142             delete arg.thread;
143           }
144 
145           (*arg.body)(arg.arg);
146           if (arg.tracked) {
147             Fork::DecThreadCount();
148           }
149           return nullptr;
150         },
151         info);
152     *success = (pthread_create_err == 0);
153 
154     GPR_ASSERT(pthread_attr_destroy(&attr) == 0);
155 
156     if (!(*success)) {
157       gpr_log(GPR_ERROR, "pthread_create failed: %s",
158               StrError(pthread_create_err).c_str());
159       // don't use gpr_free, as this was allocated using malloc (see above)
160       free(info);
161       if (options.tracked()) {
162         Fork::DecThreadCount();
163       }
164     }
165   }
166 
~ThreadInternalsPosix()167   ~ThreadInternalsPosix() override {
168     gpr_mu_destroy(&mu_);
169     gpr_cv_destroy(&ready_);
170   }
171 
Start()172   void Start() override {
173     gpr_mu_lock(&mu_);
174     started_ = true;
175     gpr_cv_signal(&ready_);
176     gpr_mu_unlock(&mu_);
177   }
178 
Join()179   void Join() override {
180     int pthread_join_err = pthread_join(pthread_id_, nullptr);
181     if (pthread_join_err != 0) {
182       Crash("pthread_join failed: " + StrError(pthread_join_err));
183     }
184   }
185 
186  private:
187   gpr_mu mu_;
188   gpr_cv ready_;
189   bool started_;
190   pthread_t pthread_id_;
191 };
192 
193 }  // namespace
194 
Thread(const char * thd_name,void (* thd_body)(void * arg),void * arg,bool * success,const Options & options)195 Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
196                bool* success, const Options& options)
197     : options_(options) {
198   bool outcome = false;
199   impl_ = new ThreadInternalsPosix(thd_name, thd_body, arg, &outcome, options);
200   if (outcome) {
201     state_ = ALIVE;
202   } else {
203     state_ = FAILED;
204     delete impl_;
205     impl_ = nullptr;
206   }
207 
208   if (success != nullptr) {
209     *success = outcome;
210   }
211 }
212 }  // namespace grpc_core
213 
214 // The following is in the external namespace as it is exposed as C89 API
gpr_thd_currentid(void)215 gpr_thd_id gpr_thd_currentid(void) {
216   // Use C-style casting because Linux and OSX have different definitions
217   // of pthread_t so that a single C++ cast doesn't handle it.
218   // NOLINTNEXTLINE(google-readability-casting)
219   return (gpr_thd_id)pthread_self();
220 }
221 
222 #endif  // GPR_POSIX_SYNC
223