1 // 2 // 3 // Copyright 2017 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 20 #define GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 21 22 #include <grpc/grpc.h> 23 #include <grpc/support/atm.h> 24 #include <grpc/support/log.h> 25 #include <grpcpp/channel.h> 26 #include <grpcpp/create_channel.h> 27 #include <grpcpp/security/credentials.h> 28 #include <grpcpp/security/server_credentials.h> 29 #include <grpcpp/server.h> 30 #include <grpcpp/server_builder.h> 31 32 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" 33 #include "src/core/lib/channel/channel_args.h" 34 #include "src/core/lib/config/core_configuration.h" 35 #include "src/core/lib/gprpp/crash.h" 36 #include "src/core/lib/iomgr/endpoint.h" 37 #include "src/core/lib/iomgr/endpoint_pair.h" 38 #include "src/core/lib/iomgr/exec_ctx.h" 39 #include "src/core/lib/iomgr/tcp_posix.h" 40 #include "src/core/lib/surface/channel.h" 41 #include "src/core/lib/surface/channel_create.h" 42 #include "src/core/lib/surface/completion_queue.h" 43 #include "src/core/lib/surface/server.h" 44 #include "src/cpp/client/create_channel_internal.h" 45 #include "test/core/util/port.h" 46 #include "test/core/util/test_config.h" 47 #include "test/cpp/microbenchmarks/helpers.h" 48 49 namespace grpc { 50 namespace testing { 51 52 class FixtureConfiguration { 53 public: ~FixtureConfiguration()54 virtual ~FixtureConfiguration() {} ApplyCommonChannelArguments(ChannelArguments * c)55 virtual void ApplyCommonChannelArguments(ChannelArguments* c) const { 56 c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX); 57 c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX); 58 c->SetInt(GRPC_ARG_ENABLE_RETRIES, 0); 59 c->SetResourceQuota(ResourceQuota()); 60 } 61 ApplyCommonServerBuilderConfig(ServerBuilder * b)62 virtual void ApplyCommonServerBuilderConfig(ServerBuilder* b) const { 63 b->SetMaxReceiveMessageSize(INT_MAX); 64 b->SetMaxSendMessageSize(INT_MAX); 65 } 66 }; 67 68 class BaseFixture { 69 public: 70 virtual ~BaseFixture() = default; 71 }; 72 73 class FullstackFixture : public BaseFixture { 74 public: FullstackFixture(Service * service,const FixtureConfiguration & config,const std::string & address)75 FullstackFixture(Service* service, const FixtureConfiguration& config, 76 const std::string& address) { 77 ServerBuilder b; 78 if (address.length() > 0) { 79 b.AddListeningPort(address, InsecureServerCredentials()); 80 } 81 cq_ = b.AddCompletionQueue(true); 82 b.RegisterService(service); 83 config.ApplyCommonServerBuilderConfig(&b); 84 server_ = b.BuildAndStart(); 85 ChannelArguments args; 86 config.ApplyCommonChannelArguments(&args); 87 if (address.length() > 0) { 88 channel_ = grpc::CreateCustomChannel(address, 89 InsecureChannelCredentials(), args); 90 } else { 91 channel_ = server_->InProcessChannel(args); 92 } 93 } 94 ~FullstackFixture()95 ~FullstackFixture() override { 96 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); 97 cq_->Shutdown(); 98 void* tag; 99 bool ok; 100 while (cq_->Next(&tag, &ok)) { 101 } 102 } 103 cq()104 ServerCompletionQueue* cq() { return cq_.get(); } channel()105 std::shared_ptr<Channel> channel() { return channel_; } 106 107 private: 108 std::unique_ptr<Server> server_; 109 std::unique_ptr<ServerCompletionQueue> cq_; 110 std::shared_ptr<Channel> channel_; 111 }; 112 113 class TCP : public FullstackFixture { 114 public: 115 explicit TCP(Service* service, 116 const FixtureConfiguration& fixture_configuration = 117 FixtureConfiguration()) FullstackFixture(service,fixture_configuration,MakeAddress (& port_))118 : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {} 119 ~TCP()120 ~TCP() override { grpc_recycle_unused_port(port_); } 121 122 private: 123 int port_; 124 MakeAddress(int * port)125 static std::string MakeAddress(int* port) { 126 *port = grpc_pick_unused_port_or_die(); 127 std::stringstream addr; 128 addr << "localhost:" << *port; 129 return addr.str(); 130 } 131 }; 132 133 class UDS : public FullstackFixture { 134 public: 135 explicit UDS(Service* service, 136 const FixtureConfiguration& fixture_configuration = 137 FixtureConfiguration()) FullstackFixture(service,fixture_configuration,MakeAddress (& port_))138 : FullstackFixture(service, fixture_configuration, MakeAddress(&port_)) {} 139 ~UDS()140 ~UDS() override { grpc_recycle_unused_port(port_); } 141 142 private: 143 int port_; 144 MakeAddress(int * port)145 static std::string MakeAddress(int* port) { 146 *port = grpc_pick_unused_port_or_die(); // just for a unique id - not a 147 // real port 148 std::stringstream addr; 149 addr << "unix:/tmp/bm_fullstack." << *port; 150 return addr.str(); 151 } 152 }; 153 154 class InProcess : public FullstackFixture { 155 public: 156 explicit InProcess(Service* service, 157 const FixtureConfiguration& fixture_configuration = 158 FixtureConfiguration()) 159 : FullstackFixture(service, fixture_configuration, "") {} ~InProcess()160 ~InProcess() override {} 161 }; 162 163 class EndpointPairFixture : public BaseFixture { 164 public: EndpointPairFixture(Service * service,grpc_endpoint_pair endpoints,const FixtureConfiguration & fixture_configuration)165 EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints, 166 const FixtureConfiguration& fixture_configuration) 167 : endpoint_pair_(endpoints) { 168 ServerBuilder b; 169 cq_ = b.AddCompletionQueue(true); 170 b.RegisterService(service); 171 fixture_configuration.ApplyCommonServerBuilderConfig(&b); 172 server_ = b.BuildAndStart(); 173 grpc_core::ExecCtx exec_ctx; 174 // add server endpoint to server_ 175 // 176 { 177 grpc_core::Server* core_server = 178 grpc_core::Server::FromC(server_->c_server()); 179 grpc_core::ChannelArgs server_args = core_server->channel_args(); 180 server_transport_ = grpc_create_chttp2_transport( 181 server_args, endpoints.server, false /* is_client */); 182 for (grpc_pollset* pollset : core_server->pollsets()) { 183 grpc_endpoint_add_to_pollset(endpoints.server, pollset); 184 } 185 186 GPR_ASSERT(GRPC_LOG_IF_ERROR( 187 "SetupTransport", 188 core_server->SetupTransport(server_transport_, nullptr, server_args, 189 nullptr))); 190 grpc_chttp2_transport_start_reading(server_transport_, nullptr, nullptr, 191 nullptr); 192 } 193 194 // create channel 195 { 196 grpc_core::ChannelArgs c_args; 197 { 198 ChannelArguments args; 199 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority"); 200 fixture_configuration.ApplyCommonChannelArguments(&args); 201 // precondition 202 grpc_channel_args tmp_args; 203 args.SetChannelArgs(&tmp_args); 204 c_args = grpc_core::CoreConfiguration::Get() 205 .channel_args_preconditioning() 206 .PreconditionChannelArgs(&tmp_args); 207 } 208 client_transport_ = 209 grpc_create_chttp2_transport(c_args, endpoints.client, true); 210 GPR_ASSERT(client_transport_); 211 grpc_channel* channel = 212 grpc_core::ChannelCreate("target", c_args, GRPC_CLIENT_DIRECT_CHANNEL, 213 client_transport_) 214 ->release() 215 ->c_ptr(); 216 grpc_chttp2_transport_start_reading(client_transport_, nullptr, nullptr, 217 nullptr); 218 219 channel_ = grpc::CreateChannelInternal( 220 "", channel, 221 std::vector<std::unique_ptr< 222 experimental::ClientInterceptorFactoryInterface>>()); 223 } 224 } 225 ~EndpointPairFixture()226 ~EndpointPairFixture() override { 227 server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); 228 cq_->Shutdown(); 229 void* tag; 230 bool ok; 231 while (cq_->Next(&tag, &ok)) { 232 } 233 } 234 cq()235 ServerCompletionQueue* cq() { return cq_.get(); } channel()236 std::shared_ptr<Channel> channel() { return channel_; } 237 238 protected: 239 grpc_endpoint_pair endpoint_pair_; 240 grpc_core::Transport* client_transport_; 241 grpc_core::Transport* server_transport_; 242 243 private: 244 std::unique_ptr<Server> server_; 245 std::unique_ptr<ServerCompletionQueue> cq_; 246 std::shared_ptr<Channel> channel_; 247 }; 248 249 class SockPair : public EndpointPairFixture { 250 public: 251 explicit SockPair(Service* service, 252 const FixtureConfiguration& fixture_configuration = 253 FixtureConfiguration()) 254 : EndpointPairFixture(service, 255 grpc_iomgr_create_endpoint_pair("test", nullptr), 256 fixture_configuration) {} 257 }; 258 259 //////////////////////////////////////////////////////////////////////////////// 260 // Minimal stack fixtures 261 262 class MinStackConfiguration : public FixtureConfiguration { ApplyCommonChannelArguments(ChannelArguments * a)263 void ApplyCommonChannelArguments(ChannelArguments* a) const override { 264 a->SetInt(GRPC_ARG_MINIMAL_STACK, 1); 265 FixtureConfiguration::ApplyCommonChannelArguments(a); 266 } 267 ApplyCommonServerBuilderConfig(ServerBuilder * b)268 void ApplyCommonServerBuilderConfig(ServerBuilder* b) const override { 269 b->AddChannelArgument(GRPC_ARG_MINIMAL_STACK, 1); 270 FixtureConfiguration::ApplyCommonServerBuilderConfig(b); 271 } 272 }; 273 274 template <class Base> 275 class MinStackize : public Base { 276 public: MinStackize(Service * service)277 explicit MinStackize(Service* service) 278 : Base(service, MinStackConfiguration()) {} 279 }; 280 281 typedef MinStackize<TCP> MinTCP; 282 typedef MinStackize<UDS> MinUDS; 283 typedef MinStackize<InProcess> MinInProcess; 284 typedef MinStackize<SockPair> MinSockPair; 285 286 } // namespace testing 287 } // namespace grpc 288 289 #endif // GRPC_TEST_CPP_MICROBENCHMARKS_FULLSTACK_FIXTURES_H 290