xref: /aosp_15_r20/external/tensorflow/tensorflow/python/lib/io/file_io_wrapper.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include <memory>
17 #include <string>
18 #include <vector>
19 
20 #include "pybind11/pybind11.h"
21 #include "pybind11/stl.h"
22 #include "tensorflow/core/lib/core/error_codes.pb.h"
23 #include "tensorflow/core/lib/core/errors.h"
24 #include "tensorflow/core/lib/core/status.h"
25 #include "tensorflow/core/lib/io/buffered_inputstream.h"
26 #include "tensorflow/core/lib/io/random_inputstream.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/file_statistics.h"
29 #include "tensorflow/core/platform/file_system.h"
30 #include "tensorflow/core/platform/stringpiece.h"
31 #include "tensorflow/core/platform/tstring.h"
32 #include "tensorflow/python/lib/core/pybind11_absl.h"
33 #include "tensorflow/python/lib/core/pybind11_status.h"
34 
35 namespace tensorflow {
36 struct PyTransactionToken {
37   TransactionToken* token_;
38 };
39 
TokenFromPyToken(PyTransactionToken * t)40 inline TransactionToken* TokenFromPyToken(PyTransactionToken* t) {
41   return (t ? t->token_ : nullptr);
42 }
43 }  // namespace tensorflow
44 
45 namespace {
46 namespace py = pybind11;
47 
PYBIND11_MODULE(_pywrap_file_io,m)48 PYBIND11_MODULE(_pywrap_file_io, m) {
49   using tensorflow::PyTransactionToken;
50   using tensorflow::TransactionToken;
51   py::class_<PyTransactionToken>(m, "TransactionToken")
52       .def("__repr__", [](const PyTransactionToken* t) {
53         if (t->token_) {
54           return std::string(t->token_->owner->DecodeTransaction(t->token_));
55         }
56         return std::string("Invalid token!");
57       });
58 
59   m.def(
60       "FileExists",
61       [](const std::string& filename, PyTransactionToken* token) {
62         tensorflow::Status status;
63         {
64           py::gil_scoped_release release;
65           status = tensorflow::Env::Default()->FileExists(filename);
66         }
67         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
68       },
69       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
70   m.def(
71       "DeleteFile",
72       [](const std::string& filename, PyTransactionToken* token) {
73         py::gil_scoped_release release;
74         tensorflow::Status status =
75             tensorflow::Env::Default()->DeleteFile(filename);
76         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
77       },
78       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
79   m.def(
80       "ReadFileToString",
81       [](const std::string& filename, PyTransactionToken* token) {
82         std::string data;
83         py::gil_scoped_release release;
84         const auto status =
85             ReadFileToString(tensorflow::Env::Default(), filename, &data);
86         pybind11::gil_scoped_acquire acquire;
87         tensorflow::MaybeRaiseRegisteredFromStatus(status);
88         return py::bytes(data);
89       },
90       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
91   m.def(
92       "WriteStringToFile",
93       [](const std::string& filename, tensorflow::StringPiece data,
94          PyTransactionToken* token) {
95         py::gil_scoped_release release;
96         const auto status =
97             WriteStringToFile(tensorflow::Env::Default(), filename, data);
98         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
99       },
100       py::arg("filename"), py::arg("data"),
101       py::arg("token") = (PyTransactionToken*)nullptr);
102   m.def(
103       "GetChildren",
104       [](const std::string& dirname, PyTransactionToken* token) {
105         std::vector<std::string> results;
106         py::gil_scoped_release release;
107         const auto status =
108             tensorflow::Env::Default()->GetChildren(dirname, &results);
109         pybind11::gil_scoped_acquire acquire;
110         tensorflow::MaybeRaiseRegisteredFromStatus(status);
111         return results;
112       },
113       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
114   m.def(
115       "GetMatchingFiles",
116       [](const std::string& pattern, PyTransactionToken* token) {
117         std::vector<std::string> results;
118         py::gil_scoped_release release;
119         const auto status =
120             tensorflow::Env::Default()->GetMatchingPaths(pattern, &results);
121         pybind11::gil_scoped_acquire acquire;
122         tensorflow::MaybeRaiseRegisteredFromStatus(status);
123         return results;
124       },
125       py::arg("pattern"), py::arg("token") = (PyTransactionToken*)nullptr);
126   m.def(
127       "CreateDir",
128       [](const std::string& dirname, PyTransactionToken* token) {
129         py::gil_scoped_release release;
130         const auto status = tensorflow::Env::Default()->CreateDir(dirname);
131         if (tensorflow::errors::IsAlreadyExists(status)) {
132           return;
133         }
134         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
135       },
136       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
137   m.def(
138       "RecursivelyCreateDir",
139       [](const std::string& dirname, PyTransactionToken* token) {
140         py::gil_scoped_release release;
141         const auto status =
142             tensorflow::Env::Default()->RecursivelyCreateDir(dirname);
143         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
144       },
145       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
146   m.def(
147       "CopyFile",
148       [](const std::string& src, const std::string& target, bool overwrite,
149          PyTransactionToken* token) {
150         py::gil_scoped_release release;
151         auto* env = tensorflow::Env::Default();
152         tensorflow::Status status;
153         if (!overwrite && env->FileExists(target).ok()) {
154           status = tensorflow::errors::AlreadyExists("file already exists");
155         } else {
156           status = env->CopyFile(src, target);
157         }
158         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
159       },
160       py::arg("src"), py::arg("target"), py::arg("overwrite"),
161       py::arg("token") = (PyTransactionToken*)nullptr);
162   m.def(
163       "RenameFile",
164       [](const std::string& src, const std::string& target, bool overwrite,
165          PyTransactionToken* token) {
166         py::gil_scoped_release release;
167         auto* env = tensorflow::Env::Default();
168         tensorflow::Status status;
169         if (!overwrite && env->FileExists(target).ok()) {
170           status = tensorflow::errors::AlreadyExists("file already exists");
171         } else {
172           status = env->RenameFile(src, target);
173         }
174         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
175       },
176       py::arg("src"), py::arg("target"), py::arg("overwrite"),
177       py::arg("token") = (PyTransactionToken*)nullptr);
178   m.def(
179       "DeleteRecursively",
180       [](const std::string& dirname, PyTransactionToken* token) {
181         py::gil_scoped_release release;
182         int64_t undeleted_files;
183         int64_t undeleted_dirs;
184         auto status = tensorflow::Env::Default()->DeleteRecursively(
185             dirname, &undeleted_files, &undeleted_dirs);
186         if (status.ok() && (undeleted_files > 0 || undeleted_dirs > 0)) {
187           status = tensorflow::errors::PermissionDenied(
188               "could not fully delete dir");
189         }
190         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
191       },
192       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
193   m.def(
194       "IsDirectory",
195       [](const std::string& dirname, PyTransactionToken* token) {
196         py::gil_scoped_release release;
197         const auto status = tensorflow::Env::Default()->IsDirectory(dirname);
198         // FAILED_PRECONDITION response means path exists but isn't a dir.
199         if (tensorflow::errors::IsFailedPrecondition(status)) {
200           return false;
201         }
202 
203         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
204         return true;
205       },
206       py::arg("dirname"), py::arg("token") = (PyTransactionToken*)nullptr);
207   m.def("HasAtomicMove", [](const std::string& path) {
208     py::gil_scoped_release release;
209     bool has_atomic_move;
210     const auto status =
211         tensorflow::Env::Default()->HasAtomicMove(path, &has_atomic_move);
212     tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
213     return has_atomic_move;
214   });
215 
216   py::class_<tensorflow::FileStatistics>(m, "FileStatistics")
217       .def_readonly("length", &tensorflow::FileStatistics::length)
218       .def_readonly("mtime_nsec", &tensorflow::FileStatistics::mtime_nsec)
219       .def_readonly("is_directory", &tensorflow::FileStatistics::is_directory);
220 
221   m.def(
222       "Stat",
223       [](const std::string& filename, PyTransactionToken* token) {
224         py::gil_scoped_release release;
225         std::unique_ptr<tensorflow::FileStatistics> self(
226             new tensorflow::FileStatistics);
227         const auto status =
228             tensorflow::Env::Default()->Stat(filename, self.get());
229         py::gil_scoped_acquire acquire;
230         tensorflow::MaybeRaiseRegisteredFromStatus(status);
231         return self.release();
232       },
233       py::arg("filename"), py::arg("token") = (PyTransactionToken*)nullptr);
234 
235   m.def("GetRegisteredSchemes", []() {
236     std::vector<std::string> results;
237     py::gil_scoped_release release;
238     const auto status =
239         tensorflow::Env::Default()->GetRegisteredFileSystemSchemes(&results);
240     pybind11::gil_scoped_acquire acquire;
241     tensorflow::MaybeRaiseRegisteredFromStatus(status);
242     return results;
243   });
244 
245   using tensorflow::WritableFile;
246   py::class_<WritableFile>(m, "WritableFile")
247       .def(py::init([](const std::string& filename, const std::string& mode,
248                        PyTransactionToken* token) {
249              py::gil_scoped_release release;
250              auto* env = tensorflow::Env::Default();
251              std::unique_ptr<WritableFile> self;
252              const auto status = mode.find('a') == std::string::npos
253                                      ? env->NewWritableFile(filename, &self)
254                                      : env->NewAppendableFile(filename, &self);
255              py::gil_scoped_acquire acquire;
256              tensorflow::MaybeRaiseRegisteredFromStatus(status);
257              return self.release();
258            }),
259            py::arg("filename"), py::arg("mode"),
260            py::arg("token") = (PyTransactionToken*)nullptr)
261       .def("append",
262            [](WritableFile* self, tensorflow::StringPiece data) {
263              const auto status = self->Append(data);
264              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
265            })
266       // TODO(slebedev): Make WritableFile::Tell const and change self
267       // to be a reference.
268       .def("tell",
269            [](WritableFile* self) {
270              int64_t pos = -1;
271              py::gil_scoped_release release;
272              const auto status = self->Tell(&pos);
273              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
274              return pos;
275            })
276       .def("flush",
277            [](WritableFile* self) {
278              py::gil_scoped_release release;
279              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Flush());
280            })
281       .def("close", [](WritableFile* self) {
282         py::gil_scoped_release release;
283         tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Close());
284       });
285 
286   using tensorflow::io::BufferedInputStream;
287   py::class_<BufferedInputStream>(m, "BufferedInputStream")
288       .def(py::init([](const std::string& filename, size_t buffer_size,
289                        PyTransactionToken* token) {
290              py::gil_scoped_release release;
291              std::unique_ptr<tensorflow::RandomAccessFile> file;
292              const auto status =
293                  tensorflow::Env::Default()->NewRandomAccessFile(filename,
294                                                                  &file);
295              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
296              std::unique_ptr<tensorflow::io::RandomAccessInputStream>
297                  input_stream(new tensorflow::io::RandomAccessInputStream(
298                      file.release(),
299                      /*owns_file=*/true));
300              py::gil_scoped_acquire acquire;
301              return new BufferedInputStream(input_stream.release(), buffer_size,
302                                             /*owns_input_stream=*/true);
303            }),
304            py::arg("filename"), py::arg("buffer_size"),
305            py::arg("token") = (PyTransactionToken*)nullptr)
306       .def("read",
307            [](BufferedInputStream* self, int64_t bytes_to_read) {
308              py::gil_scoped_release release;
309              tensorflow::tstring result;
310              const auto status = self->ReadNBytes(bytes_to_read, &result);
311              if (!status.ok() && !tensorflow::errors::IsOutOfRange(status)) {
312                result.clear();
313                tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(status);
314              }
315              py::gil_scoped_acquire acquire;
316              return py::bytes(result);
317            })
318       .def("readline",
319            [](BufferedInputStream* self) {
320              py::gil_scoped_release release;
321              auto output = self->ReadLineAsString();
322              py::gil_scoped_acquire acquire;
323              return py::bytes(output);
324            })
325       .def("seek",
326            [](BufferedInputStream* self, int64_t pos) {
327              py::gil_scoped_release release;
328              tensorflow::MaybeRaiseRegisteredFromStatusWithGIL(self->Seek(pos));
329            })
330       .def("tell", [](BufferedInputStream* self) {
331         py::gil_scoped_release release;
332         return self->Tell();
333       });
334 }
335 }  // namespace
336