1 // 2 // 3 // Copyright 2015 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPCPP_SERVER_H 20 #define GRPCPP_SERVER_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <list> 25 #include <memory> 26 #include <vector> 27 28 #include <grpc/compression.h> 29 #include <grpc/support/atm.h> 30 #include <grpcpp/channel.h> 31 #include <grpcpp/completion_queue.h> 32 #include <grpcpp/health_check_service_interface.h> 33 #include <grpcpp/impl/call.h> 34 #include <grpcpp/impl/grpc_library.h> 35 #include <grpcpp/impl/rpc_service_method.h> 36 #include <grpcpp/security/server_credentials.h> 37 #include <grpcpp/server_interface.h> 38 #include <grpcpp/support/channel_arguments.h> 39 #include <grpcpp/support/client_interceptor.h> 40 #include <grpcpp/support/config.h> 41 #include <grpcpp/support/status.h> 42 43 struct grpc_server; 44 45 namespace grpc { 46 class AsyncGenericService; 47 class ServerContext; 48 class ServerInitializer; 49 50 namespace internal { 51 class ExternalConnectionAcceptorImpl; 52 } // namespace internal 53 54 /// Represents a gRPC server. 55 /// 56 /// Use a \a grpc::ServerBuilder to create, configure, and start 57 /// \a Server instances. 58 class Server : public ServerInterface, private internal::GrpcLibrary { 59 public: 60 ~Server() ABSL_LOCKS_EXCLUDED(mu_) override; 61 62 /// Block until the server shuts down. 63 /// 64 /// \warning The server must be either shutting down or some other thread must 65 /// call \a Shutdown for this function to ever return. 66 void Wait() ABSL_LOCKS_EXCLUDED(mu_) override; 67 68 /// Global callbacks are a set of hooks that are called when server 69 /// events occur. \a SetGlobalCallbacks method is used to register 70 /// the hooks with gRPC. Note that 71 /// the \a GlobalCallbacks instance will be shared among all 72 /// \a Server instances in an application and can be set exactly 73 /// once per application. 74 class GlobalCallbacks { 75 public: ~GlobalCallbacks()76 virtual ~GlobalCallbacks() {} 77 /// Called before server is created. UpdateArguments(ChannelArguments *)78 virtual void UpdateArguments(ChannelArguments* /*args*/) {} 79 /// Called before application callback for each synchronous server request 80 virtual void PreSynchronousRequest(ServerContext* context) = 0; 81 /// Called after application callback for each synchronous server request 82 virtual void PostSynchronousRequest(ServerContext* context) = 0; 83 /// Called before server is started. PreServerStart(Server *)84 virtual void PreServerStart(Server* /*server*/) {} 85 /// Called after a server port is added. AddPort(Server *,const std::string &,ServerCredentials *,int)86 virtual void AddPort(Server* /*server*/, const std::string& /*addr*/, 87 ServerCredentials* /*creds*/, int /*port*/) {} 88 }; 89 /// Set the global callback object. Can only be called once per application. 90 /// Does not take ownership of callbacks, and expects the pointed to object 91 /// to be alive until all server objects in the process have been destroyed. 92 /// The same \a GlobalCallbacks object will be used throughout the 93 /// application and is shared among all \a Server objects. 94 static void SetGlobalCallbacks(GlobalCallbacks* callbacks); 95 96 /// Returns a \em raw pointer to the underlying \a grpc_server instance. 97 /// EXPERIMENTAL: for internal/test use only 98 grpc_server* c_server(); 99 100 /// Returns the health check service. GetHealthCheckService()101 HealthCheckServiceInterface* GetHealthCheckService() const { 102 return health_check_service_.get(); 103 } 104 105 /// Establish a channel for in-process communication 106 std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args); 107 108 /// NOTE: class experimental_type is not part of the public API of this class. 109 /// TODO(yashykt): Integrate into public API when this is no longer 110 /// experimental. 111 class experimental_type { 112 public: experimental_type(Server * server)113 explicit experimental_type(Server* server) : server_(server) {} 114 115 /// Establish a channel for in-process communication with client 116 /// interceptors 117 std::shared_ptr<Channel> InProcessChannelWithInterceptors( 118 const ChannelArguments& args, 119 std::vector< 120 std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> 121 interceptor_creators); 122 123 private: 124 Server* server_; 125 }; 126 127 /// NOTE: The function experimental() is not stable public API. It is a view 128 /// to the experimental components of this class. It may be changed or removed 129 /// at any time. experimental()130 experimental_type experimental() { return experimental_type(this); } 131 132 protected: 133 /// Register a service. This call does not take ownership of the service. 134 /// The service must exist for the lifetime of the Server instance. 135 bool RegisterService(const std::string* addr, Service* service) override; 136 137 /// Try binding the server to the given \a addr endpoint 138 /// (port, and optionally including IP address to bind to). 139 /// 140 /// It can be invoked multiple times. Should be used before 141 /// starting the server. 142 /// 143 /// \param addr The address to try to bind to the server (eg, localhost:1234, 144 /// 192.168.1.1:31416, [::1]:27182, etc.). 145 /// \param creds The credentials associated with the server. 146 /// 147 /// \return bound port number on success, 0 on failure. 148 /// 149 /// \warning It is an error to call this method on an already started server. 150 int AddListeningPort(const std::string& addr, 151 ServerCredentials* creds) override; 152 153 /// NOTE: This is *NOT* a public API. The server constructors are supposed to 154 /// be used by \a ServerBuilder class only. The constructor will be made 155 /// 'private' very soon. 156 /// 157 /// Server constructors. To be used by \a ServerBuilder only. 158 /// 159 /// \param args The channel args 160 /// 161 /// \param sync_server_cqs The completion queues to use if the server is a 162 /// synchronous server (or a hybrid server). The server polls for new RPCs on 163 /// these queues 164 /// 165 /// \param min_pollers The minimum number of polling threads per server 166 /// completion queue (in param sync_server_cqs) to use for listening to 167 /// incoming requests (used only in case of sync server) 168 /// 169 /// \param max_pollers The maximum number of polling threads per server 170 /// completion queue (in param sync_server_cqs) to use for listening to 171 /// incoming requests (used only in case of sync server) 172 /// 173 /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on 174 /// server completion queues passed via sync_server_cqs param. 175 Server(ChannelArguments* args, 176 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 177 sync_server_cqs, 178 int min_pollers, int max_pollers, int sync_cq_timeout_msec, 179 std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>> 180 acceptors, 181 grpc_server_config_fetcher* server_config_fetcher = nullptr, 182 grpc_resource_quota* server_rq = nullptr, 183 std::vector< 184 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> 185 interceptor_creators = std::vector<std::unique_ptr< 186 experimental::ServerInterceptorFactoryInterface>>(), 187 experimental::ServerMetricRecorder* server_metric_recorder = nullptr); 188 189 /// Start the server. 190 /// 191 /// \param cqs Completion queues for handling asynchronous services. The 192 /// caller is required to keep all completion queues live until the server is 193 /// destroyed. 194 /// \param num_cqs How many completion queues does \a cqs hold. 195 void Start(ServerCompletionQueue** cqs, size_t num_cqs) override; 196 server()197 grpc_server* server() override { return server_; } 198 199 /// NOTE: This method is not part of the public API for this class. set_health_check_service(std::unique_ptr<HealthCheckServiceInterface> service)200 void set_health_check_service( 201 std::unique_ptr<HealthCheckServiceInterface> service) { 202 health_check_service_ = std::move(service); 203 } 204 context_allocator()205 ContextAllocator* context_allocator() { return context_allocator_.get(); } 206 207 /// NOTE: This method is not part of the public API for this class. health_check_service_disabled()208 bool health_check_service_disabled() const { 209 return health_check_service_disabled_; 210 } 211 212 private: 213 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()214 interceptor_creators() override { 215 return &interceptor_creators_; 216 } 217 218 friend class AsyncGenericService; 219 friend class ServerBuilder; 220 friend class ServerInitializer; 221 222 class SyncRequest; 223 class CallbackRequestBase; 224 template <class ServerContextType> 225 class CallbackRequest; 226 class UnimplementedAsyncRequest; 227 class UnimplementedAsyncResponse; 228 229 /// SyncRequestThreadManager is an implementation of ThreadManager. This class 230 /// is responsible for polling for incoming RPCs and calling the RPC handlers. 231 /// This is only used in case of a Sync server (i.e a server exposing a sync 232 /// interface) 233 class SyncRequestThreadManager; 234 235 /// Register a generic service. This call does not take ownership of the 236 /// service. The service must exist for the lifetime of the Server instance. 237 void RegisterAsyncGenericService(AsyncGenericService* service) override; 238 239 /// Register a callback-based generic service. This call does not take 240 /// ownership of theservice. The service must exist for the lifetime of the 241 /// Server instance. 242 void RegisterCallbackGenericService(CallbackGenericService* service) override; 243 RegisterContextAllocator(std::unique_ptr<ContextAllocator> context_allocator)244 void RegisterContextAllocator( 245 std::unique_ptr<ContextAllocator> context_allocator) { 246 context_allocator_ = std::move(context_allocator); 247 } 248 249 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 250 internal::Call* call) override; 251 252 void ShutdownInternal(gpr_timespec deadline) 253 ABSL_LOCKS_EXCLUDED(mu_) override; 254 max_receive_message_size()255 int max_receive_message_size() const override { 256 return max_receive_message_size_; 257 } 258 call_metric_recording_enabled()259 bool call_metric_recording_enabled() const override { 260 return call_metric_recording_enabled_; 261 } 262 server_metric_recorder()263 experimental::ServerMetricRecorder* server_metric_recorder() const override { 264 return server_metric_recorder_; 265 } 266 267 CompletionQueue* CallbackCQ() ABSL_LOCKS_EXCLUDED(mu_) override; 268 269 ServerInitializer* initializer(); 270 271 // Functions to manage the server shutdown ref count. Things that increase 272 // the ref count are the running state of the server (take a ref at start and 273 // drop it at shutdown) and each running callback RPC. 274 void Ref(); 275 void UnrefWithPossibleNotify() ABSL_LOCKS_EXCLUDED(mu_); 276 void UnrefAndWaitLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 277 278 std::vector<std::shared_ptr<internal::ExternalConnectionAcceptorImpl>> 279 acceptors_; 280 281 // A vector of interceptor factory objects. 282 // This should be destroyed after health_check_service_ and this requirement 283 // is satisfied by declaring interceptor_creators_ before 284 // health_check_service_. (C++ mandates that member objects be destroyed in 285 // the reverse order of initialization.) 286 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> 287 interceptor_creators_; 288 289 int max_receive_message_size_; 290 291 /// The following completion queues are ONLY used in case of Sync API 292 /// i.e. if the server has any services with sync methods. The server uses 293 /// these completion queues to poll for new RPCs 294 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>> 295 sync_server_cqs_; 296 297 /// List of \a ThreadManager instances (one for each cq in 298 /// the \a sync_server_cqs) 299 std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_; 300 301 // Server status 302 internal::Mutex mu_; 303 bool started_; 304 bool shutdown_ ABSL_GUARDED_BY(mu_); 305 bool shutdown_notified_ 306 ABSL_GUARDED_BY(mu_); // Was notify called on the shutdown_cv_ 307 internal::CondVar shutdown_done_cv_; 308 bool shutdown_done_ ABSL_GUARDED_BY(mu_) = false; 309 std::atomic_int shutdown_refs_outstanding_{1}; 310 311 internal::CondVar shutdown_cv_; 312 313 std::shared_ptr<GlobalCallbacks> global_callbacks_; 314 315 std::vector<std::string> services_; 316 bool has_async_generic_service_ = false; 317 bool has_callback_generic_service_ = false; 318 bool has_callback_methods_ = false; 319 320 // Pointer to the wrapped grpc_server. 321 grpc_server* server_; 322 323 std::unique_ptr<ServerInitializer> server_initializer_; 324 325 std::unique_ptr<ContextAllocator> context_allocator_; 326 327 std::unique_ptr<HealthCheckServiceInterface> health_check_service_; 328 bool health_check_service_disabled_; 329 330 // When appropriate, use a default callback generic service to handle 331 // unimplemented methods 332 std::unique_ptr<CallbackGenericService> unimplemented_service_; 333 334 // A special handler for resource exhausted in sync case 335 std::unique_ptr<internal::MethodHandler> resource_exhausted_handler_; 336 337 // Handler for callback generic service, if any 338 std::unique_ptr<internal::MethodHandler> generic_handler_; 339 340 // callback_cq_ references the callbackable completion queue associated 341 // with this server (if any). It is set on the first call to CallbackCQ(). 342 // It is _not owned_ by the server; ownership belongs with its internal 343 // shutdown callback tag (invoked when the CQ is fully shutdown). 344 std::atomic<CompletionQueue*> callback_cq_{nullptr}; 345 346 // List of CQs passed in by user that must be Shutdown only after Server is 347 // Shutdown. Even though this is only used with NDEBUG, instantiate it in all 348 // cases since otherwise the size will be inconsistent. 349 std::vector<CompletionQueue*> cq_list_; 350 351 // Whetner per-call load reporting is enabled. 352 bool call_metric_recording_enabled_ = false; 353 354 // Interface to read or update server-wide metrics. Optional. 355 experimental::ServerMetricRecorder* server_metric_recorder_ = nullptr; 356 }; 357 358 } // namespace grpc 359 360 #endif // GRPCPP_SERVER_H 361