xref: /aosp_15_r20/external/federated-compute/fcp/client/fake_server.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 // Copyright 2021 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "fcp/client/fake_server.h"
16 
17 #include <string>
18 #include <utility>
19 
20 #include "gtest/gtest.h"
21 #include "absl/status/status.h"
22 #include "fcp/base/monitoring.h"
23 #include "fcp/base/status_converters.h"
24 #include "fcp/client/grpc_bidi_stream.h"
25 #include "fcp/protocol/grpc_chunked_bidi_stream.h"
26 #include "fcp/protos/federated_api.pb.h"
27 
28 namespace fcp {
29 namespace client {
30 namespace test {
31 
32 using fcp::base::ToGrpcStatus;
33 using fcp::client::GrpcChunkedBidiStream;
34 using google::internal::federatedml::v2::ClientStreamMessage;
35 using google::internal::federatedml::v2::RetryWindow;
36 using google::internal::federatedml::v2::ServerStreamMessage;
37 
GetRetryWindow(const std::string & token,int64_t min,int64_t max)38 static RetryWindow GetRetryWindow(const std::string& token, int64_t min,
39                                   int64_t max) {
40   RetryWindow retry_window;
41   retry_window.mutable_delay_min()->set_seconds(min);
42   retry_window.mutable_delay_max()->set_seconds(max);
43   *retry_window.mutable_retry_token() = token;
44   return retry_window;
45 }
46 
Session(grpc::ServerContext * context,grpc::ServerReaderWriter<ServerStreamMessage,ClientStreamMessage> * stream)47 grpc::Status FakeServer::Session(
48     grpc::ServerContext* context,
49     grpc::ServerReaderWriter<ServerStreamMessage, ClientStreamMessage>*
50         stream) {
51   GrpcChunkedBidiStream<ServerStreamMessage, ClientStreamMessage>
52       chunked_bidi_stream(
53           stream, stream,
54           {chunk_size_for_upload_, max_pending_chunks_, compression_level_});
55   ClientStreamMessage request;
56   ServerStreamMessage response;
57   FCP_LOG(INFO) << "Server session started";
58   absl::Status status;
59   while ((status = chunked_bidi_stream.Receive(&request)).ok()) {
60     FCP_LOG(INFO) << "Request is: " << request.DebugString();
61     for (const auto& [key, value] : context->client_metadata()) {
62       client_metadata_.insert(
63           std::make_pair(std::string(key.data(), key.size()),
64                          std::string(value.data(), value.size())));
65     }
66     if (request.eligibility_eval_checkin_request()
67             .protocol_options_request()
68             .should_ack_checkin() ||
69         request.checkin_request()
70             .protocol_options_request()
71             .should_ack_checkin()) {
72       ServerStreamMessage checkin_request_ack_msg;
73       auto checkin_request_ack =
74           checkin_request_ack_msg.mutable_checkin_request_ack();
75       *checkin_request_ack->mutable_retry_window_if_accepted() =
76           GetRetryWindow("A", 111L, 222L);
77       *checkin_request_ack->mutable_retry_window_if_rejected() =
78           GetRetryWindow("R", 333L, 444L);
79       if (!chunked_bidi_stream.Send(&checkin_request_ack_msg).ok()) {
80         FCP_LOG(INFO) << "Server returning status " << status;
81         return ToGrpcStatus(status);
82       }
83     }
84     if (request.has_eligibility_eval_checkin_request() ||
85         request.has_checkin_request()) {
86       auto protocol_options_response =
87           request.has_eligibility_eval_checkin_request()
88               ? response.mutable_eligibility_eval_checkin_response()
89                     ->mutable_protocol_options_response()
90               : response.mutable_checkin_response()
91                     ->mutable_protocol_options_response();
92       protocol_options_response->set_compression_level(compression_level_);
93       protocol_options_response->set_chunk_size_for_upload(
94           chunk_size_for_upload_);
95       protocol_options_response->set_max_pending_chunks(max_pending_chunks_);
96     }
97     if (!(status = Handle(request, &response, &chunked_bidi_stream)).ok()) {
98       FCP_LOG(INFO) << "Server returning status " << status;
99       return ToGrpcStatus(status);
100     }
101   }
102   session_done_.Notify();
103   FCP_LOG(INFO) << "Server returning status " << status;
104   return ToGrpcStatus(status);
105 }
106 
GetClientMetadata() const107 std::multimap<std::string, std::string> FakeServer::GetClientMetadata() const {
108   return client_metadata_;
109 }
110 
WaitForSessionDone()111 void FakeServer::WaitForSessionDone() { session_done_.WaitForNotification(); }
112 
Handle(const ClientStreamMessage & request,ServerStreamMessage * first_reply,GrpcChunkedBidiStream<ServerStreamMessage,ClientStreamMessage> * stream)113 absl::Status FakeServer::Handle(
114     const ClientStreamMessage& request, ServerStreamMessage* first_reply,
115     GrpcChunkedBidiStream<ServerStreamMessage, ClientStreamMessage>* stream) {
116   return stream->Send(first_reply);
117 }
118 
119 }  // namespace test
120 }  // namespace client
121 }  // namespace fcp
122