xref: /aosp_15_r20/external/tensorflow/tensorflow/compiler/xla/service/cpu/runtime_fork_join.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/compiler/xla/service/cpu/runtime_fork_join.h"
17 
18 #define EIGEN_USE_THREADS
19 
20 #include "absl/base/dynamic_annotations.h"
21 #include "absl/strings/str_format.h"
22 #include "absl/strings/str_join.h"
23 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
24 #include "tensorflow/compiler/xla/executable_run_options.h"
25 #include "tensorflow/compiler/xla/service/custom_call_status_internal.h"
26 #include "tensorflow/core/platform/blocking_counter.h"
27 #include "tensorflow/core/platform/logging.h"
28 
29 using ComputeFunctionType = void (*)(void*, const void*, const void**, void**,
30                                      void*, int64_t*, uint64_t*);
31 
32 // Dispatches 'num_partitions - 1' calls to 'function_ptr' in parallel.
33 // Calls 'function_ptr' for first partition inline.
34 // Uses blocking counter to synchronize threads after parallel calls complete.
35 //
36 // The 'partitions' array has a total number of elements equal to
37 // 'num_partitions * num_partitioned_dims * 2' (the '2' is necessary to specify
38 // dimension start and limit indices).
39 //
40 // The 'partitions' array layout stores array elements in memory with dimension
41 // start limit as the most-minor dimension, followed by dimension, then
42 // partition.
43 //
44 // EX: Layout of 'partitions' array with 'num_partitions = 2', and
45 //     'num_partitioned_dims = 3'
46 //
47 //   [partition0_dim0_start]
48 //   [partition0_dim0_limit]
49 //   [partition0_dim1_start]
50 //   [partition0_dim1_limit]
51 //   [partition0_dim2_start]
52 //   [partition0_dim2_limit]
53 //   [partition1_dim0_start]
54 //   [partition1_dim0_limit]
55 //   [partition1_dim1_start]
56 //   [partition1_dim1_limit]
57 //   [partition1_dim2_start]
58 //   [partition1_dim2_limit]
59 //
__xla_cpu_runtime_ParallelForkJoin(void * result_ptr,const void * run_options_ptr,const void ** params,void ** buffer_table,void * status,uint64_t * prof_counters,int32_t num_partitions,int64_t * partitions,int32_t num_partitioned_dims,void * function_ptr)60 ABSL_ATTRIBUTE_NO_SANITIZE_MEMORY void __xla_cpu_runtime_ParallelForkJoin(
61     void* result_ptr, const void* run_options_ptr, const void** params,
62     void** buffer_table, void* status, uint64_t* prof_counters,
63     int32_t num_partitions, int64_t* partitions, int32_t num_partitioned_dims,
64     void* function_ptr) {
65   VLOG(2) << "ParallelForkJoin ENTRY"
66           << " num_partitions: " << num_partitions
67           << " num_partitioned_dims: " << num_partitioned_dims;
68   CHECK_EQ(params, nullptr);
69   CHECK_GT(num_partitions, 1);
70   CHECK_GT(num_partitioned_dims, 0);
71   CHECK_NE(function_ptr, nullptr);
72   CHECK_NE(partitions, nullptr);
73   const xla::ExecutableRunOptions* run_options =
74       static_cast<const xla::ExecutableRunOptions*>(run_options_ptr);
75   CHECK_NE(run_options, nullptr);
76   CHECK_NE(run_options->intra_op_thread_pool(), nullptr);
77 
78   ComputeFunctionType function =
79       reinterpret_cast<ComputeFunctionType>(function_ptr);
80   // Compute partition stride in 'partitions' array.
81   const int64_t stride = 2 * num_partitioned_dims;
82 
83   std::vector<XlaCustomCallStatus> statuses(num_partitions);
84 
85   // Dispatch 'num_partitions - 1' compute functions to run in parallel.
86   tensorflow::BlockingCounter bc(num_partitions - 1);
87   for (int32_t i = 1; i < num_partitions; ++i) {
88     const int64_t offset = i * stride;
89     run_options->intra_op_thread_pool()->enqueueNoNotification(
90         [i, function, result_ptr, run_options_ptr, buffer_table, prof_counters,
91          partitions, offset, &bc, &statuses]() {
92           function(result_ptr, run_options_ptr, nullptr, buffer_table,
93                    &statuses[i], &partitions[offset], prof_counters);
94           bc.DecrementCount();
95           VLOG(3) << "ParallelForkJoin partition " << i << " done.";
96         });
97   }
98 
99   // Call first compute function inline.
100   function(result_ptr, run_options_ptr, params, buffer_table, &statuses[0],
101            &partitions[0], prof_counters);
102   VLOG(3) << "ParallelForkJoin partition 0 done.";
103   bc.Wait();
104 
105   // Collect all error messages (if any).
106   std::vector<std::pair<int32_t, absl::string_view>> error_messages;
107   for (int32_t i = 0; i < num_partitions; ++i) {
108     std::optional<absl::string_view> msg =
109         xla::CustomCallStatusGetMessage(&statuses[i]);
110     if (msg) {
111       error_messages.emplace_back(i, *msg);
112     }
113   }
114 
115   if (!error_messages.empty()) {
116     // Join all error messages into a single string to serve as the message for
117     // the returned status.
118     std::string error_message = absl::StrJoin(
119         error_messages, "\n",
120         [](std::string* out, std::pair<int32_t, absl::string_view> p) {
121           int32_t idx = p.first;
122           absl::string_view msg = p.second;
123           absl::StrAppend(out,
124                           absl::StrFormat("Partition %d error: %s", idx, msg));
125         });
126     XlaCustomCallStatusSetFailure(
127         reinterpret_cast<XlaCustomCallStatus*>(status), error_message.data(),
128         error_message.length());
129   }
130   VLOG(2) << "ParallelForkJoin EXIT";
131 }
132