1 /*
2 * Copyright 2021 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "fcp/client/opstats/opstats_logger_impl.h"
18
19 #include <filesystem>
20 #include <string>
21 #include <utility>
22
23 #include "google/protobuf/util/time_util.h"
24 #include "gmock/gmock.h"
25 #include "gtest/gtest.h"
26 #include "fcp/base/monitoring.h"
27 #include "fcp/client/diag_codes.pb.h"
28 #include "fcp/client/histogram_counters.pb.h"
29 #include "fcp/client/opstats/pds_backed_opstats_db.h"
30 #include "fcp/client/test_helpers.h"
31 #include "fcp/protos/opstats.pb.h"
32 #include "fcp/testing/testing.h"
33
34 namespace fcp {
35 namespace client {
36 namespace opstats {
37 namespace {
38
39 using google::internal::federatedml::v2::RetryWindow;
40 using google::protobuf::Timestamp;
41 using google::protobuf::util::TimeUtil;
42 using testing::Ge;
43 using testing::Return;
44 using testing::StrictMock;
45
46 constexpr char kSessionName[] = "SESSION";
47 constexpr char kPopulationName[] = "POPULATION";
48 constexpr char kTaskName[] = "TASK";
49
50 class OpStatsLoggerImplTest : public testing::Test {
51 protected:
SetUp()52 void SetUp() override {
53 ON_CALL(mock_flags_, enable_opstats()).WillByDefault(Return(true));
54 ON_CALL(mock_flags_, opstats_ttl_days()).WillByDefault(Return(1));
55 ON_CALL(mock_flags_, opstats_db_size_limit_bytes())
56 .WillByDefault(Return(1 * 1024 * 1024));
57 base_dir_ = testing::TempDir();
58 }
59
TearDown()60 void TearDown() override {
61 auto db = PdsBackedOpStatsDb::Create(
62 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
63 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
64 ASSERT_OK(db);
65 EXPECT_CALL(
66 mock_log_manager_,
67 LogToLongHistogram(OPSTATS_DB_SIZE_BYTES, /*execution_index=*/0,
68 /*epoch_index=*/0, engine::DataSourceType::DATASET,
69 /*value=*/0));
70 EXPECT_CALL(
71 mock_log_manager_,
72 LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
73 /*execution_index=*/0, /*epoch_index=*/0,
74 engine::DataSourceType::DATASET, /*value=*/0));
75 EXPECT_THAT((*db)->Transform([](OpStatsSequence& data) { data.Clear(); }),
76 IsOk());
77 }
78
CreateOpStatsLoggerImpl(const std::string & session_name,const std::string & population_name)79 std::unique_ptr<OpStatsLogger> CreateOpStatsLoggerImpl(
80 const std::string& session_name, const std::string& population_name) {
81 auto db = PdsBackedOpStatsDb::Create(
82 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
83 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
84 FCP_CHECK(db.ok());
85 return std::make_unique<OpStatsLoggerImpl>(std::move(*db),
86 &mock_log_manager_, &mock_flags_,
87 session_name, population_name);
88 }
89
90 // Checks that the expected and actual protos are equivalent, ignoring the
91 // timestamps in the actual proto, which must also be increasing.
CheckEqualProtosAndIncreasingTimestamps(const Timestamp & start_time,const OpStatsSequence & expected,OpStatsSequence actual)92 void CheckEqualProtosAndIncreasingTimestamps(const Timestamp& start_time,
93 const OpStatsSequence& expected,
94 OpStatsSequence actual) {
95 auto previous_timestamp = start_time;
96 for (auto& opstats : *actual.mutable_opstats()) {
97 for (auto& event : *opstats.mutable_events()) {
98 EXPECT_GE(event.timestamp(), previous_timestamp);
99 previous_timestamp = event.timestamp();
100 // Remove the timestamp
101 event.clear_timestamp();
102 }
103 }
104 actual.clear_earliest_trustworthy_time();
105 EXPECT_THAT(actual, EqualsProto(expected));
106 }
107
ExpectOpstatsEnabledEvents(int num_opstats_loggers)108 void ExpectOpstatsEnabledEvents(int num_opstats_loggers) {
109 ExpectOpstatsEnabledEvents(num_opstats_loggers, num_opstats_loggers);
110 }
111
ExpectOpstatsEnabledEvents(int num_opstats_loggers,int num_opstats_commits)112 void ExpectOpstatsEnabledEvents(int num_opstats_loggers,
113 int num_opstats_commits) {
114 EXPECT_CALL(mock_log_manager_,
115 LogDiag(DebugDiagCode::TRAINING_OPSTATS_ENABLED))
116 .Times(num_opstats_loggers);
117 // Logged when the class is initialized.
118 EXPECT_CALL(mock_log_manager_,
119 LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_EXPECTED))
120 .Times(num_opstats_loggers);
121 EXPECT_CALL(mock_log_manager_,
122 LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_ATTEMPTED))
123 .Times(num_opstats_commits);
124 EXPECT_CALL(
125 mock_log_manager_,
126 LogToLongHistogram(TRAINING_OPSTATS_COMMIT_LATENCY,
127 /*execution_index=*/0, /*epoch_index=*/0,
128 engine::DataSourceType::DATASET, /*value=*/Ge(0)))
129 .Times(num_opstats_commits);
130 EXPECT_CALL(
131 mock_log_manager_,
132 LogToLongHistogram(OPSTATS_DB_SIZE_BYTES, /*execution_index=*/0,
133 /*epoch_index=*/0, engine::DataSourceType::DATASET,
134 /*value=*/Ge(0)))
135 .Times(num_opstats_commits);
136 EXPECT_CALL(
137 mock_log_manager_,
138 LogToLongHistogram(OPSTATS_DB_NUM_ENTRIES, /*execution_index=*/0,
139 /*epoch_index=*/0, engine::DataSourceType::DATASET,
140 /*value=*/Ge(0)))
141 .Times(num_opstats_commits);
142 }
143
CreateRetryWindow(const std::string & retry_token,int64_t delay_min_seconds,int64_t delay_max_seconds)144 RetryWindow CreateRetryWindow(const std::string& retry_token,
145 int64_t delay_min_seconds,
146 int64_t delay_max_seconds) {
147 RetryWindow retry_window;
148 retry_window.set_retry_token(retry_token);
149 retry_window.mutable_delay_min()->set_seconds(delay_min_seconds);
150 retry_window.mutable_delay_max()->set_seconds(delay_max_seconds);
151 return retry_window;
152 }
153
154 std::string base_dir_;
155 MockFlags mock_flags_;
156 StrictMock<MockLogManager> mock_log_manager_;
157 };
158
TEST_F(OpStatsLoggerImplTest,SetTaskName)159 TEST_F(OpStatsLoggerImplTest, SetTaskName) {
160 auto start_time = TimeUtil::GetCurrentTime();
161 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/3);
162
163 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
164 opstats_logger->AddEventAndSetTaskName(
165 kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
166
167 opstats_logger.reset();
168
169 auto opstats_logger_no_population =
170 CreateOpStatsLoggerImpl(kSessionName,
171 /*population_name=*/"");
172 opstats_logger_no_population->AddEventAndSetTaskName(
173 kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
174
175 opstats_logger_no_population.reset();
176
177 auto opstats_logger_no_session =
178 CreateOpStatsLoggerImpl(/*session_name=*/"", kPopulationName);
179 opstats_logger_no_session->AddEventAndSetTaskName(
180 kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
181
182 opstats_logger_no_session.reset();
183
184 auto db = PdsBackedOpStatsDb::Create(
185 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
186 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
187 ASSERT_OK(db);
188 auto data = (*db)->Read();
189 ASSERT_OK(data);
190
191 OpStatsSequence expected;
192 // Add the first run
193 auto new_opstats = expected.add_opstats();
194 new_opstats->set_session_name(kSessionName);
195 new_opstats->set_population_name(kPopulationName);
196 new_opstats->set_task_name(kTaskName);
197 new_opstats->add_events()->set_event_type(
198 OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
199 // Add the second run
200 new_opstats = expected.add_opstats();
201 new_opstats->set_session_name(kSessionName);
202 new_opstats->set_task_name(kTaskName);
203 new_opstats->add_events()->set_event_type(
204 OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
205 // Add the third run
206 new_opstats = expected.add_opstats();
207 new_opstats->set_population_name(kPopulationName);
208 new_opstats->set_task_name(kTaskName);
209 new_opstats->add_events()->set_event_type(
210 OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
211
212 CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
213 }
214
TEST_F(OpStatsLoggerImplTest,NewRunAfterCorruption)215 TEST_F(OpStatsLoggerImplTest, NewRunAfterCorruption) {
216 auto start_time = TimeUtil::GetCurrentTime();
217 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/2);
218
219 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
220 opstats_logger->AddEventAndSetTaskName(
221 kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
222 opstats_logger.reset();
223
224 // Make the db file corrupt
225 {
226 std::filesystem::path db_path(base_dir_);
227 db_path /= PdsBackedOpStatsDb::kParentDir;
228 db_path /= PdsBackedOpStatsDb::kDbFileName;
229 protostore::FileStorage file_storage;
230 std::unique_ptr<protostore::OutputStream> ostream =
231 file_storage.OpenForWrite(db_path).value();
232 EXPECT_THAT(ostream->Append("not a proto"), IsOk());
233 EXPECT_THAT(ostream->Close(), IsOk());
234 }
235
236 EXPECT_CALL(mock_log_manager_, LogDiag(ProdDiagCode::OPSTATS_READ_FAILED));
237 auto opstats_logger_no_population =
238 CreateOpStatsLoggerImpl(kSessionName,
239 /*population_name=*/"");
240 opstats_logger_no_population->AddEventAndSetTaskName(
241 kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
242
243 opstats_logger_no_population.reset();
244
245 auto db = PdsBackedOpStatsDb::Create(
246 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
247 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
248 ASSERT_OK(db);
249 auto data = (*db)->Read();
250 ASSERT_OK(data);
251
252 // Expect only the second run to be represented in the db.
253 OpStatsSequence expected;
254 auto new_opstats = expected.add_opstats();
255 new_opstats->set_session_name(kSessionName);
256 new_opstats->set_task_name(kTaskName);
257 new_opstats->add_events()->set_event_type(
258 OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
259 CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
260 }
261
TEST_F(OpStatsLoggerImplTest,AddEvent)262 TEST_F(OpStatsLoggerImplTest, AddEvent) {
263 auto start_time = TimeUtil::GetCurrentTime();
264 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/2);
265
266 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
267 opstats_logger->AddEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
268 opstats_logger.reset();
269
270 auto opstats_logger_no_population =
271 CreateOpStatsLoggerImpl(kSessionName,
272 /*population_name=*/"");
273 opstats_logger_no_population->AddEvent(
274 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
275 opstats_logger_no_population->AddEvent(
276 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
277 opstats_logger_no_population.reset();
278
279 auto db = PdsBackedOpStatsDb::Create(
280 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
281 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
282 ASSERT_OK(db);
283 auto data = (*db)->Read();
284 ASSERT_OK(data);
285
286 OpStatsSequence expected;
287 // Add the first run
288 auto new_opstats = expected.add_opstats();
289 new_opstats->set_session_name(kSessionName);
290 new_opstats->set_population_name(kPopulationName);
291 new_opstats->add_events()->set_event_type(
292 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
293 // Add the second run
294 new_opstats = expected.add_opstats();
295 new_opstats->set_session_name(kSessionName);
296 new_opstats->add_events()->set_event_type(
297 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
298 new_opstats->add_events()->set_event_type(
299 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
300
301 CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
302 }
303
TEST_F(OpStatsLoggerImplTest,AddEventAfterTtl)304 TEST_F(OpStatsLoggerImplTest, AddEventAfterTtl) {
305 auto start_time = TimeUtil::GetCurrentTime();
306 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/2);
307
308 // Set the ttl to 0 so that previous data will be wiped out each time the
309 // logger tries to commit new data.
310 EXPECT_CALL(mock_flags_, opstats_ttl_days()).WillRepeatedly(Return(0));
311 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
312 opstats_logger->AddEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
313 opstats_logger.reset();
314
315 auto opstats_logger_no_population =
316 CreateOpStatsLoggerImpl(kSessionName,
317 /*population_name=*/"");
318 opstats_logger_no_population->AddEvent(
319 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
320 opstats_logger_no_population->AddEvent(
321 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
322 opstats_logger_no_population.reset();
323
324 auto db = PdsBackedOpStatsDb::Create(
325 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
326 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
327 auto data = (*db)->Read();
328 ASSERT_OK(data);
329
330 // Expect the db to contain only data associated with the second run. The
331 // second run should be complete, however.
332 OpStatsSequence expected;
333 auto new_opstats = expected.add_opstats();
334 new_opstats->set_session_name(kSessionName);
335 new_opstats->add_events()->set_event_type(
336 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
337 new_opstats->add_events()->set_event_type(
338 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
339
340 CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
341 }
342
TEST_F(OpStatsLoggerImplTest,AddEventWithErrorMessage)343 TEST_F(OpStatsLoggerImplTest, AddEventWithErrorMessage) {
344 auto start_time = TimeUtil::GetCurrentTime();
345 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
346
347 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
348 opstats_logger->AddEventWithErrorMessage(
349 OperationalStats::Event::EVENT_KIND_ERROR_IO, "first error");
350 opstats_logger->AddEventWithErrorMessage(
351 OperationalStats::Event::EVENT_KIND_ERROR_TENSORFLOW, "second error");
352 opstats_logger.reset();
353
354 auto db = PdsBackedOpStatsDb::Create(
355 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
356 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
357 auto data = (*db)->Read();
358 ASSERT_OK(data);
359
360 OpStatsSequence expected;
361 auto new_opstats = expected.add_opstats();
362 new_opstats->set_session_name(kSessionName);
363 new_opstats->set_population_name(kPopulationName);
364 new_opstats->add_events()->set_event_type(
365 OperationalStats::Event::EVENT_KIND_ERROR_IO);
366 new_opstats->add_events()->set_event_type(
367 OperationalStats::Event::EVENT_KIND_ERROR_TENSORFLOW);
368 new_opstats->set_error_message("first error");
369
370 CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
371 }
372
TEST_F(OpStatsLoggerImplTest,UpdateDatasetStats)373 TEST_F(OpStatsLoggerImplTest, UpdateDatasetStats) {
374 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
375
376 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
377 const std::string kCollectionUri = "app:/collection_uri";
378 const std::string kCollectionUriOther = "app:/collection_uri_other";
379 opstats_logger->UpdateDatasetStats(kCollectionUri,
380 /*additional_example_count=*/100,
381 /*additional_example_size_bytes=*/1000);
382 opstats_logger->UpdateDatasetStats(kCollectionUriOther,
383 /*additional_example_count=*/200,
384 /*additional_example_size_bytes=*/2000);
385 opstats_logger->UpdateDatasetStats(kCollectionUri,
386 /*additional_example_count=*/300,
387 /*additional_example_size_bytes=*/3000);
388 opstats_logger.reset();
389
390 auto db = PdsBackedOpStatsDb::Create(
391 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
392 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
393 auto data = (*db)->Read();
394 ASSERT_OK(data);
395
396 OpStatsSequence expected;
397 auto new_opstats = expected.add_opstats();
398 new_opstats->set_session_name(kSessionName);
399 new_opstats->set_population_name(kPopulationName);
400 OperationalStats::DatasetStats dataset_stats;
401 dataset_stats.set_num_examples_read(400);
402 dataset_stats.set_num_bytes_read(4000);
403 (*new_opstats->mutable_dataset_stats())[kCollectionUri] =
404 std::move(dataset_stats);
405 OperationalStats::DatasetStats dataset_stats_other;
406 dataset_stats_other.set_num_examples_read(200);
407 dataset_stats_other.set_num_bytes_read(2000);
408 (*new_opstats->mutable_dataset_stats())[kCollectionUriOther] =
409 std::move(dataset_stats_other);
410
411 (*data).clear_earliest_trustworthy_time();
412 EXPECT_THAT(*data, EqualsProto(expected));
413 }
414
TEST_F(OpStatsLoggerImplTest,SetNetworkStats)415 TEST_F(OpStatsLoggerImplTest, SetNetworkStats) {
416 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
417
418 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
419 opstats_logger->SetNetworkStats(
420 {.bytes_downloaded = 102,
421 .bytes_uploaded = 103,
422 .network_duration = absl::Milliseconds(104)});
423 opstats_logger->SetNetworkStats(
424 {.bytes_downloaded = 202,
425 .bytes_uploaded = 203,
426 .network_duration = absl::Milliseconds(204)});
427 opstats_logger.reset();
428
429 auto db = PdsBackedOpStatsDb::Create(
430 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
431 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
432 ASSERT_OK(db);
433 auto data = (*db)->Read();
434 ASSERT_OK(data);
435
436 OpStatsSequence expected;
437 auto new_opstats = expected.add_opstats();
438 new_opstats->set_session_name(kSessionName);
439 new_opstats->set_population_name(kPopulationName);
440 // The bytes_downloaded/bytes_uploaded fields should not be set anymore
441 new_opstats->set_chunking_layer_bytes_downloaded(202);
442 new_opstats->set_chunking_layer_bytes_uploaded(203);
443 // The new network_duration field should be set now.
444 new_opstats->mutable_network_duration()->set_nanos(
445 static_cast<int32_t>(absl::ToInt64Nanoseconds(absl::Milliseconds(204))));
446
447 (*data).clear_earliest_trustworthy_time();
448 EXPECT_THAT(*data, EqualsProto(expected));
449 }
450
TEST_F(OpStatsLoggerImplTest,SetRetryWindow)451 TEST_F(OpStatsLoggerImplTest, SetRetryWindow) {
452 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
453
454 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
455 opstats_logger->SetRetryWindow(CreateRetryWindow("retry_token", 100, 200));
456 opstats_logger->SetRetryWindow(CreateRetryWindow("retry_token", 300, 400));
457 opstats_logger.reset();
458
459 auto db = PdsBackedOpStatsDb::Create(
460 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
461 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
462 ASSERT_OK(db);
463 auto data = (*db)->Read();
464 ASSERT_OK(data);
465
466 OpStatsSequence expected;
467 auto new_opstats = expected.add_opstats();
468 new_opstats->set_session_name(kSessionName);
469 new_opstats->set_population_name(kPopulationName);
470 *new_opstats->mutable_retry_window() =
471 CreateRetryWindow(/*retry_token=*/"", 300, 400);
472
473 (*data).clear_earliest_trustworthy_time();
474 EXPECT_THAT(*data, EqualsProto(expected));
475 }
476
TEST_F(OpStatsLoggerImplTest,AddEventCommitAddMoreEvents)477 TEST_F(OpStatsLoggerImplTest, AddEventCommitAddMoreEvents) {
478 auto start_time = TimeUtil::GetCurrentTime();
479 ExpectOpstatsEnabledEvents(
480 /*num_opstats_loggers=*/2, /*num_opstats_commits=*/4);
481
482 auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
483 opstats_logger->AddEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
484 opstats_logger.reset();
485
486 auto opstats_logger_no_population =
487 CreateOpStatsLoggerImpl(kSessionName,
488 /*population_name=*/"");
489 opstats_logger_no_population->AddEvent(
490 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
491 ASSERT_OK(opstats_logger_no_population->CommitToStorage());
492 opstats_logger_no_population->AddEvent(
493 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
494 ASSERT_OK(opstats_logger_no_population->CommitToStorage());
495 opstats_logger_no_population->AddEvent(
496 OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
497 opstats_logger_no_population.reset();
498
499 auto db = PdsBackedOpStatsDb::Create(
500 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
501 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
502 ASSERT_OK(db);
503 auto data = (*db)->Read();
504 ASSERT_OK(data);
505
506 OpStatsSequence expected;
507 // Add the first run
508 auto second_run = expected.add_opstats();
509 second_run->set_session_name(kSessionName);
510 second_run->set_population_name(kPopulationName);
511 second_run->add_events()->set_event_type(
512 OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
513 // Add the second run
514 second_run = expected.add_opstats();
515 second_run->set_session_name(kSessionName);
516 second_run->add_events()->set_event_type(
517 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
518 second_run->add_events()->set_event_type(
519 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
520 second_run->add_events()->set_event_type(
521 OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
522
523 CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
524 }
525
TEST_F(OpStatsLoggerImplTest,MisconfiguredTtlMultipleCommit)526 TEST_F(OpStatsLoggerImplTest, MisconfiguredTtlMultipleCommit) {
527 auto start_time = TimeUtil::GetCurrentTime();
528 ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1,
529 /*num_opstats_commits*/ 3);
530 auto db_zero_ttl = PdsBackedOpStatsDb::Create(
531 base_dir_, absl::ZeroDuration(), mock_log_manager_,
532 mock_flags_.opstats_db_size_limit_bytes())
533 .value();
534 auto opstats_logger = std::make_unique<OpStatsLoggerImpl>(
535 std::move(db_zero_ttl), &mock_log_manager_, &mock_flags_, kSessionName,
536 kPopulationName);
537
538 opstats_logger->AddEvent(
539 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
540 ASSERT_OK(opstats_logger->CommitToStorage());
541 opstats_logger->AddEvent(
542 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
543 ASSERT_OK(opstats_logger->CommitToStorage());
544 opstats_logger->AddEvent(
545 OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
546 opstats_logger.reset();
547
548 auto db = PdsBackedOpStatsDb::Create(
549 base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
550 mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
551 ASSERT_OK(db);
552 auto data = (*db)->Read();
553 ASSERT_OK(data);
554
555 // Even though we had corruption in the middle of the run, it should be ok
556 // because we committed the entire history successfully at the end.
557 OpStatsSequence expected;
558 auto expected_stats = expected.add_opstats();
559 expected_stats->set_population_name(kPopulationName);
560 expected_stats->set_session_name(kSessionName);
561 expected_stats->add_events()->set_event_type(
562 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
563 expected_stats->add_events()->set_event_type(
564 OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
565 expected_stats->add_events()->set_event_type(
566 OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
567
568 CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
569 }
570
571 } // anonymous namespace
572 } // namespace opstats
573 } // namespace client
574 } // namespace fcp
575