1 /*
2 * Copyright 2019 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fcp/client/grpc_bidi_stream.h"
18
19 #include <memory>
20 #include <string>
21
22 #include "gmock/gmock.h"
23 #include "gtest/gtest.h"
24 #include "absl/status/status.h"
25 #include "absl/strings/str_cat.h"
26 #include "absl/time/clock.h"
27 #include "absl/time/time.h"
28 #include "fcp/base/monitoring.h"
29 #include "fcp/base/scheduler.h"
30 #include "fcp/client/fake_server.h"
31 #include "fcp/client/test_helpers.h"
32 #include "fcp/testing/testing.h"
33 #include "grpcpp/server_builder.h"
34
35 namespace fcp {
36 namespace client {
37 namespace test {
38 namespace {
39
40 using google::internal::federatedml::v2::ClientStreamMessage;
41 using google::internal::federatedml::v2::ServerStreamMessage;
42 using ::testing::Contains;
43 using ::testing::Not;
44 using ::testing::Pair;
45
46 class GrpcBidiStreamTest : public testing::Test {
47 protected:
SetUp()48 void SetUp() override { BuildAndStartServer(); }
49
TearDown()50 void TearDown() override {
51 server_->Shutdown();
52 server_->Wait();
53 }
54
CreateClient(const std::string & population_name="")55 void CreateClient(const std::string& population_name = "") {
56 client_stream_ = std::make_unique<GrpcBidiStream>(
57 absl::StrCat("dns:///localhost", ":", port_), "none", population_name,
58 /* grpc_channel_deadline_seconds=*/600);
59 FCP_LOG(INFO) << "Client created." << std::endl;
60 }
61
62 std::unique_ptr<GrpcBidiStream> client_stream_;
63 FakeServer server_impl_;
64
65 private:
BuildAndStartServer()66 void BuildAndStartServer() {
67 grpc::ServerBuilder builder;
68 builder.AddListeningPort("dns:///localhost:0",
69 grpc::InsecureServerCredentials(), &port_);
70 builder.RegisterService(&server_impl_);
71 server_ = builder.BuildAndStart();
72 }
73 // Variables that must be in scope for the lifetime of a test but are not
74 // used by test code.
75 int port_ = 0;
76 std::unique_ptr<grpc::Server> server_;
77 };
78
TEST_F(GrpcBidiStreamTest,ClientContainsPopulationMetadata)79 TEST_F(GrpcBidiStreamTest, ClientContainsPopulationMetadata) {
80 CreateClient("population_name");
81 ClientStreamMessage request;
82 request.mutable_checkin_request();
83 EXPECT_THAT(client_stream_->Send(&request), IsOk());
84 ServerStreamMessage reply;
85 EXPECT_THAT(client_stream_->Receive(&reply), IsOk());
86 EXPECT_TRUE(reply.has_checkin_response()) << reply.DebugString();
87 EXPECT_THAT(server_impl_.GetClientMetadata(),
88 Contains(Pair(GrpcBidiStream::kApiKeyHeader, "none")));
89 EXPECT_THAT(
90 server_impl_.GetClientMetadata(),
91 Contains(Pair(GrpcBidiStream::kPopulationNameHeader, "population_name")));
92 client_stream_->Close();
93 server_impl_.WaitForSessionDone();
94 }
95
TEST_F(GrpcBidiStreamTest,CancellationDuringBlockingOp)96 TEST_F(GrpcBidiStreamTest, CancellationDuringBlockingOp) {
97 CreateClient();
98 auto pool = CreateThreadPoolScheduler(1);
99 pool->Schedule([this]() {
100 sleep(1);
101 client_stream_->Close();
102 });
103 ServerStreamMessage reply;
104 auto start = absl::Now();
105 // Will block indefinitely, as the default FakeServer requires a request
106 // before sending a response.
107 EXPECT_THAT(client_stream_->Receive(&reply),
108 IsCode(absl::StatusCode::kCancelled));
109 EXPECT_GE(absl::Now() - start, absl::Seconds(1));
110
111 server_impl_.WaitForSessionDone();
112
113 // Idempotency check:
114 client_stream_->Close();
115 EXPECT_THAT(client_stream_->Receive(&reply), Not(IsOk()));
116 pool->WaitUntilIdle();
117 }
118
TEST_F(GrpcBidiStreamTest,CancellationBeforeSend)119 TEST_F(GrpcBidiStreamTest, CancellationBeforeSend) {
120 CreateClient();
121 absl::Status status;
122 client_stream_->Close();
123 server_impl_.WaitForSessionDone();
124 ClientStreamMessage request;
125 request.mutable_checkin_request();
126 EXPECT_THAT(client_stream_->Send(&request),
127 IsCode(absl::StatusCode::kCancelled));
128 }
129
TEST_F(GrpcBidiStreamTest,CancellationBeforeReceive)130 TEST_F(GrpcBidiStreamTest, CancellationBeforeReceive) {
131 CreateClient();
132 ClientStreamMessage request;
133 request.mutable_checkin_request();
134 EXPECT_THAT(client_stream_->Send(&request), IsOk());
135 client_stream_->Close();
136 server_impl_.WaitForSessionDone();
137 ServerStreamMessage reply;
138 EXPECT_THAT(client_stream_->Receive(&reply),
139 IsCode(absl::StatusCode::kCancelled));
140 // Idempotency check:
141 EXPECT_THAT(client_stream_->Receive(&reply),
142 IsCode(absl::StatusCode::kCancelled));
143 }
144
TEST_F(GrpcBidiStreamTest,CancellationWithoutBlockingOp)145 TEST_F(GrpcBidiStreamTest, CancellationWithoutBlockingOp) {
146 CreateClient();
147 ClientStreamMessage request;
148 request.mutable_checkin_request();
149 EXPECT_THAT(client_stream_->Send(&request), IsOk());
150 ServerStreamMessage reply;
151 EXPECT_THAT(client_stream_->Receive(&reply), IsOk());
152 EXPECT_TRUE(reply.has_checkin_response()) << reply.DebugString();
153 EXPECT_THAT(server_impl_.GetClientMetadata(),
154 Contains(Pair(GrpcBidiStream::kApiKeyHeader, "none")));
155 EXPECT_THAT(server_impl_.GetClientMetadata(),
156 Contains(Pair(GrpcBidiStream::kPopulationNameHeader, "")));
157
158 client_stream_->Close();
159 server_impl_.WaitForSessionDone();
160 }
161
162 } // namespace
163 } // namespace test
164 } // namespace client
165 } // namespace fcp
166