xref: /aosp_15_r20/external/federated-compute/fcp/client/interruptible_runner.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1*14675a02SAndroid Build Coastguard Worker /*
2*14675a02SAndroid Build Coastguard Worker  * Copyright 2020 Google LLC
3*14675a02SAndroid Build Coastguard Worker  *
4*14675a02SAndroid Build Coastguard Worker  * Licensed under the Apache License, Version 2.0 (the "License");
5*14675a02SAndroid Build Coastguard Worker  * you may not use this file except in compliance with the License.
6*14675a02SAndroid Build Coastguard Worker  * You may obtain a copy of the License at
7*14675a02SAndroid Build Coastguard Worker  *
8*14675a02SAndroid Build Coastguard Worker  *      http://www.apache.org/licenses/LICENSE-2.0
9*14675a02SAndroid Build Coastguard Worker  *
10*14675a02SAndroid Build Coastguard Worker  * Unless required by applicable law or agreed to in writing, software
11*14675a02SAndroid Build Coastguard Worker  * distributed under the License is distributed on an "AS IS" BASIS,
12*14675a02SAndroid Build Coastguard Worker  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*14675a02SAndroid Build Coastguard Worker  * See the License for the specific language governing permissions and
14*14675a02SAndroid Build Coastguard Worker  * limitations under the License.
15*14675a02SAndroid Build Coastguard Worker  */
16*14675a02SAndroid Build Coastguard Worker #include "fcp/client/interruptible_runner.h"
17*14675a02SAndroid Build Coastguard Worker 
18*14675a02SAndroid Build Coastguard Worker #include <functional>
19*14675a02SAndroid Build Coastguard Worker #include <utility>
20*14675a02SAndroid Build Coastguard Worker 
21*14675a02SAndroid Build Coastguard Worker #include "absl/status/status.h"
22*14675a02SAndroid Build Coastguard Worker 
23*14675a02SAndroid Build Coastguard Worker namespace fcp {
24*14675a02SAndroid Build Coastguard Worker namespace client {
25*14675a02SAndroid Build Coastguard Worker 
Run(std::function<absl::Status ()> f,std::function<void ()> abort_function)26*14675a02SAndroid Build Coastguard Worker absl::Status InterruptibleRunner::Run(std::function<absl::Status()> f,
27*14675a02SAndroid Build Coastguard Worker                                       std::function<void()> abort_function) {
28*14675a02SAndroid Build Coastguard Worker   // Check before even making the call.
29*14675a02SAndroid Build Coastguard Worker   if (should_abort_()) {
30*14675a02SAndroid Build Coastguard Worker     return absl::CancelledError("cancelled before posting callable");
31*14675a02SAndroid Build Coastguard Worker   }
32*14675a02SAndroid Build Coastguard Worker   fcp::thread::Future<absl::Status> run_future =
33*14675a02SAndroid Build Coastguard Worker       fcp::thread::ScheduleFuture<absl::Status>(thread_pool_.get(), f);
34*14675a02SAndroid Build Coastguard Worker   return WaitUntilDone(std::move(run_future), abort_function);
35*14675a02SAndroid Build Coastguard Worker }
36*14675a02SAndroid Build Coastguard Worker 
WaitUntilDone(fcp::thread::Future<absl::Status> && run_future,std::function<void ()> abort_function)37*14675a02SAndroid Build Coastguard Worker absl::Status InterruptibleRunner::WaitUntilDone(
38*14675a02SAndroid Build Coastguard Worker     fcp::thread::Future<absl::Status>&& run_future,
39*14675a02SAndroid Build Coastguard Worker     std::function<void()> abort_function) {
40*14675a02SAndroid Build Coastguard Worker   // Wait until call is done, checking periodically whether we need to abort.
41*14675a02SAndroid Build Coastguard Worker   while (true) {
42*14675a02SAndroid Build Coastguard Worker     if (run_future.Wait(timing_config_.polling_period)) {
43*14675a02SAndroid Build Coastguard Worker       std::optional<absl::Status> future_result = std::move(run_future).Take();
44*14675a02SAndroid Build Coastguard Worker       // std::nullopt indicates the underlying promise was abandoned. To my
45*14675a02SAndroid Build Coastguard Worker       // best knowledge this always indicates a programming error and hence
46*14675a02SAndroid Build Coastguard Worker       // should result in a crash.
47*14675a02SAndroid Build Coastguard Worker       FCP_CHECK(future_result != std::nullopt);
48*14675a02SAndroid Build Coastguard Worker       return future_result.value();
49*14675a02SAndroid Build Coastguard Worker     }
50*14675a02SAndroid Build Coastguard Worker 
51*14675a02SAndroid Build Coastguard Worker     if (should_abort_()) {
52*14675a02SAndroid Build Coastguard Worker       return Abort(std::move(run_future), abort_function);
53*14675a02SAndroid Build Coastguard Worker     }
54*14675a02SAndroid Build Coastguard Worker   }
55*14675a02SAndroid Build Coastguard Worker }
56*14675a02SAndroid Build Coastguard Worker 
Abort(fcp::thread::Future<absl::Status> run_future,std::function<void ()> abort_function)57*14675a02SAndroid Build Coastguard Worker absl::Status InterruptibleRunner::Abort(
58*14675a02SAndroid Build Coastguard Worker     fcp::thread::Future<absl::Status> run_future,
59*14675a02SAndroid Build Coastguard Worker     std::function<void()> abort_function) {
60*14675a02SAndroid Build Coastguard Worker   FCP_LOG(WARNING) << "Aborting run.";
61*14675a02SAndroid Build Coastguard Worker 
62*14675a02SAndroid Build Coastguard Worker   // Attempt to abort the ongoing call.
63*14675a02SAndroid Build Coastguard Worker   abort_function();
64*14675a02SAndroid Build Coastguard Worker 
65*14675a02SAndroid Build Coastguard Worker   // Wait for at most the graceful shutdown period.
66*14675a02SAndroid Build Coastguard Worker   if (run_future.Wait(timing_config_.graceful_shutdown_period)) {
67*14675a02SAndroid Build Coastguard Worker     log_manager_->LogDiag(diagnostics_config_.interrupted);
68*14675a02SAndroid Build Coastguard Worker     FCP_CHECK(std::move(run_future).Take() != std::nullopt);
69*14675a02SAndroid Build Coastguard Worker     return absl::CancelledError("cancelled after graceful wait");
70*14675a02SAndroid Build Coastguard Worker   }
71*14675a02SAndroid Build Coastguard Worker 
72*14675a02SAndroid Build Coastguard Worker   // Runnable failed to abort during the graceful shutdown period. Wait for
73*14675a02SAndroid Build Coastguard Worker   // (possibly much) longer, because there's nothing much being
74*14675a02SAndroid Build Coastguard Worker   // gained by returning with TF still running, but resources leak.
75*14675a02SAndroid Build Coastguard Worker   log_manager_->LogDiag(diagnostics_config_.interrupt_timeout);
76*14675a02SAndroid Build Coastguard Worker   if (run_future.Wait(timing_config_.extended_shutdown_period)) {
77*14675a02SAndroid Build Coastguard Worker     log_manager_->LogDiag(diagnostics_config_.interrupted_extended);
78*14675a02SAndroid Build Coastguard Worker     FCP_CHECK(std::move(run_future).Take() != std::nullopt);
79*14675a02SAndroid Build Coastguard Worker     return absl::CancelledError("cancelled after extended wait");
80*14675a02SAndroid Build Coastguard Worker   }
81*14675a02SAndroid Build Coastguard Worker 
82*14675a02SAndroid Build Coastguard Worker   // If even waiting for the long period didn't help, exit this process.
83*14675a02SAndroid Build Coastguard Worker   // This is the worst case that will unfortunately happen - we hope the
84*14675a02SAndroid Build Coastguard Worker   // logs above and below make it to a logging backend, allowing to narrow
85*14675a02SAndroid Build Coastguard Worker   // the root cause down to particular models or builds; and the exit(0) should
86*14675a02SAndroid Build Coastguard Worker   // avoid raising a crash dialog when training is running in a background
87*14675a02SAndroid Build Coastguard Worker   // process. Nevertheless the goal should be to never reach this point.
88*14675a02SAndroid Build Coastguard Worker 
89*14675a02SAndroid Build Coastguard Worker   log_manager_->LogDiag(diagnostics_config_.interrupt_timeout_extended);
90*14675a02SAndroid Build Coastguard Worker   exit(0);
91*14675a02SAndroid Build Coastguard Worker }
92*14675a02SAndroid Build Coastguard Worker 
93*14675a02SAndroid Build Coastguard Worker }  // namespace client
94*14675a02SAndroid Build Coastguard Worker }  // namespace fcp
95