1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/compiler/xla/service/cpu/runtime_fork_join.h"
17
18 #define EIGEN_USE_THREADS
19
20 #include "absl/base/dynamic_annotations.h"
21 #include "absl/strings/str_format.h"
22 #include "absl/strings/str_join.h"
23 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
24 #include "tensorflow/compiler/xla/executable_run_options.h"
25 #include "tensorflow/compiler/xla/service/custom_call_status_internal.h"
26 #include "tensorflow/core/platform/blocking_counter.h"
27 #include "tensorflow/core/platform/logging.h"
28
29 using ComputeFunctionType = void (*)(void*, const void*, const void**, void**,
30 void*, int64_t*, uint64_t*);
31
32 // Dispatches 'num_partitions - 1' calls to 'function_ptr' in parallel.
33 // Calls 'function_ptr' for first partition inline.
34 // Uses blocking counter to synchronize threads after parallel calls complete.
35 //
36 // The 'partitions' array has a total number of elements equal to
37 // 'num_partitions * num_partitioned_dims * 2' (the '2' is necessary to specify
38 // dimension start and limit indices).
39 //
40 // The 'partitions' array layout stores array elements in memory with dimension
41 // start limit as the most-minor dimension, followed by dimension, then
42 // partition.
43 //
44 // EX: Layout of 'partitions' array with 'num_partitions = 2', and
45 // 'num_partitioned_dims = 3'
46 //
47 // [partition0_dim0_start]
48 // [partition0_dim0_limit]
49 // [partition0_dim1_start]
50 // [partition0_dim1_limit]
51 // [partition0_dim2_start]
52 // [partition0_dim2_limit]
53 // [partition1_dim0_start]
54 // [partition1_dim0_limit]
55 // [partition1_dim1_start]
56 // [partition1_dim1_limit]
57 // [partition1_dim2_start]
58 // [partition1_dim2_limit]
59 //
__xla_cpu_runtime_ParallelForkJoin(void * result_ptr,const void * run_options_ptr,const void ** params,void ** buffer_table,void * status,uint64_t * prof_counters,int32_t num_partitions,int64_t * partitions,int32_t num_partitioned_dims,void * function_ptr)60 ABSL_ATTRIBUTE_NO_SANITIZE_MEMORY void __xla_cpu_runtime_ParallelForkJoin(
61 void* result_ptr, const void* run_options_ptr, const void** params,
62 void** buffer_table, void* status, uint64_t* prof_counters,
63 int32_t num_partitions, int64_t* partitions, int32_t num_partitioned_dims,
64 void* function_ptr) {
65 VLOG(2) << "ParallelForkJoin ENTRY"
66 << " num_partitions: " << num_partitions
67 << " num_partitioned_dims: " << num_partitioned_dims;
68 CHECK_EQ(params, nullptr);
69 CHECK_GT(num_partitions, 1);
70 CHECK_GT(num_partitioned_dims, 0);
71 CHECK_NE(function_ptr, nullptr);
72 CHECK_NE(partitions, nullptr);
73 const xla::ExecutableRunOptions* run_options =
74 static_cast<const xla::ExecutableRunOptions*>(run_options_ptr);
75 CHECK_NE(run_options, nullptr);
76 CHECK_NE(run_options->intra_op_thread_pool(), nullptr);
77
78 ComputeFunctionType function =
79 reinterpret_cast<ComputeFunctionType>(function_ptr);
80 // Compute partition stride in 'partitions' array.
81 const int64_t stride = 2 * num_partitioned_dims;
82
83 std::vector<XlaCustomCallStatus> statuses(num_partitions);
84
85 // Dispatch 'num_partitions - 1' compute functions to run in parallel.
86 tensorflow::BlockingCounter bc(num_partitions - 1);
87 for (int32_t i = 1; i < num_partitions; ++i) {
88 const int64_t offset = i * stride;
89 run_options->intra_op_thread_pool()->enqueueNoNotification(
90 [i, function, result_ptr, run_options_ptr, buffer_table, prof_counters,
91 partitions, offset, &bc, &statuses]() {
92 function(result_ptr, run_options_ptr, nullptr, buffer_table,
93 &statuses[i], &partitions[offset], prof_counters);
94 bc.DecrementCount();
95 VLOG(3) << "ParallelForkJoin partition " << i << " done.";
96 });
97 }
98
99 // Call first compute function inline.
100 function(result_ptr, run_options_ptr, params, buffer_table, &statuses[0],
101 &partitions[0], prof_counters);
102 VLOG(3) << "ParallelForkJoin partition 0 done.";
103 bc.Wait();
104
105 // Collect all error messages (if any).
106 std::vector<std::pair<int32_t, absl::string_view>> error_messages;
107 for (int32_t i = 0; i < num_partitions; ++i) {
108 std::optional<absl::string_view> msg =
109 xla::CustomCallStatusGetMessage(&statuses[i]);
110 if (msg) {
111 error_messages.emplace_back(i, *msg);
112 }
113 }
114
115 if (!error_messages.empty()) {
116 // Join all error messages into a single string to serve as the message for
117 // the returned status.
118 std::string error_message = absl::StrJoin(
119 error_messages, "\n",
120 [](std::string* out, std::pair<int32_t, absl::string_view> p) {
121 int32_t idx = p.first;
122 absl::string_view msg = p.second;
123 absl::StrAppend(out,
124 absl::StrFormat("Partition %d error: %s", idx, msg));
125 });
126 XlaCustomCallStatusSetFailure(
127 reinterpret_cast<XlaCustomCallStatus*>(status), error_message.data(),
128 error_message.length());
129 }
130 VLOG(2) << "ParallelForkJoin EXIT";
131 }
132