xref: /aosp_15_r20/external/federated-compute/fcp/aggregation/protocol/aggregation_protocol.h (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2022 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef FCP_AGGREGATION_PROTOCOL_AGGREGATION_PROTOCOL_H_
18 #define FCP_AGGREGATION_PROTOCOL_AGGREGATION_PROTOCOL_H_
19 
20 #include "absl/status/status.h"
21 #include "absl/strings/cord.h"
22 #include "fcp/aggregation/protocol/aggregation_protocol_messages.pb.h"
23 
24 namespace fcp::aggregation {
25 
26 // Describes a abstract aggregation protocol interface between a networking
27 // layer (e.g. a service that handles receiving and sending messages with the
28 // client devices) and an implementation of an aggregation algorithm.
29 //
30 // The design of the AggregationProtocol follows a Bridge Pattern
31 // (https://en.wikipedia.org/wiki/Bridge_pattern) in that it is meant to
32 // decouple an abstraction of the layers above and below the AggregationProtocol
33 // from the implementation.
34 //
35 // In this interface the receiving and sending contributing inputs or
36 // messages is abstracted from the actual mechanism for sending and receiving
37 // data over the network and from the actual aggregation mechanism.
38 //
39 // Client identification: the real client identities are hidden from the
40 // protocol implementations. Instead each client is identified by a client_id
41 // number in a range [0, num_clients) where num_clients is the number of clients
42 // the protocol started with or the extended number of clients, which is the
43 // sum of the starting num_clients and num_clients passed to each subsequent
44 // AddClients call.
45 //
46 // Thread safety: for any given client identified by a unique client_id, the
47 // protocol methods are expected to be called sequentially. But there are no
48 // assumptions about concurrent calls made for different clients. Specific
49 // implementations of AggregationProtocol are expected to handle concurrent
50 // calls. The caller side of the protocol isn't expected to queue messages.
51 class AggregationProtocol {
52  public:
53   AggregationProtocol() = default;
54   virtual ~AggregationProtocol() = default;
55 
56   // Instructs the protocol to start with the specified number of clients.
57   //
58   // Depending on the protocol implementation, the starting number of clients
59   // may be zero.  This method is guaranteed to be the first method called on
60   // the protocol.
61   //
62   // AcceptClients callback is expected in response to this method.
63   virtual absl::Status Start(int64_t num_clients) = 0;
64 
65   // Adds an additional batch of clients to the protocol.
66   //
67   // Depending on the protocol implementation, adding clients may not be allowed
68   // and this method might return an error Status.
69   //
70   // AcceptClients callback is expected in response to this method.
71   virtual absl::Status AddClients(int64_t num_clients) = 0;
72 
73   // Handles a message from a given client.
74   //
75   // Depending on the specific protocol implementation there may be multiple
76   // messages exchanged with each clients.
77   //
78   // This method should return an error status only if there is an unrecoverable
79   // error which must result in aborting the protocol.  Any client specific
80   // error, like an invalid message, should result in closing the protocol with
81   // that specific client only, but this method should still return OK status.
82   virtual absl::Status ReceiveClientMessage(int64_t client_id,
83                                             const ClientMessage& message) = 0;
84 
85   // Notifies the protocol about a communication with a given client being
86   // closed, either normally or abnormally.
87   //
88   // The client_status indicates whether the client connection was closed
89   // normally.
90   //
91   // No further calls or callbacks specific to the given client are expected
92   // after this method.
93   virtual absl::Status CloseClient(int64_t client_id,
94                                    absl::Status client_status) = 0;
95 
96   // Forces the protocol to complete.
97   //
98   // Once the protocol has completed successfully, the Complete callback will
99   // be invoked and provide the aggregation result.  If the protocol cannot be
100   // completed in its current state, this method should return an error status.
101   // It is also possible for the completion to fail eventually due to finishing
102   // some asynchronous work, in which case the Abort callback will be invoked.
103   //
104   // No further protocol method calls except Abort and GetStatus are expected
105   // after this method.
106   virtual absl::Status Complete() = 0;
107 
108   // Forces the protocol to Abort.
109   //
110   // No further protocol method calls except GetStatus are expected after this
111   // method.
112   virtual absl::Status Abort() = 0;
113 
114   // Called periodically to receive the protocol status.
115   //
116   // This method can still be called after the protocol has been completed or
117   // aborted.
118   virtual StatusMessage GetStatus() = 0;
119 
120   // Callback interface which methods are implemented by the protocol host.
121   class Callback {
122    public:
123     Callback() = default;
124     virtual ~Callback() = default;
125 
126     // Called in response to either StartProtocol or AddClients methods being
127     // called and provides protocol parameters to be broadcasted to all newly
128     // joined clients.
129     virtual void OnAcceptClients(int64_t start_client_id, int64_t num_clients,
130                                  const AcceptanceMessage& message) = 0;
131 
132     // Called by the protocol to deliver a message to a given client.
133     //
134     // Depending on the specific protocol implementation there may be multiple
135     // messages exchanged with each clients, but not all protocols need to
136     // send messages to clients.
137     virtual void OnSendServerMessage(int64_t client_id,
138                                      const ServerMessage& message) = 0;
139 
140     // Called by the protocol to force communication with a client to be closed,
141     // for example due to a client specific error or due to the protocol getting
142     // into a state where no further input for that client is needed.
143     //
144     // No further calls or callbacks specific to the given client are expected
145     // after this method.
146     virtual void OnCloseClient(int64_t client_id,
147                                absl::Status diagnostic_status) = 0;
148 
149     // Indicates successful completion of the aggregation protocol, contains
150     // the result of the aggregation.
151     //
152     // The format of the result blob is unspecified and can be different for
153     // each specific aggregation protocol implementation.  Completing the
154     // protocol should close communications with all remaining clients.
155     virtual void OnComplete(absl::Cord result) = 0;
156 
157     // Called by the protocol to indicate that the protocol has been aborted
158     // for internal reasons (e.g. the number of remaining clients dropping
159     // too low).
160     //
161     // Aborting the protocol should close communications with all remaining
162     // clients.
163     virtual void OnAbort(absl::Status diagnostic_status) = 0;
164   };
165 };
166 
167 }  // namespace fcp::aggregation
168 
169 #endif  // FCP_AGGREGATION_PROTOCOL_AGGREGATION_PROTOCOL_H_
170