1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/python/lib/core/py_func.h"
17
18 #include <Python.h>
19
20 // clang-format: off
21 // Must be included first.
22 #include "tensorflow/python/lib/core/numpy.h"
23 // clang-format: on
24
25 #include <array>
26
27 #include "numpy/arrayobject.h"
28 #include "tensorflow/c/eager/c_api.h"
29 #include "tensorflow/c/eager/tfe_context_internal.h"
30 #include "tensorflow/c/eager/tfe_tensorhandle_internal.h"
31 #include "tensorflow/c/tf_status_helper.h"
32 #include "tensorflow/core/common_runtime/eager/context.h"
33 #include "tensorflow/core/common_runtime/eager/tensor_handle.h"
34 #include "tensorflow/core/framework/allocation_description.pb.h"
35 #include "tensorflow/core/framework/op_kernel.h"
36 #include "tensorflow/core/framework/tensor.h"
37 #include "tensorflow/core/lib/core/errors.h"
38 #include "tensorflow/core/lib/core/status.h"
39 #include "tensorflow/core/lib/core/threadpool.h"
40 #include "tensorflow/core/platform/macros.h"
41 #include "tensorflow/core/platform/mutex.h"
42 #include "tensorflow/core/platform/types.h"
43 #include "tensorflow/python/eager/pywrap_tfe.h"
44 #include "tensorflow/python/lib/core/ndarray_tensor.h"
45 #include "tensorflow/python/lib/core/ndarray_tensor_bridge.h"
46 #include "tensorflow/python/lib/core/py_util.h"
47 #include "tensorflow/python/lib/core/safe_ptr.h"
48
49 namespace tensorflow {
50 namespace {
51
52 static mutex mu(LINKER_INITIALIZED);
53 static PyObject* py_trampoline TF_GUARDED_BY(mu) = nullptr;
54
55 // Returns the py_trampoline that is used to pass the control to the
56 // python runtime.
GetPyTrampoline()57 PyObject* GetPyTrampoline() {
58 mutex_lock l(mu);
59 return py_trampoline;
60 }
61
62 // A call to the registered python function.
63 struct PyCall {
64 // Passed to python runtime to call the python function registered
65 // with this "token".
66 string token;
67
68 // The device on which Tensors are stored; only used for EagerPyFunc.
69 Device* device = nullptr;
70
71 // True if the call is associated with an EagerPyFunc.
72 bool eager = false;
73
74 // True if the call is running under eager async mode.
75 bool eager_async = false;
76
77 // Inputs and outputs of this function invocation.
78 std::vector<Tensor> ins;
79 std::vector<Tensor> out;
80 };
81
IsCPUDevice(const Device * d)82 bool IsCPUDevice(const Device* d) {
83 return d == nullptr || d->tensorflow_accelerator_device_info() == nullptr;
84 }
85
86 // Givens the 'call', prepares the token and inputs as a python tuple
87 // that is appropriate for calling the trampoline.
MakeArgTuple(const PyCall * call,TFE_Context * ctx,PyObject ** tuple)88 Status MakeArgTuple(const PyCall* call, TFE_Context* ctx, PyObject** tuple) {
89 int64_t n = call->ins.size();
90 PyObject* lst = PyList_New(n);
91 CHECK(lst);
92 // TFE_TensorHandle assumes that CPU is identified by nullptr.
93 //
94 // Set device name to be empty if the device is CPU.
95 const char* device_name = nullptr;
96
97 if (call->device != nullptr && !IsCPUDevice(call->device))
98 device_name = call->device->name().c_str();
99
100 for (int64_t i = 0; i < n; ++i) {
101 PyObject* arg = nullptr;
102 if (call->eager) {
103 Tensor t = call->ins[i];
104 arg = EagerTensorFromHandle(tensorflow::wrap(
105 tensorflow::unwrap(ctx)->CreateLocalHandleFromTFTensor(t,
106 device_name)));
107 if (arg == nullptr) {
108 Py_DECREF(lst);
109 return errors::Internal("Unable to procure EagerTensor from Tensor.");
110 }
111 } else {
112 Status s = TensorToNdarray(call->ins[i], &arg);
113 if (!s.ok()) {
114 Py_DECREF(lst);
115 return s;
116 }
117 arg = PyArray_Return(reinterpret_cast<PyArrayObject*>(arg));
118 }
119 PyList_SetItem(lst, i, arg);
120 }
121 *tuple = Py_BuildValue("(ssN)", call->token.c_str(), device_name, lst);
122 CHECK(*tuple);
123 return OkStatus();
124 }
125
IsSingleNone(PyObject * obj)126 bool IsSingleNone(PyObject* obj) {
127 if (!PyArray_Check(obj)) {
128 return false;
129 }
130 PyArrayObject* array_obj = reinterpret_cast<PyArrayObject*>(obj);
131 if (PyArray_NDIM(array_obj) != 0 || PyArray_SIZE(array_obj) != 1) {
132 return false;
133 }
134 std::array<npy_intp, 0> indices;
135 char* item_ptr =
136 static_cast<char*>(PyArray_GetPtr(array_obj, indices.data()));
137 PyObject* item = PyArray_GETITEM(array_obj, item_ptr);
138 CHECK(item);
139 return item == Py_None;
140 }
141
142 // Retrieves a Tensor from `eager_tensor` and stores it in `output_tensor`.
143 // Validates that `output_tensor` is backed by memory in `expected_device`
144 // (which is assumed to be a local device, one on which the kernel was
145 // executed.)
146 //
147 // It may be nice to copy the tensor to the right device instead of failing if
148 // it isn't already there. This is left as a future exercise. The required
149 // device-copying logic is implemented in Python at the moment.
ExtractTensorFromEagerTensor(const PyObject * eager_tensor,TFE_Context * ctx,const Device * expected_device,const Tensor ** output_tensor)150 tensorflow::Status ExtractTensorFromEagerTensor(const PyObject* eager_tensor,
151 TFE_Context* ctx,
152 const Device* expected_device,
153 const Tensor** output_tensor) {
154 tensorflow::TensorHandle* handle = down_cast<tensorflow::TensorHandle*>(
155 tensorflow::unwrap(ctx)->TFTensorHandleFromInterface(
156 tensorflow::unwrap(EagerTensor_Handle(eager_tensor))));
157
158 Device* actual_device = handle->device();
159 TF_RETURN_IF_ERROR(handle->Tensor(output_tensor));
160 // actual_device may be nullptr, which implies local CPU.
161 if (expected_device == actual_device) return OkStatus();
162 const string& expected_device_name = expected_device->attributes().name();
163 if (actual_device == nullptr) {
164 if (!IsCPUDevice(expected_device)) {
165 return errors::Internal(
166 "Expected the py_func to return a Tensor backed by memory in ",
167 expected_device_name,
168 ", but is actually backed by local host memory. This is a bug.");
169 }
170 return OkStatus();
171 }
172 // NOTE(ebrevdo): Here we could try comparing "actual_device_name"
173 // (actual_device->attributes()->name()) to expected_device_name and ensure
174 // they're the same. However, this comparison fails if we create a ClusterDef
175 // on localhost, mainly because the Device created by Eager code doesn't match
176 // the device created by a session. In this case, expected_device_name may
177 // contain "worker" but the Eager device name contains "localhost". Since we
178 // can't easily access the true underlying device of "worker" here, we are not
179 // able to perform a proper comparison. Furthermore, we can't check
180 // IsCPUDevice(actual_device) because the kernel's device may indeed be a
181 // GPU device (the python interpreter doesn't use it, however).
182 return OkStatus();
183 }
184
185 // Calls the registered py function through the trampoline.
DoCallPyFunc(PyCall * call,bool * out_log_on_error)186 Status DoCallPyFunc(PyCall* call, bool* out_log_on_error) {
187 *out_log_on_error = true;
188 PyObject* trampoline = GetPyTrampoline();
189 if (trampoline == nullptr) {
190 return errors::InvalidArgument(
191 "Missing py trampoline. Most likely, it is a link error.");
192 }
193
194 // Prepare the argument.
195 PyObject* args = nullptr;
196 std::unique_ptr<EagerExecutor> new_executor = nullptr;
197 EagerExecutor* old_executor = nullptr;
198 if (call->eager) {
199 // See FuncRegistry._ctx.
200 TFE_Context* ctx = reinterpret_cast<TFE_Context*>(PyCapsule_GetPointer(
201 PyObject_GetAttrString(trampoline, "_ctx"), nullptr));
202 CHECK_NE(ctx, nullptr);
203 TF_RETURN_IF_ERROR(MakeArgTuple(call, ctx, &args));
204 new_executor.reset(new EagerExecutor(call->eager_async));
205 old_executor = &(tensorflow::unwrap(ctx)->Executor());
206 tensorflow::unwrap(ctx)->SetExecutorForThread(new_executor.get());
207 } else {
208 TF_RETURN_IF_ERROR(MakeArgTuple(call, nullptr, &args));
209 }
210 CHECK(args);
211
212 // Invokes the trampoline.
213 PyObject* result = PyEval_CallObject(trampoline, args);
214 Py_DECREF(args);
215 Status s = OkStatus();
216 if (result == nullptr) {
217 if (PyErr_Occurred()) {
218 if (PyErr_ExceptionMatches(PyExc_ValueError) ||
219 PyErr_ExceptionMatches(PyExc_TypeError)) {
220 s = errors::InvalidArgument(PyExceptionFetch());
221 } else if (PyErr_ExceptionMatches(PyExc_StopIteration)) {
222 *out_log_on_error = false;
223 s = errors::OutOfRange(PyExceptionFetch());
224 } else if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
225 s = errors::ResourceExhausted(PyExceptionFetch());
226 } else if (PyErr_ExceptionMatches(PyExc_NotImplementedError)) {
227 s = errors::Unimplemented(PyExceptionFetch());
228 } else {
229 // TODO(ebrevdo): Check if exception is an OpError and use the
230 // OpError.error_code property to map it back in the Status.
231 s = errors::Unknown(PyExceptionFetch());
232 }
233 } else {
234 s = errors::Internal("Failed to run py callback ", call->token,
235 ": see error log.");
236 }
237 }
238
239 TFE_Context* ctx = reinterpret_cast<TFE_Context*>(PyCapsule_GetPointer(
240 PyObject_GetAttrString(trampoline, "_ctx"), /*name=*/nullptr));
241 if (new_executor != nullptr) {
242 s.Update(new_executor->WaitForAllPendingNodes());
243 tensorflow::unwrap(ctx)->SetExecutorForThread(old_executor);
244 }
245
246 TF_RETURN_IF_ERROR(s);
247
248 // Process the return values and convert them to TF Tensors.
249 if (PyList_Check(result)) {
250 // `result` is a Python list; if this operation is an `EagerPyFunc`, then
251 // every item in the list must be an `EagerTensor`; otherwise, every element
252 // must be a NumPy array.
253 call->out.clear();
254 for (int i = 0; i < PyList_Size(result); ++i) {
255 Tensor t;
256 if (call->eager) {
257 const PyObject* item = PyList_GetItem(result, i);
258 if (EagerTensor_CheckExact(item)) {
259 const Tensor* tensor = nullptr;
260 s = ExtractTensorFromEagerTensor(item, ctx, call->device, &tensor);
261 if (s.ok()) t = *tensor;
262 } else {
263 s = errors::FailedPrecondition(
264 "Expected EagerTensor, found PyObject of type: ",
265 Py_TYPE(item)->tp_name);
266 }
267 } else {
268 s = NdarrayToTensor(PyList_GetItem(result, i), &t);
269 }
270
271 if (!s.ok()) {
272 break;
273 }
274 call->out.push_back(t);
275 }
276 } else if (EagerTensor_CheckExact(result) || result == Py_None) {
277 // result is an `EagerTensor` or `None`.
278 DCHECK(call->eager);
279 if (result != Py_None) {
280 const Tensor* t = nullptr;
281 s = ExtractTensorFromEagerTensor(result, ctx, call->device, &t);
282 if (s.ok()) call->out.push_back(*t);
283 }
284 } else if (PyArray_Check(result)) {
285 // `result` is a NumPy array.
286 DCHECK(!call->eager);
287 if (!IsSingleNone(result)) {
288 Tensor t;
289 s = NdarrayToTensor(result, &t);
290 if (s.ok()) {
291 call->out.push_back(t);
292 }
293 }
294 } else {
295 s = errors::Internal("Unexpected PyObject was returned: ",
296 Py_TYPE(result)->tp_name);
297 }
298 Py_DECREF(result);
299 return s;
300 }
301
302 } // end namespace
303
InitializePyTrampoline(PyObject * trampoline)304 void InitializePyTrampoline(PyObject* trampoline) {
305 mutex_lock l(mu);
306 if (py_trampoline == nullptr) {
307 py_trampoline = trampoline;
308 Py_INCREF(py_trampoline);
309 } else {
310 LOG(WARNING) << "InitializeCallback should only be called once";
311 }
312 }
313
314 class PyFuncOp : public OpKernel {
315 public:
PyFuncOp(OpKernelConstruction * ctx)316 explicit PyFuncOp(OpKernelConstruction* ctx) : OpKernel(ctx) {
317 OP_REQUIRES_OK(ctx, ctx->GetAttr("token", &token_));
318 eager_ = type_string() == "EagerPyFunc";
319 if (eager_) {
320 OP_REQUIRES_OK(ctx, ctx->GetAttr("is_async", &eager_async_));
321 }
322 }
323
IsExpensive()324 bool IsExpensive() override { return true; }
325
Compute(OpKernelContext * ctx)326 void Compute(OpKernelContext* ctx) override {
327 PyCall call;
328 call.token = token_;
329 call.eager = eager_;
330 if (call.eager) {
331 // Eager's C API uses `Device`, whereas `OpKernelContext` stores a
332 // `DeviceBase`; attempt to downcast.
333 call.device = dynamic_cast<Device*>(ctx->device());
334 if (call.device == nullptr) {
335 ctx->CtxFailureWithWarning(errors::Internal(
336 "Unrecognized device class: ", ctx->device()->name()));
337 return;
338 }
339 call.eager_async = eager_async_;
340 }
341
342 VLOG(1) << "PyFuncOp of token " << call.token << "is called.";
343
344 for (int i = 0; i < ctx->num_inputs(); ++i) {
345 call.ins.push_back(ctx->input(i));
346 }
347
348 // NOTE(mrry): There is a potential time-of-check-to-time-of-use race here.
349 // because it is possible that `Py_Finalize()` could be called in another
350 // thread between this check and the call to `PyGILState_Ensure()`, which
351 // will abort the process if `Py_Finalize()` has been called. A more robust
352 // solution would be welcome, but it is not obvious how to make this work
353 // using the current Python C API.
354 OP_REQUIRES(ctx, Py_IsInitialized(),
355 errors::FailedPrecondition(
356 "Python interpreter state is not initialized. "
357 "The process may be terminated."));
358
359 PyGILState_STATE py_threadstate;
360 py_threadstate = PyGILState_Ensure();
361 bool log_on_error;
362 Status s = DoCallPyFunc(&call, &log_on_error);
363 // Sometimes py_funcs can be called without a session and leak memory. This
364 // ensures we clear the decref cache so this doesn't happen.
365 ClearDecrefCache();
366 PyGILState_Release(py_threadstate);
367
368 // Ensures that GIL is released even when !s.ok().
369 if (!s.ok()) {
370 if (log_on_error) {
371 ctx->CtxFailureWithWarning(s);
372 } else {
373 ctx->CtxFailure(s);
374 }
375 return;
376 }
377
378 OP_REQUIRES(ctx, static_cast<int32>(call.out.size()) == ctx->num_outputs(),
379 errors::InvalidArgument(token_, " returns ", call.out.size(),
380 " values, but expects to see ",
381 ctx->num_outputs(), " values."));
382 for (size_t i = 0; i < call.out.size(); ++i) {
383 const auto& t = call.out[i];
384 OP_REQUIRES(
385 ctx, t.dtype() == output_type(i),
386 errors::InvalidArgument(i, "-th value returned by ", token_, " is ",
387 DataTypeString(t.dtype()), ", but expects ",
388 DataTypeString(output_type(i))));
389 ctx->set_output(i, t);
390 }
391 }
392
393 private:
394 string token_;
395
396 // True if and only if this op should execute the python function eagerly,
397 // i.e., if and only if the eager attribute is set.
398 bool eager_;
399
400 bool eager_async_;
401
402 TF_DISALLOW_COPY_AND_ASSIGN(PyFuncOp);
403 };
404
405 REGISTER_KERNEL_BUILDER(Name("PyFunc").Device(DEVICE_CPU), PyFuncOp);
406 REGISTER_KERNEL_BUILDER(Name("PyFuncStateless").Device(DEVICE_CPU), PyFuncOp);
407 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc").Device(DEVICE_CPU), PyFuncOp);
408
409 DataType gpu_types[] = {
410 // No strings and int32s, no ref types and no resource/variant types.
411 DT_FLOAT, DT_DOUBLE, DT_UINT8, DT_INT16, DT_INT8,
412 DT_COMPLEX64, DT_INT64, DT_BOOL, DT_QINT8, DT_QUINT8,
413 DT_QINT32, DT_BFLOAT16, DT_QINT16, DT_QUINT16, DT_UINT16,
414 DT_COMPLEX128, DT_HALF, DT_UINT32, DT_UINT64,
415 };
416
417 REGISTER_KERNEL_BUILDER(Name("EagerPyFunc")
418 .Device(DEVICE_DEFAULT)
419 .TypeConstraint("Tin", gpu_types)
420 .TypeConstraint("Tout", gpu_types),
421 PyFuncOp);
422
423 } // end namespace tensorflow
424