xref: /aosp_15_r20/external/pigweed/pw_rpc/fuzz/public/pw_rpc/fuzz/engine.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <atomic>
17 #include <cstdarg>
18 #include <cstddef>
19 #include <cstdint>
20 #include <thread>
21 #include <variant>
22 
23 #include "pw_containers/vector.h"
24 #include "pw_random/xor_shift.h"
25 #include "pw_rpc/benchmark.h"
26 #include "pw_rpc/benchmark.raw_rpc.pb.h"
27 #include "pw_rpc/fuzz/alarm_timer.h"
28 #include "pw_sync/condition_variable.h"
29 #include "pw_sync/lock_annotations.h"
30 #include "pw_sync/mutex.h"
31 #include "pw_sync/timed_mutex.h"
32 
33 namespace pw::rpc::fuzz {
34 
35 /// Describes an action a fuzzing thread can perform on a call.
36 struct Action {
37   enum Op : uint8_t {
38     /// No-op.
39     kSkip,
40 
41     /// Waits for the call indicated by `target` to complete.
42     kWait,
43 
44     /// Makes a new unary request using the call indicated by `target`. The data
45     /// written is derived from `value`.
46     kWriteUnary,
47 
48     /// Writes to a stream request using the call indicated by `target`, or
49     /// makes
50     /// a new one if not currently a stream call.  The data written is derived
51     /// from `value`.
52     kWriteStream,
53 
54     /// Closes the stream if the call indicated by `target` is a stream call.
55     kCloseClientStream,
56 
57     /// Cancels the call indicated by `target`.
58     kCancel,
59 
60     /// Abandons the call indicated by `target`.
61     kAbandon,
62 
63     /// Swaps the call indicated by `target` with a call indicated by `value`.
64     kSwap,
65 
66     /// Sets the call indicated by `target` to an initial, unset state.
67     kDestroy,
68   };
69 
70   constexpr Action() = default;
71   Action(uint32_t encoded);
72   Action(Op op, size_t target, uint16_t value);
73   Action(Op op, size_t target, char val, size_t len);
74   ~Action() = default;
75 
set_thread_idAction76   void set_thread_id(size_t thread_id_) {
77     thread_id = thread_id_;
78     callback_id = std::numeric_limits<size_t>::max();
79   }
80 
set_callback_idAction81   void set_callback_id(size_t callback_id_) {
82     thread_id = 0;
83     callback_id = callback_id_;
84   }
85 
86   // For a write action's value, returns the character value to be written.
87   static char DecodeWriteValue(uint16_t value);
88 
89   // For a write action's value, returns the number of characters to be written.
90   static size_t DecodeWriteLength(uint16_t value);
91 
92   /// Returns a value that represents the fields of an action. Constructing an
93   /// `Action` with this value will produce the same fields.
94   uint32_t Encode() const;
95 
96   /// Records details of the action being performed if verbose logging is
97   /// enabled.
98   void Log(bool verbose, size_t num_actions, const char* fmt, ...) const;
99 
100   /// Records an encountered when trying to log an action.
101   void LogFailure(bool verbose, size_t num_actions, Status status) const;
102 
103   Op op = kSkip;
104   size_t target = 0;
105   uint16_t value = 0;
106 
107   size_t thread_id = 0;
108   size_t callback_id = std::numeric_limits<size_t>::max();
109 };
110 
111 /// Wraps an RPC call that may be either a `RawUnaryReceiver` or
112 /// `RawClientReaderWriter`. Allows applying `Action`s to each possible
113 /// type of call.
114 class FuzzyCall {
115  public:
116   using Variant =
117       std::variant<std::monostate, RawUnaryReceiver, RawClientReaderWriter>;
118 
FuzzyCall(size_t index)119   explicit FuzzyCall(size_t index) : index_(index), id_(index) {}
120   ~FuzzyCall() = default;
121 
id()122   size_t id() {
123     std::lock_guard lock(mutex_);
124     return id_;
125   }
126 
pending()127   bool pending() {
128     std::lock_guard lock(mutex_);
129     return pending_;
130   }
131 
132   /// Applies the given visitor to the call variant. If the action taken by the
133   /// visitor is expected to complete the call, it will notify any threads
134   /// waiting for the call to complete. This version of the method does not
135   /// return the result of the visiting the variant.
136   template <typename Visitor,
137             typename std::enable_if_t<
138                 std::is_same_v<typename Visitor::result_type, void>,
139                 int> = 0>
140   typename Visitor::result_type Visit(Visitor visitor, bool completes = true) {
141     {
142       std::lock_guard lock(mutex_);
143       std::visit(std::move(visitor), call_);
144     }
145     if (completes && pending_.exchange(false)) {
146       cv_.notify_all();
147     }
148   }
149 
150   /// Applies the given visitor to the call variant. If the action taken by the
151   /// visitor is expected to complete the call, it will notify any threads
152   /// waiting for the call to complete. This version of the method returns the
153   /// result of the visiting the variant.
154   template <typename Visitor,
155             typename std::enable_if_t<
156                 !std::is_same_v<typename Visitor::result_type, void>,
157                 int> = 0>
158   typename Visitor::result_type Visit(Visitor visitor, bool completes = true) {
159     typename Visitor::result_type result;
160     {
161       std::lock_guard lock(mutex_);
162       result = std::visit(std::move(visitor), call_);
163     }
164     if (completes && pending_.exchange(false)) {
165       cv_.notify_all();
166     }
167     return result;
168   }
169 
170   // Records the number of bytes written as part of a request. If `append` is
171   // true, treats the write as a continuation of a streaming request.
172   void RecordWrite(size_t num, bool append = false);
173 
174   /// Waits to be notified that a callback has been invoked.
175   void Await() PW_LOCKS_EXCLUDED(mutex_);
176 
177   /// Completes the call, notifying any waiters.
178   void Notify() PW_LOCKS_EXCLUDED(mutex_);
179 
180   /// Exchanges the call represented by this object with another.
181   void Swap(FuzzyCall& other);
182 
183   /// Resets the call wrapped by this object with a new one. Destorys the
184   /// previous call.
185   void Reset(Variant call = Variant()) PW_LOCKS_EXCLUDED(mutex_);
186 
187   // Reports the state of this object.
188   void Log() PW_LOCKS_EXCLUDED(mutex_);
189 
190  private:
191   /// This represents the index in the engine's list of calls. It is used to
192   /// ensure a consistent order of locking multiple calls.
193   const size_t index_;
194 
195   sync::TimedMutex mutex_;
196   sync::ConditionVariable cv_;
197 
198   /// An identifier that can be used find this object, e.g. by a callback, even
199   /// when it has been swapped with another call.
200   size_t id_ PW_GUARDED_BY(mutex_);
201 
202   /// Holds the actual pw::rpc::Call object, when present.
203   Variant call_ PW_GUARDED_BY(mutex_);
204 
205   /// Set when a request is sent, and cleared when a callback is invoked.
206   std::atomic_bool pending_ = false;
207 
208   /// Bytes sent in the last unary request or stream write.
209   size_t last_write_ PW_GUARDED_BY(mutex_) = 0;
210 
211   /// Total bytes sent using this call object.
212   size_t total_written_ PW_GUARDED_BY(mutex_) = 0;
213 };
214 
215 /// The main RPC fuzzing engine.
216 ///
217 /// This class takes or generates a sequence of actions, and dsitributes them to
218 /// a number of threads that can perform them using an RPC client. Passing the
219 /// same seed to the engine at construction will allow it to generate the same
220 /// sequence of actions.
221 class Fuzzer {
222  public:
223   /// Number of fuzzing threads. The first thread counted is the RPC dispatch
224   /// thread.
225   static constexpr size_t kNumThreads = 4;
226 
227   /// Maximum number of actions that a single thread will try to perform before
228   /// exiting.
229   static constexpr size_t kMaxActionsPerThread = 255;
230 
231   /// The number of call objects available to be used for fuzzing.
232   static constexpr size_t kMaxConcurrentCalls = 8;
233 
234   /// The mxiumum number of individual fuzzing actions that the fuzzing threads
235   /// can perform. The `+ 1` is to allow the inclusion of a special `0` action
236   /// to separate each thread's actions when concatenated into a single list.
237   static constexpr size_t kMaxActions =
238       kNumThreads * (kMaxActionsPerThread + 1);
239 
240   explicit Fuzzer(Client& client, uint32_t channel_id);
241 
242   /// The fuzzer engine should remain pinned in memory since it is referenced by
243   /// the `CallbackContext`s.
244   Fuzzer(const Fuzzer&) = delete;
245   Fuzzer(Fuzzer&&) = delete;
246   Fuzzer& operator=(const Fuzzer&) = delete;
247   Fuzzer& operator=(Fuzzer&&) = delete;
248 
set_verbose(bool verbose)249   void set_verbose(bool verbose) { verbose_ = verbose; }
250 
251   /// Sets the timeout and starts the timer.
set_timeout(chrono::SystemClock::duration timeout)252   void set_timeout(chrono::SystemClock::duration timeout) {
253     timer_.Start(timeout);
254   }
255 
256   /// Generates encoded actions from the RNG and `Run`s them.
257   void Run(uint64_t seed, size_t num_actions);
258 
259   /// Splits the provided `actions` between the fuzzing threads and runs them to
260   /// completion.
261   void Run(const Vector<uint32_t>& actions);
262 
263  private:
264   /// Information passed to the RPC callbacks, including the index of the
265   /// associated call and a pointer to the fuzzer object.
266   struct CallbackContext {
267     size_t id;
268     Fuzzer* fuzzer;
269   };
270 
271   /// Restarts the alarm timer, delaying it from detecting a timeout. This is
272   /// called whenever actions complete and indicates progress is still being
273   /// made.
274   void ResetTimerLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
275 
276   /// Decodes the `encoded` action and performs it. The `thread_id` is used for
277   /// verbose diagnostics. When invoked from `PerformCallback` the `callback_id`
278   /// will be set to the index of the associated call. This allows avoiding
279   /// specific, prohibited actions, e.g. destroying a call from its own
280   /// callback.
281   void Perform(const Action& action) PW_LOCKS_EXCLUDED(mutex_);
282 
283   /// Returns the call with the matching `id`.
FindCall(size_t id)284   FuzzyCall& FindCall(size_t id) PW_LOCKS_EXCLUDED(mutex_) {
285     std::lock_guard lock(mutex_);
286     return FindCallLocked(id);
287   }
288 
FindCallLocked(size_t id)289   FuzzyCall& FindCallLocked(size_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
290     return fuzzy_calls_[indices_[id]];
291   }
292 
293   /// Returns a pointer to callback context for the given call index.
GetContext(size_t callback_id)294   CallbackContext* GetContext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_) {
295     std::lock_guard lock(mutex_);
296     return &contexts_[callback_id];
297   }
298 
299   /// Callback for stream write made by the call with the given `callback_id`.
300   void OnNext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_);
301 
302   /// Callback for completed request for the call with the given `callback_id`.
303   void OnCompleted(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_);
304 
305   /// Callback for an error for the call with the given `callback_id`.
306   void OnError(size_t callback_id, Status status) PW_LOCKS_EXCLUDED(mutex_);
307 
308   bool verbose_ = false;
309   pw_rpc::raw::Benchmark::Client client_;
310   BenchmarkService service_;
311 
312   /// Alarm thread that detects when no workers have made recent progress.
313   AlarmTimer timer_;
314 
315   sync::Mutex mutex_;
316 
317   /// Worker threads. The first thread is the RPC response dispatcher.
318   Vector<std::thread, kNumThreads> threads_;
319 
320   /// RPC call objects.
321   Vector<FuzzyCall, kMaxConcurrentCalls> fuzzy_calls_;
322 
323   /// Maps each call's IDs to its index. Since calls may be move before their
324   /// callbacks are invoked, this list can be used to find the original call.
325   Vector<size_t, kMaxConcurrentCalls> indices_ PW_GUARDED_BY(mutex_);
326 
327   /// Context objects used to reference the engine and call.
328   Vector<CallbackContext, kMaxConcurrentCalls> contexts_ PW_GUARDED_BY(mutex_);
329 
330   /// Set of actions performed as callbacks from other calls.
331   Vector<uint32_t, kMaxActionsPerThread> callback_actions_
332       PW_GUARDED_BY(mutex_);
333   Vector<uint32_t>::iterator callback_iterator_ PW_GUARDED_BY(mutex_);
334 
335   /// Total actions performed by all workers.
336   std::atomic<size_t> num_actions_ = 0;
337 };
338 
339 }  // namespace pw::rpc::fuzz
340