xref: /aosp_15_r20/external/pigweed/pw_grpc/send_queue.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors
2*61c4878aSAndroid Build Coastguard Worker //
3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of
5*61c4878aSAndroid Build Coastguard Worker // the License at
6*61c4878aSAndroid Build Coastguard Worker //
7*61c4878aSAndroid Build Coastguard Worker //     https://www.apache.org/licenses/LICENSE-2.0
8*61c4878aSAndroid Build Coastguard Worker //
9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under
13*61c4878aSAndroid Build Coastguard Worker // the License.
14*61c4878aSAndroid Build Coastguard Worker 
15*61c4878aSAndroid Build Coastguard Worker #include "pw_grpc/send_queue.h"
16*61c4878aSAndroid Build Coastguard Worker 
17*61c4878aSAndroid Build Coastguard Worker namespace pw::grpc {
18*61c4878aSAndroid Build Coastguard Worker 
19*61c4878aSAndroid Build Coastguard Worker std::optional<std::reference_wrapper<SendQueue::SendRequest>>
NextSendRequest()20*61c4878aSAndroid Build Coastguard Worker SendQueue::NextSendRequest() {
21*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(send_mutex_);
22*61c4878aSAndroid Build Coastguard Worker   if (send_requests_.empty()) {
23*61c4878aSAndroid Build Coastguard Worker     return std::nullopt;
24*61c4878aSAndroid Build Coastguard Worker   }
25*61c4878aSAndroid Build Coastguard Worker   auto& front = send_requests_.front();
26*61c4878aSAndroid Build Coastguard Worker   send_requests_.pop_front();
27*61c4878aSAndroid Build Coastguard Worker   return front;
28*61c4878aSAndroid Build Coastguard Worker }
29*61c4878aSAndroid Build Coastguard Worker 
ProcessSendQueue(async::Context &,Status status)30*61c4878aSAndroid Build Coastguard Worker void SendQueue::ProcessSendQueue(async::Context&, Status status) {
31*61c4878aSAndroid Build Coastguard Worker   if (!status.ok()) {
32*61c4878aSAndroid Build Coastguard Worker     return;
33*61c4878aSAndroid Build Coastguard Worker   }
34*61c4878aSAndroid Build Coastguard Worker 
35*61c4878aSAndroid Build Coastguard Worker   auto request = NextSendRequest();
36*61c4878aSAndroid Build Coastguard Worker   while (request.has_value()) {
37*61c4878aSAndroid Build Coastguard Worker     for (auto message : request->get().messages) {
38*61c4878aSAndroid Build Coastguard Worker       request->get().status.Update(socket_.Write(message));
39*61c4878aSAndroid Build Coastguard Worker     }
40*61c4878aSAndroid Build Coastguard Worker     request->get().notify.release();
41*61c4878aSAndroid Build Coastguard Worker     request = NextSendRequest();
42*61c4878aSAndroid Build Coastguard Worker   }
43*61c4878aSAndroid Build Coastguard Worker }
44*61c4878aSAndroid Build Coastguard Worker 
QueueSendRequest(SendRequest & request)45*61c4878aSAndroid Build Coastguard Worker void SendQueue::QueueSendRequest(SendRequest& request) {
46*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(send_mutex_);
47*61c4878aSAndroid Build Coastguard Worker   send_requests_.push_back(request);
48*61c4878aSAndroid Build Coastguard Worker   send_dispatcher_.Cancel(send_task_);
49*61c4878aSAndroid Build Coastguard Worker   send_dispatcher_.Post(send_task_);
50*61c4878aSAndroid Build Coastguard Worker }
51*61c4878aSAndroid Build Coastguard Worker 
CancelSendRequest(SendRequest & request)52*61c4878aSAndroid Build Coastguard Worker void SendQueue::CancelSendRequest(SendRequest& request) {
53*61c4878aSAndroid Build Coastguard Worker   std::lock_guard lock(send_mutex_);
54*61c4878aSAndroid Build Coastguard Worker   send_requests_.remove(request);
55*61c4878aSAndroid Build Coastguard Worker }
56*61c4878aSAndroid Build Coastguard Worker 
SendBytes(ConstByteSpan message)57*61c4878aSAndroid Build Coastguard Worker Status SendQueue::SendBytes(ConstByteSpan message) {
58*61c4878aSAndroid Build Coastguard Worker   std::array<ConstByteSpan, 1> messages = {message};
59*61c4878aSAndroid Build Coastguard Worker   return SendBytesVector(messages);
60*61c4878aSAndroid Build Coastguard Worker }
61*61c4878aSAndroid Build Coastguard Worker 
SendBytesVector(span<ConstByteSpan> messages)62*61c4878aSAndroid Build Coastguard Worker Status SendQueue::SendBytesVector(span<ConstByteSpan> messages) {
63*61c4878aSAndroid Build Coastguard Worker   SendRequest request(messages);
64*61c4878aSAndroid Build Coastguard Worker   QueueSendRequest(request);
65*61c4878aSAndroid Build Coastguard Worker   // TODO: b/345088816 - Add timeout error support to this blocking call.
66*61c4878aSAndroid Build Coastguard Worker   request.notify.acquire();
67*61c4878aSAndroid Build Coastguard Worker   return request.status;
68*61c4878aSAndroid Build Coastguard Worker }
69*61c4878aSAndroid Build Coastguard Worker 
70*61c4878aSAndroid Build Coastguard Worker }  // namespace pw::grpc
71