xref: /aosp_15_r20/external/federated-compute/fcp/client/fake_server.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1*14675a02SAndroid Build Coastguard Worker // Copyright 2021 Google LLC
2*14675a02SAndroid Build Coastguard Worker //
3*14675a02SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*14675a02SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*14675a02SAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*14675a02SAndroid Build Coastguard Worker //
7*14675a02SAndroid Build Coastguard Worker //      http://www.apache.org/licenses/LICENSE-2.0
8*14675a02SAndroid Build Coastguard Worker //
9*14675a02SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*14675a02SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*14675a02SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*14675a02SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*14675a02SAndroid Build Coastguard Worker // limitations under the License.
14*14675a02SAndroid Build Coastguard Worker 
15*14675a02SAndroid Build Coastguard Worker #include "fcp/client/fake_server.h"
16*14675a02SAndroid Build Coastguard Worker 
17*14675a02SAndroid Build Coastguard Worker #include <string>
18*14675a02SAndroid Build Coastguard Worker #include <utility>
19*14675a02SAndroid Build Coastguard Worker 
20*14675a02SAndroid Build Coastguard Worker #include "gtest/gtest.h"
21*14675a02SAndroid Build Coastguard Worker #include "absl/status/status.h"
22*14675a02SAndroid Build Coastguard Worker #include "fcp/base/monitoring.h"
23*14675a02SAndroid Build Coastguard Worker #include "fcp/base/status_converters.h"
24*14675a02SAndroid Build Coastguard Worker #include "fcp/client/grpc_bidi_stream.h"
25*14675a02SAndroid Build Coastguard Worker #include "fcp/protocol/grpc_chunked_bidi_stream.h"
26*14675a02SAndroid Build Coastguard Worker #include "fcp/protos/federated_api.pb.h"
27*14675a02SAndroid Build Coastguard Worker 
28*14675a02SAndroid Build Coastguard Worker namespace fcp {
29*14675a02SAndroid Build Coastguard Worker namespace client {
30*14675a02SAndroid Build Coastguard Worker namespace test {
31*14675a02SAndroid Build Coastguard Worker 
32*14675a02SAndroid Build Coastguard Worker using fcp::base::ToGrpcStatus;
33*14675a02SAndroid Build Coastguard Worker using fcp::client::GrpcChunkedBidiStream;
34*14675a02SAndroid Build Coastguard Worker using google::internal::federatedml::v2::ClientStreamMessage;
35*14675a02SAndroid Build Coastguard Worker using google::internal::federatedml::v2::RetryWindow;
36*14675a02SAndroid Build Coastguard Worker using google::internal::federatedml::v2::ServerStreamMessage;
37*14675a02SAndroid Build Coastguard Worker 
GetRetryWindow(const std::string & token,int64_t min,int64_t max)38*14675a02SAndroid Build Coastguard Worker static RetryWindow GetRetryWindow(const std::string& token, int64_t min,
39*14675a02SAndroid Build Coastguard Worker                                   int64_t max) {
40*14675a02SAndroid Build Coastguard Worker   RetryWindow retry_window;
41*14675a02SAndroid Build Coastguard Worker   retry_window.mutable_delay_min()->set_seconds(min);
42*14675a02SAndroid Build Coastguard Worker   retry_window.mutable_delay_max()->set_seconds(max);
43*14675a02SAndroid Build Coastguard Worker   *retry_window.mutable_retry_token() = token;
44*14675a02SAndroid Build Coastguard Worker   return retry_window;
45*14675a02SAndroid Build Coastguard Worker }
46*14675a02SAndroid Build Coastguard Worker 
Session(grpc::ServerContext * context,grpc::ServerReaderWriter<ServerStreamMessage,ClientStreamMessage> * stream)47*14675a02SAndroid Build Coastguard Worker grpc::Status FakeServer::Session(
48*14675a02SAndroid Build Coastguard Worker     grpc::ServerContext* context,
49*14675a02SAndroid Build Coastguard Worker     grpc::ServerReaderWriter<ServerStreamMessage, ClientStreamMessage>*
50*14675a02SAndroid Build Coastguard Worker         stream) {
51*14675a02SAndroid Build Coastguard Worker   GrpcChunkedBidiStream<ServerStreamMessage, ClientStreamMessage>
52*14675a02SAndroid Build Coastguard Worker       chunked_bidi_stream(
53*14675a02SAndroid Build Coastguard Worker           stream, stream,
54*14675a02SAndroid Build Coastguard Worker           {chunk_size_for_upload_, max_pending_chunks_, compression_level_});
55*14675a02SAndroid Build Coastguard Worker   ClientStreamMessage request;
56*14675a02SAndroid Build Coastguard Worker   ServerStreamMessage response;
57*14675a02SAndroid Build Coastguard Worker   FCP_LOG(INFO) << "Server session started";
58*14675a02SAndroid Build Coastguard Worker   absl::Status status;
59*14675a02SAndroid Build Coastguard Worker   while ((status = chunked_bidi_stream.Receive(&request)).ok()) {
60*14675a02SAndroid Build Coastguard Worker     FCP_LOG(INFO) << "Request is: " << request.DebugString();
61*14675a02SAndroid Build Coastguard Worker     for (const auto& [key, value] : context->client_metadata()) {
62*14675a02SAndroid Build Coastguard Worker       client_metadata_.insert(
63*14675a02SAndroid Build Coastguard Worker           std::make_pair(std::string(key.data(), key.size()),
64*14675a02SAndroid Build Coastguard Worker                          std::string(value.data(), value.size())));
65*14675a02SAndroid Build Coastguard Worker     }
66*14675a02SAndroid Build Coastguard Worker     if (request.eligibility_eval_checkin_request()
67*14675a02SAndroid Build Coastguard Worker             .protocol_options_request()
68*14675a02SAndroid Build Coastguard Worker             .should_ack_checkin() ||
69*14675a02SAndroid Build Coastguard Worker         request.checkin_request()
70*14675a02SAndroid Build Coastguard Worker             .protocol_options_request()
71*14675a02SAndroid Build Coastguard Worker             .should_ack_checkin()) {
72*14675a02SAndroid Build Coastguard Worker       ServerStreamMessage checkin_request_ack_msg;
73*14675a02SAndroid Build Coastguard Worker       auto checkin_request_ack =
74*14675a02SAndroid Build Coastguard Worker           checkin_request_ack_msg.mutable_checkin_request_ack();
75*14675a02SAndroid Build Coastguard Worker       *checkin_request_ack->mutable_retry_window_if_accepted() =
76*14675a02SAndroid Build Coastguard Worker           GetRetryWindow("A", 111L, 222L);
77*14675a02SAndroid Build Coastguard Worker       *checkin_request_ack->mutable_retry_window_if_rejected() =
78*14675a02SAndroid Build Coastguard Worker           GetRetryWindow("R", 333L, 444L);
79*14675a02SAndroid Build Coastguard Worker       if (!chunked_bidi_stream.Send(&checkin_request_ack_msg).ok()) {
80*14675a02SAndroid Build Coastguard Worker         FCP_LOG(INFO) << "Server returning status " << status;
81*14675a02SAndroid Build Coastguard Worker         return ToGrpcStatus(status);
82*14675a02SAndroid Build Coastguard Worker       }
83*14675a02SAndroid Build Coastguard Worker     }
84*14675a02SAndroid Build Coastguard Worker     if (request.has_eligibility_eval_checkin_request() ||
85*14675a02SAndroid Build Coastguard Worker         request.has_checkin_request()) {
86*14675a02SAndroid Build Coastguard Worker       auto protocol_options_response =
87*14675a02SAndroid Build Coastguard Worker           request.has_eligibility_eval_checkin_request()
88*14675a02SAndroid Build Coastguard Worker               ? response.mutable_eligibility_eval_checkin_response()
89*14675a02SAndroid Build Coastguard Worker                     ->mutable_protocol_options_response()
90*14675a02SAndroid Build Coastguard Worker               : response.mutable_checkin_response()
91*14675a02SAndroid Build Coastguard Worker                     ->mutable_protocol_options_response();
92*14675a02SAndroid Build Coastguard Worker       protocol_options_response->set_compression_level(compression_level_);
93*14675a02SAndroid Build Coastguard Worker       protocol_options_response->set_chunk_size_for_upload(
94*14675a02SAndroid Build Coastguard Worker           chunk_size_for_upload_);
95*14675a02SAndroid Build Coastguard Worker       protocol_options_response->set_max_pending_chunks(max_pending_chunks_);
96*14675a02SAndroid Build Coastguard Worker     }
97*14675a02SAndroid Build Coastguard Worker     if (!(status = Handle(request, &response, &chunked_bidi_stream)).ok()) {
98*14675a02SAndroid Build Coastguard Worker       FCP_LOG(INFO) << "Server returning status " << status;
99*14675a02SAndroid Build Coastguard Worker       return ToGrpcStatus(status);
100*14675a02SAndroid Build Coastguard Worker     }
101*14675a02SAndroid Build Coastguard Worker   }
102*14675a02SAndroid Build Coastguard Worker   session_done_.Notify();
103*14675a02SAndroid Build Coastguard Worker   FCP_LOG(INFO) << "Server returning status " << status;
104*14675a02SAndroid Build Coastguard Worker   return ToGrpcStatus(status);
105*14675a02SAndroid Build Coastguard Worker }
106*14675a02SAndroid Build Coastguard Worker 
GetClientMetadata() const107*14675a02SAndroid Build Coastguard Worker std::multimap<std::string, std::string> FakeServer::GetClientMetadata() const {
108*14675a02SAndroid Build Coastguard Worker   return client_metadata_;
109*14675a02SAndroid Build Coastguard Worker }
110*14675a02SAndroid Build Coastguard Worker 
WaitForSessionDone()111*14675a02SAndroid Build Coastguard Worker void FakeServer::WaitForSessionDone() { session_done_.WaitForNotification(); }
112*14675a02SAndroid Build Coastguard Worker 
Handle(const ClientStreamMessage & request,ServerStreamMessage * first_reply,GrpcChunkedBidiStream<ServerStreamMessage,ClientStreamMessage> * stream)113*14675a02SAndroid Build Coastguard Worker absl::Status FakeServer::Handle(
114*14675a02SAndroid Build Coastguard Worker     const ClientStreamMessage& request, ServerStreamMessage* first_reply,
115*14675a02SAndroid Build Coastguard Worker     GrpcChunkedBidiStream<ServerStreamMessage, ClientStreamMessage>* stream) {
116*14675a02SAndroid Build Coastguard Worker   return stream->Send(first_reply);
117*14675a02SAndroid Build Coastguard Worker }
118*14675a02SAndroid Build Coastguard Worker 
119*14675a02SAndroid Build Coastguard Worker }  // namespace test
120*14675a02SAndroid Build Coastguard Worker }  // namespace client
121*14675a02SAndroid Build Coastguard Worker }  // namespace fcp
122