1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 // clang-format off
16 #include "pw_rpc/internal/log_config.h" // PW_LOG_* macros must be first.
17
18 #include "pw_rpc/internal/endpoint.h"
19 // clang-format on
20
21 #include "pw_log/log.h"
22 #include "pw_rpc/internal/lock.h"
23
24 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_BUSY_LOOP
25
26 static_assert(
27 PW_RPC_USE_GLOBAL_MUTEX == 0,
28 "The RPC global mutex is enabled, but no pw_rpc yield mode is selected! "
29 "Because the global mutex is in use, pw_rpc may be used from multiple "
30 "threads. This could result in thread starvation. To fix this, set "
31 "PW_RPC_YIELD to PW_RPC_YIELD_MODE_SLEEP and add a dependency on "
32 "pw_thread:sleep.");
33
34 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
35
36 #include <chrono>
37
38 #if !__has_include("pw_thread/sleep.h")
39
40 static_assert(false,
41 "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_SLEEP "
42 "(pw::this_thread::sleep_for()), but no backend is set for "
43 "pw_thread:sleep. Set a pw_thread:sleep backend or use a "
44 "different PW_RPC_YIELD_MODE setting.");
45
46 #endif // !__has_include("pw_thread/sleep.h")
47
48 #include "pw_thread/sleep.h"
49
50 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
51
52 #if !__has_include("pw_thread/yield.h")
53
54 static_assert(false,
55 "PW_RPC_YIELD_MODE is PW_RPC_YIELD_MODE_YIELD "
56 "(pw::this_thread::yield()), but no backend is set for "
57 "pw_thread:yield. Set a pw_thread:yield backend or use a "
58 "different PW_RPC_YIELD_MODE setting.");
59
60 #endif // !__has_include("pw_thread/yield.h")
61
62 #include "pw_thread/yield.h"
63
64 #else
65
66 static_assert(
67 false,
68 "PW_RPC_YIELD_MODE macro must be set to PW_RPC_YIELD_MODE_BUSY_LOOP, "
69 "PW_RPC_YIELD_MODE_SLEEP (pw::this_thread::sleep_for()), or "
70 "PW_RPC_YIELD_MODE_YIELD (pw::this_thread::yield())");
71
72 #endif // PW_RPC_YIELD_MODE
73
74 namespace pw::rpc::internal {
75
YieldRpcLock()76 void YieldRpcLock() {
77 rpc_lock().unlock();
78 #if PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_SLEEP
79 static constexpr chrono::SystemClock::duration kSleepDuration =
80 PW_RPC_YIELD_SLEEP_DURATION;
81 this_thread::sleep_for(kSleepDuration);
82 #elif PW_RPC_YIELD_MODE == PW_RPC_YIELD_MODE_YIELD
83 this_thread::yield();
84 #endif // PW_RPC_YIELD_MODE
85 rpc_lock().lock();
86 }
87
ProcessPacket(span<const std::byte> data,Packet::Destination destination)88 Result<Packet> Endpoint::ProcessPacket(span<const std::byte> data,
89 Packet::Destination destination) {
90 Result<Packet> result = Packet::FromBuffer(data);
91
92 if (!result.ok()) {
93 PW_LOG_WARN("Failed to decode pw_rpc packet");
94 return Status::DataLoss();
95 }
96
97 Packet& packet = *result;
98
99 if (packet.channel_id() == Channel::kUnassignedChannelId ||
100 packet.service_id() == 0 || packet.method_id() == 0) {
101 PW_LOG_WARN("Received malformed pw_rpc packet");
102 return Status::DataLoss();
103 }
104
105 if (packet.destination() != destination) {
106 return Status::InvalidArgument();
107 }
108
109 return result;
110 }
111
RegisterCall(Call & new_call)112 void Endpoint::RegisterCall(Call& new_call) {
113 // Mark any exisitng duplicate calls as cancelled.
114 auto [before_call, call] = FindIteratorsForCall(new_call);
115 if (call != calls_.end()) {
116 CloseCallAndMarkForCleanup(before_call, call, Status::Cancelled());
117 }
118
119 // Register the new call.
120 calls_.push_front(new_call);
121 }
122
123 std::tuple<IntrusiveList<Call>::iterator, IntrusiveList<Call>::iterator>
FindIteratorsForCall(uint32_t channel_id,uint32_t service_id,uint32_t method_id,uint32_t call_id)124 Endpoint::FindIteratorsForCall(uint32_t channel_id,
125 uint32_t service_id,
126 uint32_t method_id,
127 uint32_t call_id) {
128 auto previous = calls_.before_begin();
129 auto call = calls_.begin();
130
131 while (call != calls_.end()) {
132 if (channel_id == call->channel_id_locked() &&
133 service_id == call->service_id() && method_id == call->method_id()) {
134 if (call_id == call->id() || call_id == kOpenCallId ||
135 call_id == kLegacyOpenCallId) {
136 break;
137 }
138 if (call->id() == kOpenCallId || call->id() == kLegacyOpenCallId) {
139 // Calls with ID of `kOpenCallId` were unrequested, and
140 // are updated to have the call ID of the first matching request.
141 //
142 // kLegacyOpenCallId is used for compatibility with old servers
143 // which do not specify a Call ID but expect to be able to send
144 // unrequested responses.
145 call->set_id(call_id);
146 break;
147 }
148 }
149 previous = call;
150 ++call;
151 }
152
153 return {previous, call};
154 }
155
CloseChannel(uint32_t channel_id)156 Status Endpoint::CloseChannel(uint32_t channel_id) {
157 rpc_lock().lock();
158
159 Channel* channel = channels_.Get(channel_id);
160 if (channel == nullptr) {
161 rpc_lock().unlock();
162 return Status::NotFound();
163 }
164 static_cast<internal::ChannelBase*>(channel)->Close();
165
166 // Close pending calls on the channel that's going away.
167 AbortCalls(AbortIdType::kChannel, channel_id);
168
169 CleanUpCalls();
170
171 return OkStatus();
172 }
173
AbortCalls(AbortIdType type,uint32_t id)174 void Endpoint::AbortCalls(AbortIdType type, uint32_t id) {
175 auto previous = calls_.before_begin();
176 auto current = calls_.begin();
177
178 while (current != calls_.end()) {
179 if (id == (type == AbortIdType::kChannel ? current->channel_id_locked()
180 : current->service_id())) {
181 current =
182 CloseCallAndMarkForCleanup(previous, current, Status::Aborted());
183 } else {
184 previous = current;
185 ++current;
186 }
187 }
188 }
189
CleanUpCalls()190 void Endpoint::CleanUpCalls() {
191 if (to_cleanup_.empty()) {
192 rpc_lock().unlock();
193 return;
194 }
195
196 // Drain the to_cleanup_ list. This while loop is structured to avoid
197 // unnecessarily acquiring the lock after popping the last call.
198 while (true) {
199 Call& call = to_cleanup_.front();
200 to_cleanup_.pop_front();
201
202 const bool done = to_cleanup_.empty();
203
204 call.CleanUpFromEndpoint();
205
206 if (done) {
207 return;
208 }
209
210 rpc_lock().lock();
211 }
212 }
213
RemoveAllCalls()214 void Endpoint::RemoveAllCalls() {
215 RpcLockGuard lock;
216
217 // Close all calls without invoking on_error callbacks, since the calls should
218 // have been closed before the Endpoint was deleted.
219 while (!calls_.empty()) {
220 calls_.front().CloseFromDeletedEndpoint();
221 calls_.pop_front();
222 }
223 while (!to_cleanup_.empty()) {
224 to_cleanup_.front().CloseFromDeletedEndpoint();
225 to_cleanup_.pop_front();
226 }
227 }
228
229 } // namespace pw::rpc::internal
230