/* * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "fcp/client/phase_logger_impl.h" #include #include "absl/time/time.h" #include "fcp/base/monitoring.h" namespace fcp { namespace client { namespace { constexpr absl::string_view kInitializationErrorPrefix = "Error during initialization: "; constexpr absl::string_view kEligibilityCheckinErrorPrefix = "Error during eligibility check-in: "; constexpr absl::string_view kEligibilityComputationErrorPrefix = "Error during eligibility eval computation: "; constexpr absl::string_view kCheckinErrorPrefix = "Error during check-in: "; constexpr absl::string_view kComputationErrorPrefix = "Error during computation: "; constexpr absl::string_view kResultUploadErrorPrefix = "Error reporting results: "; constexpr absl::string_view kFailureUploadErrorPrefix = "Error reporting computation failure: "; } // anonymous namespace using ::fcp::client::opstats::OperationalStats; using ::google::internal::federatedml::v2::RetryWindow; void PhaseLoggerImpl::UpdateRetryWindowAndNetworkStats( const RetryWindow& retry_window, const NetworkStats& network_stats) { opstats_logger_->SetRetryWindow(retry_window); // Update the network stats. opstats_logger_->SetNetworkStats(network_stats); } void PhaseLoggerImpl::SetModelIdentifier(absl::string_view model_identifier) { event_publisher_->SetModelIdentifier(std::string(model_identifier)); log_manager_->SetModelIdentifier(std::string(model_identifier)); } void PhaseLoggerImpl::LogTaskNotStarted(absl::string_view error_message) { event_publisher_->PublishTaskNotStarted(error_message); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_TRAIN_NOT_STARTED, std::string(error_message)); } void PhaseLoggerImpl::LogNonfatalInitializationError( absl::Status error_status) { std::string error_message = GetErrorMessage( error_status, kInitializationErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishNonfatalInitializationError(error_message); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_INITIALIZATION_ERROR_FATAL, error_message); } void PhaseLoggerImpl::LogFatalInitializationError(absl::Status error_status) { std::string error_message = GetErrorMessage( error_status, kInitializationErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishFatalInitializationError(error_message); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_INITIALIZATION_ERROR_NONFATAL, error_message); } void PhaseLoggerImpl::LogEligibilityEvalCheckinStarted() { event_publisher_->PublishEligibilityEvalCheckin(); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_STARTED); } void PhaseLoggerImpl::LogEligibilityEvalCheckinIOError( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_checkin) { std::string error_message = GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishEligibilityEvalCheckinIoError( error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_ERROR_IO, error_message); LogEligibilityEvalCheckinLatency(time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalCheckinClientInterrupted( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_checkin) { std::string error_message = GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishEligibilityEvalCheckinClientInterrupted( error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event:: EVENT_KIND_ELIGIBILITY_CHECKIN_CLIENT_INTERRUPTED, error_message); LogEligibilityEvalCheckinLatency(time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalCheckinServerAborted( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_checkin) { std::string error_message = GetErrorMessage(error_status, kEligibilityCheckinErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishEligibilityEvalCheckinServerAborted( error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_CHECKIN_SERVER_ABORTED, error_message); LogEligibilityEvalCheckinLatency(time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalNotConfigured( const NetworkStats& network_stats, absl::Time time_before_checkin) { event_publisher_->PublishEligibilityEvalNotConfigured( network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_DISABLED); LogEligibilityEvalCheckinLatency(time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalCheckinTurnedAway( const NetworkStats& network_stats, absl::Time time_before_checkin) { event_publisher_->PublishEligibilityEvalRejected( network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_REJECTED); LogEligibilityEvalCheckinLatency(time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalCheckinInvalidPayloadError( absl::string_view error_message, const NetworkStats& network_stats, absl::Time time_before_checkin) { log_manager_->LogDiag( ProdDiagCode:: BACKGROUND_TRAINING_ELIGIBILITY_EVAL_FAILED_CANNOT_PARSE_PLAN); event_publisher_->PublishEligibilityEvalCheckinErrorInvalidPayload( error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event:: EVENT_KIND_ELIGIBILITY_CHECKIN_ERROR_INVALID_PAYLOAD, std::string(error_message)); LogEligibilityEvalCheckinLatency(time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalCheckinPlanUriReceived( const NetworkStats& network_stats, absl::Time time_before_checkin) { opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_PLAN_URI_RECEIVED); event_publisher_->PublishEligibilityEvalPlanUriReceived( network_stats, absl::Now() - time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalCheckinCompleted( const NetworkStats& network_stats, absl::Time time_before_checkin, absl::Time time_before_plan_download) { opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_ENABLED); absl::Time before_time = time_before_plan_download; event_publisher_->PublishEligibilityEvalPlanReceived( network_stats, absl::Now() - before_time); // The 'EligibilityEvalCheckinLatency' should cover the whole period from // eligibility eval checkin to completion (and not just the period from EET // plan URIs being received to completion). LogEligibilityEvalCheckinLatency(time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalComputationStarted() { event_publisher_->PublishEligibilityEvalComputationStarted(); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_COMPUTATION_STARTED); } void PhaseLoggerImpl::LogEligibilityEvalComputationInvalidArgument( absl::Status error_status, const ExampleStats& example_stats, absl::Time run_plan_start_time) { std::string error_message = GetErrorMessage(error_status, kEligibilityComputationErrorPrefix, /* keep_error_message= */ true); log_manager_->LogDiag( ProdDiagCode::BACKGROUND_TRAINING_FAILED_PLAN_FAILS_SANITY_CHECK); event_publisher_->PublishEligibilityEvalComputationInvalidArgument( error_message, example_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event:: EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_INVALID_ARGUMENT, error_message); } void PhaseLoggerImpl::LogEligibilityEvalComputationExampleIteratorError( absl::Status error_status, const ExampleStats& example_stats, absl::Time run_plan_start_time) { std::string error_message = GetErrorMessage(error_status, kEligibilityComputationErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishEligibilityEvalComputationExampleIteratorError( error_message, example_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event:: EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_EXAMPLE_ITERATOR, error_message); } void PhaseLoggerImpl::LogEligibilityEvalComputationTensorflowError( absl::Status error_status, const ExampleStats& example_stats, absl::Time run_plan_start_time, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kEligibilityComputationErrorPrefix, log_tensorflow_error_messages_); event_publisher_->PublishEligibilityEvalComputationTensorflowError( error_message, example_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event:: EVENT_KIND_ELIGIBILITY_COMPUTATION_ERROR_TENSORFLOW, error_message); LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time); } void PhaseLoggerImpl::LogEligibilityEvalComputationInterrupted( absl::Status error_status, const ExampleStats& example_stats, absl::Time run_plan_start_time, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kEligibilityComputationErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishEligibilityEvalComputationInterrupted( error_message, example_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event:: EVENT_KIND_ELIGIBILITY_COMPUTATION_CLIENT_INTERRUPTED, error_message); LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time); } void PhaseLoggerImpl::LogEligibilityEvalComputationCompleted( const ExampleStats& example_stats, absl::Time run_plan_start_time, absl::Time reference_time) { event_publisher_->PublishEligibilityEvalComputationCompleted( example_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_ELIGIBILITY_COMPUTATION_FINISHED); log_manager_->LogToLongHistogram( HistogramCounters::TRAINING_OVERALL_EXAMPLE_SIZE, example_stats.example_size_bytes); log_manager_->LogToLongHistogram( HistogramCounters::TRAINING_OVERALL_EXAMPLE_COUNT, example_stats.example_count); LogEligibilityEvalComputationLatency(run_plan_start_time, reference_time); } void PhaseLoggerImpl::LogCheckinStarted() { // Log that we are about to check in with the server. event_publisher_->PublishCheckin(); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_CHECKIN_STARTED); } void PhaseLoggerImpl::LogCheckinIOError(absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_checkin, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishCheckinIoError(error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_CHECKIN_ERROR_IO, error_message); LogCheckinLatency(time_before_checkin, reference_time); } void PhaseLoggerImpl::LogCheckinClientInterrupted( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_checkin, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishCheckinClientInterrupted( error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_CHECKIN_CLIENT_INTERRUPTED, error_message); LogCheckinLatency(time_before_checkin, reference_time); } void PhaseLoggerImpl::LogCheckinServerAborted(absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_checkin, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kCheckinErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishCheckinServerAborted( error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_CHECKIN_SERVER_ABORTED, error_message); LogCheckinLatency(time_before_checkin, reference_time); } void PhaseLoggerImpl::LogCheckinTurnedAway(const NetworkStats& network_stats, absl::Time time_before_checkin, absl::Time reference_time) { event_publisher_->PublishRejected(network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_CHECKIN_REJECTED); LogCheckinLatency(time_before_checkin, reference_time); } void PhaseLoggerImpl::LogCheckinInvalidPayload( absl::string_view error_message, const NetworkStats& network_stats, absl::Time time_before_checkin, absl::Time reference_time) { log_manager_->LogDiag( ProdDiagCode::BACKGROUND_TRAINING_FAILED_CANNOT_PARSE_PLAN); event_publisher_->PublishCheckinInvalidPayload( error_message, network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_CHECKIN_ERROR_INVALID_PAYLOAD, std::string(error_message)); LogCheckinLatency(time_before_checkin, reference_time); } void PhaseLoggerImpl::LogCheckinPlanUriReceived( absl::string_view task_name, const NetworkStats& network_stats, absl::Time time_before_checkin) { event_publisher_->PublishCheckinPlanUriReceived( network_stats, absl::Now() - time_before_checkin); opstats_logger_->AddEventAndSetTaskName( std::string(task_name), OperationalStats::Event::EVENT_KIND_CHECKIN_PLAN_URI_RECEIVED); } void PhaseLoggerImpl::LogCheckinCompleted(absl::string_view task_name, const NetworkStats& network_stats, absl::Time time_before_checkin, absl::Time time_before_plan_download, absl::Time reference_time) { absl::Duration duration = absl::Now() - time_before_plan_download; event_publisher_->PublishCheckinFinishedV2(network_stats, duration); // We already have set the task name when LogCheckinPlanUriReceived was // called, so we only have to add the event. opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_CHECKIN_ACCEPTED); // The 'EligibilityEvalCheckinLatency' should cover the whole period from // eligibility eval checkin to completion (and not just the period from EET // plan URIs being received to completion). LogCheckinLatency(time_before_checkin, reference_time); } void PhaseLoggerImpl::LogComputationStarted() { event_publisher_->PublishComputationStarted(); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_COMPUTATION_STARTED); } void PhaseLoggerImpl::LogComputationInvalidArgument( absl::Status error_status, const ExampleStats& example_stats, const NetworkStats& network_stats, absl::Time run_plan_start_time) { std::string error_message = GetErrorMessage(error_status, kComputationErrorPrefix, /* keep_error_message= */ true); log_manager_->LogDiag( ProdDiagCode::BACKGROUND_TRAINING_FAILED_PLAN_FAILS_SANITY_CHECK); event_publisher_->PublishComputationInvalidArgument( error_message, example_stats, network_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_INVALID_ARGUMENT, error_message); } void PhaseLoggerImpl::LogComputationIOError(absl::Status error_status, const ExampleStats& example_stats, const NetworkStats& network_stats, absl::Time run_plan_start_time) { std::string error_message = GetErrorMessage(error_status, kComputationErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishComputationIOError( error_message, example_stats, network_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_IO, error_message); } void PhaseLoggerImpl::LogComputationExampleIteratorError( absl::Status error_status, const ExampleStats& example_stats, const NetworkStats& network_stats, absl::Time run_plan_start_time) { std::string error_message = GetErrorMessage( error_status, kComputationErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishComputationExampleIteratorError( error_message, example_stats, network_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_EXAMPLE_ITERATOR, error_message); } void PhaseLoggerImpl::LogComputationTensorflowError( absl::Status error_status, const ExampleStats& example_stats, const NetworkStats& network_stats, absl::Time run_plan_start_time, absl::Time reference_time) { std::string error_message = GetErrorMessage( error_status, kComputationErrorPrefix, log_tensorflow_error_messages_); event_publisher_->PublishComputationTensorflowError( error_message, example_stats, network_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_COMPUTATION_ERROR_TENSORFLOW, error_message); LogComputationLatency(run_plan_start_time, reference_time); } void PhaseLoggerImpl::LogComputationInterrupted( absl::Status error_status, const ExampleStats& example_stats, const NetworkStats& network_stats, absl::Time run_plan_start_time, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kComputationErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishComputationInterrupted( error_message, example_stats, network_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_COMPUTATION_CLIENT_INTERRUPTED, error_message); LogComputationLatency(run_plan_start_time, reference_time); } void PhaseLoggerImpl::LogComputationCompleted(const ExampleStats& example_stats, const NetworkStats& network_stats, absl::Time run_plan_start_time, absl::Time reference_time) { event_publisher_->PublishComputationCompleted( example_stats, network_stats, absl::Now() - run_plan_start_time); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_COMPUTATION_FINISHED); log_manager_->LogToLongHistogram( HistogramCounters::TRAINING_OVERALL_EXAMPLE_SIZE, example_stats.example_size_bytes); log_manager_->LogToLongHistogram( HistogramCounters::TRAINING_OVERALL_EXAMPLE_COUNT, example_stats.example_count); LogComputationLatency(run_plan_start_time, reference_time); } absl::Status PhaseLoggerImpl::LogResultUploadStarted() { opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_STARTED); // Commit the run data accumulated thus far to Opstats and fail if // something goes wrong. FCP_RETURN_IF_ERROR(opstats_logger_->CommitToStorage()); event_publisher_->PublishResultUploadStarted(); return absl::OkStatus(); } void PhaseLoggerImpl::LogResultUploadIOError( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_result_upload, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kResultUploadErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishResultUploadIOError( error_message, network_stats, absl::Now() - time_before_result_upload); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_ERROR_IO, error_message); LogReportLatency(time_before_result_upload, reference_time); } void PhaseLoggerImpl::LogResultUploadClientInterrupted( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_result_upload, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kResultUploadErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishResultUploadClientInterrupted( error_message, network_stats, absl::Now() - time_before_result_upload); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_CLIENT_INTERRUPTED, error_message); LogReportLatency(time_before_result_upload, reference_time); } void PhaseLoggerImpl::LogResultUploadServerAborted( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_result_upload, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kResultUploadErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishResultUploadServerAborted( error_message, network_stats, absl::Now() - time_before_result_upload); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_SERVER_ABORTED, error_message); LogReportLatency(time_before_result_upload, reference_time); } void PhaseLoggerImpl::LogResultUploadCompleted( const NetworkStats& network_stats, absl::Time time_before_result_upload, absl::Time reference_time) { event_publisher_->PublishResultUploadCompleted( network_stats, absl::Now() - time_before_result_upload); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_RESULT_UPLOAD_FINISHED); LogReportLatency(time_before_result_upload, reference_time); } absl::Status PhaseLoggerImpl::LogFailureUploadStarted() { opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_STARTED); // Commit the run data accumulated thus far to Opstats and fail if // something goes wrong. FCP_RETURN_IF_ERROR(opstats_logger_->CommitToStorage()); event_publisher_->PublishFailureUploadStarted(); return absl::OkStatus(); } void PhaseLoggerImpl::LogFailureUploadIOError( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_failure_upload, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kFailureUploadErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishFailureUploadIOError( error_message, network_stats, absl::Now() - time_before_failure_upload); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_ERROR_IO, error_message); LogReportLatency(time_before_failure_upload, reference_time); } void PhaseLoggerImpl::LogFailureUploadClientInterrupted( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_failure_upload, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kFailureUploadErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishFailureUploadClientInterrupted( error_message, network_stats, absl::Now() - time_before_failure_upload); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_CLIENT_INTERRUPTED, error_message); LogReportLatency(time_before_failure_upload, reference_time); } void PhaseLoggerImpl::LogFailureUploadServerAborted( absl::Status error_status, const NetworkStats& network_stats, absl::Time time_before_failure_upload, absl::Time reference_time) { std::string error_message = GetErrorMessage(error_status, kFailureUploadErrorPrefix, /* keep_error_message= */ true); event_publisher_->PublishFailureUploadServerAborted( error_message, network_stats, absl::Now() - time_before_failure_upload); opstats_logger_->AddEventWithErrorMessage( OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_SERVER_ABORTED, error_message); LogReportLatency(time_before_failure_upload, reference_time); } void PhaseLoggerImpl::LogFailureUploadCompleted( const NetworkStats& network_stats, absl::Time time_before_failure_upload, absl::Time reference_time) { event_publisher_->PublishFailureUploadCompleted( network_stats, absl::Now() - time_before_failure_upload); opstats_logger_->AddEvent( OperationalStats::Event::EVENT_KIND_FAILURE_UPLOAD_FINISHED); LogReportLatency(time_before_failure_upload, reference_time); } void PhaseLoggerImpl::LogTimeSince(HistogramCounters histogram_counter, absl::Time reference_time) { absl::Duration duration = absl::Now() - reference_time; log_manager_->LogToLongHistogram(histogram_counter, absl::ToInt64Milliseconds(duration)); } void PhaseLoggerImpl::LogEligibilityEvalCheckinLatency( absl::Time time_before_checkin) { LogTimeSince(HistogramCounters::TRAINING_FL_ELIGIBILITY_EVAL_CHECKIN_LATENCY, time_before_checkin); } void PhaseLoggerImpl::LogEligibilityEvalComputationLatency( absl::Time run_plan_start_time, absl::Time reference_time) { LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_LATENCY, run_plan_start_time); LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_END_TIME, reference_time); } void PhaseLoggerImpl::LogCheckinLatency(absl::Time time_before_checkin, absl::Time reference_time) { LogTimeSince(HistogramCounters::TRAINING_FL_CHECKIN_LATENCY, time_before_checkin); LogTimeSince(HistogramCounters::TRAINING_FL_CHECKIN_END_TIME, reference_time); } void PhaseLoggerImpl::LogComputationLatency(absl::Time run_plan_start_time, absl::Time reference_time) { LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_LATENCY, run_plan_start_time); LogTimeSince(HistogramCounters::TRAINING_RUN_PHASE_END_TIME, reference_time); } void PhaseLoggerImpl::LogReportLatency(absl::Time time_before_report, absl::Time reference_time) { LogTimeSince(HistogramCounters::TRAINING_FL_REPORT_RESULTS_LATENCY, time_before_report); LogTimeSince(HistogramCounters::TRAINING_FL_REPORT_RESULTS_END_TIME, reference_time); } std::string PhaseLoggerImpl::GetErrorMessage(absl::Status error_status, absl::string_view error_prefix, bool keep_error_message) { return absl::StrCat(error_prefix, "code: ", error_status.code(), ", error: ", keep_error_message ? error_status.message() : ""); } } // namespace client } // namespace fcp