xref: /aosp_15_r20/external/federated-compute/fcp/client/interruptible_runner.cc (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2020 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "fcp/client/interruptible_runner.h"
17 
18 #include <functional>
19 #include <utility>
20 
21 #include "absl/status/status.h"
22 
23 namespace fcp {
24 namespace client {
25 
Run(std::function<absl::Status ()> f,std::function<void ()> abort_function)26 absl::Status InterruptibleRunner::Run(std::function<absl::Status()> f,
27                                       std::function<void()> abort_function) {
28   // Check before even making the call.
29   if (should_abort_()) {
30     return absl::CancelledError("cancelled before posting callable");
31   }
32   fcp::thread::Future<absl::Status> run_future =
33       fcp::thread::ScheduleFuture<absl::Status>(thread_pool_.get(), f);
34   return WaitUntilDone(std::move(run_future), abort_function);
35 }
36 
WaitUntilDone(fcp::thread::Future<absl::Status> && run_future,std::function<void ()> abort_function)37 absl::Status InterruptibleRunner::WaitUntilDone(
38     fcp::thread::Future<absl::Status>&& run_future,
39     std::function<void()> abort_function) {
40   // Wait until call is done, checking periodically whether we need to abort.
41   while (true) {
42     if (run_future.Wait(timing_config_.polling_period)) {
43       std::optional<absl::Status> future_result = std::move(run_future).Take();
44       // std::nullopt indicates the underlying promise was abandoned. To my
45       // best knowledge this always indicates a programming error and hence
46       // should result in a crash.
47       FCP_CHECK(future_result != std::nullopt);
48       return future_result.value();
49     }
50 
51     if (should_abort_()) {
52       return Abort(std::move(run_future), abort_function);
53     }
54   }
55 }
56 
Abort(fcp::thread::Future<absl::Status> run_future,std::function<void ()> abort_function)57 absl::Status InterruptibleRunner::Abort(
58     fcp::thread::Future<absl::Status> run_future,
59     std::function<void()> abort_function) {
60   FCP_LOG(WARNING) << "Aborting run.";
61 
62   // Attempt to abort the ongoing call.
63   abort_function();
64 
65   // Wait for at most the graceful shutdown period.
66   if (run_future.Wait(timing_config_.graceful_shutdown_period)) {
67     log_manager_->LogDiag(diagnostics_config_.interrupted);
68     FCP_CHECK(std::move(run_future).Take() != std::nullopt);
69     return absl::CancelledError("cancelled after graceful wait");
70   }
71 
72   // Runnable failed to abort during the graceful shutdown period. Wait for
73   // (possibly much) longer, because there's nothing much being
74   // gained by returning with TF still running, but resources leak.
75   log_manager_->LogDiag(diagnostics_config_.interrupt_timeout);
76   if (run_future.Wait(timing_config_.extended_shutdown_period)) {
77     log_manager_->LogDiag(diagnostics_config_.interrupted_extended);
78     FCP_CHECK(std::move(run_future).Take() != std::nullopt);
79     return absl::CancelledError("cancelled after extended wait");
80   }
81 
82   // If even waiting for the long period didn't help, exit this process.
83   // This is the worst case that will unfortunately happen - we hope the
84   // logs above and below make it to a logging backend, allowing to narrow
85   // the root cause down to particular models or builds; and the exit(0) should
86   // avoid raising a crash dialog when training is running in a background
87   // process. Nevertheless the goal should be to never reach this point.
88 
89   log_manager_->LogDiag(diagnostics_config_.interrupt_timeout_extended);
90   exit(0);
91 }
92 
93 }  // namespace client
94 }  // namespace fcp
95