1 /*
2 * Copyright 2022 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fcp/aggregation/protocol/simple_aggregation/simple_aggregation_protocol.h"
18
19 #include <cstddef>
20 #include <cstdint>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 #include <vector>
25
26 #include "absl/memory/memory.h"
27 #include "absl/status/status.h"
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/synchronization/mutex.h"
31 #include "fcp/aggregation/core/tensor.h"
32 #include "fcp/aggregation/core/tensor_aggregator.h"
33 #include "fcp/aggregation/core/tensor_aggregator_factory.h"
34 #include "fcp/aggregation/core/tensor_aggregator_registry.h"
35 #include "fcp/aggregation/protocol/aggregation_protocol_messages.pb.h"
36 #include "fcp/aggregation/protocol/checkpoint_builder.h"
37 #include "fcp/aggregation/protocol/checkpoint_parser.h"
38 #include "fcp/aggregation/tensorflow/converters.h"
39 #include "fcp/base/monitoring.h"
40 #include "fcp/protos/plan.pb.h"
41
42 namespace fcp::aggregation {
43
44 // Creates an INVALID_ARGUMENT error with the provided error message.
ServerAggregationConfigArgumentError(const Configuration::ServerAggregationConfig & aggregation_config,absl::string_view error_message)45 absl::Status ServerAggregationConfigArgumentError(
46 const Configuration::ServerAggregationConfig& aggregation_config,
47 absl::string_view error_message) {
48 return absl::InvalidArgumentError(
49 absl::StrFormat("ServerAggregationConfig: %s\n:%s", error_message,
50 aggregation_config.DebugString()));
51 }
52
53 // Creates an aggregation intrinsic based on the intrinsic configuration.
54 absl::StatusOr<SimpleAggregationProtocol::Intrinsic>
CreateIntrinsic(const Configuration::ServerAggregationConfig & aggregation_config)55 SimpleAggregationProtocol::CreateIntrinsic(
56 const Configuration::ServerAggregationConfig& aggregation_config) {
57 // Resolve the intrinsic_uri to the registered TensorAggregatorFactory.
58 FCP_ASSIGN_OR_RETURN(
59 const TensorAggregatorFactory* factory,
60 GetAggregatorFactory(aggregation_config.intrinsic_uri()));
61
62 // Convert the input tensor specification.
63 FCP_ASSIGN_OR_RETURN(
64 TensorSpec input_spec,
65 tensorflow::ConvertTensorSpec(
66 aggregation_config.intrinsic_args(0).input_tensor()));
67
68 // Convert the output tensor specification.
69 FCP_ASSIGN_OR_RETURN(
70 TensorSpec output_spec,
71 tensorflow::ConvertTensorSpec(aggregation_config.output_tensors(0)));
72
73 // TODO(team): currently the input and output data type and shape are
74 // expected to be the same.
75 if (input_spec.dtype() != output_spec.dtype() ||
76 input_spec.shape() != output_spec.shape()) {
77 return ServerAggregationConfigArgumentError(
78 aggregation_config, "Input and output tensors have mismatched specs.");
79 }
80
81 // Use the factory to create the TensorAggregator instance.
82 FCP_ASSIGN_OR_RETURN(std::unique_ptr<TensorAggregator> aggregator,
83 factory->Create(input_spec.dtype(), input_spec.shape()));
84
85 return Intrinsic{std::move(input_spec), std::move(output_spec),
86 std::move(aggregator)};
87 }
88
ValidateConfig(const Configuration & configuration)89 absl::Status SimpleAggregationProtocol::ValidateConfig(
90 const Configuration& configuration) {
91 for (const Configuration::ServerAggregationConfig& aggregation_config :
92 configuration.aggregation_configs()) {
93 // TODO(team): Add support for other intrinsics after MVP launch.
94 if (!GetAggregatorFactory(aggregation_config.intrinsic_uri()).ok()) {
95 return ServerAggregationConfigArgumentError(
96 aggregation_config,
97 absl::StrFormat("%s is not a supported intrinsic_uri.",
98 aggregation_config.intrinsic_uri()));
99 }
100
101 // TODO(team): Support multiple intrinsic args.
102 if (aggregation_config.intrinsic_args_size() != 1) {
103 return ServerAggregationConfigArgumentError(
104 aggregation_config, "Exactly one intrinsic argument is expected.");
105 }
106
107 if (aggregation_config.output_tensors_size() != 1) {
108 return ServerAggregationConfigArgumentError(
109 aggregation_config, "Exactly one output tensor is expected.");
110 }
111
112 if (!aggregation_config.intrinsic_args(0).has_input_tensor()) {
113 return ServerAggregationConfigArgumentError(
114 aggregation_config, "Intrinsic arguments must be input tensors.");
115 }
116 }
117 return absl::OkStatus();
118 }
119
120 absl::StatusOr<std::unique_ptr<SimpleAggregationProtocol>>
Create(const Configuration & configuration,AggregationProtocol::Callback * callback,const CheckpointParserFactory * checkpoint_parser_factory,const CheckpointBuilderFactory * checkpoint_builder_factory,ResourceResolver * resource_resolver)121 SimpleAggregationProtocol::Create(
122 const Configuration& configuration, AggregationProtocol::Callback* callback,
123 const CheckpointParserFactory* checkpoint_parser_factory,
124 const CheckpointBuilderFactory* checkpoint_builder_factory,
125 ResourceResolver* resource_resolver) {
126 FCP_CHECK(callback != nullptr);
127 FCP_CHECK(checkpoint_parser_factory != nullptr);
128 FCP_CHECK(checkpoint_builder_factory != nullptr);
129 FCP_CHECK(resource_resolver != nullptr);
130 FCP_RETURN_IF_ERROR(ValidateConfig(configuration));
131
132 std::vector<Intrinsic> intrinsics;
133 for (const Configuration::ServerAggregationConfig& aggregation_config :
134 configuration.aggregation_configs()) {
135 FCP_ASSIGN_OR_RETURN(Intrinsic intrinsic,
136 CreateIntrinsic(aggregation_config));
137 intrinsics.emplace_back(std::move(intrinsic));
138 }
139
140 return absl::WrapUnique(new SimpleAggregationProtocol(
141 std::move(intrinsics), callback, checkpoint_parser_factory,
142 checkpoint_builder_factory, resource_resolver));
143 }
144
SimpleAggregationProtocol(std::vector<Intrinsic> intrinsics,AggregationProtocol::Callback * callback,const CheckpointParserFactory * checkpoint_parser_factory,const CheckpointBuilderFactory * checkpoint_builder_factory,ResourceResolver * resource_resolver)145 SimpleAggregationProtocol::SimpleAggregationProtocol(
146 std::vector<Intrinsic> intrinsics, AggregationProtocol::Callback* callback,
147 const CheckpointParserFactory* checkpoint_parser_factory,
148 const CheckpointBuilderFactory* checkpoint_builder_factory,
149 ResourceResolver* resource_resolver)
150 : protocol_state_(PROTOCOL_CREATED),
151 intrinsics_(std::move(intrinsics)),
152 callback_(callback),
153 checkpoint_parser_factory_(checkpoint_parser_factory),
154 checkpoint_builder_factory_(checkpoint_builder_factory),
155 resource_resolver_(resource_resolver) {}
156
ProtocolStateDebugString(ProtocolState state)157 absl::string_view SimpleAggregationProtocol::ProtocolStateDebugString(
158 ProtocolState state) {
159 switch (state) {
160 case PROTOCOL_CREATED:
161 return "PROTOCOL_CREATED";
162 case PROTOCOL_STARTED:
163 return "PROTOCOL_STARTED";
164 case PROTOCOL_COMPLETED:
165 return "PROTOCOL_COMPLETED";
166 case PROTOCOL_ABORTED:
167 return "PROTOCOL_ABORTED";
168 }
169 }
170
ClientStateDebugString(ClientState state)171 absl::string_view SimpleAggregationProtocol::ClientStateDebugString(
172 ClientState state) {
173 switch (state) {
174 case CLIENT_PENDING:
175 return "CLIENT_PENDING";
176 case CLIENT_RECEIVED_INPUT_AND_PENDING:
177 return "CLIENT_RECEIVED_INPUT_AND_PENDING";
178 case CLIENT_COMPLETED:
179 return "CLIENT_COMPLETED";
180 case CLIENT_FAILED:
181 return "CLIENT_FAILED";
182 case CLIENT_ABORTED:
183 return "CLIENT_ABORTED";
184 case CLIENT_DISCARDED:
185 return "CLIENT_DISCARDED";
186 }
187 }
188
CheckProtocolState(ProtocolState state) const189 absl::Status SimpleAggregationProtocol::CheckProtocolState(
190 ProtocolState state) const {
191 if (protocol_state_ != state) {
192 return absl::FailedPreconditionError(
193 absl::StrFormat("The current protocol state is %s, expected %s.",
194 ProtocolStateDebugString(protocol_state_),
195 ProtocolStateDebugString(state)));
196 }
197 return absl::OkStatus();
198 }
199
SetProtocolState(ProtocolState state)200 void SimpleAggregationProtocol::SetProtocolState(ProtocolState state) {
201 FCP_CHECK(
202 (protocol_state_ == PROTOCOL_CREATED && state == PROTOCOL_STARTED) ||
203 (protocol_state_ == PROTOCOL_STARTED &&
204 (state == PROTOCOL_COMPLETED || state == PROTOCOL_ABORTED)))
205 << "Invalid protocol state transition from "
206 << ProtocolStateDebugString(protocol_state_) << " to "
207 << ProtocolStateDebugString(state) << ".";
208 protocol_state_ = state;
209 }
210
211 absl::StatusOr<SimpleAggregationProtocol::ClientState>
GetClientState(int64_t client_id) const212 SimpleAggregationProtocol::GetClientState(int64_t client_id) const {
213 if (client_id < 0 || client_id >= client_states_.size()) {
214 return absl::InvalidArgumentError(
215 absl::StrFormat("client_id %ld is outside the valid range", client_id));
216 }
217 return client_states_[client_id];
218 }
219
SetClientState(int64_t client_id,ClientState to_state)220 void SimpleAggregationProtocol::SetClientState(int64_t client_id,
221 ClientState to_state) {
222 FCP_CHECK(client_id >= 0 && client_id < client_states_.size());
223 ClientState from_state = client_states_[client_id];
224 FCP_CHECK(from_state != to_state);
225 if (from_state == CLIENT_RECEIVED_INPUT_AND_PENDING) {
226 num_clients_received_and_pending_--;
227 } else if (from_state == CLIENT_COMPLETED) {
228 FCP_CHECK(to_state == CLIENT_DISCARDED)
229 << "Client state can't be changed from CLIENT_COMPLETED to "
230 << ClientStateDebugString(to_state);
231 num_clients_aggregated_--;
232 } else {
233 FCP_CHECK(from_state == CLIENT_PENDING)
234 << "Client state can't be changed from "
235 << ClientStateDebugString(from_state);
236 }
237 client_states_[client_id] = to_state;
238 switch (to_state) {
239 case CLIENT_PENDING:
240 FCP_LOG(FATAL) << "Client state can't be changed to CLIENT_PENDING";
241 break;
242 case CLIENT_RECEIVED_INPUT_AND_PENDING:
243 num_clients_received_and_pending_++;
244 break;
245 case CLIENT_COMPLETED:
246 num_clients_aggregated_++;
247 break;
248 case CLIENT_FAILED:
249 num_clients_failed_++;
250 break;
251 case CLIENT_ABORTED:
252 num_clients_aborted_++;
253 break;
254 case CLIENT_DISCARDED:
255 num_clients_discarded_++;
256 break;
257 }
258 }
259
260 absl::StatusOr<SimpleAggregationProtocol::TensorMap>
ParseCheckpoint(absl::Cord report) const261 SimpleAggregationProtocol::ParseCheckpoint(absl::Cord report) const {
262 FCP_ASSIGN_OR_RETURN(std::unique_ptr<CheckpointParser> parser,
263 checkpoint_parser_factory_->Create(report));
264 TensorMap tensor_map;
265 for (const auto& intrinsic : intrinsics_) {
266 // TODO(team): Support multiple input tensors.
267 FCP_ASSIGN_OR_RETURN(Tensor tensor,
268 parser->GetTensor(intrinsic.input.name()));
269 if (tensor.dtype() != intrinsic.input.dtype() ||
270 tensor.shape() != intrinsic.input.shape()) {
271 // TODO(team): Detailed diagnostics including the expected vs
272 // actual data types and shapes.
273 return absl::InvalidArgumentError("Input tensor spec mismatch.");
274 }
275 tensor_map.emplace(intrinsic.input.name(), std::move(tensor));
276 }
277
278 return tensor_map;
279 }
280
AggregateClientInput(SimpleAggregationProtocol::TensorMap tensor_map)281 absl::Status SimpleAggregationProtocol::AggregateClientInput(
282 SimpleAggregationProtocol::TensorMap tensor_map) {
283 absl::MutexLock lock(&aggregation_mu_);
284 if (!aggregation_finished_) {
285 for (const auto& intrinsic : intrinsics_) {
286 // TODO(team): Support multiple input tensors.
287 const auto& it = tensor_map.find(intrinsic.input.name());
288 FCP_CHECK(it != tensor_map.end());
289 FCP_CHECK(intrinsic.aggregator != nullptr)
290 << "CreateReport() has already been called.";
291 FCP_RETURN_IF_ERROR(intrinsic.aggregator->Accumulate(it->second));
292 }
293 }
294 return absl::OkStatus();
295 }
296
CreateReport()297 absl::StatusOr<absl::Cord> SimpleAggregationProtocol::CreateReport() {
298 absl::MutexLock lock(&aggregation_mu_);
299 for (auto& intrinsic : intrinsics_) {
300 FCP_CHECK(intrinsic.aggregator != nullptr)
301 << "CreateReport() has already been called.";
302 if (!intrinsic.aggregator->CanReport()) {
303 return absl::FailedPreconditionError(
304 "The aggregation can't be completed due to failed preconditions.");
305 }
306 }
307
308 // Build the resulting checkpoint.
309 std::unique_ptr<CheckpointBuilder> checkpoint_builder =
310 checkpoint_builder_factory_->Create();
311 for (auto& intrinsic : intrinsics_) {
312 FCP_ASSIGN_OR_RETURN(OutputTensorList output_tensors,
313 std::move(*intrinsic.aggregator).Report());
314 // TODO(team): Support multiple output tensors per intrinsic.
315 FCP_CHECK(output_tensors.size() == 1);
316 const Tensor& tensor = output_tensors[0];
317 FCP_CHECK(tensor.dtype() == intrinsic.output.dtype());
318 FCP_CHECK(tensor.shape() == intrinsic.output.shape());
319 FCP_RETURN_IF_ERROR(
320 checkpoint_builder->Add(intrinsic.output.name(), tensor));
321 }
322 aggregation_finished_ = true;
323 return checkpoint_builder->Build();
324 }
325
Start(int64_t num_clients)326 absl::Status SimpleAggregationProtocol::Start(int64_t num_clients) {
327 if (num_clients < 0) {
328 return absl::InvalidArgumentError("Number of clients cannot be negative.");
329 }
330 {
331 absl::MutexLock lock(&state_mu_);
332 FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_CREATED));
333 SetProtocolState(PROTOCOL_STARTED);
334 FCP_CHECK(client_states_.empty());
335 client_states_.resize(num_clients, CLIENT_PENDING);
336 }
337 if (num_clients > 0) {
338 AcceptanceMessage acceptance_message;
339 callback_->OnAcceptClients(0, num_clients, acceptance_message);
340 }
341 return absl::OkStatus();
342 }
343
AddClients(int64_t num_clients)344 absl::Status SimpleAggregationProtocol::AddClients(int64_t num_clients) {
345 int64_t start_index;
346 {
347 absl::MutexLock lock(&state_mu_);
348 FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_STARTED));
349 if (num_clients <= 0) {
350 return absl::InvalidArgumentError("Non-zero number of clients required");
351 }
352 start_index = client_states_.size();
353 client_states_.resize(start_index + num_clients, CLIENT_PENDING);
354 }
355 AcceptanceMessage acceptance_message;
356 callback_->OnAcceptClients(start_index, num_clients, acceptance_message);
357 return absl::OkStatus();
358 }
359
ReceiveClientMessage(int64_t client_id,const ClientMessage & message)360 absl::Status SimpleAggregationProtocol::ReceiveClientMessage(
361 int64_t client_id, const ClientMessage& message) {
362 if (!message.has_simple_aggregation() ||
363 !message.simple_aggregation().has_input()) {
364 return absl::InvalidArgumentError("Unexpected message");
365 }
366
367 if (!message.simple_aggregation().input().has_inline_bytes() &&
368 !message.simple_aggregation().input().has_uri()) {
369 return absl::InvalidArgumentError(
370 "Only inline_bytes or uri type of input is supported");
371 }
372
373 // Verify the state.
374 {
375 absl::MutexLock lock(&state_mu_);
376 if (protocol_state_ == PROTOCOL_CREATED) {
377 return absl::FailedPreconditionError("The protocol hasn't been started");
378 }
379 FCP_ASSIGN_OR_RETURN(auto client_state, GetClientState(client_id));
380 if (client_state != CLIENT_PENDING) {
381 // TODO(team): Decide whether the logging level should be INFO or
382 // WARNING, or perhaps it should depend on the client state (e.g. WARNING
383 // for COMPLETED and INFO for other states).
384 FCP_LOG(INFO) << "ReceiveClientMessage: client " << client_id
385 << " message ignored, the state is already "
386 << ClientStateDebugString(client_state);
387 return absl::OkStatus();
388 }
389 SetClientState(client_id, CLIENT_RECEIVED_INPUT_AND_PENDING);
390 }
391
392 absl::Status client_completion_status = absl::OkStatus();
393 ClientState client_completion_state = CLIENT_COMPLETED;
394
395 absl::Cord report;
396 if (message.simple_aggregation().input().has_inline_bytes()) {
397 // Parse the client input concurrently with other protocol calls.
398 report =
399 absl::Cord(message.simple_aggregation().input().inline_bytes());
400 } else {
401 absl::StatusOr<absl::Cord> report_or_status =
402 resource_resolver_->RetrieveResource(
403 client_id, message.simple_aggregation().input().uri());
404 if (!report_or_status.ok()) {
405 client_completion_status = report_or_status.status();
406 client_completion_state = CLIENT_FAILED;
407 FCP_LOG(WARNING) << "Report with resource uri "
408 << message.simple_aggregation().input().uri()
409 << " for client " << client_id << "is missing. "
410 << client_completion_status.ToString();
411 } else {
412 report = std::move(report_or_status.value());
413 }
414 }
415
416 if (client_completion_state != CLIENT_FAILED) {
417 absl::StatusOr<TensorMap> tensor_map_or_status =
418 ParseCheckpoint(std::move(report));
419 if (!tensor_map_or_status.ok()) {
420 client_completion_status = tensor_map_or_status.status();
421 client_completion_state = CLIENT_FAILED;
422 FCP_LOG(WARNING) << "Client " << client_id << " input can't be parsed: "
423 << client_completion_status.ToString();
424 } else {
425 // Aggregate the client input which would block on aggregation_mu_ if
426 // there are any concurrent AggregateClientInput calls.
427 client_completion_status =
428 AggregateClientInput(std::move(tensor_map_or_status).value());
429 if (!client_completion_status.ok()) {
430 client_completion_state = CLIENT_DISCARDED;
431 FCP_LOG(INFO) << "Client " << client_id << " input is discarded: "
432 << client_completion_status.ToString();
433 }
434 }
435 }
436
437 // Update the state post aggregation.
438 {
439 absl::MutexLock lock(&state_mu_);
440 // Change the client state only if the current state is still
441 // CLIENT_RECEIVED_INPUT_AND_PENDING, meaning that the client wasn't already
442 // closed by a concurrent Complete or Abort call.
443 if (client_states_[client_id] == CLIENT_RECEIVED_INPUT_AND_PENDING) {
444 SetClientState(client_id, client_completion_state);
445 callback_->OnCloseClient(client_id, client_completion_status);
446 }
447 }
448 return absl::OkStatus();
449 }
450
CloseClient(int64_t client_id,absl::Status client_status)451 absl::Status SimpleAggregationProtocol::CloseClient(
452 int64_t client_id, absl::Status client_status) {
453 {
454 absl::MutexLock lock(&state_mu_);
455 if (protocol_state_ == PROTOCOL_CREATED) {
456 return absl::FailedPreconditionError("The protocol hasn't been started");
457 }
458 FCP_ASSIGN_OR_RETURN(auto client_state, GetClientState(client_id));
459 // Close the client only if the client is currently pending.
460 if (client_state == CLIENT_PENDING) {
461 FCP_LOG(INFO) << "Closing client " << client_id << " with the status "
462 << client_status.ToString();
463 SetClientState(client_id,
464 client_status.ok() ? CLIENT_DISCARDED : CLIENT_FAILED);
465 }
466 }
467
468 return absl::OkStatus();
469 }
470
Complete()471 absl::Status SimpleAggregationProtocol::Complete() {
472 absl::Cord result;
473 std::vector<int64_t> client_ids_to_close;
474 {
475 absl::MutexLock lock(&state_mu_);
476 FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_STARTED));
477 FCP_ASSIGN_OR_RETURN(result, CreateReport());
478 SetProtocolState(PROTOCOL_COMPLETED);
479 for (int64_t client_id = 0; client_id < client_states_.size();
480 client_id++) {
481 switch (client_states_[client_id]) {
482 case CLIENT_PENDING:
483 SetClientState(client_id, CLIENT_ABORTED);
484 client_ids_to_close.push_back(client_id);
485 break;
486 case CLIENT_RECEIVED_INPUT_AND_PENDING:
487 SetClientState(client_id, CLIENT_DISCARDED);
488 client_ids_to_close.push_back(client_id);
489 break;
490 default:
491 break;
492 }
493 }
494 }
495 for (int64_t client_id : client_ids_to_close) {
496 callback_->OnCloseClient(
497 client_id, absl::AbortedError("The protocol has completed before the "
498 "client input has been aggregated."));
499 }
500 callback_->OnComplete(std::move(result));
501 return absl::OkStatus();
502 }
503
Abort()504 absl::Status SimpleAggregationProtocol::Abort() {
505 std::vector<int64_t> client_ids_to_close;
506 {
507 absl::MutexLock lock(&state_mu_);
508 FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_STARTED));
509 aggregation_finished_ = true;
510 SetProtocolState(PROTOCOL_ABORTED);
511 for (int64_t client_id = 0; client_id < client_states_.size();
512 client_id++) {
513 switch (client_states_[client_id]) {
514 case CLIENT_PENDING:
515 SetClientState(client_id, CLIENT_ABORTED);
516 client_ids_to_close.push_back(client_id);
517 break;
518 case CLIENT_RECEIVED_INPUT_AND_PENDING:
519 SetClientState(client_id, CLIENT_DISCARDED);
520 client_ids_to_close.push_back(client_id);
521 break;
522 case CLIENT_COMPLETED:
523 SetClientState(client_id, CLIENT_DISCARDED);
524 break;
525 default:
526 break;
527 }
528 }
529 }
530
531 for (int64_t client_id : client_ids_to_close) {
532 callback_->OnCloseClient(
533 client_id, absl::AbortedError("The protocol has aborted before the "
534 "client input has been aggregated."));
535 }
536 return absl::OkStatus();
537 }
538
GetStatus()539 StatusMessage SimpleAggregationProtocol::GetStatus() {
540 absl::MutexLock lock(&state_mu_);
541 int64_t num_clients_completed = num_clients_received_and_pending_ +
542 num_clients_aggregated_ +
543 num_clients_discarded_;
544 StatusMessage message;
545 message.set_num_clients_completed(num_clients_completed);
546 message.set_num_clients_failed(num_clients_failed_);
547 message.set_num_clients_pending(client_states_.size() -
548 num_clients_completed - num_clients_failed_ -
549 num_clients_aborted_);
550 message.set_num_inputs_aggregated_and_included(num_clients_aggregated_);
551 message.set_num_inputs_aggregated_and_pending(
552 num_clients_received_and_pending_);
553 message.set_num_clients_aborted(num_clients_aborted_);
554 message.set_num_inputs_discarded(num_clients_discarded_);
555 return message;
556 }
557
558 } // namespace fcp::aggregation
559