1*cf78ab8cSAndroid Build Coastguard Worker // Copyright 2022 The Android Open Source Project
2*cf78ab8cSAndroid Build Coastguard Worker //
3*cf78ab8cSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*cf78ab8cSAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*cf78ab8cSAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*cf78ab8cSAndroid Build Coastguard Worker //
7*cf78ab8cSAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
8*cf78ab8cSAndroid Build Coastguard Worker //
9*cf78ab8cSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*cf78ab8cSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*cf78ab8cSAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*cf78ab8cSAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*cf78ab8cSAndroid Build Coastguard Worker // limitations under the License.
14*cf78ab8cSAndroid Build Coastguard Worker
15*cf78ab8cSAndroid Build Coastguard Worker #include "backend/grpc_client.h"
16*cf78ab8cSAndroid Build Coastguard Worker
17*cf78ab8cSAndroid Build Coastguard Worker #include <google/protobuf/util/json_util.h>
18*cf78ab8cSAndroid Build Coastguard Worker
19*cf78ab8cSAndroid Build Coastguard Worker #include <cstdint>
20*cf78ab8cSAndroid Build Coastguard Worker #include <memory>
21*cf78ab8cSAndroid Build Coastguard Worker #include <mutex>
22*cf78ab8cSAndroid Build Coastguard Worker #include <string>
23*cf78ab8cSAndroid Build Coastguard Worker #include <unordered_map>
24*cf78ab8cSAndroid Build Coastguard Worker #include <utility>
25*cf78ab8cSAndroid Build Coastguard Worker
26*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/channel.h"
27*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/create_channel.h"
28*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/security/credentials.h"
29*cf78ab8cSAndroid Build Coastguard Worker #include "grpcpp/server_context.h"
30*cf78ab8cSAndroid Build Coastguard Worker #include "netsim/packet_streamer.grpc.pb.h"
31*cf78ab8cSAndroid Build Coastguard Worker #include "netsim/packet_streamer.pb.h"
32*cf78ab8cSAndroid Build Coastguard Worker #include "rust/cxx.h"
33*cf78ab8cSAndroid Build Coastguard Worker #include "util/log.h"
34*cf78ab8cSAndroid Build Coastguard Worker
35*cf78ab8cSAndroid Build Coastguard Worker // Backend Packet Streamer Client
36*cf78ab8cSAndroid Build Coastguard Worker
37*cf78ab8cSAndroid Build Coastguard Worker namespace netsim {
38*cf78ab8cSAndroid Build Coastguard Worker namespace backend {
39*cf78ab8cSAndroid Build Coastguard Worker namespace client {
40*cf78ab8cSAndroid Build Coastguard Worker
41*cf78ab8cSAndroid Build Coastguard Worker const std::chrono::duration kConnectionDeadline = std::chrono::seconds(5);
42*cf78ab8cSAndroid Build Coastguard Worker
43*cf78ab8cSAndroid Build Coastguard Worker using Stream = ::grpc::ClientReaderWriter<netsim::packet::PacketRequest,
44*cf78ab8cSAndroid Build Coastguard Worker netsim::packet::PacketResponse>;
45*cf78ab8cSAndroid Build Coastguard Worker
46*cf78ab8cSAndroid Build Coastguard Worker std::mutex mutex_;
47*cf78ab8cSAndroid Build Coastguard Worker uint32_t stream_id_max_ = 0;
48*cf78ab8cSAndroid Build Coastguard Worker
49*cf78ab8cSAndroid Build Coastguard Worker // Active StreamPacket calls
50*cf78ab8cSAndroid Build Coastguard Worker std::unordered_map<uint32_t, std::unique_ptr<Stream>> streams_;
51*cf78ab8cSAndroid Build Coastguard Worker std::unordered_map<uint32_t, grpc::ClientContext> contexts_;
52*cf78ab8cSAndroid Build Coastguard Worker
53*cf78ab8cSAndroid Build Coastguard Worker // Single connection to a server with multiple StreamPackets calls
54*cf78ab8cSAndroid Build Coastguard Worker std::string server_;
55*cf78ab8cSAndroid Build Coastguard Worker std::shared_ptr<grpc::Channel> channel_;
56*cf78ab8cSAndroid Build Coastguard Worker grpc::ClientContext context_;
57*cf78ab8cSAndroid Build Coastguard Worker std::unique_ptr<netsim::packet::PacketStreamer::Stub> stub_;
58*cf78ab8cSAndroid Build Coastguard Worker
59*cf78ab8cSAndroid Build Coastguard Worker // Call the StreamPackets RPC on server.
60*cf78ab8cSAndroid Build Coastguard Worker //
61*cf78ab8cSAndroid Build Coastguard Worker // This function allows multiple StreamPacket calls at once but only one
62*cf78ab8cSAndroid Build Coastguard Worker // connection to a server. If the server isn't already connected a new
63*cf78ab8cSAndroid Build Coastguard Worker // connection is created.
64*cf78ab8cSAndroid Build Coastguard Worker
StreamPackets(const rust::String & server_rust)65*cf78ab8cSAndroid Build Coastguard Worker uint32_t StreamPackets(const rust::String &server_rust) {
66*cf78ab8cSAndroid Build Coastguard Worker std::unique_lock<std::mutex> lock(mutex_);
67*cf78ab8cSAndroid Build Coastguard Worker auto server = std::string(server_rust);
68*cf78ab8cSAndroid Build Coastguard Worker if (server_.empty()) {
69*cf78ab8cSAndroid Build Coastguard Worker server_ = server;
70*cf78ab8cSAndroid Build Coastguard Worker channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
71*cf78ab8cSAndroid Build Coastguard Worker auto deadline = std::chrono::system_clock::now() + kConnectionDeadline;
72*cf78ab8cSAndroid Build Coastguard Worker if (!channel_->WaitForConnected(deadline)) {
73*cf78ab8cSAndroid Build Coastguard Worker BtsLog("Failed to create packet streamer client to %s", server_.c_str());
74*cf78ab8cSAndroid Build Coastguard Worker return -1;
75*cf78ab8cSAndroid Build Coastguard Worker }
76*cf78ab8cSAndroid Build Coastguard Worker stub_ = netsim::packet::PacketStreamer::NewStub(channel_);
77*cf78ab8cSAndroid Build Coastguard Worker } else if (server_ != server) {
78*cf78ab8cSAndroid Build Coastguard Worker BtsLog("grpc_client: multiple servers not supported");
79*cf78ab8cSAndroid Build Coastguard Worker return -1;
80*cf78ab8cSAndroid Build Coastguard Worker }
81*cf78ab8cSAndroid Build Coastguard Worker // Each active gRPC call needs its own context
82*cf78ab8cSAndroid Build Coastguard Worker auto stream = stub_->StreamPackets(&contexts_[++stream_id_max_]);
83*cf78ab8cSAndroid Build Coastguard Worker streams_[stream_id_max_] = std::move(stream);
84*cf78ab8cSAndroid Build Coastguard Worker BtsLog("Created packet streamer client to %s", server_.c_str());
85*cf78ab8cSAndroid Build Coastguard Worker return stream_id_max_;
86*cf78ab8cSAndroid Build Coastguard Worker }
87*cf78ab8cSAndroid Build Coastguard Worker
88*cf78ab8cSAndroid Build Coastguard Worker /// Loop reading packets on the stream identified by stream_id and call the
89*cf78ab8cSAndroid Build Coastguard Worker // ReadCallback function with the PacketResponse byte proto.
90*cf78ab8cSAndroid Build Coastguard Worker
ReadPacketResponseLoop(uint32_t stream_id,ReadCallback read_fn)91*cf78ab8cSAndroid Build Coastguard Worker bool ReadPacketResponseLoop(uint32_t stream_id, ReadCallback read_fn) {
92*cf78ab8cSAndroid Build Coastguard Worker netsim::packet::PacketResponse response;
93*cf78ab8cSAndroid Build Coastguard Worker while (true) {
94*cf78ab8cSAndroid Build Coastguard Worker {
95*cf78ab8cSAndroid Build Coastguard Worker std::unique_lock<std::mutex> lock(mutex_);
96*cf78ab8cSAndroid Build Coastguard Worker if (streams_.find(stream_id) == streams_.end()) {
97*cf78ab8cSAndroid Build Coastguard Worker BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
98*cf78ab8cSAndroid Build Coastguard Worker return false;
99*cf78ab8cSAndroid Build Coastguard Worker }
100*cf78ab8cSAndroid Build Coastguard Worker }
101*cf78ab8cSAndroid Build Coastguard Worker // TODO: fix locking here
102*cf78ab8cSAndroid Build Coastguard Worker if (!streams_[stream_id]->Read(&response)) {
103*cf78ab8cSAndroid Build Coastguard Worker BtsLogWarn("grpc_client: reading stopped stream_id %d", stream_id);
104*cf78ab8cSAndroid Build Coastguard Worker return false;
105*cf78ab8cSAndroid Build Coastguard Worker }
106*cf78ab8cSAndroid Build Coastguard Worker std::vector<unsigned char> proto_bytes(response.ByteSizeLong());
107*cf78ab8cSAndroid Build Coastguard Worker response.SerializeToArray(proto_bytes.data(), proto_bytes.size());
108*cf78ab8cSAndroid Build Coastguard Worker rust::Slice<const uint8_t> slice{proto_bytes.data(), proto_bytes.size()};
109*cf78ab8cSAndroid Build Coastguard Worker (*read_fn)(stream_id, slice);
110*cf78ab8cSAndroid Build Coastguard Worker }
111*cf78ab8cSAndroid Build Coastguard Worker }
112*cf78ab8cSAndroid Build Coastguard Worker
113*cf78ab8cSAndroid Build Coastguard Worker // Write a packet to the stream identified by stream_id
114*cf78ab8cSAndroid Build Coastguard Worker
WritePacketRequest(uint32_t stream_id,const rust::Slice<::std::uint8_t const> proto_bytes)115*cf78ab8cSAndroid Build Coastguard Worker bool WritePacketRequest(uint32_t stream_id,
116*cf78ab8cSAndroid Build Coastguard Worker const rust::Slice<::std::uint8_t const> proto_bytes) {
117*cf78ab8cSAndroid Build Coastguard Worker netsim::packet::PacketRequest request;
118*cf78ab8cSAndroid Build Coastguard Worker if (!request.ParseFromArray(proto_bytes.data(), proto_bytes.size())) {
119*cf78ab8cSAndroid Build Coastguard Worker BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
120*cf78ab8cSAndroid Build Coastguard Worker return false;
121*cf78ab8cSAndroid Build Coastguard Worker };
122*cf78ab8cSAndroid Build Coastguard Worker
123*cf78ab8cSAndroid Build Coastguard Worker std::unique_lock<std::mutex> lock(mutex_);
124*cf78ab8cSAndroid Build Coastguard Worker if (streams_.find(stream_id) == streams_.end()) {
125*cf78ab8cSAndroid Build Coastguard Worker BtsLogWarn("grpc_client: no stream for stream_id %d", stream_id);
126*cf78ab8cSAndroid Build Coastguard Worker return false;
127*cf78ab8cSAndroid Build Coastguard Worker }
128*cf78ab8cSAndroid Build Coastguard Worker if (!streams_[stream_id]->Write(request)) {
129*cf78ab8cSAndroid Build Coastguard Worker BtsLogWarn("grpc_client: write failed stream_id %d", stream_id);
130*cf78ab8cSAndroid Build Coastguard Worker return false;
131*cf78ab8cSAndroid Build Coastguard Worker }
132*cf78ab8cSAndroid Build Coastguard Worker return true;
133*cf78ab8cSAndroid Build Coastguard Worker };
134*cf78ab8cSAndroid Build Coastguard Worker
135*cf78ab8cSAndroid Build Coastguard Worker } // namespace client
136*cf78ab8cSAndroid Build Coastguard Worker } // namespace backend
137*cf78ab8cSAndroid Build Coastguard Worker } // namespace netsim
138