1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <stdarg.h>
18
19 #include <chrono>
20 #include <list>
21 #include <map>
22 #include <random>
23 #include <regex>
24 #include <string>
25 #include <thread>
26 #include <vector>
27
28 #include "perfetto/base/build_config.h"
29 #include "perfetto/base/compiler.h"
30 #include "perfetto/base/time.h"
31 #include "perfetto/ext/base/ctrl_c_handler.h"
32 #include "perfetto/ext/base/file_utils.h"
33 #include "perfetto/ext/base/scoped_file.h"
34 #include "perfetto/ext/base/subprocess.h"
35 #include "perfetto/ext/base/temp_file.h"
36 #include "perfetto/ext/base/utils.h"
37 #include "perfetto/protozero/proto_utils.h"
38 #include "perfetto/tracing.h"
39 #include "perfetto/tracing/core/forward_decls.h"
40 #include "perfetto/tracing/core/trace_config.h"
41 #include "src/base/test/utils.h"
42
43 #include "protos/perfetto/config/stress_test_config.gen.h"
44 #include "protos/perfetto/trace/test_event.pbzero.h"
45 #include "protos/perfetto/trace/trace_packet.pbzero.h"
46
47 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
48 #include <Windows.h>
49 #else
50 #include <signal.h>
51 #endif
52
53 // Generated by gen_configs_blob.py. It defines the kStressTestConfigs array,
54 // which contains a proto-encoded StressTestConfig message for each .cfg file
55 // listed in /test/stress_test/configs/BUILD.gn.
56 #include "test/stress_test/configs/stress_test_config_blobs.h"
57
58 namespace perfetto {
59 namespace {
60
61 // TODO(primiano): We need a base::File to get around the awkwardness of
62 // files on Windows being a mix of int and HANDLE (and open() vs CreateFile())
63 // in our codebase.
OpenLogFile(const std::string & path)64 base::ScopedPlatformHandle OpenLogFile(const std::string& path) {
65 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
66 return base::ScopedPlatformHandle(::CreateFileA(
67 path.c_str(), GENERIC_READ | GENERIC_WRITE,
68 FILE_SHARE_DELETE | FILE_SHARE_READ, nullptr, CREATE_ALWAYS, 0, nullptr));
69 #else
70 return base::OpenFile(path, O_RDWR | O_CREAT | O_TRUNC, 0644);
71 #endif
72 }
73
74 using StressTestConfig = protos::gen::StressTestConfig;
75
76 struct SigHandlerCtx {
77 std::atomic<bool> aborted{};
78 std::vector<base::PlatformProcessId> pids_to_kill;
79 };
80 SigHandlerCtx* g_sig;
81
82 struct TestResult {
83 const char* cfg_name = nullptr;
84 StressTestConfig cfg;
85 uint32_t run_time_ms = 0;
86 uint32_t trace_size_kb = 0;
87 uint32_t num_packets = 0;
88 uint32_t num_threads = 0;
89 uint32_t num_errors = 0;
90 base::Subprocess::ResourceUsage svc_rusage;
91 base::Subprocess::ResourceUsage prod_rusage;
92 };
93
94 struct ParsedTraceStats {
95 struct WriterThread {
96 uint64_t packets_seen = 0;
97 bool last_seen = false;
98 uint32_t last_seq = 0;
99 uint64_t seq_errors = 0;
100 uint64_t counter_errors = 0;
101 std::minstd_rand0 rnd_engine;
102 };
103
104 // One for each trusted_packet_sequence_id.
105 std::map<uint32_t, WriterThread> threads;
106 };
107
108 class TestHarness {
109 public:
110 TestHarness();
111 void RunConfig(const char* cfg_name, const StressTestConfig&, bool verbose);
test_results() const112 const std::list<TestResult>& test_results() const { return test_results_; }
113
114 private:
115 void ReadbackTrace(const std::string&, ParsedTraceStats*);
116 void ParseTracePacket(const uint8_t*, size_t, ParsedTraceStats* ctx);
117 void AddFailure(const char* fmt, ...) PERFETTO_PRINTF_FORMAT(2, 3);
118
119 std::vector<std::string> env_;
120 std::list<TestResult> test_results_;
121 std::string results_dir_;
122 base::ScopedFile error_log_;
123 };
124
TestHarness()125 TestHarness::TestHarness() {
126 results_dir_ = base::GetSysTempDir() + "/perfetto-stress-test";
127 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
128 base::ignore_result(system(("rmdir \"" + results_dir_ + "\" /s /q").c_str()));
129 #else
130 base::ignore_result(system(("rm -r -- \"" + results_dir_ + "\"").c_str()));
131 #endif
132 PERFETTO_CHECK(base::Mkdir(results_dir_));
133 PERFETTO_LOG("Saving test results in %s", results_dir_.c_str());
134 }
135
AddFailure(const char * fmt,...)136 void TestHarness::AddFailure(const char* fmt, ...) {
137 ++test_results_.back().num_errors;
138
139 char log_msg[512];
140 va_list args;
141 va_start(args, fmt);
142 int res = vsnprintf(log_msg, sizeof(log_msg), fmt, args);
143 va_end(args);
144
145 PERFETTO_ELOG("FAIL: %s", log_msg);
146
147 if (res > 0 && static_cast<size_t>(res) < sizeof(log_msg) - 2) {
148 log_msg[res++] = '\n';
149 log_msg[res++] = '\0';
150 }
151
152 base::WriteAll(*error_log_, log_msg, static_cast<size_t>(res));
153 }
154
RunConfig(const char * cfg_name,const StressTestConfig & cfg,bool verbose)155 void TestHarness::RunConfig(const char* cfg_name,
156 const StressTestConfig& cfg,
157 bool verbose) {
158 test_results_.emplace_back();
159 TestResult& test_result = test_results_.back();
160 test_result.cfg_name = cfg_name;
161 test_result.cfg = cfg;
162 g_sig->pids_to_kill.clear();
163
164 auto result_dir = results_dir_ + "/" + cfg_name;
165 PERFETTO_CHECK(base::Mkdir(result_dir));
166 error_log_ = base::OpenFile(result_dir + "/errors.log",
167 O_RDWR | O_CREAT | O_TRUNC, 0644);
168
169 PERFETTO_ILOG("Starting \"%s\" - %s", cfg_name, result_dir.c_str());
170
171 env_.emplace_back("PERFETTO_PRODUCER_SOCK_NAME=" + result_dir +
172 "/producer.sock");
173 env_.emplace_back("PERFETTO_CONSUMER_SOCK_NAME=" + result_dir +
174 "/consumer.sock");
175 std::string bin_dir = base::GetCurExecutableDir();
176
177 // Start the service.
178 base::Subprocess traced({bin_dir + "/traced"});
179 traced.args.env = env_;
180 if (!verbose) {
181 traced.args.out_fd = OpenLogFile(result_dir + "/traced.log");
182 traced.args.stderr_mode = traced.args.stdout_mode =
183 base::Subprocess::OutputMode::kFd;
184 }
185 traced.Start();
186 g_sig->pids_to_kill.emplace_back(traced.pid());
187 std::this_thread::sleep_for(std::chrono::milliseconds(100));
188 PERFETTO_CHECK(traced.Poll() == base::Subprocess::kRunning);
189
190 // Start the producer processes.
191 std::list<base::Subprocess> producers;
192 for (uint32_t i = 0; i < cfg.num_processes(); ++i) {
193 producers.emplace_back(base::Subprocess({bin_dir + "/stress_producer"}));
194 auto& producer = producers.back();
195 producer.args.input = cfg.SerializeAsString();
196 if (!verbose) {
197 producer.args.out_fd =
198 OpenLogFile(result_dir + "/producer." + std::to_string(i) + ".log");
199 producer.args.stderr_mode = producer.args.stdout_mode =
200 base::Subprocess::OutputMode::kFd;
201 }
202 producer.args.env = env_;
203 producer.Start();
204 g_sig->pids_to_kill.emplace_back(producer.pid());
205 }
206 std::this_thread::sleep_for(std::chrono::milliseconds(100));
207 for (auto& producer : producers)
208 PERFETTO_CHECK(producer.Poll() == base::Subprocess::kRunning);
209
210 auto trace_file_path = result_dir + "/trace";
211 base::Subprocess consumer(
212 {bin_dir + "/perfetto", "-c", "-", "-o", trace_file_path.c_str()});
213 consumer.args.env = env_;
214 consumer.args.input = cfg.trace_config().SerializeAsString();
215 if (!verbose) {
216 consumer.args.out_fd = OpenLogFile(result_dir + "/perfetto.log");
217 consumer.args.stderr_mode = consumer.args.stdout_mode =
218 base::Subprocess::OutputMode::kFd;
219 }
220 remove(trace_file_path.c_str());
221 consumer.Start();
222 int64_t t_start = base::GetBootTimeNs().count();
223 g_sig->pids_to_kill.emplace_back(consumer.pid());
224
225 std::this_thread::sleep_for(std::chrono::milliseconds(100));
226 PERFETTO_CHECK(consumer.Poll() == base::Subprocess::kRunning);
227
228 if (!consumer.Wait(
229 static_cast<int>(cfg.trace_config().duration_ms() + 30000))) {
230 AddFailure("Consumer didn't quit in time");
231 consumer.KillAndWaitForTermination();
232 }
233
234 int64_t t_end = base::GetBootTimeNs().count();
235
236 for (auto& producer : producers) {
237 producer.KillAndWaitForTermination();
238 test_result.prod_rusage = producer.posix_rusage(); // Only keep last one
239 }
240 producers.clear();
241 traced.KillAndWaitForTermination();
242
243 test_result.svc_rusage = traced.posix_rusage();
244 test_result.run_time_ms = static_cast<uint32_t>((t_end - t_start) / 1000000);
245
246 // Verify
247 // TODO(primiano): read back the TraceStats and check them as well.
248 ParsedTraceStats ctx;
249 ReadbackTrace(trace_file_path, &ctx);
250 auto exp_thd = cfg.num_processes() * cfg.num_threads();
251 if (ctx.threads.size() != exp_thd) {
252 AddFailure("Trace threads mismatch. Expected %u threads, got %zu", exp_thd,
253 ctx.threads.size());
254 }
255 for (const auto& it : ctx.threads) {
256 uint32_t seq_id = it.first;
257 const auto& thd = it.second;
258 if (!thd.last_seen) {
259 AddFailure("Last packet not seen for sequence %u", seq_id);
260 }
261 if (thd.seq_errors > 0) {
262 AddFailure("Sequence %u had %" PRIu64 " packets out of sync", seq_id,
263 thd.seq_errors);
264 }
265 if (thd.counter_errors > 0) {
266 AddFailure("Sequence %u had %" PRIu64 " packets counter errors", seq_id,
267 thd.counter_errors);
268 }
269 }
270
271 error_log_.reset();
272 PERFETTO_ILOG("Completed \"%s\"", cfg_name);
273 }
274
ReadbackTrace(const std::string & trace_file_path,ParsedTraceStats * ctx)275 void TestHarness::ReadbackTrace(const std::string& trace_file_path,
276 ParsedTraceStats* ctx) {
277 TestResult& test_result = test_results_.back();
278 using namespace protozero::proto_utils;
279 auto fd = base::OpenFile(trace_file_path.c_str(), O_RDONLY);
280 if (!fd)
281 return AddFailure("Trace file does not exist");
282 const off_t file_size = lseek(*fd, 0, SEEK_END);
283 lseek(*fd, 0, SEEK_SET);
284 if (file_size <= 0)
285 return AddFailure("Trace file is empty");
286
287 test_result.trace_size_kb = static_cast<uint32_t>(file_size / 1000);
288 std::string trace_data;
289 PERFETTO_CHECK(base::ReadFileDescriptor(*fd, &trace_data));
290 const auto* const start = reinterpret_cast<const uint8_t*>(trace_data.data());
291 const uint8_t* const end = start + file_size;
292
293 constexpr uint8_t kTracePacketTag = MakeTagLengthDelimited(1);
294
295 for (auto* ptr = start; (end - ptr) > 2;) {
296 const uint8_t* tokenizer_start = ptr;
297 if (*(ptr++) != kTracePacketTag) {
298 return AddFailure("Tokenizer failure at offset %zd", ptr - start);
299 }
300 uint64_t packet_size = 0;
301 ptr = ParseVarInt(ptr, end, &packet_size);
302 const uint8_t* const packet_start = ptr;
303 ptr += packet_size;
304 if ((ptr - tokenizer_start) < 2 || ptr > end)
305 return AddFailure("Got invalid packet size %" PRIu64 " at offset %zd",
306 packet_size,
307 static_cast<ssize_t>(packet_start - start));
308 ParseTracePacket(packet_start, static_cast<size_t>(packet_size), ctx);
309 }
310 test_result.num_threads = static_cast<uint32_t>(ctx->threads.size());
311 }
312
ParseTracePacket(const uint8_t * start,size_t size,ParsedTraceStats * ctx)313 void TestHarness::ParseTracePacket(const uint8_t* start,
314 size_t size,
315 ParsedTraceStats* ctx) {
316 TestResult& test_result = test_results_.back();
317 protos::pbzero::TracePacket::Decoder packet(start, size);
318 if (!packet.has_for_testing())
319 return;
320
321 ++test_result.num_packets;
322 const uint32_t seq_id = packet.trusted_packet_sequence_id();
323
324 protos::pbzero::TestEvent::Decoder te(packet.for_testing());
325 auto t_it = ctx->threads.find(seq_id);
326 bool is_first_packet = false;
327 if (t_it == ctx->threads.end()) {
328 is_first_packet = true;
329 t_it = ctx->threads.emplace(seq_id, ParsedTraceStats::WriterThread()).first;
330 }
331 ParsedTraceStats::WriterThread& thd = t_it->second;
332
333 ++thd.packets_seen;
334 if (te.is_last()) {
335 if (thd.last_seen) {
336 return AddFailure(
337 "last_seen=true happened more than once for sequence %u", seq_id);
338 } else {
339 thd.last_seen = true;
340 }
341 }
342 if (is_first_packet) {
343 thd.rnd_engine = std::minstd_rand0(te.seq_value());
344 } else {
345 const uint32_t expected = static_cast<uint32_t>(thd.rnd_engine());
346 if (te.seq_value() != expected) {
347 thd.rnd_engine = std::minstd_rand0(te.seq_value()); // Resync the engine.
348 ++thd.seq_errors;
349 return AddFailure(
350 "TestEvent seq mismatch for sequence %u. Expected %u got %u", seq_id,
351 expected, te.seq_value());
352 }
353 if (te.counter() != thd.packets_seen) {
354 return AddFailure(
355 "TestEvent counter mismatch for sequence %u. Expected %" PRIu64
356 " got %" PRIu64,
357 seq_id, thd.packets_seen, te.counter());
358 }
359 }
360
361 if (!te.has_payload()) {
362 return AddFailure("TestEvent %u for sequence %u has no payload",
363 te.seq_value(), seq_id);
364 }
365
366 // Check the validity of the payload. The payload might be nested. If that is
367 // the case, we need to check all levels.
368 protozero::ConstBytes payload_bounds = te.payload();
369 for (uint32_t depth = 0, last_depth = 0;; depth++) {
370 if (depth > 100) {
371 return AddFailure("Unexpectedly deep depth for event %u, sequence %u",
372 te.seq_value(), seq_id);
373 }
374 protos::pbzero::TestEvent::TestPayload::Decoder payload(payload_bounds);
375 const uint32_t rem_depth = payload.remaining_nesting_depth();
376
377 // The payload is a repeated field and must have exactly two instances.
378 // The writer splits it always in two halves of identical size.
379 int num_payload_pieces = 0;
380 size_t last_size = 0;
381 for (auto it = payload.str(); it; ++it, ++num_payload_pieces) {
382 protozero::ConstChars payload_str = *it;
383 last_size = last_size ? last_size : payload_str.size;
384 if (payload_str.size != last_size) {
385 return AddFailure(
386 "Asymmetrical payload at depth %u, event id %u, sequence %u. "
387 "%zu != %zu",
388 depth, te.seq_value(), seq_id, last_size, payload_str.size);
389 }
390 // Check that the payload content matches the expected sequence.
391 for (size_t i = 0; i < payload_str.size; i++) {
392 char exp = static_cast<char>(33 + ((te.seq_value() + i) % 64));
393 if (payload_str.data[i] != exp) {
394 return AddFailure(
395 "Payload mismatch at %zu, depth %u, event id %u, sequence %u. "
396 "Expected: 0x%x, Actual: 0x%x",
397 i, depth, te.seq_value(), seq_id, exp, payload_str.data[i]);
398 }
399 }
400 }
401 if (num_payload_pieces != 2) {
402 return AddFailure(
403 "Broken payload at depth %u, event id %u, sequence %u. "
404 "Expecting 2 repeated str fields, got %d",
405 depth, te.seq_value(), seq_id, num_payload_pieces);
406 }
407
408 if (depth > 0 && rem_depth != last_depth - 1) {
409 return AddFailure(
410 "Unexpected nesting level (expected: %u, actual: %u) at depth %u, "
411 "event id %u, sequence %u",
412 rem_depth, last_depth - 1, depth, te.seq_value(), seq_id);
413 }
414
415 last_depth = rem_depth;
416 if (rem_depth == 0)
417 break;
418 if (payload.has_nested()) {
419 payload_bounds = *payload.nested();
420 } else {
421 payload_bounds = {nullptr, 0};
422 }
423 }
424 }
425
CtrlCHandler()426 void CtrlCHandler() {
427 g_sig->aborted.store(true);
428 for (auto it = g_sig->pids_to_kill.rbegin(); it != g_sig->pids_to_kill.rend();
429 it++) {
430 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
431 base::ScopedPlatformHandle proc_handle(
432 ::OpenProcess(PROCESS_TERMINATE, false, static_cast<DWORD>(*it)));
433 ::TerminateProcess(*proc_handle, STATUS_CONTROL_C_EXIT);
434 #else
435 kill(*it, SIGKILL);
436 #endif
437 }
438 }
439
StressTestMain(int argc,char ** argv)440 void StressTestMain(int argc, char** argv) {
441 TestHarness th;
442 std::regex filter;
443 bool has_filter = false;
444
445 bool verbose = false;
446 for (int i = 1; i < argc; ++i) {
447 if (!strcmp(argv[i], "-v")) {
448 verbose = true;
449 } else {
450 filter = std::regex(argv[i], std::regex::ECMAScript | std::regex::icase);
451 has_filter = true;
452 }
453 }
454
455 g_sig = new SigHandlerCtx();
456 base::InstallCtrlCHandler(&CtrlCHandler);
457
458 for (size_t i = 0; i < base::ArraySize(kStressTestConfigs) && !g_sig->aborted;
459 ++i) {
460 const auto& cfg_blob = kStressTestConfigs[i];
461 StressTestConfig cfg;
462 std::cmatch ignored;
463 if (has_filter && !std::regex_search(cfg_blob.name, ignored, filter)) {
464 continue;
465 }
466 PERFETTO_CHECK(cfg.ParseFromArray(cfg_blob.data, cfg_blob.size));
467 th.RunConfig(cfg_blob.name, cfg, verbose);
468 }
469
470 for (const auto& tres : th.test_results()) {
471 const auto& cfg = tres.cfg;
472 printf("===============================================================\n");
473 printf("Config: %s\n", tres.cfg_name);
474 printf("===============================================================\n");
475 printf("%-20s %-10s %-10s\n", "Metric", "Expected", "Actual");
476 printf("%-20s %-10s %-10s\n", "------", "--------", "------");
477 printf("%-20s %-10d %-10d\n", "#Errors", 0, tres.num_errors);
478 printf("%-20s %-10d %-10d \n", "Duration [ms]",
479 cfg.trace_config().duration_ms(), tres.run_time_ms);
480
481 uint32_t exp_threads = cfg.num_processes() * cfg.num_threads();
482 printf("%-20s %-10u %-10u\n", "Num threads", exp_threads, tres.num_threads);
483
484 double dur_s = cfg.trace_config().duration_ms() / 1e3;
485 double exp_per_thread = cfg.steady_state_timings().rate_mean() * dur_s;
486 if (cfg.burst_period_ms()) {
487 double burst_rate = 1.0 * cfg.burst_duration_ms() / cfg.burst_period_ms();
488 exp_per_thread *= 1.0 - burst_rate;
489 exp_per_thread += burst_rate * cfg.burst_timings().rate_mean() * dur_s;
490 }
491 if (cfg.max_events())
492 exp_per_thread = std::min(exp_per_thread, 1.0 * cfg.max_events());
493 double exp_packets = std::round(exp_per_thread * exp_threads);
494 printf("%-20s %-10.0f %-10d\n", "Num packets", exp_packets,
495 tres.num_packets);
496
497 double exp_size_kb = exp_packets * (cfg.nesting() + 1) *
498 (cfg.steady_state_timings().payload_mean() + 40) /
499 1000;
500 printf("%-20s ~%-9.0f %-10d\n", "Trace size [KB]", exp_size_kb,
501 tres.trace_size_kb);
502
503 double exp_rss_mb = cfg.trace_config().buffers()[0].size_kb() / 1000;
504 printf("%-20s (max) %-4.0f %-10d\n", "Svc RSS [MB]", exp_rss_mb,
505 tres.svc_rusage.max_rss_kb / 1000);
506 printf("%-20s %-10s %-10d\n", "Svc CPU [ms]", "---",
507 tres.svc_rusage.cpu_time_ms());
508 printf("%-20s %-10s %d / %d\n", "Svc #ctxswitch", "---",
509 tres.svc_rusage.invol_ctx_switch, tres.svc_rusage.vol_ctx_switch);
510
511 printf("%-20s %-10s %-10d\n", "Prod RSS [MB]", "---",
512 tres.prod_rusage.max_rss_kb / 1000);
513 printf("%-20s %-10s %-10d\n", "Prod CPU [ms]", "---",
514 tres.prod_rusage.cpu_time_ms());
515 printf("%-20s %-10s %d / %d\n", "Prod #ctxswitch", "---",
516 tres.prod_rusage.invol_ctx_switch, tres.prod_rusage.vol_ctx_switch);
517 printf("\n");
518 }
519 }
520
521 } // namespace
522 } // namespace perfetto
523
main(int argc,char ** argv)524 int main(int argc, char** argv) {
525 perfetto::StressTestMain(argc, argv);
526 }
527