xref: /aosp_15_r20/external/pigweed/pw_rpc/endpoint.cc (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 // clang-format off
16 #include "pw_rpc/internal/log_config.h"  // PW_LOG_* macros must be first.
17 
18 #include "pw_rpc/internal/endpoint.h"
19 // clang-format on
20 
21 #include "pw_log/log.h"
22 #include "pw_rpc/internal/lock.h"
23 
24 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_BUSY_LOOP
25 
26 static_assert(
27     PW_RPC_USE_GLOBAL_MUTEX == 0,
28     "The RPC global mutex is enabled, but no pw_rpc yield mode is selected! "
29     "Because the global mutex is in use, pw_rpc may be used from multiple "
30     "threads. This could result in thread starvation. To fix this, set "
31     "PW_RPC_YIELD to PW_RPC_YIELD_MODE_SLEEP and add a dependency on "
32     "pw_thread:sleep.");
33 
34 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
35 
36 #include <chrono>
37 
38 #if !__has_include("pw_thread/sleep.h")
39 
40 static_assert(false,
41               "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_SLEEP "
42               "(pw::this_thread::sleep_for()), but no backend is set for "
43               "pw_thread:sleep. Set a pw_thread:sleep backend or use a "
44               "different PW_RPC_YIELD_MODE setting.");
45 
46 #endif  // !__has_include("pw_thread/sleep.h")
47 
48 #include "pw_thread/sleep.h"
49 
50 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
51 
52 #if !__has_include("pw_thread/yield.h")
53 
54 static_assert(false,
55               "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_YIELD "
56               "(pw::this_thread::yield()), but no backend is set for "
57               "pw_thread:yield. Set a pw_thread:yield backend or use a "
58               "different PW_RPC_YIELD_MODE setting.");
59 
60 #endif  // !__has_include("pw_thread/yield.h")
61 
62 #include "pw_thread/yield.h"
63 
64 #else
65 
66 static_assert(
67     false,
68     "PW_RPC_YIELD_MODE macro must be set to PW_RPC_YIELD_MODE_BUSY_LOOP, "
69     "PW_RPC_YIELD_MODE_SLEEP (pw::this_thread::sleep_for()), or "
70     "PW_RPC_YIELD_MODE_YIELD (pw::this_thread::yield())");
71 
72 #endif  // PW_RPC_YIELD_MODE
73 
74 namespace pw::rpc::internal {
75 
YieldRpcLock()76 void YieldRpcLock() {
77   rpc_lock().unlock();
78 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
79   static constexpr chrono::SystemClock::duration kSleepDuration =
80       PW_RPC_YIELD_SLEEP_DURATION;
81   this_thread::sleep_for(kSleepDuration);
82 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
83   this_thread::yield();
84 #endif  // PW_RPC_YIELD_MODE
85   rpc_lock().lock();
86 }
87 
ProcessPacket(span<const std::byte> data,Packet::Destination destination)88 Result<Packet> Endpoint::ProcessPacket(span<const std::byte> data,
89                                        Packet::Destination destination) {
90   Result<Packet> result = Packet::FromBuffer(data);
91 
92   if (!result.ok()) {
93     PW_LOG_WARN("Failed to decode pw_rpc packet");
94     return Status::DataLoss();
95   }
96 
97   Packet& packet = *result;
98 
99   if (packet.channel_id() == Channel::kUnassignedChannelId ||
100       packet.service_id() == 0 || packet.method_id() == 0) {
101     PW_LOG_WARN("Received malformed pw_rpc packet");
102     return Status::DataLoss();
103   }
104 
105   if (packet.destination() != destination) {
106     return Status::InvalidArgument();
107   }
108 
109   return result;
110 }
111 
RegisterCall(Call & new_call)112 void Endpoint::RegisterCall(Call& new_call) {
113   // Mark any exisitng duplicate calls as cancelled.
114   auto [before_call, call] = FindIteratorsForCall(new_call);
115   if (call != calls_.end()) {
116     CloseCallAndMarkForCleanup(before_call, call, Status::Cancelled());
117   }
118 
119   // Register the new call.
120   calls_.push_front(new_call);
121 }
122 
123 std::tuple<IntrusiveList<Call>::iterator, IntrusiveList<Call>::iterator>
FindIteratorsForCall(uint32_t channel_id,uint32_t service_id,uint32_t method_id,uint32_t call_id)124 Endpoint::FindIteratorsForCall(uint32_t channel_id,
125                                uint32_t service_id,
126                                uint32_t method_id,
127                                uint32_t call_id) {
128   auto previous = calls_.before_begin();
129   auto call = calls_.begin();
130 
131   while (call != calls_.end()) {
132     if (channel_id == call->channel_id_locked() &&
133         service_id == call->service_id() && method_id == call->method_id()) {
134       if (call_id == call->id() || call_id == kOpenCallId ||
135           call_id == kLegacyOpenCallId) {
136         break;
137       }
138       if (call->id() == kOpenCallId || call->id() == kLegacyOpenCallId) {
139         // Calls with ID of `kOpenCallId` were unrequested, and
140         // are updated to have the call ID of the first matching request.
141         //
142         // kLegacyOpenCallId is used for compatibility with old servers
143         // which do not specify a Call ID but expect to be able to send
144         // unrequested responses.
145         call->set_id(call_id);
146         break;
147       }
148     }
149     previous = call;
150     ++call;
151   }
152 
153   return {previous, call};
154 }
155 
CloseChannel(uint32_t channel_id)156 Status Endpoint::CloseChannel(uint32_t channel_id) {
157   rpc_lock().lock();
158 
159   Channel* channel = channels_.Get(channel_id);
160   if (channel == nullptr) {
161     rpc_lock().unlock();
162     return Status::NotFound();
163   }
164   static_cast<internal::ChannelBase*>(channel)->Close();
165 
166   // Close pending calls on the channel that's going away.
167   AbortCalls(AbortIdType::kChannel, channel_id);
168 
169   CleanUpCalls();
170 
171   return OkStatus();
172 }
173 
AbortCalls(AbortIdType type,uint32_t id)174 void Endpoint::AbortCalls(AbortIdType type, uint32_t id) {
175   auto previous = calls_.before_begin();
176   auto current = calls_.begin();
177 
178   while (current != calls_.end()) {
179     if (id == (type == AbortIdType::kChannel ? current->channel_id_locked()
180                                              : current->service_id())) {
181       current =
182           CloseCallAndMarkForCleanup(previous, current, Status::Aborted());
183     } else {
184       previous = current;
185       ++current;
186     }
187   }
188 }
189 
CleanUpCalls()190 void Endpoint::CleanUpCalls() {
191   if (to_cleanup_.empty()) {
192     rpc_lock().unlock();
193     return;
194   }
195 
196   // Drain the to_cleanup_ list. This while loop is structured to avoid
197   // unnecessarily acquiring the lock after popping the last call.
198   while (true) {
199     Call& call = to_cleanup_.front();
200     to_cleanup_.pop_front();
201 
202     const bool done = to_cleanup_.empty();
203 
204     call.CleanUpFromEndpoint();
205 
206     if (done) {
207       return;
208     }
209 
210     rpc_lock().lock();
211   }
212 }
213 
RemoveAllCalls()214 void Endpoint::RemoveAllCalls() {
215   RpcLockGuard lock;
216 
217   // Close all calls without invoking on_error callbacks, since the calls should
218   // have been closed before the Endpoint was deleted.
219   while (!calls_.empty()) {
220     calls_.front().CloseFromDeletedEndpoint();
221     calls_.pop_front();
222   }
223   while (!to_cleanup_.empty()) {
224     to_cleanup_.front().CloseFromDeletedEndpoint();
225     to_cleanup_.pop_front();
226   }
227 }
228 
229 }  // namespace pw::rpc::internal
230