// Copyright 2024 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include "pw_grpc/send_queue.h" namespace pw::grpc { std::optional> SendQueue::NextSendRequest() { std::lock_guard lock(send_mutex_); if (send_requests_.empty()) { return std::nullopt; } auto& front = send_requests_.front(); send_requests_.pop_front(); return front; } void SendQueue::ProcessSendQueue(async::Context&, Status status) { if (!status.ok()) { return; } auto request = NextSendRequest(); while (request.has_value()) { for (auto message : request->get().messages) { request->get().status.Update(socket_.Write(message)); } request->get().notify.release(); request = NextSendRequest(); } } void SendQueue::QueueSendRequest(SendRequest& request) { std::lock_guard lock(send_mutex_); send_requests_.push_back(request); send_dispatcher_.Cancel(send_task_); send_dispatcher_.Post(send_task_); } void SendQueue::CancelSendRequest(SendRequest& request) { std::lock_guard lock(send_mutex_); send_requests_.remove(request); } Status SendQueue::SendBytes(ConstByteSpan message) { std::array messages = {message}; return SendBytesVector(messages); } Status SendQueue::SendBytesVector(span messages) { SendRequest request(messages); QueueSendRequest(request); // TODO: b/345088816 - Add timeout error support to this blocking call. request.notify.acquire(); return request.status; } } // namespace pw::grpc