xref: /aosp_15_r20/external/federated-compute/fcp/client/grpc_bidi_stream_test.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2019 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fcp/client/grpc_bidi_stream.h"
18 
19 #include <memory>
20 #include <string>
21 
22 #include "gmock/gmock.h"
23 #include "gtest/gtest.h"
24 #include "absl/status/status.h"
25 #include "absl/strings/str_cat.h"
26 #include "absl/time/clock.h"
27 #include "absl/time/time.h"
28 #include "fcp/base/monitoring.h"
29 #include "fcp/base/scheduler.h"
30 #include "fcp/client/fake_server.h"
31 #include "fcp/client/test_helpers.h"
32 #include "fcp/testing/testing.h"
33 #include "grpcpp/server_builder.h"
34 
35 namespace fcp {
36 namespace client {
37 namespace test {
38 namespace {
39 
40 using google::internal::federatedml::v2::ClientStreamMessage;
41 using google::internal::federatedml::v2::ServerStreamMessage;
42 using ::testing::Contains;
43 using ::testing::Not;
44 using ::testing::Pair;
45 
46 class GrpcBidiStreamTest : public testing::Test {
47  protected:
SetUp()48   void SetUp() override { BuildAndStartServer(); }
49 
TearDown()50   void TearDown() override {
51     server_->Shutdown();
52     server_->Wait();
53   }
54 
CreateClient(const std::string & population_name="")55   void CreateClient(const std::string& population_name = "") {
56     client_stream_ = std::make_unique<GrpcBidiStream>(
57         absl::StrCat("dns:///localhost", ":", port_), "none", population_name,
58         /* grpc_channel_deadline_seconds=*/600);
59     FCP_LOG(INFO) << "Client created." << std::endl;
60   }
61 
62   std::unique_ptr<GrpcBidiStream> client_stream_;
63   FakeServer server_impl_;
64 
65  private:
BuildAndStartServer()66   void BuildAndStartServer() {
67     grpc::ServerBuilder builder;
68     builder.AddListeningPort("dns:///localhost:0",
69                              grpc::InsecureServerCredentials(), &port_);
70     builder.RegisterService(&server_impl_);
71     server_ = builder.BuildAndStart();
72   }
73   // Variables that must be in scope for the lifetime of a test but are not
74   // used by test code.
75   int port_ = 0;
76   std::unique_ptr<grpc::Server> server_;
77 };
78 
TEST_F(GrpcBidiStreamTest,ClientContainsPopulationMetadata)79 TEST_F(GrpcBidiStreamTest, ClientContainsPopulationMetadata) {
80   CreateClient("population_name");
81   ClientStreamMessage request;
82   request.mutable_checkin_request();
83   EXPECT_THAT(client_stream_->Send(&request), IsOk());
84   ServerStreamMessage reply;
85   EXPECT_THAT(client_stream_->Receive(&reply), IsOk());
86   EXPECT_TRUE(reply.has_checkin_response()) << reply.DebugString();
87   EXPECT_THAT(server_impl_.GetClientMetadata(),
88               Contains(Pair(GrpcBidiStream::kApiKeyHeader, "none")));
89   EXPECT_THAT(
90       server_impl_.GetClientMetadata(),
91       Contains(Pair(GrpcBidiStream::kPopulationNameHeader, "population_name")));
92   client_stream_->Close();
93   server_impl_.WaitForSessionDone();
94 }
95 
TEST_F(GrpcBidiStreamTest,CancellationDuringBlockingOp)96 TEST_F(GrpcBidiStreamTest, CancellationDuringBlockingOp) {
97   CreateClient();
98   auto pool = CreateThreadPoolScheduler(1);
99   pool->Schedule([this]() {
100     sleep(1);
101     client_stream_->Close();
102   });
103   ServerStreamMessage reply;
104   auto start = absl::Now();
105   // Will block indefinitely, as the default FakeServer requires a request
106   // before sending a response.
107   EXPECT_THAT(client_stream_->Receive(&reply),
108               IsCode(absl::StatusCode::kCancelled));
109   EXPECT_GE(absl::Now() - start, absl::Seconds(1));
110 
111   server_impl_.WaitForSessionDone();
112 
113   // Idempotency check:
114   client_stream_->Close();
115   EXPECT_THAT(client_stream_->Receive(&reply), Not(IsOk()));
116   pool->WaitUntilIdle();
117 }
118 
TEST_F(GrpcBidiStreamTest,CancellationBeforeSend)119 TEST_F(GrpcBidiStreamTest, CancellationBeforeSend) {
120   CreateClient();
121   absl::Status status;
122   client_stream_->Close();
123   server_impl_.WaitForSessionDone();
124   ClientStreamMessage request;
125   request.mutable_checkin_request();
126   EXPECT_THAT(client_stream_->Send(&request),
127               IsCode(absl::StatusCode::kCancelled));
128 }
129 
TEST_F(GrpcBidiStreamTest,CancellationBeforeReceive)130 TEST_F(GrpcBidiStreamTest, CancellationBeforeReceive) {
131   CreateClient();
132   ClientStreamMessage request;
133   request.mutable_checkin_request();
134   EXPECT_THAT(client_stream_->Send(&request), IsOk());
135   client_stream_->Close();
136   server_impl_.WaitForSessionDone();
137   ServerStreamMessage reply;
138   EXPECT_THAT(client_stream_->Receive(&reply),
139               IsCode(absl::StatusCode::kCancelled));
140   // Idempotency check:
141   EXPECT_THAT(client_stream_->Receive(&reply),
142               IsCode(absl::StatusCode::kCancelled));
143 }
144 
TEST_F(GrpcBidiStreamTest,CancellationWithoutBlockingOp)145 TEST_F(GrpcBidiStreamTest, CancellationWithoutBlockingOp) {
146   CreateClient();
147   ClientStreamMessage request;
148   request.mutable_checkin_request();
149   EXPECT_THAT(client_stream_->Send(&request), IsOk());
150   ServerStreamMessage reply;
151   EXPECT_THAT(client_stream_->Receive(&reply), IsOk());
152   EXPECT_TRUE(reply.has_checkin_response()) << reply.DebugString();
153   EXPECT_THAT(server_impl_.GetClientMetadata(),
154               Contains(Pair(GrpcBidiStream::kApiKeyHeader, "none")));
155   EXPECT_THAT(server_impl_.GetClientMetadata(),
156               Contains(Pair(GrpcBidiStream::kPopulationNameHeader, "")));
157 
158   client_stream_->Close();
159   server_impl_.WaitForSessionDone();
160 }
161 
162 }  // namespace
163 }  // namespace test
164 }  // namespace client
165 }  // namespace fcp
166