1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <iostream>
20 #include <memory>
21 #include <string>
22 #include <vector>
23
24 #include <grpcpp/grpcpp.h>
25 #include <grpcpp/support/server_interceptor.h>
26
27 #ifdef BAZEL_BUILD
28 #include "examples/protos/keyvaluestore.grpc.pb.h"
29 #else
30 #include "keyvaluestore.grpc.pb.h"
31 #endif
32
33 using grpc::CallbackServerContext;
34 using grpc::Server;
35 using grpc::ServerBidiReactor;
36 using grpc::ServerBuilder;
37 using grpc::Status;
38 using grpc::experimental::InterceptionHookPoints;
39 using grpc::experimental::Interceptor;
40 using grpc::experimental::InterceptorBatchMethods;
41 using grpc::experimental::ServerInterceptorFactoryInterface;
42 using grpc::experimental::ServerRpcInfo;
43 using keyvaluestore::KeyValueStore;
44 using keyvaluestore::Request;
45 using keyvaluestore::Response;
46
47 // This is a simple interceptor that logs whenever it gets a request, which on
48 // the server side happens when initial metadata is received.
49 class LoggingInterceptor : public Interceptor {
50 public:
Intercept(InterceptorBatchMethods * methods)51 void Intercept(InterceptorBatchMethods* methods) override {
52 if (methods->QueryInterceptionHookPoint(
53 InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
54 std::cout << "Got a new streaming RPC" << std::endl;
55 }
56 methods->Proceed();
57 }
58 };
59
60 class LoggingInterceptorFactory : public ServerInterceptorFactoryInterface {
61 public:
CreateServerInterceptor(ServerRpcInfo * info)62 Interceptor* CreateServerInterceptor(ServerRpcInfo* info) override {
63 return new LoggingInterceptor();
64 }
65 };
66
67 struct kv_pair {
68 const char* key;
69 const char* value;
70 };
71
72 static const kv_pair kvs_map[] = {
73 {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"},
74 {"key4", "value4"}, {"key5", "value5"},
75 };
76
get_value_from_map(const char * key)77 const char* get_value_from_map(const char* key) {
78 for (size_t i = 0; i < sizeof(kvs_map) / sizeof(kv_pair); ++i) {
79 if (strcmp(key, kvs_map[i].key) == 0) {
80 return kvs_map[i].value;
81 }
82 }
83 return "";
84 }
85
86 // Logic behind the server's behavior.
87 class KeyValueStoreServiceImpl final : public KeyValueStore::CallbackService {
GetValues(CallbackServerContext * context)88 ServerBidiReactor<Request, Response>* GetValues(
89 CallbackServerContext* context) override {
90 class Reactor : public ServerBidiReactor<Request, Response> {
91 public:
92 explicit Reactor() { StartRead(&request_); }
93
94 void OnReadDone(bool ok) override {
95 if (!ok) {
96 return Finish(grpc::Status::CANCELLED);
97 }
98 response_.set_value(get_value_from_map(request_.key().c_str()));
99 StartWrite(&response_);
100 }
101
102 void OnWriteDone(bool ok) override {
103 if (!ok) {
104 return Finish(grpc::Status::CANCELLED);
105 }
106 StartRead(&request_);
107 }
108
109 void OnDone() override { delete this; }
110
111 private:
112 Request request_;
113 Response response_;
114 };
115
116 return new Reactor();
117 }
118 };
119
RunServer()120 void RunServer() {
121 std::string server_address("0.0.0.0:50051");
122 KeyValueStoreServiceImpl service;
123
124 ServerBuilder builder;
125 // Listen on the given address without any authentication mechanism.
126 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
127 // Register "service" as the instance through which we'll communicate with
128 // clients. In this case, it corresponds to an *synchronous* service.
129 builder.RegisterService(&service);
130 std::vector<std::unique_ptr<ServerInterceptorFactoryInterface>> creators;
131 creators.push_back(std::unique_ptr<ServerInterceptorFactoryInterface>(
132 new LoggingInterceptorFactory()));
133 builder.experimental().SetInterceptorCreators(std::move(creators));
134 // Finally assemble the server.
135 std::unique_ptr<Server> server(builder.BuildAndStart());
136 std::cout << "Server listening on " << server_address << std::endl;
137
138 // Wait for the server to shutdown. Note that some other thread must be
139 // responsible for shutting down the server for this call to ever return.
140 server->Wait();
141 }
142
main(int argc,char ** argv)143 int main(int argc, char** argv) {
144 RunServer();
145
146 return 0;
147 }
148