1*61c4878aSAndroid Build Coastguard Worker // Copyright 2024 The Pigweed Authors 2*61c4878aSAndroid Build Coastguard Worker // 3*61c4878aSAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4*61c4878aSAndroid Build Coastguard Worker // use this file except in compliance with the License. You may obtain a copy of 5*61c4878aSAndroid Build Coastguard Worker // the License at 6*61c4878aSAndroid Build Coastguard Worker // 7*61c4878aSAndroid Build Coastguard Worker // https://www.apache.org/licenses/LICENSE-2.0 8*61c4878aSAndroid Build Coastguard Worker // 9*61c4878aSAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software 10*61c4878aSAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11*61c4878aSAndroid Build Coastguard Worker // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12*61c4878aSAndroid Build Coastguard Worker // License for the specific language governing permissions and limitations under 13*61c4878aSAndroid Build Coastguard Worker // the License. 14*61c4878aSAndroid Build Coastguard Worker 15*61c4878aSAndroid Build Coastguard Worker #include "pw_grpc/send_queue.h" 16*61c4878aSAndroid Build Coastguard Worker 17*61c4878aSAndroid Build Coastguard Worker namespace pw::grpc { 18*61c4878aSAndroid Build Coastguard Worker 19*61c4878aSAndroid Build Coastguard Worker std::optional<std::reference_wrapper<SendQueue::SendRequest>> NextSendRequest()20*61c4878aSAndroid Build Coastguard WorkerSendQueue::NextSendRequest() { 21*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(send_mutex_); 22*61c4878aSAndroid Build Coastguard Worker if (send_requests_.empty()) { 23*61c4878aSAndroid Build Coastguard Worker return std::nullopt; 24*61c4878aSAndroid Build Coastguard Worker } 25*61c4878aSAndroid Build Coastguard Worker auto& front = send_requests_.front(); 26*61c4878aSAndroid Build Coastguard Worker send_requests_.pop_front(); 27*61c4878aSAndroid Build Coastguard Worker return front; 28*61c4878aSAndroid Build Coastguard Worker } 29*61c4878aSAndroid Build Coastguard Worker ProcessSendQueue(async::Context &,Status status)30*61c4878aSAndroid Build Coastguard Workervoid SendQueue::ProcessSendQueue(async::Context&, Status status) { 31*61c4878aSAndroid Build Coastguard Worker if (!status.ok()) { 32*61c4878aSAndroid Build Coastguard Worker return; 33*61c4878aSAndroid Build Coastguard Worker } 34*61c4878aSAndroid Build Coastguard Worker 35*61c4878aSAndroid Build Coastguard Worker auto request = NextSendRequest(); 36*61c4878aSAndroid Build Coastguard Worker while (request.has_value()) { 37*61c4878aSAndroid Build Coastguard Worker for (auto message : request->get().messages) { 38*61c4878aSAndroid Build Coastguard Worker request->get().status.Update(socket_.Write(message)); 39*61c4878aSAndroid Build Coastguard Worker } 40*61c4878aSAndroid Build Coastguard Worker request->get().notify.release(); 41*61c4878aSAndroid Build Coastguard Worker request = NextSendRequest(); 42*61c4878aSAndroid Build Coastguard Worker } 43*61c4878aSAndroid Build Coastguard Worker } 44*61c4878aSAndroid Build Coastguard Worker QueueSendRequest(SendRequest & request)45*61c4878aSAndroid Build Coastguard Workervoid SendQueue::QueueSendRequest(SendRequest& request) { 46*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(send_mutex_); 47*61c4878aSAndroid Build Coastguard Worker send_requests_.push_back(request); 48*61c4878aSAndroid Build Coastguard Worker send_dispatcher_.Cancel(send_task_); 49*61c4878aSAndroid Build Coastguard Worker send_dispatcher_.Post(send_task_); 50*61c4878aSAndroid Build Coastguard Worker } 51*61c4878aSAndroid Build Coastguard Worker CancelSendRequest(SendRequest & request)52*61c4878aSAndroid Build Coastguard Workervoid SendQueue::CancelSendRequest(SendRequest& request) { 53*61c4878aSAndroid Build Coastguard Worker std::lock_guard lock(send_mutex_); 54*61c4878aSAndroid Build Coastguard Worker send_requests_.remove(request); 55*61c4878aSAndroid Build Coastguard Worker } 56*61c4878aSAndroid Build Coastguard Worker SendBytes(ConstByteSpan message)57*61c4878aSAndroid Build Coastguard WorkerStatus SendQueue::SendBytes(ConstByteSpan message) { 58*61c4878aSAndroid Build Coastguard Worker std::array<ConstByteSpan, 1> messages = {message}; 59*61c4878aSAndroid Build Coastguard Worker return SendBytesVector(messages); 60*61c4878aSAndroid Build Coastguard Worker } 61*61c4878aSAndroid Build Coastguard Worker SendBytesVector(span<ConstByteSpan> messages)62*61c4878aSAndroid Build Coastguard WorkerStatus SendQueue::SendBytesVector(span<ConstByteSpan> messages) { 63*61c4878aSAndroid Build Coastguard Worker SendRequest request(messages); 64*61c4878aSAndroid Build Coastguard Worker QueueSendRequest(request); 65*61c4878aSAndroid Build Coastguard Worker // TODO: b/345088816 - Add timeout error support to this blocking call. 66*61c4878aSAndroid Build Coastguard Worker request.notify.acquire(); 67*61c4878aSAndroid Build Coastguard Worker return request.status; 68*61c4878aSAndroid Build Coastguard Worker } 69*61c4878aSAndroid Build Coastguard Worker 70*61c4878aSAndroid Build Coastguard Worker } // namespace pw::grpc 71