1 // Copyright 2021 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "fcp/client/fake_server.h"
16
17 #include <string>
18 #include <utility>
19
20 #include "gtest/gtest.h"
21 #include "absl/status/status.h"
22 #include "fcp/base/monitoring.h"
23 #include "fcp/base/status_converters.h"
24 #include "fcp/client/grpc_bidi_stream.h"
25 #include "fcp/protocol/grpc_chunked_bidi_stream.h"
26 #include "fcp/protos/federated_api.pb.h"
27
28 namespace fcp {
29 namespace client {
30 namespace test {
31
32 using fcp::base::ToGrpcStatus;
33 using fcp::client::GrpcChunkedBidiStream;
34 using google::internal::federatedml::v2::ClientStreamMessage;
35 using google::internal::federatedml::v2::RetryWindow;
36 using google::internal::federatedml::v2::ServerStreamMessage;
37
GetRetryWindow(const std::string & token,int64_t min,int64_t max)38 static RetryWindow GetRetryWindow(const std::string& token, int64_t min,
39 int64_t max) {
40 RetryWindow retry_window;
41 retry_window.mutable_delay_min()->set_seconds(min);
42 retry_window.mutable_delay_max()->set_seconds(max);
43 *retry_window.mutable_retry_token() = token;
44 return retry_window;
45 }
46
Session(grpc::ServerContext * context,grpc::ServerReaderWriter<ServerStreamMessage,ClientStreamMessage> * stream)47 grpc::Status FakeServer::Session(
48 grpc::ServerContext* context,
49 grpc::ServerReaderWriter<ServerStreamMessage, ClientStreamMessage>*
50 stream) {
51 GrpcChunkedBidiStream<ServerStreamMessage, ClientStreamMessage>
52 chunked_bidi_stream(
53 stream, stream,
54 {chunk_size_for_upload_, max_pending_chunks_, compression_level_});
55 ClientStreamMessage request;
56 ServerStreamMessage response;
57 FCP_LOG(INFO) << "Server session started";
58 absl::Status status;
59 while ((status = chunked_bidi_stream.Receive(&request)).ok()) {
60 FCP_LOG(INFO) << "Request is: " << request.DebugString();
61 for (const auto& [key, value] : context->client_metadata()) {
62 client_metadata_.insert(
63 std::make_pair(std::string(key.data(), key.size()),
64 std::string(value.data(), value.size())));
65 }
66 if (request.eligibility_eval_checkin_request()
67 .protocol_options_request()
68 .should_ack_checkin() ||
69 request.checkin_request()
70 .protocol_options_request()
71 .should_ack_checkin()) {
72 ServerStreamMessage checkin_request_ack_msg;
73 auto checkin_request_ack =
74 checkin_request_ack_msg.mutable_checkin_request_ack();
75 *checkin_request_ack->mutable_retry_window_if_accepted() =
76 GetRetryWindow("A", 111L, 222L);
77 *checkin_request_ack->mutable_retry_window_if_rejected() =
78 GetRetryWindow("R", 333L, 444L);
79 if (!chunked_bidi_stream.Send(&checkin_request_ack_msg).ok()) {
80 FCP_LOG(INFO) << "Server returning status " << status;
81 return ToGrpcStatus(status);
82 }
83 }
84 if (request.has_eligibility_eval_checkin_request() ||
85 request.has_checkin_request()) {
86 auto protocol_options_response =
87 request.has_eligibility_eval_checkin_request()
88 ? response.mutable_eligibility_eval_checkin_response()
89 ->mutable_protocol_options_response()
90 : response.mutable_checkin_response()
91 ->mutable_protocol_options_response();
92 protocol_options_response->set_compression_level(compression_level_);
93 protocol_options_response->set_chunk_size_for_upload(
94 chunk_size_for_upload_);
95 protocol_options_response->set_max_pending_chunks(max_pending_chunks_);
96 }
97 if (!(status = Handle(request, &response, &chunked_bidi_stream)).ok()) {
98 FCP_LOG(INFO) << "Server returning status " << status;
99 return ToGrpcStatus(status);
100 }
101 }
102 session_done_.Notify();
103 FCP_LOG(INFO) << "Server returning status " << status;
104 return ToGrpcStatus(status);
105 }
106
GetClientMetadata() const107 std::multimap<std::string, std::string> FakeServer::GetClientMetadata() const {
108 return client_metadata_;
109 }
110
WaitForSessionDone()111 void FakeServer::WaitForSessionDone() { session_done_.WaitForNotification(); }
112
Handle(const ClientStreamMessage & request,ServerStreamMessage * first_reply,GrpcChunkedBidiStream<ServerStreamMessage,ClientStreamMessage> * stream)113 absl::Status FakeServer::Handle(
114 const ClientStreamMessage& request, ServerStreamMessage* first_reply,
115 GrpcChunkedBidiStream<ServerStreamMessage, ClientStreamMessage>* stream) {
116 return stream->Send(first_reply);
117 }
118
119 } // namespace test
120 } // namespace client
121 } // namespace fcp
122