1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include <memory>
17 #include <string>
18 #include <vector>
19
20 #include "pybind11/pybind11.h"
21 #include "pybind11/stl.h"
22 #include "tensorflow/core/lib/core/error_codes.pb.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/status.h"
25 #include "tensorflow/core/lib/io/buffered_inputstream.h"
26 #include "tensorflow/core/lib/io/random_inputstream.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/file_system.h"
30 #include "tensorflow/core/platform/stringpiece.h"
31 #include "tensorflow/core/platform/tstring.h"
32 #include "tensorflow/python/lib/core/pybind11_absl.h"
33 #include "tensorflow/python/lib/core/pybind11_status.h"
34
35 namespace tensorflow {
36 struct PyTransactionToken {
37 TransactionToken* token_;
38 };
39
TokenFromPyToken(PyTransactionToken * t)40 inline TransactionToken* TokenFromPyToken(PyTransactionToken* t) {
41 return (t ? t->token_ : nullptr);
42 }
43 } // namespace tensorflow
44
45 namespace {
46 namespace py = pybind11;
47
PYBIND11_MODULE(_pywrap_file_io,m)48 PYBIND11_MODULE(_pywrap_file_io, m) {
49 using tensorflow::PyTransactionToken;
50 using tensorflow::TransactionToken;
51 py::class_<PyTransactionToken>(m, "TransactionToken")
52 .def("__repr__", [](const PyTransactionToken* t) {
53 if (t->token_) {
54 return std::string(t->token_->owner->DecodeTransaction(t->token_));
55 }
56 return std::string("Invalid token!");
57 });
58
59 m.def(
60 "FileExists",
61 [](const std::string& filename, PyTransactionToken* token) {
62 tensorflow::Status status;
63 {
64 py::gil_scoped_release release;
65 status = tensorflow::Env::Default()->FileExists(filename);
66 }
67 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
68 },
69 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
70 m.def(
71 "DeleteFile",
72 [](const std::string& filename, PyTransactionToken* token) {
73 py::gil_scoped_release release;
74 tensorflow::Status status =
75 tensorflow::Env::Default()->DeleteFile(filename);
76 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
77 },
78 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
79 m.def(
80 "ReadFileToString",
81 [](const std::string& filename, PyTransactionToken* token) {
82 std::string data;
83 py::gil_scoped_release release;
84 const auto status =
85 ReadFileToString(tensorflow::Env::Default(), filename, &data);
86 pybind11::gil_scoped_acquire acquire;
87 tensorflow::MaybeRaiseRegisteredFromStatus(status);
88 return py::bytes(data);
89 },
90 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
91 m.def(
92 "WriteStringToFile",
93 [](const std::string& filename, tensorflow::StringPiece data,
94 PyTransactionToken* token) {
95 py::gil_scoped_release release;
96 const auto status =
97 WriteStringToFile(tensorflow::Env::Default(), filename, data);
98 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
99 },
100 py::arg("filename"), py::arg("data"),
101 py::arg("token") = (PyTransactionToken*)nullptr);
102 m.def(
103 "GetChildren",
104 [](const std::string& dirname, PyTransactionToken* token) {
105 std::vector<std::string> results;
106 py::gil_scoped_release release;
107 const auto status =
108 tensorflow::Env::Default()->GetChildren(dirname, &results);
109 pybind11::gil_scoped_acquire acquire;
110 tensorflow::MaybeRaiseRegisteredFromStatus(status);
111 return results;
112 },
113 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
114 m.def(
115 "GetMatchingFiles",
116 [](const std::string& pattern, PyTransactionToken* token) {
117 std::vector<std::string> results;
118 py::gil_scoped_release release;
119 const auto status =
120 tensorflow::Env::Default()->GetMatchingPaths(pattern, &results);
121 pybind11::gil_scoped_acquire acquire;
122 tensorflow::MaybeRaiseRegisteredFromStatus(status);
123 return results;
124 },
125 py::arg("pattern"), py::arg("token") = (PyTransactionToken*)nullptr);
126 m.def(
127 "CreateDir",
128 [](const std::string& dirname, PyTransactionToken* token) {
129 py::gil_scoped_release release;
130 const auto status = tensorflow::Env::Default()->CreateDir(dirname);
131 if (tensorflow::errors::IsAlreadyExists(status)) {
132 return;
133 }
134 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
135 },
136 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
137 m.def(
138 "RecursivelyCreateDir",
139 [](const std::string& dirname, PyTransactionToken* token) {
140 py::gil_scoped_release release;
141 const auto status =
142 tensorflow::Env::Default()->RecursivelyCreateDir(dirname);
143 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
144 },
145 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
146 m.def(
147 "CopyFile",
148 [](const std::string& src, const std::string& target, bool overwrite,
149 PyTransactionToken* token) {
150 py::gil_scoped_release release;
151 auto* env = tensorflow::Env::Default();
152 tensorflow::Status status;
153 if (!overwrite && env->FileExists(target).ok()) {
154 status = tensorflow::errors::AlreadyExists("file already exists");
155 } else {
156 status = env->CopyFile(src, target);
157 }
158 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
159 },
160 py::arg("src"), py::arg("target"), py::arg("overwrite"),
161 py::arg("token") = (PyTransactionToken*)nullptr);
162 m.def(
163 "RenameFile",
164 [](const std::string& src, const std::string& target, bool overwrite,
165 PyTransactionToken* token) {
166 py::gil_scoped_release release;
167 auto* env = tensorflow::Env::Default();
168 tensorflow::Status status;
169 if (!overwrite && env->FileExists(target).ok()) {
170 status = tensorflow::errors::AlreadyExists("file already exists");
171 } else {
172 status = env->RenameFile(src, target);
173 }
174 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
175 },
176 py::arg("src"), py::arg("target"), py::arg("overwrite"),
177 py::arg("token") = (PyTransactionToken*)nullptr);
178 m.def(
179 "DeleteRecursively",
180 [](const std::string& dirname, PyTransactionToken* token) {
181 py::gil_scoped_release release;
182 int64_t undeleted_files;
183 int64_t undeleted_dirs;
184 auto status = tensorflow::Env::Default()->DeleteRecursively(
185 dirname, &undeleted_files, &undeleted_dirs);
186 if (status.ok() && (undeleted_files > 0 || undeleted_dirs > 0)) {
187 status = tensorflow::errors::PermissionDenied(
188 "could not fully delete dir");
189 }
190 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
191 },
192 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
193 m.def(
194 "IsDirectory",
195 [](const std::string& dirname, PyTransactionToken* token) {
196 py::gil_scoped_release release;
197 const auto status = tensorflow::Env::Default()->IsDirectory(dirname);
198 // FAILED_PRECONDITION response means path exists but isn't a dir.
199 if (tensorflow::errors::IsFailedPrecondition(status)) {
200 return false;
201 }
202
203 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
204 return true;
205 },
206 py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
207 m.def("HasAtomicMove", [](const std::string& path) {
208 py::gil_scoped_release release;
209 bool has_atomic_move;
210 const auto status =
211 tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move);
212 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
213 return has_atomic_move;
214 });
215
216 py::class_<tensorflow::FileStatistics>(m, "FileStatistics")
217 .def_readonly("length", &tensorflow::FileStatistics::length)
218 .def_readonly("mtime_nsec", &tensorflow::FileStatistics::mtime_nsec)
219 .def_readonly("is_directory", &tensorflow::FileStatistics::is_directory);
220
221 m.def(
222 "Stat",
223 [](const std::string& filename, PyTransactionToken* token) {
224 py::gil_scoped_release release;
225 std::unique_ptr<tensorflow::FileStatistics> self(
226 new tensorflow::FileStatistics);
227 const auto status =
228 tensorflow::Env::Default()->Stat(filename, self.get());
229 py::gil_scoped_acquire acquire;
230 tensorflow::MaybeRaiseRegisteredFromStatus(status);
231 return self.release();
232 },
233 py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
234
235 m.def("GetRegisteredSchemes", []() {
236 std::vector<std::string> results;
237 py::gil_scoped_release release;
238 const auto status =
239 tensorflow::Env::Default()->GetRegisteredFileSystemSchemes(&results);
240 pybind11::gil_scoped_acquire acquire;
241 tensorflow::MaybeRaiseRegisteredFromStatus(status);
242 return results;
243 });
244
245 using tensorflow::WritableFile;
246 py::class_<WritableFile>(m, "WritableFile")
247 .def(py::init([](const std::string& filename, const std::string& mode,
248 PyTransactionToken* token) {
249 py::gil_scoped_release release;
250 auto* env = tensorflow::Env::Default();
251 std::unique_ptr<WritableFile> self;
252 const auto status = mode.find('a') == std::string::npos
253 ? env->NewWritableFile(filename, &self)
254 : env->NewAppendableFile(filename, &self);
255 py::gil_scoped_acquire acquire;
256 tensorflow::MaybeRaiseRegisteredFromStatus(status);
257 return self.release();
258 }),
259 py::arg("filename"), py::arg("mode"),
260 py::arg("token") = (PyTransactionToken*)nullptr)
261 .def("append",
262 [](WritableFile* self, tensorflow::StringPiece data) {
263 const auto status = self->Append(data);
264 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
265 })
266 // TODO(slebedev): Make WritableFile::Tell const and change self
267 // to be a reference.
268 .def("tell",
269 [](WritableFile* self) {
270 int64_t pos = -1;
271 py::gil_scoped_release release;
272 const auto status = self->Tell(&pos);
273 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
274 return pos;
275 })
276 .def("flush",
277 [](WritableFile* self) {
278 py::gil_scoped_release release;
279 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Flush());
280 })
281 .def("close", [](WritableFile* self) {
282 py::gil_scoped_release release;
283 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Close());
284 });
285
286 using tensorflow::io::BufferedInputStream;
287 py::class_<BufferedInputStream>(m, "BufferedInputStream")
288 .def(py::init([](const std::string& filename, size_t buffer_size,
289 PyTransactionToken* token) {
290 py::gil_scoped_release release;
291 std::unique_ptr<tensorflow::RandomAccessFile> file;
292 const auto status =
293 tensorflow::Env::Default()->NewRandomAccessFile(filename,
294 &file);
295 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
296 std::unique_ptr<tensorflow::io::RandomAccessInputStream>
297 input_stream(new tensorflow::io::RandomAccessInputStream(
298 file.release(),
299 /*owns_file=*/true));
300 py::gil_scoped_acquire acquire;
301 return new BufferedInputStream(input_stream.release(), buffer_size,
302 /*owns_input_stream=*/true);
303 }),
304 py::arg("filename"), py::arg("buffer_size"),
305 py::arg("token") = (PyTransactionToken*)nullptr)
306 .def("read",
307 [](BufferedInputStream* self, int64_t bytes_to_read) {
308 py::gil_scoped_release release;
309 tensorflow::tstring result;
310 const auto status = self->ReadNBytes(bytes_to_read, &result);
311 if (!status.ok() && !tensorflow::errors::IsOutOfRange(status)) {
312 result.clear();
313 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
314 }
315 py::gil_scoped_acquire acquire;
316 return py::bytes(result);
317 })
318 .def("readline",
319 [](BufferedInputStream* self) {
320 py::gil_scoped_release release;
321 auto output = self->ReadLineAsString();
322 py::gil_scoped_acquire acquire;
323 return py::bytes(output);
324 })
325 .def("seek",
326 [](BufferedInputStream* self, int64_t pos) {
327 py::gil_scoped_release release;
328 tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Seek(pos));
329 })
330 .def("tell", [](BufferedInputStream* self) {
331 py::gil_scoped_release release;
332 return self->Tell();
333 });
334 }
335 } // namespace
336