1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fcp/aggregation/protocol/simple_aggregation/simple_aggregation_protocol.h"
18 
19 #include <cstddef>
20 #include <cstdint>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/memory/memory.h"
27 #include "absl/status/status.h"
28 #include "absl/strings/str_format.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/synchronization/mutex.h"
31 #include "fcp/aggregation/core/tensor.h"
32 #include "fcp/aggregation/core/tensor_aggregator.h"
33 #include "fcp/aggregation/core/tensor_aggregator_factory.h"
34 #include "fcp/aggregation/core/tensor_aggregator_registry.h"
35 #include "fcp/aggregation/protocol/aggregation_protocol_messages.pb.h"
36 #include "fcp/aggregation/protocol/checkpoint_builder.h"
37 #include "fcp/aggregation/protocol/checkpoint_parser.h"
38 #include "fcp/aggregation/tensorflow/converters.h"
39 #include "fcp/base/monitoring.h"
40 #include "fcp/protos/plan.pb.h"
41 
42 namespace fcp::aggregation {
43 
44 // Creates an INVALID_ARGUMENT error with the provided error message.
ServerAggregationConfigArgumentError(const Configuration::ServerAggregationConfig & aggregation_config,absl::string_view error_message)45 absl::Status ServerAggregationConfigArgumentError(
46     const Configuration::ServerAggregationConfig& aggregation_config,
47     absl::string_view error_message) {
48   return absl::InvalidArgumentError(
49       absl::StrFormat("ServerAggregationConfig: %s\n:%s", error_message,
50                       aggregation_config.DebugString()));
51 }
52 
53 // Creates an aggregation intrinsic based on the intrinsic configuration.
54 absl::StatusOr<SimpleAggregationProtocol::Intrinsic>
CreateIntrinsic(const Configuration::ServerAggregationConfig & aggregation_config)55 SimpleAggregationProtocol::CreateIntrinsic(
56     const Configuration::ServerAggregationConfig& aggregation_config) {
57   // Resolve the intrinsic_uri to the registered TensorAggregatorFactory.
58   FCP_ASSIGN_OR_RETURN(
59       const TensorAggregatorFactory* factory,
60       GetAggregatorFactory(aggregation_config.intrinsic_uri()));
61 
62   // Convert the input tensor specification.
63   FCP_ASSIGN_OR_RETURN(
64       TensorSpec input_spec,
65       tensorflow::ConvertTensorSpec(
66           aggregation_config.intrinsic_args(0).input_tensor()));
67 
68   // Convert the output tensor specification.
69   FCP_ASSIGN_OR_RETURN(
70       TensorSpec output_spec,
71       tensorflow::ConvertTensorSpec(aggregation_config.output_tensors(0)));
72 
73   // TODO(team): currently the input and output data type and shape are
74   // expected to be the same.
75   if (input_spec.dtype() != output_spec.dtype() ||
76       input_spec.shape() != output_spec.shape()) {
77     return ServerAggregationConfigArgumentError(
78         aggregation_config, "Input and output tensors have mismatched specs.");
79   }
80 
81   // Use the factory to create the TensorAggregator instance.
82   FCP_ASSIGN_OR_RETURN(std::unique_ptr<TensorAggregator> aggregator,
83                        factory->Create(input_spec.dtype(), input_spec.shape()));
84 
85   return Intrinsic{std::move(input_spec), std::move(output_spec),
86                    std::move(aggregator)};
87 }
88 
ValidateConfig(const Configuration & configuration)89 absl::Status SimpleAggregationProtocol::ValidateConfig(
90     const Configuration& configuration) {
91   for (const Configuration::ServerAggregationConfig& aggregation_config :
92        configuration.aggregation_configs()) {
93     // TODO(team): Add support for other intrinsics after MVP launch.
94     if (!GetAggregatorFactory(aggregation_config.intrinsic_uri()).ok()) {
95       return ServerAggregationConfigArgumentError(
96           aggregation_config,
97           absl::StrFormat("%s is not a supported intrinsic_uri.",
98                           aggregation_config.intrinsic_uri()));
99     }
100 
101     // TODO(team): Support multiple intrinsic args.
102     if (aggregation_config.intrinsic_args_size() != 1) {
103       return ServerAggregationConfigArgumentError(
104           aggregation_config, "Exactly one intrinsic argument is expected.");
105     }
106 
107     if (aggregation_config.output_tensors_size() != 1) {
108       return ServerAggregationConfigArgumentError(
109           aggregation_config, "Exactly one output tensor is expected.");
110     }
111 
112     if (!aggregation_config.intrinsic_args(0).has_input_tensor()) {
113       return ServerAggregationConfigArgumentError(
114           aggregation_config, "Intrinsic arguments must be input tensors.");
115     }
116   }
117   return absl::OkStatus();
118 }
119 
120 absl::StatusOr<std::unique_ptr<SimpleAggregationProtocol>>
Create(const Configuration & configuration,AggregationProtocol::Callback * callback,const CheckpointParserFactory * checkpoint_parser_factory,const CheckpointBuilderFactory * checkpoint_builder_factory,ResourceResolver * resource_resolver)121 SimpleAggregationProtocol::Create(
122     const Configuration& configuration, AggregationProtocol::Callback* callback,
123     const CheckpointParserFactory* checkpoint_parser_factory,
124     const CheckpointBuilderFactory* checkpoint_builder_factory,
125     ResourceResolver* resource_resolver) {
126   FCP_CHECK(callback != nullptr);
127   FCP_CHECK(checkpoint_parser_factory != nullptr);
128   FCP_CHECK(checkpoint_builder_factory != nullptr);
129   FCP_CHECK(resource_resolver != nullptr);
130   FCP_RETURN_IF_ERROR(ValidateConfig(configuration));
131 
132   std::vector<Intrinsic> intrinsics;
133   for (const Configuration::ServerAggregationConfig& aggregation_config :
134        configuration.aggregation_configs()) {
135     FCP_ASSIGN_OR_RETURN(Intrinsic intrinsic,
136                          CreateIntrinsic(aggregation_config));
137     intrinsics.emplace_back(std::move(intrinsic));
138   }
139 
140   return absl::WrapUnique(new SimpleAggregationProtocol(
141       std::move(intrinsics), callback, checkpoint_parser_factory,
142       checkpoint_builder_factory, resource_resolver));
143 }
144 
SimpleAggregationProtocol(std::vector<Intrinsic> intrinsics,AggregationProtocol::Callback * callback,const CheckpointParserFactory * checkpoint_parser_factory,const CheckpointBuilderFactory * checkpoint_builder_factory,ResourceResolver * resource_resolver)145 SimpleAggregationProtocol::SimpleAggregationProtocol(
146     std::vector<Intrinsic> intrinsics, AggregationProtocol::Callback* callback,
147     const CheckpointParserFactory* checkpoint_parser_factory,
148     const CheckpointBuilderFactory* checkpoint_builder_factory,
149     ResourceResolver* resource_resolver)
150     : protocol_state_(PROTOCOL_CREATED),
151       intrinsics_(std::move(intrinsics)),
152       callback_(callback),
153       checkpoint_parser_factory_(checkpoint_parser_factory),
154       checkpoint_builder_factory_(checkpoint_builder_factory),
155       resource_resolver_(resource_resolver) {}
156 
ProtocolStateDebugString(ProtocolState state)157 absl::string_view SimpleAggregationProtocol::ProtocolStateDebugString(
158     ProtocolState state) {
159   switch (state) {
160     case PROTOCOL_CREATED:
161       return "PROTOCOL_CREATED";
162     case PROTOCOL_STARTED:
163       return "PROTOCOL_STARTED";
164     case PROTOCOL_COMPLETED:
165       return "PROTOCOL_COMPLETED";
166     case PROTOCOL_ABORTED:
167       return "PROTOCOL_ABORTED";
168   }
169 }
170 
ClientStateDebugString(ClientState state)171 absl::string_view SimpleAggregationProtocol::ClientStateDebugString(
172     ClientState state) {
173   switch (state) {
174     case CLIENT_PENDING:
175       return "CLIENT_PENDING";
176     case CLIENT_RECEIVED_INPUT_AND_PENDING:
177       return "CLIENT_RECEIVED_INPUT_AND_PENDING";
178     case CLIENT_COMPLETED:
179       return "CLIENT_COMPLETED";
180     case CLIENT_FAILED:
181       return "CLIENT_FAILED";
182     case CLIENT_ABORTED:
183       return "CLIENT_ABORTED";
184     case CLIENT_DISCARDED:
185       return "CLIENT_DISCARDED";
186   }
187 }
188 
CheckProtocolState(ProtocolState state) const189 absl::Status SimpleAggregationProtocol::CheckProtocolState(
190     ProtocolState state) const {
191   if (protocol_state_ != state) {
192     return absl::FailedPreconditionError(
193         absl::StrFormat("The current protocol state is %s, expected %s.",
194                         ProtocolStateDebugString(protocol_state_),
195                         ProtocolStateDebugString(state)));
196   }
197   return absl::OkStatus();
198 }
199 
SetProtocolState(ProtocolState state)200 void SimpleAggregationProtocol::SetProtocolState(ProtocolState state) {
201   FCP_CHECK(
202       (protocol_state_ == PROTOCOL_CREATED && state == PROTOCOL_STARTED) ||
203       (protocol_state_ == PROTOCOL_STARTED &&
204        (state == PROTOCOL_COMPLETED || state == PROTOCOL_ABORTED)))
205       << "Invalid protocol state transition from "
206       << ProtocolStateDebugString(protocol_state_) << " to "
207       << ProtocolStateDebugString(state) << ".";
208   protocol_state_ = state;
209 }
210 
211 absl::StatusOr<SimpleAggregationProtocol::ClientState>
GetClientState(int64_t client_id) const212 SimpleAggregationProtocol::GetClientState(int64_t client_id) const {
213   if (client_id < 0 || client_id >= client_states_.size()) {
214     return absl::InvalidArgumentError(
215         absl::StrFormat("client_id %ld is outside the valid range", client_id));
216   }
217   return client_states_[client_id];
218 }
219 
SetClientState(int64_t client_id,ClientState to_state)220 void SimpleAggregationProtocol::SetClientState(int64_t client_id,
221                                                ClientState to_state) {
222   FCP_CHECK(client_id >= 0 && client_id < client_states_.size());
223   ClientState from_state = client_states_[client_id];
224   FCP_CHECK(from_state != to_state);
225   if (from_state == CLIENT_RECEIVED_INPUT_AND_PENDING) {
226     num_clients_received_and_pending_--;
227   } else if (from_state == CLIENT_COMPLETED) {
228     FCP_CHECK(to_state == CLIENT_DISCARDED)
229         << "Client state can't be changed from CLIENT_COMPLETED to "
230         << ClientStateDebugString(to_state);
231     num_clients_aggregated_--;
232   } else {
233     FCP_CHECK(from_state == CLIENT_PENDING)
234         << "Client state can't be changed from "
235         << ClientStateDebugString(from_state);
236   }
237   client_states_[client_id] = to_state;
238   switch (to_state) {
239     case CLIENT_PENDING:
240       FCP_LOG(FATAL) << "Client state can't be changed to CLIENT_PENDING";
241       break;
242     case CLIENT_RECEIVED_INPUT_AND_PENDING:
243       num_clients_received_and_pending_++;
244       break;
245     case CLIENT_COMPLETED:
246       num_clients_aggregated_++;
247       break;
248     case CLIENT_FAILED:
249       num_clients_failed_++;
250       break;
251     case CLIENT_ABORTED:
252       num_clients_aborted_++;
253       break;
254     case CLIENT_DISCARDED:
255       num_clients_discarded_++;
256       break;
257   }
258 }
259 
260 absl::StatusOr<SimpleAggregationProtocol::TensorMap>
ParseCheckpoint(absl::Cord report) const261 SimpleAggregationProtocol::ParseCheckpoint(absl::Cord report) const {
262   FCP_ASSIGN_OR_RETURN(std::unique_ptr<CheckpointParser> parser,
263                        checkpoint_parser_factory_->Create(report));
264   TensorMap tensor_map;
265   for (const auto& intrinsic : intrinsics_) {
266     // TODO(team): Support multiple input tensors.
267     FCP_ASSIGN_OR_RETURN(Tensor tensor,
268                          parser->GetTensor(intrinsic.input.name()));
269     if (tensor.dtype() != intrinsic.input.dtype() ||
270         tensor.shape() != intrinsic.input.shape()) {
271       // TODO(team): Detailed diagnostics including the expected vs
272       // actual data types and shapes.
273       return absl::InvalidArgumentError("Input tensor spec mismatch.");
274     }
275     tensor_map.emplace(intrinsic.input.name(), std::move(tensor));
276   }
277 
278   return tensor_map;
279 }
280 
AggregateClientInput(SimpleAggregationProtocol::TensorMap tensor_map)281 absl::Status SimpleAggregationProtocol::AggregateClientInput(
282     SimpleAggregationProtocol::TensorMap tensor_map) {
283   absl::MutexLock lock(&aggregation_mu_);
284   if (!aggregation_finished_) {
285     for (const auto& intrinsic : intrinsics_) {
286       // TODO(team): Support multiple input tensors.
287       const auto& it = tensor_map.find(intrinsic.input.name());
288       FCP_CHECK(it != tensor_map.end());
289       FCP_CHECK(intrinsic.aggregator != nullptr)
290           << "CreateReport() has already been called.";
291       FCP_RETURN_IF_ERROR(intrinsic.aggregator->Accumulate(it->second));
292     }
293   }
294   return absl::OkStatus();
295 }
296 
CreateReport()297 absl::StatusOr<absl::Cord> SimpleAggregationProtocol::CreateReport() {
298   absl::MutexLock lock(&aggregation_mu_);
299   for (auto& intrinsic : intrinsics_) {
300     FCP_CHECK(intrinsic.aggregator != nullptr)
301         << "CreateReport() has already been called.";
302     if (!intrinsic.aggregator->CanReport()) {
303       return absl::FailedPreconditionError(
304           "The aggregation can't be completed due to failed preconditions.");
305     }
306   }
307 
308   // Build the resulting checkpoint.
309   std::unique_ptr<CheckpointBuilder> checkpoint_builder =
310       checkpoint_builder_factory_->Create();
311   for (auto& intrinsic : intrinsics_) {
312     FCP_ASSIGN_OR_RETURN(OutputTensorList output_tensors,
313                          std::move(*intrinsic.aggregator).Report());
314     // TODO(team): Support multiple output tensors per intrinsic.
315     FCP_CHECK(output_tensors.size() == 1);
316     const Tensor& tensor = output_tensors[0];
317     FCP_CHECK(tensor.dtype() == intrinsic.output.dtype());
318     FCP_CHECK(tensor.shape() == intrinsic.output.shape());
319     FCP_RETURN_IF_ERROR(
320         checkpoint_builder->Add(intrinsic.output.name(), tensor));
321   }
322   aggregation_finished_ = true;
323   return checkpoint_builder->Build();
324 }
325 
Start(int64_t num_clients)326 absl::Status SimpleAggregationProtocol::Start(int64_t num_clients) {
327   if (num_clients < 0) {
328     return absl::InvalidArgumentError("Number of clients cannot be negative.");
329   }
330   {
331     absl::MutexLock lock(&state_mu_);
332     FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_CREATED));
333     SetProtocolState(PROTOCOL_STARTED);
334     FCP_CHECK(client_states_.empty());
335     client_states_.resize(num_clients, CLIENT_PENDING);
336   }
337   if (num_clients > 0) {
338     AcceptanceMessage acceptance_message;
339     callback_->OnAcceptClients(0, num_clients, acceptance_message);
340   }
341   return absl::OkStatus();
342 }
343 
AddClients(int64_t num_clients)344 absl::Status SimpleAggregationProtocol::AddClients(int64_t num_clients) {
345   int64_t start_index;
346   {
347     absl::MutexLock lock(&state_mu_);
348     FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_STARTED));
349     if (num_clients <= 0) {
350       return absl::InvalidArgumentError("Non-zero number of clients required");
351     }
352     start_index = client_states_.size();
353     client_states_.resize(start_index + num_clients, CLIENT_PENDING);
354   }
355   AcceptanceMessage acceptance_message;
356   callback_->OnAcceptClients(start_index, num_clients, acceptance_message);
357   return absl::OkStatus();
358 }
359 
ReceiveClientMessage(int64_t client_id,const ClientMessage & message)360 absl::Status SimpleAggregationProtocol::ReceiveClientMessage(
361     int64_t client_id, const ClientMessage& message) {
362   if (!message.has_simple_aggregation() ||
363       !message.simple_aggregation().has_input()) {
364     return absl::InvalidArgumentError("Unexpected message");
365   }
366 
367   if (!message.simple_aggregation().input().has_inline_bytes() &&
368       !message.simple_aggregation().input().has_uri()) {
369     return absl::InvalidArgumentError(
370         "Only inline_bytes or uri type of input is supported");
371   }
372 
373   // Verify the state.
374   {
375     absl::MutexLock lock(&state_mu_);
376     if (protocol_state_ == PROTOCOL_CREATED) {
377       return absl::FailedPreconditionError("The protocol hasn't been started");
378     }
379     FCP_ASSIGN_OR_RETURN(auto client_state, GetClientState(client_id));
380     if (client_state != CLIENT_PENDING) {
381       // TODO(team): Decide whether the logging level should be INFO or
382       // WARNING, or perhaps it should depend on the client state (e.g. WARNING
383       // for COMPLETED and INFO for other states).
384       FCP_LOG(INFO) << "ReceiveClientMessage: client " << client_id
385                     << " message ignored, the state is already "
386                     << ClientStateDebugString(client_state);
387       return absl::OkStatus();
388     }
389     SetClientState(client_id, CLIENT_RECEIVED_INPUT_AND_PENDING);
390   }
391 
392   absl::Status client_completion_status = absl::OkStatus();
393   ClientState client_completion_state = CLIENT_COMPLETED;
394 
395   absl::Cord report;
396   if (message.simple_aggregation().input().has_inline_bytes()) {
397     // Parse the client input concurrently with other protocol calls.
398     report =
399         absl::Cord(message.simple_aggregation().input().inline_bytes());
400   } else {
401     absl::StatusOr<absl::Cord> report_or_status =
402         resource_resolver_->RetrieveResource(
403             client_id, message.simple_aggregation().input().uri());
404     if (!report_or_status.ok()) {
405       client_completion_status = report_or_status.status();
406       client_completion_state = CLIENT_FAILED;
407       FCP_LOG(WARNING) << "Report with resource uri "
408                        << message.simple_aggregation().input().uri()
409                        << " for client " << client_id << "is missing. "
410                        << client_completion_status.ToString();
411     } else {
412       report = std::move(report_or_status.value());
413     }
414   }
415 
416   if (client_completion_state != CLIENT_FAILED) {
417     absl::StatusOr<TensorMap> tensor_map_or_status =
418         ParseCheckpoint(std::move(report));
419     if (!tensor_map_or_status.ok()) {
420       client_completion_status = tensor_map_or_status.status();
421       client_completion_state = CLIENT_FAILED;
422       FCP_LOG(WARNING) << "Client " << client_id << " input can't be parsed: "
423                        << client_completion_status.ToString();
424     } else {
425       // Aggregate the client input which would block on aggregation_mu_ if
426       // there are any concurrent AggregateClientInput calls.
427       client_completion_status =
428           AggregateClientInput(std::move(tensor_map_or_status).value());
429       if (!client_completion_status.ok()) {
430         client_completion_state = CLIENT_DISCARDED;
431         FCP_LOG(INFO) << "Client " << client_id << " input is discarded: "
432                       << client_completion_status.ToString();
433       }
434     }
435   }
436 
437   // Update the state post aggregation.
438   {
439     absl::MutexLock lock(&state_mu_);
440     // Change the client state only if the current state is still
441     // CLIENT_RECEIVED_INPUT_AND_PENDING, meaning that the client wasn't already
442     // closed by a concurrent Complete or Abort call.
443     if (client_states_[client_id] == CLIENT_RECEIVED_INPUT_AND_PENDING) {
444       SetClientState(client_id, client_completion_state);
445       callback_->OnCloseClient(client_id, client_completion_status);
446     }
447   }
448   return absl::OkStatus();
449 }
450 
CloseClient(int64_t client_id,absl::Status client_status)451 absl::Status SimpleAggregationProtocol::CloseClient(
452     int64_t client_id, absl::Status client_status) {
453   {
454     absl::MutexLock lock(&state_mu_);
455     if (protocol_state_ == PROTOCOL_CREATED) {
456       return absl::FailedPreconditionError("The protocol hasn't been started");
457     }
458     FCP_ASSIGN_OR_RETURN(auto client_state, GetClientState(client_id));
459     // Close the client only if the client is currently pending.
460     if (client_state == CLIENT_PENDING) {
461       FCP_LOG(INFO) << "Closing client " << client_id << " with the status "
462                     << client_status.ToString();
463       SetClientState(client_id,
464                      client_status.ok() ? CLIENT_DISCARDED : CLIENT_FAILED);
465     }
466   }
467 
468   return absl::OkStatus();
469 }
470 
Complete()471 absl::Status SimpleAggregationProtocol::Complete() {
472   absl::Cord result;
473   std::vector<int64_t> client_ids_to_close;
474   {
475     absl::MutexLock lock(&state_mu_);
476     FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_STARTED));
477     FCP_ASSIGN_OR_RETURN(result, CreateReport());
478     SetProtocolState(PROTOCOL_COMPLETED);
479     for (int64_t client_id = 0; client_id < client_states_.size();
480          client_id++) {
481       switch (client_states_[client_id]) {
482         case CLIENT_PENDING:
483           SetClientState(client_id, CLIENT_ABORTED);
484           client_ids_to_close.push_back(client_id);
485           break;
486         case CLIENT_RECEIVED_INPUT_AND_PENDING:
487           SetClientState(client_id, CLIENT_DISCARDED);
488           client_ids_to_close.push_back(client_id);
489           break;
490         default:
491           break;
492       }
493     }
494   }
495   for (int64_t client_id : client_ids_to_close) {
496     callback_->OnCloseClient(
497         client_id, absl::AbortedError("The protocol has completed before the "
498                                       "client input has been aggregated."));
499   }
500   callback_->OnComplete(std::move(result));
501   return absl::OkStatus();
502 }
503 
Abort()504 absl::Status SimpleAggregationProtocol::Abort() {
505   std::vector<int64_t> client_ids_to_close;
506   {
507     absl::MutexLock lock(&state_mu_);
508     FCP_RETURN_IF_ERROR(CheckProtocolState(PROTOCOL_STARTED));
509     aggregation_finished_ = true;
510     SetProtocolState(PROTOCOL_ABORTED);
511     for (int64_t client_id = 0; client_id < client_states_.size();
512          client_id++) {
513       switch (client_states_[client_id]) {
514         case CLIENT_PENDING:
515           SetClientState(client_id, CLIENT_ABORTED);
516           client_ids_to_close.push_back(client_id);
517           break;
518         case CLIENT_RECEIVED_INPUT_AND_PENDING:
519           SetClientState(client_id, CLIENT_DISCARDED);
520           client_ids_to_close.push_back(client_id);
521           break;
522         case CLIENT_COMPLETED:
523           SetClientState(client_id, CLIENT_DISCARDED);
524           break;
525         default:
526           break;
527       }
528     }
529   }
530 
531   for (int64_t client_id : client_ids_to_close) {
532     callback_->OnCloseClient(
533         client_id, absl::AbortedError("The protocol has aborted before the "
534                                       "client input has been aggregated."));
535   }
536   return absl::OkStatus();
537 }
538 
GetStatus()539 StatusMessage SimpleAggregationProtocol::GetStatus() {
540   absl::MutexLock lock(&state_mu_);
541   int64_t num_clients_completed = num_clients_received_and_pending_ +
542                                   num_clients_aggregated_ +
543                                   num_clients_discarded_;
544   StatusMessage message;
545   message.set_num_clients_completed(num_clients_completed);
546   message.set_num_clients_failed(num_clients_failed_);
547   message.set_num_clients_pending(client_states_.size() -
548                                   num_clients_completed - num_clients_failed_ -
549                                   num_clients_aborted_);
550   message.set_num_inputs_aggregated_and_included(num_clients_aggregated_);
551   message.set_num_inputs_aggregated_and_pending(
552       num_clients_received_and_pending_);
553   message.set_num_clients_aborted(num_clients_aborted_);
554   message.set_num_inputs_discarded(num_clients_discarded_);
555   return message;
556 }
557 
558 }  // namespace fcp::aggregation
559