1 /* 2 * Copyright 2022 Google LLC 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef FCP_AGGREGATION_PROTOCOL_AGGREGATION_PROTOCOL_H_ 18 #define FCP_AGGREGATION_PROTOCOL_AGGREGATION_PROTOCOL_H_ 19 20 #include "absl/status/status.h" 21 #include "absl/strings/cord.h" 22 #include "fcp/aggregation/protocol/aggregation_protocol_messages.pb.h" 23 24 namespace fcp::aggregation { 25 26 // Describes a abstract aggregation protocol interface between a networking 27 // layer (e.g. a service that handles receiving and sending messages with the 28 // client devices) and an implementation of an aggregation algorithm. 29 // 30 // The design of the AggregationProtocol follows a Bridge Pattern 31 // (https://en.wikipedia.org/wiki/Bridge_pattern) in that it is meant to 32 // decouple an abstraction of the layers above and below the AggregationProtocol 33 // from the implementation. 34 // 35 // In this interface the receiving and sending contributing inputs or 36 // messages is abstracted from the actual mechanism for sending and receiving 37 // data over the network and from the actual aggregation mechanism. 38 // 39 // Client identification: the real client identities are hidden from the 40 // protocol implementations. Instead each client is identified by a client_id 41 // number in a range [0, num_clients) where num_clients is the number of clients 42 // the protocol started with or the extended number of clients, which is the 43 // sum of the starting num_clients and num_clients passed to each subsequent 44 // AddClients call. 45 // 46 // Thread safety: for any given client identified by a unique client_id, the 47 // protocol methods are expected to be called sequentially. But there are no 48 // assumptions about concurrent calls made for different clients. Specific 49 // implementations of AggregationProtocol are expected to handle concurrent 50 // calls. The caller side of the protocol isn't expected to queue messages. 51 class AggregationProtocol { 52 public: 53 AggregationProtocol() = default; 54 virtual ~AggregationProtocol() = default; 55 56 // Instructs the protocol to start with the specified number of clients. 57 // 58 // Depending on the protocol implementation, the starting number of clients 59 // may be zero. This method is guaranteed to be the first method called on 60 // the protocol. 61 // 62 // AcceptClients callback is expected in response to this method. 63 virtual absl::Status Start(int64_t num_clients) = 0; 64 65 // Adds an additional batch of clients to the protocol. 66 // 67 // Depending on the protocol implementation, adding clients may not be allowed 68 // and this method might return an error Status. 69 // 70 // AcceptClients callback is expected in response to this method. 71 virtual absl::Status AddClients(int64_t num_clients) = 0; 72 73 // Handles a message from a given client. 74 // 75 // Depending on the specific protocol implementation there may be multiple 76 // messages exchanged with each clients. 77 // 78 // This method should return an error status only if there is an unrecoverable 79 // error which must result in aborting the protocol. Any client specific 80 // error, like an invalid message, should result in closing the protocol with 81 // that specific client only, but this method should still return OK status. 82 virtual absl::Status ReceiveClientMessage(int64_t client_id, 83 const ClientMessage& message) = 0; 84 85 // Notifies the protocol about a communication with a given client being 86 // closed, either normally or abnormally. 87 // 88 // The client_status indicates whether the client connection was closed 89 // normally. 90 // 91 // No further calls or callbacks specific to the given client are expected 92 // after this method. 93 virtual absl::Status CloseClient(int64_t client_id, 94 absl::Status client_status) = 0; 95 96 // Forces the protocol to complete. 97 // 98 // Once the protocol has completed successfully, the Complete callback will 99 // be invoked and provide the aggregation result. If the protocol cannot be 100 // completed in its current state, this method should return an error status. 101 // It is also possible for the completion to fail eventually due to finishing 102 // some asynchronous work, in which case the Abort callback will be invoked. 103 // 104 // No further protocol method calls except Abort and GetStatus are expected 105 // after this method. 106 virtual absl::Status Complete() = 0; 107 108 // Forces the protocol to Abort. 109 // 110 // No further protocol method calls except GetStatus are expected after this 111 // method. 112 virtual absl::Status Abort() = 0; 113 114 // Called periodically to receive the protocol status. 115 // 116 // This method can still be called after the protocol has been completed or 117 // aborted. 118 virtual StatusMessage GetStatus() = 0; 119 120 // Callback interface which methods are implemented by the protocol host. 121 class Callback { 122 public: 123 Callback() = default; 124 virtual ~Callback() = default; 125 126 // Called in response to either StartProtocol or AddClients methods being 127 // called and provides protocol parameters to be broadcasted to all newly 128 // joined clients. 129 virtual void OnAcceptClients(int64_t start_client_id, int64_t num_clients, 130 const AcceptanceMessage& message) = 0; 131 132 // Called by the protocol to deliver a message to a given client. 133 // 134 // Depending on the specific protocol implementation there may be multiple 135 // messages exchanged with each clients, but not all protocols need to 136 // send messages to clients. 137 virtual void OnSendServerMessage(int64_t client_id, 138 const ServerMessage& message) = 0; 139 140 // Called by the protocol to force communication with a client to be closed, 141 // for example due to a client specific error or due to the protocol getting 142 // into a state where no further input for that client is needed. 143 // 144 // No further calls or callbacks specific to the given client are expected 145 // after this method. 146 virtual void OnCloseClient(int64_t client_id, 147 absl::Status diagnostic_status) = 0; 148 149 // Indicates successful completion of the aggregation protocol, contains 150 // the result of the aggregation. 151 // 152 // The format of the result blob is unspecified and can be different for 153 // each specific aggregation protocol implementation. Completing the 154 // protocol should close communications with all remaining clients. 155 virtual void OnComplete(absl::Cord result) = 0; 156 157 // Called by the protocol to indicate that the protocol has been aborted 158 // for internal reasons (e.g. the number of remaining clients dropping 159 // too low). 160 // 161 // Aborting the protocol should close communications with all remaining 162 // clients. 163 virtual void OnAbort(absl::Status diagnostic_status) = 0; 164 }; 165 }; 166 167 } // namespace fcp::aggregation 168 169 #endif // FCP_AGGREGATION_PROTOCOL_AGGREGATION_PROTOCOL_H_ 170