1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <atomic> 17 #include <cstdarg> 18 #include <cstddef> 19 #include <cstdint> 20 #include <thread> 21 #include <variant> 22 23 #include "pw_containers/vector.h" 24 #include "pw_random/xor_shift.h" 25 #include "pw_rpc/benchmark.h" 26 #include "pw_rpc/benchmark.raw_rpc.pb.h" 27 #include "pw_rpc/fuzz/alarm_timer.h" 28 #include "pw_sync/condition_variable.h" 29 #include "pw_sync/lock_annotations.h" 30 #include "pw_sync/mutex.h" 31 #include "pw_sync/timed_mutex.h" 32 33 namespace pw::rpc::fuzz { 34 35 /// Describes an action a fuzzing thread can perform on a call. 36 struct Action { 37 enum Op : uint8_t { 38 /// No-op. 39 kSkip, 40 41 /// Waits for the call indicated by `target` to complete. 42 kWait, 43 44 /// Makes a new unary request using the call indicated by `target`. The data 45 /// written is derived from `value`. 46 kWriteUnary, 47 48 /// Writes to a stream request using the call indicated by `target`, or 49 /// makes 50 /// a new one if not currently a stream call. The data written is derived 51 /// from `value`. 52 kWriteStream, 53 54 /// Closes the stream if the call indicated by `target` is a stream call. 55 kCloseClientStream, 56 57 /// Cancels the call indicated by `target`. 58 kCancel, 59 60 /// Abandons the call indicated by `target`. 61 kAbandon, 62 63 /// Swaps the call indicated by `target` with a call indicated by `value`. 64 kSwap, 65 66 /// Sets the call indicated by `target` to an initial, unset state. 67 kDestroy, 68 }; 69 70 constexpr Action() = default; 71 Action(uint32_t encoded); 72 Action(Op op, size_t target, uint16_t value); 73 Action(Op op, size_t target, char val, size_t len); 74 ~Action() = default; 75 set_thread_idAction76 void set_thread_id(size_t thread_id_) { 77 thread_id = thread_id_; 78 callback_id = std::numeric_limits<size_t>::max(); 79 } 80 set_callback_idAction81 void set_callback_id(size_t callback_id_) { 82 thread_id = 0; 83 callback_id = callback_id_; 84 } 85 86 // For a write action's value, returns the character value to be written. 87 static char DecodeWriteValue(uint16_t value); 88 89 // For a write action's value, returns the number of characters to be written. 90 static size_t DecodeWriteLength(uint16_t value); 91 92 /// Returns a value that represents the fields of an action. Constructing an 93 /// `Action` with this value will produce the same fields. 94 uint32_t Encode() const; 95 96 /// Records details of the action being performed if verbose logging is 97 /// enabled. 98 void Log(bool verbose, size_t num_actions, const char* fmt, ...) const; 99 100 /// Records an encountered when trying to log an action. 101 void LogFailure(bool verbose, size_t num_actions, Status status) const; 102 103 Op op = kSkip; 104 size_t target = 0; 105 uint16_t value = 0; 106 107 size_t thread_id = 0; 108 size_t callback_id = std::numeric_limits<size_t>::max(); 109 }; 110 111 /// Wraps an RPC call that may be either a `RawUnaryReceiver` or 112 /// `RawClientReaderWriter`. Allows applying `Action`s to each possible 113 /// type of call. 114 class FuzzyCall { 115 public: 116 using Variant = 117 std::variant<std::monostate, RawUnaryReceiver, RawClientReaderWriter>; 118 FuzzyCall(size_t index)119 explicit FuzzyCall(size_t index) : index_(index), id_(index) {} 120 ~FuzzyCall() = default; 121 id()122 size_t id() { 123 std::lock_guard lock(mutex_); 124 return id_; 125 } 126 pending()127 bool pending() { 128 std::lock_guard lock(mutex_); 129 return pending_; 130 } 131 132 /// Applies the given visitor to the call variant. If the action taken by the 133 /// visitor is expected to complete the call, it will notify any threads 134 /// waiting for the call to complete. This version of the method does not 135 /// return the result of the visiting the variant. 136 template <typename Visitor, 137 typename std::enable_if_t< 138 std::is_same_v<typename Visitor::result_type, void>, 139 int> = 0> 140 typename Visitor::result_type Visit(Visitor visitor, bool completes = true) { 141 { 142 std::lock_guard lock(mutex_); 143 std::visit(std::move(visitor), call_); 144 } 145 if (completes && pending_.exchange(false)) { 146 cv_.notify_all(); 147 } 148 } 149 150 /// Applies the given visitor to the call variant. If the action taken by the 151 /// visitor is expected to complete the call, it will notify any threads 152 /// waiting for the call to complete. This version of the method returns the 153 /// result of the visiting the variant. 154 template <typename Visitor, 155 typename std::enable_if_t< 156 !std::is_same_v<typename Visitor::result_type, void>, 157 int> = 0> 158 typename Visitor::result_type Visit(Visitor visitor, bool completes = true) { 159 typename Visitor::result_type result; 160 { 161 std::lock_guard lock(mutex_); 162 result = std::visit(std::move(visitor), call_); 163 } 164 if (completes && pending_.exchange(false)) { 165 cv_.notify_all(); 166 } 167 return result; 168 } 169 170 // Records the number of bytes written as part of a request. If `append` is 171 // true, treats the write as a continuation of a streaming request. 172 void RecordWrite(size_t num, bool append = false); 173 174 /// Waits to be notified that a callback has been invoked. 175 void Await() PW_LOCKS_EXCLUDED(mutex_); 176 177 /// Completes the call, notifying any waiters. 178 void Notify() PW_LOCKS_EXCLUDED(mutex_); 179 180 /// Exchanges the call represented by this object with another. 181 void Swap(FuzzyCall& other); 182 183 /// Resets the call wrapped by this object with a new one. Destorys the 184 /// previous call. 185 void Reset(Variant call = Variant()) PW_LOCKS_EXCLUDED(mutex_); 186 187 // Reports the state of this object. 188 void Log() PW_LOCKS_EXCLUDED(mutex_); 189 190 private: 191 /// This represents the index in the engine's list of calls. It is used to 192 /// ensure a consistent order of locking multiple calls. 193 const size_t index_; 194 195 sync::TimedMutex mutex_; 196 sync::ConditionVariable cv_; 197 198 /// An identifier that can be used find this object, e.g. by a callback, even 199 /// when it has been swapped with another call. 200 size_t id_ PW_GUARDED_BY(mutex_); 201 202 /// Holds the actual pw::rpc::Call object, when present. 203 Variant call_ PW_GUARDED_BY(mutex_); 204 205 /// Set when a request is sent, and cleared when a callback is invoked. 206 std::atomic_bool pending_ = false; 207 208 /// Bytes sent in the last unary request or stream write. 209 size_t last_write_ PW_GUARDED_BY(mutex_) = 0; 210 211 /// Total bytes sent using this call object. 212 size_t total_written_ PW_GUARDED_BY(mutex_) = 0; 213 }; 214 215 /// The main RPC fuzzing engine. 216 /// 217 /// This class takes or generates a sequence of actions, and dsitributes them to 218 /// a number of threads that can perform them using an RPC client. Passing the 219 /// same seed to the engine at construction will allow it to generate the same 220 /// sequence of actions. 221 class Fuzzer { 222 public: 223 /// Number of fuzzing threads. The first thread counted is the RPC dispatch 224 /// thread. 225 static constexpr size_t kNumThreads = 4; 226 227 /// Maximum number of actions that a single thread will try to perform before 228 /// exiting. 229 static constexpr size_t kMaxActionsPerThread = 255; 230 231 /// The number of call objects available to be used for fuzzing. 232 static constexpr size_t kMaxConcurrentCalls = 8; 233 234 /// The mxiumum number of individual fuzzing actions that the fuzzing threads 235 /// can perform. The `+ 1` is to allow the inclusion of a special `0` action 236 /// to separate each thread's actions when concatenated into a single list. 237 static constexpr size_t kMaxActions = 238 kNumThreads * (kMaxActionsPerThread + 1); 239 240 explicit Fuzzer(Client& client, uint32_t channel_id); 241 242 /// The fuzzer engine should remain pinned in memory since it is referenced by 243 /// the `CallbackContext`s. 244 Fuzzer(const Fuzzer&) = delete; 245 Fuzzer(Fuzzer&&) = delete; 246 Fuzzer& operator=(const Fuzzer&) = delete; 247 Fuzzer& operator=(Fuzzer&&) = delete; 248 set_verbose(bool verbose)249 void set_verbose(bool verbose) { verbose_ = verbose; } 250 251 /// Sets the timeout and starts the timer. set_timeout(chrono::SystemClock::duration timeout)252 void set_timeout(chrono::SystemClock::duration timeout) { 253 timer_.Start(timeout); 254 } 255 256 /// Generates encoded actions from the RNG and `Run`s them. 257 void Run(uint64_t seed, size_t num_actions); 258 259 /// Splits the provided `actions` between the fuzzing threads and runs them to 260 /// completion. 261 void Run(const Vector<uint32_t>& actions); 262 263 private: 264 /// Information passed to the RPC callbacks, including the index of the 265 /// associated call and a pointer to the fuzzer object. 266 struct CallbackContext { 267 size_t id; 268 Fuzzer* fuzzer; 269 }; 270 271 /// Restarts the alarm timer, delaying it from detecting a timeout. This is 272 /// called whenever actions complete and indicates progress is still being 273 /// made. 274 void ResetTimerLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 275 276 /// Decodes the `encoded` action and performs it. The `thread_id` is used for 277 /// verbose diagnostics. When invoked from `PerformCallback` the `callback_id` 278 /// will be set to the index of the associated call. This allows avoiding 279 /// specific, prohibited actions, e.g. destroying a call from its own 280 /// callback. 281 void Perform(const Action& action) PW_LOCKS_EXCLUDED(mutex_); 282 283 /// Returns the call with the matching `id`. FindCall(size_t id)284 FuzzyCall& FindCall(size_t id) PW_LOCKS_EXCLUDED(mutex_) { 285 std::lock_guard lock(mutex_); 286 return FindCallLocked(id); 287 } 288 FindCallLocked(size_t id)289 FuzzyCall& FindCallLocked(size_t id) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_) { 290 return fuzzy_calls_[indices_[id]]; 291 } 292 293 /// Returns a pointer to callback context for the given call index. GetContext(size_t callback_id)294 CallbackContext* GetContext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_) { 295 std::lock_guard lock(mutex_); 296 return &contexts_[callback_id]; 297 } 298 299 /// Callback for stream write made by the call with the given `callback_id`. 300 void OnNext(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_); 301 302 /// Callback for completed request for the call with the given `callback_id`. 303 void OnCompleted(size_t callback_id) PW_LOCKS_EXCLUDED(mutex_); 304 305 /// Callback for an error for the call with the given `callback_id`. 306 void OnError(size_t callback_id, Status status) PW_LOCKS_EXCLUDED(mutex_); 307 308 bool verbose_ = false; 309 pw_rpc::raw::Benchmark::Client client_; 310 BenchmarkService service_; 311 312 /// Alarm thread that detects when no workers have made recent progress. 313 AlarmTimer timer_; 314 315 sync::Mutex mutex_; 316 317 /// Worker threads. The first thread is the RPC response dispatcher. 318 Vector<std::thread, kNumThreads> threads_; 319 320 /// RPC call objects. 321 Vector<FuzzyCall, kMaxConcurrentCalls> fuzzy_calls_; 322 323 /// Maps each call's IDs to its index. Since calls may be move before their 324 /// callbacks are invoked, this list can be used to find the original call. 325 Vector<size_t, kMaxConcurrentCalls> indices_ PW_GUARDED_BY(mutex_); 326 327 /// Context objects used to reference the engine and call. 328 Vector<CallbackContext, kMaxConcurrentCalls> contexts_ PW_GUARDED_BY(mutex_); 329 330 /// Set of actions performed as callbacks from other calls. 331 Vector<uint32_t, kMaxActionsPerThread> callback_actions_ 332 PW_GUARDED_BY(mutex_); 333 Vector<uint32_t>::iterator callback_iterator_ PW_GUARDED_BY(mutex_); 334 335 /// Total actions performed by all workers. 336 std::atomic<size_t> num_actions_ = 0; 337 }; 338 339 } // namespace pw::rpc::fuzz 340