xref: /aosp_15_r20/external/federated-compute/fcp/client/opstats/opstats_logger_impl_test.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2021 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "fcp/client/opstats/opstats_logger_impl.h"
18 
19 #include <filesystem>
20 #include <string>
21 #include <utility>
22 
23 #include "google/protobuf/util/time_util.h"
24 #include "gmock/gmock.h"
25 #include "gtest/gtest.h"
26 #include "fcp/base/monitoring.h"
27 #include "fcp/client/diag_codes.pb.h"
28 #include "fcp/client/histogram_counters.pb.h"
29 #include "fcp/client/opstats/pds_backed_opstats_db.h"
30 #include "fcp/client/test_helpers.h"
31 #include "fcp/protos/opstats.pb.h"
32 #include "fcp/testing/testing.h"
33 
34 namespace fcp {
35 namespace client {
36 namespace opstats {
37 namespace {
38 
39 using google::internal::federatedml::v2::RetryWindow;
40 using google::protobuf::Timestamp;
41 using google::protobuf::util::TimeUtil;
42 using testing::Ge;
43 using testing::Return;
44 using testing::StrictMock;
45 
46 constexpr char kSessionName[] = "SESSION";
47 constexpr char kPopulationName[] = "POPULATION";
48 constexpr char kTaskName[] = "TASK";
49 
50 class OpStatsLoggerImplTest : public testing::Test {
51  protected:
SetUp()52   void SetUp() override {
53     ON_CALL(mock_flags_, enable_opstats()).WillByDefault(Return(true));
54     ON_CALL(mock_flags_, opstats_ttl_days()).WillByDefault(Return(1));
55     ON_CALL(mock_flags_, opstats_db_size_limit_bytes())
56         .WillByDefault(Return(1 * 1024 * 1024));
57     base_dir_ = testing::TempDir();
58   }
59 
TearDown()60   void TearDown() override {
61     auto db = PdsBackedOpStatsDb::Create(
62         base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
63         mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
64     ASSERT_OK(db);
65     EXPECT_CALL(
66         mock_log_manager_,
67         LogToLongHistogram(OPSTATS_DB_SIZE_BYTES, /*execution_index=*/0,
68                            /*epoch_index=*/0, engine::DataSourceType::DATASET,
69                            /*value=*/0));
70     EXPECT_CALL(
71         mock_log_manager_,
72         LogToLongHistogram(HistogramCounters::OPSTATS_DB_NUM_ENTRIES,
73                            /*execution_index=*/0, /*epoch_index=*/0,
74                            engine::DataSourceType::DATASET, /*value=*/0));
75     EXPECT_THAT((*db)->Transform([](OpStatsSequence& data) { data.Clear(); }),
76                 IsOk());
77   }
78 
CreateOpStatsLoggerImpl(const std::string & session_name,const std::string & population_name)79   std::unique_ptr<OpStatsLogger> CreateOpStatsLoggerImpl(
80       const std::string& session_name, const std::string& population_name) {
81     auto db = PdsBackedOpStatsDb::Create(
82         base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
83         mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
84     FCP_CHECK(db.ok());
85     return std::make_unique<OpStatsLoggerImpl>(std::move(*db),
86                                                &mock_log_manager_, &mock_flags_,
87                                                session_name, population_name);
88   }
89 
90   // Checks that the expected and actual protos are equivalent, ignoring the
91   // timestamps in the actual proto, which must also be increasing.
CheckEqualProtosAndIncreasingTimestamps(const Timestamp & start_time,const OpStatsSequence & expected,OpStatsSequence actual)92   void CheckEqualProtosAndIncreasingTimestamps(const Timestamp& start_time,
93                                                const OpStatsSequence& expected,
94                                                OpStatsSequence actual) {
95     auto previous_timestamp = start_time;
96     for (auto& opstats : *actual.mutable_opstats()) {
97       for (auto& event : *opstats.mutable_events()) {
98         EXPECT_GE(event.timestamp(), previous_timestamp);
99         previous_timestamp = event.timestamp();
100         // Remove the timestamp
101         event.clear_timestamp();
102       }
103     }
104     actual.clear_earliest_trustworthy_time();
105     EXPECT_THAT(actual, EqualsProto(expected));
106   }
107 
ExpectOpstatsEnabledEvents(int num_opstats_loggers)108   void ExpectOpstatsEnabledEvents(int num_opstats_loggers) {
109     ExpectOpstatsEnabledEvents(num_opstats_loggers, num_opstats_loggers);
110   }
111 
ExpectOpstatsEnabledEvents(int num_opstats_loggers,int num_opstats_commits)112   void ExpectOpstatsEnabledEvents(int num_opstats_loggers,
113                                   int num_opstats_commits) {
114     EXPECT_CALL(mock_log_manager_,
115                 LogDiag(DebugDiagCode::TRAINING_OPSTATS_ENABLED))
116         .Times(num_opstats_loggers);
117     // Logged when the class is initialized.
118     EXPECT_CALL(mock_log_manager_,
119                 LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_EXPECTED))
120         .Times(num_opstats_loggers);
121     EXPECT_CALL(mock_log_manager_,
122                 LogDiag(ProdDiagCode::OPSTATS_DB_COMMIT_ATTEMPTED))
123         .Times(num_opstats_commits);
124     EXPECT_CALL(
125         mock_log_manager_,
126         LogToLongHistogram(TRAINING_OPSTATS_COMMIT_LATENCY,
127                            /*execution_index=*/0, /*epoch_index=*/0,
128                            engine::DataSourceType::DATASET, /*value=*/Ge(0)))
129         .Times(num_opstats_commits);
130     EXPECT_CALL(
131         mock_log_manager_,
132         LogToLongHistogram(OPSTATS_DB_SIZE_BYTES, /*execution_index=*/0,
133                            /*epoch_index=*/0, engine::DataSourceType::DATASET,
134                            /*value=*/Ge(0)))
135         .Times(num_opstats_commits);
136     EXPECT_CALL(
137         mock_log_manager_,
138         LogToLongHistogram(OPSTATS_DB_NUM_ENTRIES, /*execution_index=*/0,
139                            /*epoch_index=*/0, engine::DataSourceType::DATASET,
140                            /*value=*/Ge(0)))
141         .Times(num_opstats_commits);
142   }
143 
CreateRetryWindow(const std::string & retry_token,int64_t delay_min_seconds,int64_t delay_max_seconds)144   RetryWindow CreateRetryWindow(const std::string& retry_token,
145                                 int64_t delay_min_seconds,
146                                 int64_t delay_max_seconds) {
147     RetryWindow retry_window;
148     retry_window.set_retry_token(retry_token);
149     retry_window.mutable_delay_min()->set_seconds(delay_min_seconds);
150     retry_window.mutable_delay_max()->set_seconds(delay_max_seconds);
151     return retry_window;
152   }
153 
154   std::string base_dir_;
155   MockFlags mock_flags_;
156   StrictMock<MockLogManager> mock_log_manager_;
157 };
158 
TEST_F(OpStatsLoggerImplTest,SetTaskName)159 TEST_F(OpStatsLoggerImplTest, SetTaskName) {
160   auto start_time = TimeUtil::GetCurrentTime();
161   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/3);
162 
163   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
164   opstats_logger->AddEventAndSetTaskName(
165       kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
166 
167   opstats_logger.reset();
168 
169   auto opstats_logger_no_population =
170       CreateOpStatsLoggerImpl(kSessionName,
171                               /*population_name=*/"");
172   opstats_logger_no_population->AddEventAndSetTaskName(
173       kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
174 
175   opstats_logger_no_population.reset();
176 
177   auto opstats_logger_no_session =
178       CreateOpStatsLoggerImpl(/*session_name=*/"", kPopulationName);
179   opstats_logger_no_session->AddEventAndSetTaskName(
180       kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
181 
182   opstats_logger_no_session.reset();
183 
184   auto db = PdsBackedOpStatsDb::Create(
185       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
186       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
187   ASSERT_OK(db);
188   auto data = (*db)->Read();
189   ASSERT_OK(data);
190 
191   OpStatsSequence expected;
192   // Add the first run
193   auto new_opstats = expected.add_opstats();
194   new_opstats->set_session_name(kSessionName);
195   new_opstats->set_population_name(kPopulationName);
196   new_opstats->set_task_name(kTaskName);
197   new_opstats->add_events()->set_event_type(
198       OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
199   // Add the second run
200   new_opstats = expected.add_opstats();
201   new_opstats->set_session_name(kSessionName);
202   new_opstats->set_task_name(kTaskName);
203   new_opstats->add_events()->set_event_type(
204       OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
205   // Add the third run
206   new_opstats = expected.add_opstats();
207   new_opstats->set_population_name(kPopulationName);
208   new_opstats->set_task_name(kTaskName);
209   new_opstats->add_events()->set_event_type(
210       OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
211 
212   CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
213 }
214 
TEST_F(OpStatsLoggerImplTest,NewRunAfterCorruption)215 TEST_F(OpStatsLoggerImplTest, NewRunAfterCorruption) {
216   auto start_time = TimeUtil::GetCurrentTime();
217   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/2);
218 
219   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
220   opstats_logger->AddEventAndSetTaskName(
221       kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
222   opstats_logger.reset();
223 
224   // Make the db file corrupt
225   {
226     std::filesystem::path db_path(base_dir_);
227     db_path /= PdsBackedOpStatsDb::kParentDir;
228     db_path /= PdsBackedOpStatsDb::kDbFileName;
229     protostore::FileStorage file_storage;
230     std::unique_ptr<protostore::OutputStream> ostream =
231         file_storage.OpenForWrite(db_path).value();
232     EXPECT_THAT(ostream->Append("not a proto"), IsOk());
233     EXPECT_THAT(ostream->Close(), IsOk());
234   }
235 
236   EXPECT_CALL(mock_log_manager_, LogDiag(ProdDiagCode::OPSTATS_READ_FAILED));
237   auto opstats_logger_no_population =
238       CreateOpStatsLoggerImpl(kSessionName,
239                               /*population_name=*/"");
240   opstats_logger_no_population->AddEventAndSetTaskName(
241       kTaskName, OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
242 
243   opstats_logger_no_population.reset();
244 
245   auto db = PdsBackedOpStatsDb::Create(
246       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
247       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
248   ASSERT_OK(db);
249   auto data = (*db)->Read();
250   ASSERT_OK(data);
251 
252   // Expect only the second run to be represented in the db.
253   OpStatsSequence expected;
254   auto new_opstats = expected.add_opstats();
255   new_opstats->set_session_name(kSessionName);
256   new_opstats->set_task_name(kTaskName);
257   new_opstats->add_events()->set_event_type(
258       OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED);
259   CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
260 }
261 
TEST_F(OpStatsLoggerImplTest,AddEvent)262 TEST_F(OpStatsLoggerImplTest, AddEvent) {
263   auto start_time = TimeUtil::GetCurrentTime();
264   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/2);
265 
266   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
267   opstats_logger->AddEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
268   opstats_logger.reset();
269 
270   auto opstats_logger_no_population =
271       CreateOpStatsLoggerImpl(kSessionName,
272                               /*population_name=*/"");
273   opstats_logger_no_population->AddEvent(
274       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
275   opstats_logger_no_population->AddEvent(
276       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
277   opstats_logger_no_population.reset();
278 
279   auto db = PdsBackedOpStatsDb::Create(
280       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
281       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
282   ASSERT_OK(db);
283   auto data = (*db)->Read();
284   ASSERT_OK(data);
285 
286   OpStatsSequence expected;
287   // Add the first run
288   auto new_opstats = expected.add_opstats();
289   new_opstats->set_session_name(kSessionName);
290   new_opstats->set_population_name(kPopulationName);
291   new_opstats->add_events()->set_event_type(
292       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
293   // Add the second run
294   new_opstats = expected.add_opstats();
295   new_opstats->set_session_name(kSessionName);
296   new_opstats->add_events()->set_event_type(
297       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
298   new_opstats->add_events()->set_event_type(
299       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
300 
301   CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
302 }
303 
TEST_F(OpStatsLoggerImplTest,AddEventAfterTtl)304 TEST_F(OpStatsLoggerImplTest, AddEventAfterTtl) {
305   auto start_time = TimeUtil::GetCurrentTime();
306   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/2);
307 
308   // Set the ttl to 0 so that previous data will be wiped out each time the
309   // logger tries to commit new data.
310   EXPECT_CALL(mock_flags_, opstats_ttl_days()).WillRepeatedly(Return(0));
311   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
312   opstats_logger->AddEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
313   opstats_logger.reset();
314 
315   auto opstats_logger_no_population =
316       CreateOpStatsLoggerImpl(kSessionName,
317                               /*population_name=*/"");
318   opstats_logger_no_population->AddEvent(
319       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
320   opstats_logger_no_population->AddEvent(
321       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
322   opstats_logger_no_population.reset();
323 
324   auto db = PdsBackedOpStatsDb::Create(
325       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
326       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
327   auto data = (*db)->Read();
328   ASSERT_OK(data);
329 
330   // Expect the db to contain only data associated with the second run. The
331   // second run should be complete, however.
332   OpStatsSequence expected;
333   auto new_opstats = expected.add_opstats();
334   new_opstats->set_session_name(kSessionName);
335   new_opstats->add_events()->set_event_type(
336       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
337   new_opstats->add_events()->set_event_type(
338       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
339 
340   CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
341 }
342 
TEST_F(OpStatsLoggerImplTest,AddEventWithErrorMessage)343 TEST_F(OpStatsLoggerImplTest, AddEventWithErrorMessage) {
344   auto start_time = TimeUtil::GetCurrentTime();
345   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
346 
347   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
348   opstats_logger->AddEventWithErrorMessage(
349       OperationalStats::Event::EVENT_KIND_ERROR_IO, "first error");
350   opstats_logger->AddEventWithErrorMessage(
351       OperationalStats::Event::EVENT_KIND_ERROR_TENSORFLOW, "second error");
352   opstats_logger.reset();
353 
354   auto db = PdsBackedOpStatsDb::Create(
355       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
356       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
357   auto data = (*db)->Read();
358   ASSERT_OK(data);
359 
360   OpStatsSequence expected;
361   auto new_opstats = expected.add_opstats();
362   new_opstats->set_session_name(kSessionName);
363   new_opstats->set_population_name(kPopulationName);
364   new_opstats->add_events()->set_event_type(
365       OperationalStats::Event::EVENT_KIND_ERROR_IO);
366   new_opstats->add_events()->set_event_type(
367       OperationalStats::Event::EVENT_KIND_ERROR_TENSORFLOW);
368   new_opstats->set_error_message("first error");
369 
370   CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
371 }
372 
TEST_F(OpStatsLoggerImplTest,UpdateDatasetStats)373 TEST_F(OpStatsLoggerImplTest, UpdateDatasetStats) {
374   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
375 
376   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
377   const std::string kCollectionUri = "app:/collection_uri";
378   const std::string kCollectionUriOther = "app:/collection_uri_other";
379   opstats_logger->UpdateDatasetStats(kCollectionUri,
380                                      /*additional_example_count=*/100,
381                                      /*additional_example_size_bytes=*/1000);
382   opstats_logger->UpdateDatasetStats(kCollectionUriOther,
383                                      /*additional_example_count=*/200,
384                                      /*additional_example_size_bytes=*/2000);
385   opstats_logger->UpdateDatasetStats(kCollectionUri,
386                                      /*additional_example_count=*/300,
387                                      /*additional_example_size_bytes=*/3000);
388   opstats_logger.reset();
389 
390   auto db = PdsBackedOpStatsDb::Create(
391       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
392       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
393   auto data = (*db)->Read();
394   ASSERT_OK(data);
395 
396   OpStatsSequence expected;
397   auto new_opstats = expected.add_opstats();
398   new_opstats->set_session_name(kSessionName);
399   new_opstats->set_population_name(kPopulationName);
400   OperationalStats::DatasetStats dataset_stats;
401   dataset_stats.set_num_examples_read(400);
402   dataset_stats.set_num_bytes_read(4000);
403   (*new_opstats->mutable_dataset_stats())[kCollectionUri] =
404       std::move(dataset_stats);
405   OperationalStats::DatasetStats dataset_stats_other;
406   dataset_stats_other.set_num_examples_read(200);
407   dataset_stats_other.set_num_bytes_read(2000);
408   (*new_opstats->mutable_dataset_stats())[kCollectionUriOther] =
409       std::move(dataset_stats_other);
410 
411   (*data).clear_earliest_trustworthy_time();
412   EXPECT_THAT(*data, EqualsProto(expected));
413 }
414 
TEST_F(OpStatsLoggerImplTest,SetNetworkStats)415 TEST_F(OpStatsLoggerImplTest, SetNetworkStats) {
416   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
417 
418   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
419   opstats_logger->SetNetworkStats(
420       {.bytes_downloaded = 102,
421        .bytes_uploaded = 103,
422        .network_duration = absl::Milliseconds(104)});
423   opstats_logger->SetNetworkStats(
424       {.bytes_downloaded = 202,
425        .bytes_uploaded = 203,
426        .network_duration = absl::Milliseconds(204)});
427   opstats_logger.reset();
428 
429   auto db = PdsBackedOpStatsDb::Create(
430       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
431       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
432   ASSERT_OK(db);
433   auto data = (*db)->Read();
434   ASSERT_OK(data);
435 
436   OpStatsSequence expected;
437   auto new_opstats = expected.add_opstats();
438   new_opstats->set_session_name(kSessionName);
439   new_opstats->set_population_name(kPopulationName);
440   // The bytes_downloaded/bytes_uploaded fields should not be set anymore
441   new_opstats->set_chunking_layer_bytes_downloaded(202);
442   new_opstats->set_chunking_layer_bytes_uploaded(203);
443   // The new network_duration field should be set now.
444   new_opstats->mutable_network_duration()->set_nanos(
445       static_cast<int32_t>(absl::ToInt64Nanoseconds(absl::Milliseconds(204))));
446 
447   (*data).clear_earliest_trustworthy_time();
448   EXPECT_THAT(*data, EqualsProto(expected));
449 }
450 
TEST_F(OpStatsLoggerImplTest,SetRetryWindow)451 TEST_F(OpStatsLoggerImplTest, SetRetryWindow) {
452   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1);
453 
454   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
455   opstats_logger->SetRetryWindow(CreateRetryWindow("retry_token", 100, 200));
456   opstats_logger->SetRetryWindow(CreateRetryWindow("retry_token", 300, 400));
457   opstats_logger.reset();
458 
459   auto db = PdsBackedOpStatsDb::Create(
460       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
461       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
462   ASSERT_OK(db);
463   auto data = (*db)->Read();
464   ASSERT_OK(data);
465 
466   OpStatsSequence expected;
467   auto new_opstats = expected.add_opstats();
468   new_opstats->set_session_name(kSessionName);
469   new_opstats->set_population_name(kPopulationName);
470   *new_opstats->mutable_retry_window() =
471       CreateRetryWindow(/*retry_token=*/"", 300, 400);
472 
473   (*data).clear_earliest_trustworthy_time();
474   EXPECT_THAT(*data, EqualsProto(expected));
475 }
476 
TEST_F(OpStatsLoggerImplTest,AddEventCommitAddMoreEvents)477 TEST_F(OpStatsLoggerImplTest, AddEventCommitAddMoreEvents) {
478   auto start_time = TimeUtil::GetCurrentTime();
479   ExpectOpstatsEnabledEvents(
480       /*num_opstats_loggers=*/2, /*num_opstats_commits=*/4);
481 
482   auto opstats_logger = CreateOpStatsLoggerImpl(kSessionName, kPopulationName);
483   opstats_logger->AddEvent(OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
484   opstats_logger.reset();
485 
486   auto opstats_logger_no_population =
487       CreateOpStatsLoggerImpl(kSessionName,
488                               /*population_name=*/"");
489   opstats_logger_no_population->AddEvent(
490       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
491   ASSERT_OK(opstats_logger_no_population->CommitToStorage());
492   opstats_logger_no_population->AddEvent(
493       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
494   ASSERT_OK(opstats_logger_no_population->CommitToStorage());
495   opstats_logger_no_population->AddEvent(
496       OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
497   opstats_logger_no_population.reset();
498 
499   auto db = PdsBackedOpStatsDb::Create(
500       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
501       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
502   ASSERT_OK(db);
503   auto data = (*db)->Read();
504   ASSERT_OK(data);
505 
506   OpStatsSequence expected;
507   // Add the first run
508   auto second_run = expected.add_opstats();
509   second_run->set_session_name(kSessionName);
510   second_run->set_population_name(kPopulationName);
511   second_run->add_events()->set_event_type(
512       OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED);
513   // Add the second run
514   second_run = expected.add_opstats();
515   second_run->set_session_name(kSessionName);
516   second_run->add_events()->set_event_type(
517       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
518   second_run->add_events()->set_event_type(
519       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
520   second_run->add_events()->set_event_type(
521       OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
522 
523   CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
524 }
525 
TEST_F(OpStatsLoggerImplTest,MisconfiguredTtlMultipleCommit)526 TEST_F(OpStatsLoggerImplTest, MisconfiguredTtlMultipleCommit) {
527   auto start_time = TimeUtil::GetCurrentTime();
528   ExpectOpstatsEnabledEvents(/*num_opstats_loggers=*/1,
529                              /*num_opstats_commits*/ 3);
530   auto db_zero_ttl = PdsBackedOpStatsDb::Create(
531                          base_dir_, absl::ZeroDuration(), mock_log_manager_,
532                          mock_flags_.opstats_db_size_limit_bytes())
533                          .value();
534   auto opstats_logger = std::make_unique<OpStatsLoggerImpl>(
535       std::move(db_zero_ttl), &mock_log_manager_, &mock_flags_, kSessionName,
536       kPopulationName);
537 
538   opstats_logger->AddEvent(
539       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
540   ASSERT_OK(opstats_logger->CommitToStorage());
541   opstats_logger->AddEvent(
542       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
543   ASSERT_OK(opstats_logger->CommitToStorage());
544   opstats_logger->AddEvent(
545       OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
546   opstats_logger.reset();
547 
548   auto db = PdsBackedOpStatsDb::Create(
549       base_dir_, mock_flags_.opstats_ttl_days() * absl::Hours(24),
550       mock_log_manager_, mock_flags_.opstats_db_size_limit_bytes());
551   ASSERT_OK(db);
552   auto data = (*db)->Read();
553   ASSERT_OK(data);
554 
555   // Even though we had corruption in the middle of the run, it should be ok
556   // because we committed the entire history successfully at the end.
557   OpStatsSequence expected;
558   auto expected_stats = expected.add_opstats();
559   expected_stats->set_population_name(kPopulationName);
560   expected_stats->set_session_name(kSessionName);
561   expected_stats->add_events()->set_event_type(
562       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED);
563   expected_stats->add_events()->set_event_type(
564       OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED);
565   expected_stats->add_events()->set_event_type(
566       OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED);
567 
568   CheckEqualProtosAndIncreasingTimestamps(start_time, expected, *data);
569 }
570 
571 }  // anonymous namespace
572 }  // namespace opstats
573 }  // namespace client
574 }  // namespace fcp
575