xref: /aosp_15_r20/tools/netsim/src/backend/grpc_client.cc (revision cf78ab8cffb8fc9207af348f23af247fb04370a6)
1*cf78ab8cSAndroid Build Coastguard Worker // Copyright 2022 The Android Open Source Project
2*cf78ab8cSAndroid Build Coastguard Worker //
3*cf78ab8cSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*cf78ab8cSAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*cf78ab8cSAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*cf78ab8cSAndroid Build Coastguard Worker //
7*cf78ab8cSAndroid Build Coastguard Worker //      http://www.apache.org/licenses/LICENSE-2.0
8*cf78ab8cSAndroid Build Coastguard Worker //
9*cf78ab8cSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*cf78ab8cSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*cf78ab8cSAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cf78ab8cSAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*cf78ab8cSAndroid Build Coastguard Worker // limitations under the License.
14*cf78ab8cSAndroid Build Coastguard Worker 
15*cf78ab8cSAndroid Build Coastguard Worker #include "backend/grpc_client.h"
16*cf78ab8cSAndroid Build Coastguard Worker 
17*cf78ab8cSAndroid Build Coastguard Worker #include <google/protobuf/util/json_util.h>
18*cf78ab8cSAndroid Build Coastguard Worker 
19*cf78ab8cSAndroid Build Coastguard Worker #include <cstdint>
20*cf78ab8cSAndroid Build Coastguard Worker #include <memory>
21*cf78ab8cSAndroid Build Coastguard Worker #include <mutex>
22*cf78ab8cSAndroid Build Coastguard Worker #include <string>
23*cf78ab8cSAndroid Build Coastguard Worker #include <unordered_map>
24*cf78ab8cSAndroid Build Coastguard Worker #include <utility>
25*cf78ab8cSAndroid Build Coastguard Worker 
26*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/channel.h"
27*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/create_channel.h"
28*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/security/credentials.h"
29*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/server_context.h"
30*cf78ab8cSAndroid Build Coastguard Worker #include "netsim/packet_streamer.grpc.pb.h"
31*cf78ab8cSAndroid Build Coastguard Worker #include "netsim/packet_streamer.pb.h"
32*cf78ab8cSAndroid Build Coastguard Worker #include "rust/cxx.h"
33*cf78ab8cSAndroid Build Coastguard Worker #include "util/log.h"
34*cf78ab8cSAndroid Build Coastguard Worker 
35*cf78ab8cSAndroid Build Coastguard Worker // Backend Packet Streamer Client
36*cf78ab8cSAndroid Build Coastguard Worker 
37*cf78ab8cSAndroid Build Coastguard Worker namespace netsim {
38*cf78ab8cSAndroid Build Coastguard Worker namespace backend {
39*cf78ab8cSAndroid Build Coastguard Worker namespace client {
40*cf78ab8cSAndroid Build Coastguard Worker 
41*cf78ab8cSAndroid Build Coastguard Worker const std::chrono::duration kConnectionDeadline = std::chrono::seconds(5);
42*cf78ab8cSAndroid Build Coastguard Worker 
43*cf78ab8cSAndroid Build Coastguard Worker using Stream = ::grpc::ClientReaderWriter<netsim::packet::PacketRequest,
44*cf78ab8cSAndroid Build Coastguard Worker                                           netsim::packet::PacketResponse>;
45*cf78ab8cSAndroid Build Coastguard Worker 
46*cf78ab8cSAndroid Build Coastguard Worker std::mutex mutex_;
47*cf78ab8cSAndroid Build Coastguard Worker uint32_t stream_id_max_ = 0;
48*cf78ab8cSAndroid Build Coastguard Worker 
49*cf78ab8cSAndroid Build Coastguard Worker // Active StreamPacket calls
50*cf78ab8cSAndroid Build Coastguard Worker std::unordered_map<uint32_t, std::unique_ptr<Stream>> streams_;
51*cf78ab8cSAndroid Build Coastguard Worker std::unordered_map<uint32_t, grpc::ClientContext> contexts_;
52*cf78ab8cSAndroid Build Coastguard Worker 
53*cf78ab8cSAndroid Build Coastguard Worker // Single connection to a server with multiple StreamPackets calls
54*cf78ab8cSAndroid Build Coastguard Worker std::string server_;
55*cf78ab8cSAndroid Build Coastguard Worker std::shared_ptr<grpc::Channel> channel_;
56*cf78ab8cSAndroid Build Coastguard Worker grpc::ClientContext context_;
57*cf78ab8cSAndroid Build Coastguard Worker std::unique_ptr<netsim::packet::PacketStreamer::Stub> stub_;
58*cf78ab8cSAndroid Build Coastguard Worker 
59*cf78ab8cSAndroid Build Coastguard Worker // Call the StreamPackets RPC on server.
60*cf78ab8cSAndroid Build Coastguard Worker //
61*cf78ab8cSAndroid Build Coastguard Worker // This function allows multiple StreamPacket calls at once but only one
62*cf78ab8cSAndroid Build Coastguard Worker // connection to a server. If the server isn't already connected a new
63*cf78ab8cSAndroid Build Coastguard Worker // connection is created.
64*cf78ab8cSAndroid Build Coastguard Worker 
StreamPackets(const rust::String & server_rust)65*cf78ab8cSAndroid Build Coastguard Worker uint32_t StreamPackets(const rust::String &server_rust) {
66*cf78ab8cSAndroid Build Coastguard Worker   std::unique_lock<std::mutex> lock(mutex_);
67*cf78ab8cSAndroid Build Coastguard Worker   auto server = std::string(server_rust);
68*cf78ab8cSAndroid Build Coastguard Worker   if (server_.empty()) {
69*cf78ab8cSAndroid Build Coastguard Worker     server_ = server;
70*cf78ab8cSAndroid Build Coastguard Worker     channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
71*cf78ab8cSAndroid Build Coastguard Worker     auto deadline = std::chrono::system_clock::now() + kConnectionDeadline;
72*cf78ab8cSAndroid Build Coastguard Worker     if (!channel_->WaitForConnected(deadline)) {
73*cf78ab8cSAndroid Build Coastguard Worker       BtsLog("Failed to create packet streamer client to %s", server_.c_str());
74*cf78ab8cSAndroid Build Coastguard Worker       return -1;
75*cf78ab8cSAndroid Build Coastguard Worker     }
76*cf78ab8cSAndroid Build Coastguard Worker     stub_ = netsim::packet::PacketStreamer::NewStub(channel_);
77*cf78ab8cSAndroid Build Coastguard Worker   } else if (server_ != server) {
78*cf78ab8cSAndroid Build Coastguard Worker     BtsLog("grpc_client: multiple servers not supported");
79*cf78ab8cSAndroid Build Coastguard Worker     return -1;
80*cf78ab8cSAndroid Build Coastguard Worker   }
81*cf78ab8cSAndroid Build Coastguard Worker   // Each active gRPC call needs its own context
82*cf78ab8cSAndroid Build Coastguard Worker   auto stream = stub_->StreamPackets(&contexts_[++stream_id_max_]);
83*cf78ab8cSAndroid Build Coastguard Worker   streams_[stream_id_max_] = std::move(stream);
84*cf78ab8cSAndroid Build Coastguard Worker   BtsLog("Created packet streamer client to %s", server_.c_str());
85*cf78ab8cSAndroid Build Coastguard Worker   return stream_id_max_;
86*cf78ab8cSAndroid Build Coastguard Worker }
87*cf78ab8cSAndroid Build Coastguard Worker 
88*cf78ab8cSAndroid Build Coastguard Worker /// Loop reading packets on the stream identified by stream_id and call the
89*cf78ab8cSAndroid Build Coastguard Worker //  ReadCallback function with the PacketResponse byte proto.
90*cf78ab8cSAndroid Build Coastguard Worker 
ReadPacketResponseLoop(uint32_t stream_id,ReadCallback read_fn)91*cf78ab8cSAndroid Build Coastguard Worker bool ReadPacketResponseLoop(uint32_t stream_id, ReadCallback read_fn) {
92*cf78ab8cSAndroid Build Coastguard Worker   netsim::packet::PacketResponse response;
93*cf78ab8cSAndroid Build Coastguard Worker   while (true) {
94*cf78ab8cSAndroid Build Coastguard Worker     {
95*cf78ab8cSAndroid Build Coastguard Worker       std::unique_lock<std::mutex> lock(mutex_);
96*cf78ab8cSAndroid Build Coastguard Worker       if (streams_.find(stream_id) == streams_.end()) {
97*cf78ab8cSAndroid Build Coastguard Worker         BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
98*cf78ab8cSAndroid Build Coastguard Worker         return false;
99*cf78ab8cSAndroid Build Coastguard Worker       }
100*cf78ab8cSAndroid Build Coastguard Worker     }
101*cf78ab8cSAndroid Build Coastguard Worker     // TODO: fix locking here
102*cf78ab8cSAndroid Build Coastguard Worker     if (!streams_[stream_id]->Read(&response)) {
103*cf78ab8cSAndroid Build Coastguard Worker       BtsLogWarn("grpc_client: reading stopped stream_id %d", stream_id);
104*cf78ab8cSAndroid Build Coastguard Worker       return false;
105*cf78ab8cSAndroid Build Coastguard Worker     }
106*cf78ab8cSAndroid Build Coastguard Worker     std::vector<unsigned char> proto_bytes(response.ByteSizeLong());
107*cf78ab8cSAndroid Build Coastguard Worker     response.SerializeToArray(proto_bytes.data(), proto_bytes.size());
108*cf78ab8cSAndroid Build Coastguard Worker     rust::Slice<const uint8_t> slice{proto_bytes.data(), proto_bytes.size()};
109*cf78ab8cSAndroid Build Coastguard Worker     (*read_fn)(stream_id, slice);
110*cf78ab8cSAndroid Build Coastguard Worker   }
111*cf78ab8cSAndroid Build Coastguard Worker }
112*cf78ab8cSAndroid Build Coastguard Worker 
113*cf78ab8cSAndroid Build Coastguard Worker // Write a packet to the stream identified by stream_id
114*cf78ab8cSAndroid Build Coastguard Worker 
WritePacketRequest(uint32_t stream_id,const rust::Slice<::std::uint8_t const> proto_bytes)115*cf78ab8cSAndroid Build Coastguard Worker bool WritePacketRequest(uint32_t stream_id,
116*cf78ab8cSAndroid Build Coastguard Worker                         const rust::Slice<::std::uint8_t const> proto_bytes) {
117*cf78ab8cSAndroid Build Coastguard Worker   netsim::packet::PacketRequest request;
118*cf78ab8cSAndroid Build Coastguard Worker   if (!request.ParseFromArray(proto_bytes.data(), proto_bytes.size())) {
119*cf78ab8cSAndroid Build Coastguard Worker     BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
120*cf78ab8cSAndroid Build Coastguard Worker     return false;
121*cf78ab8cSAndroid Build Coastguard Worker   };
122*cf78ab8cSAndroid Build Coastguard Worker 
123*cf78ab8cSAndroid Build Coastguard Worker   std::unique_lock<std::mutex> lock(mutex_);
124*cf78ab8cSAndroid Build Coastguard Worker   if (streams_.find(stream_id) == streams_.end()) {
125*cf78ab8cSAndroid Build Coastguard Worker     BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
126*cf78ab8cSAndroid Build Coastguard Worker     return false;
127*cf78ab8cSAndroid Build Coastguard Worker   }
128*cf78ab8cSAndroid Build Coastguard Worker   if (!streams_[stream_id]->Write(request)) {
129*cf78ab8cSAndroid Build Coastguard Worker     BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
130*cf78ab8cSAndroid Build Coastguard Worker     return false;
131*cf78ab8cSAndroid Build Coastguard Worker   }
132*cf78ab8cSAndroid Build Coastguard Worker   return true;
133*cf78ab8cSAndroid Build Coastguard Worker };
134*cf78ab8cSAndroid Build Coastguard Worker 
135*cf78ab8cSAndroid Build Coastguard Worker }  // namespace client
136*cf78ab8cSAndroid Build Coastguard Worker }  // namespace backend
137*cf78ab8cSAndroid Build Coastguard Worker }  // namespace netsim
138