1 /*
2 * Copyright 2020 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "fcp/client/interruptible_runner.h"
17
18 #include <functional>
19 #include <utility>
20
21 #include "absl/status/status.h"
22
23 namespace fcp {
24 namespace client {
25
Run(std::function<absl::Status ()> f,std::function<void ()> abort_function)26 absl::Status InterruptibleRunner::Run(std::function<absl::Status()> f,
27 std::function<void()> abort_function) {
28 // Check before even making the call.
29 if (should_abort_()) {
30 return absl::CancelledError("cancelled before posting callable");
31 }
32 fcp::thread::Future<absl::Status> run_future =
33 fcp::thread::ScheduleFuture<absl::Status>(thread_pool_.get(), f);
34 return WaitUntilDone(std::move(run_future), abort_function);
35 }
36
WaitUntilDone(fcp::thread::Future<absl::Status> && run_future,std::function<void ()> abort_function)37 absl::Status InterruptibleRunner::WaitUntilDone(
38 fcp::thread::Future<absl::Status>&& run_future,
39 std::function<void()> abort_function) {
40 // Wait until call is done, checking periodically whether we need to abort.
41 while (true) {
42 if (run_future.Wait(timing_config_.polling_period)) {
43 std::optional<absl::Status> future_result = std::move(run_future).Take();
44 // std::nullopt indicates the underlying promise was abandoned. To my
45 // best knowledge this always indicates a programming error and hence
46 // should result in a crash.
47 FCP_CHECK(future_result != std::nullopt);
48 return future_result.value();
49 }
50
51 if (should_abort_()) {
52 return Abort(std::move(run_future), abort_function);
53 }
54 }
55 }
56
Abort(fcp::thread::Future<absl::Status> run_future,std::function<void ()> abort_function)57 absl::Status InterruptibleRunner::Abort(
58 fcp::thread::Future<absl::Status> run_future,
59 std::function<void()> abort_function) {
60 FCP_LOG(WARNING) << "Aborting run.";
61
62 // Attempt to abort the ongoing call.
63 abort_function();
64
65 // Wait for at most the graceful shutdown period.
66 if (run_future.Wait(timing_config_.graceful_shutdown_period)) {
67 log_manager_->LogDiag(diagnostics_config_.interrupted);
68 FCP_CHECK(std::move(run_future).Take() != std::nullopt);
69 return absl::CancelledError("cancelled after graceful wait");
70 }
71
72 // Runnable failed to abort during the graceful shutdown period. Wait for
73 // (possibly much) longer, because there's nothing much being
74 // gained by returning with TF still running, but resources leak.
75 log_manager_->LogDiag(diagnostics_config_.interrupt_timeout);
76 if (run_future.Wait(timing_config_.extended_shutdown_period)) {
77 log_manager_->LogDiag(diagnostics_config_.interrupted_extended);
78 FCP_CHECK(std::move(run_future).Take() != std::nullopt);
79 return absl::CancelledError("cancelled after extended wait");
80 }
81
82 // If even waiting for the long period didn't help, exit this process.
83 // This is the worst case that will unfortunately happen - we hope the
84 // logs above and below make it to a logging backend, allowing to narrow
85 // the root cause down to particular models or builds; and the exit(0) should
86 // avoid raising a crash dialog when training is running in a background
87 // process. Nevertheless the goal should be to never reach this point.
88
89 log_manager_->LogDiag(diagnostics_config_.interrupt_timeout_extended);
90 exit(0);
91 }
92
93 } // namespace client
94 } // namespace fcp
95