1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #include <map> 20 21 #include <grpcpp/support/client_interceptor.h> 22 23 #ifdef BAZEL_BUILD 24 #include "examples/protos/keyvaluestore.grpc.pb.h" 25 #else 26 #include "keyvaluestore.grpc.pb.h" 27 #endif 28 29 // This is a naive implementation of a cache. A new cache is for each call. For 30 // each new key request, the key is first searched in the map and if found, the 31 // interceptor fills in the return value without making a request to the server. 32 // Only if the key is not found in the cache do we make a request. 33 class CachingInterceptor : public grpc::experimental::Interceptor { 34 public: CachingInterceptor(grpc::experimental::ClientRpcInfo * info)35 CachingInterceptor(grpc::experimental::ClientRpcInfo* info) {} 36 Intercept(::grpc::experimental::InterceptorBatchMethods * methods)37 void Intercept( 38 ::grpc::experimental::InterceptorBatchMethods* methods) override { 39 bool hijack = false; 40 if (methods->QueryInterceptionHookPoint( 41 grpc::experimental::InterceptionHookPoints:: 42 PRE_SEND_INITIAL_METADATA)) { 43 // Hijack all calls 44 hijack = true; 45 // Create a stream on which this interceptor can make requests 46 stub_ = keyvaluestore::KeyValueStore::NewStub( 47 methods->GetInterceptedChannel()); 48 stream_ = stub_->GetValues(&context_); 49 } 50 if (methods->QueryInterceptionHookPoint( 51 grpc::experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) { 52 // We know that clients perform a Read and a Write in a loop, so we don't 53 // need to maintain a list of the responses. 54 std::string requested_key; 55 const keyvaluestore::Request* req_msg = 56 static_cast<const keyvaluestore::Request*>(methods->GetSendMessage()); 57 if (req_msg != nullptr) { 58 requested_key = req_msg->key(); 59 } else { 60 // The non-serialized form would not be available in certain scenarios, 61 // so add a fallback 62 keyvaluestore::Request req_msg; 63 auto* buffer = methods->GetSerializedSendMessage(); 64 auto copied_buffer = *buffer; 65 GPR_ASSERT( 66 grpc::SerializationTraits<keyvaluestore::Request>::Deserialize( 67 &copied_buffer, &req_msg) 68 .ok()); 69 requested_key = req_msg.key(); 70 } 71 72 // Check if the key is present in the map 73 auto search = cached_map_.find(requested_key); 74 if (search != cached_map_.end()) { 75 std::cout << requested_key << " found in map" << std::endl; 76 response_ = search->second; 77 } else { 78 std::cout << requested_key << " not found in cache" << std::endl; 79 // Key was not found in the cache, so make a request 80 keyvaluestore::Request req; 81 req.set_key(requested_key); 82 stream_->Write(req); 83 keyvaluestore::Response resp; 84 stream_->Read(&resp); 85 response_ = resp.value(); 86 // Insert the pair in the cache for future requests 87 cached_map_.insert({requested_key, response_}); 88 } 89 } 90 if (methods->QueryInterceptionHookPoint( 91 grpc::experimental::InterceptionHookPoints::PRE_SEND_CLOSE)) { 92 stream_->WritesDone(); 93 } 94 if (methods->QueryInterceptionHookPoint( 95 grpc::experimental::InterceptionHookPoints::PRE_RECV_MESSAGE)) { 96 keyvaluestore::Response* resp = 97 static_cast<keyvaluestore::Response*>(methods->GetRecvMessage()); 98 resp->set_value(response_); 99 } 100 if (methods->QueryInterceptionHookPoint( 101 grpc::experimental::InterceptionHookPoints::PRE_RECV_STATUS)) { 102 auto* status = methods->GetRecvStatus(); 103 *status = grpc::Status::OK; 104 } 105 // One of Hijack or Proceed always needs to be called to make progress. 106 if (hijack) { 107 // Hijack is called only once when PRE_SEND_INITIAL_METADATA is present in 108 // the hook points 109 methods->Hijack(); 110 } else { 111 // Proceed is an indicator that the interceptor is done intercepting the 112 // batch. 113 methods->Proceed(); 114 } 115 } 116 117 private: 118 grpc::ClientContext context_; 119 std::unique_ptr<keyvaluestore::KeyValueStore::Stub> stub_; 120 std::unique_ptr< 121 grpc::ClientReaderWriter<keyvaluestore::Request, keyvaluestore::Response>> 122 stream_; 123 std::map<std::string, std::string> cached_map_; 124 std::string response_; 125 }; 126 127 class CachingInterceptorFactory 128 : public grpc::experimental::ClientInterceptorFactoryInterface { 129 public: CreateClientInterceptor(grpc::experimental::ClientRpcInfo * info)130 grpc::experimental::Interceptor* CreateClientInterceptor( 131 grpc::experimental::ClientRpcInfo* info) override { 132 return new CachingInterceptor(info); 133 } 134 }; 135