1 #include <torch/csrc/python_headers.h>
2 #include <system_error>
3 #include <vector>
4
5 #include <ATen/ops/from_blob.h>
6 #include <c10/core/CPUAllocator.h>
7 #include <torch/csrc/THP.h>
8 #include <torch/csrc/serialization.h>
9
10 template <class io>
11 Py_ssize_t doPartialRead(io fildes, void* buf, size_t nbytes);
12
13 template <class io>
14 Py_ssize_t doPartialWrite(io fildes, void* buf, size_t nbytes);
15
16 static Py_ssize_t doPartialPythonReadBuffered(
17 PyObject* fildes,
18 void* buf,
19 size_t nbytes);
20 static Py_ssize_t doPartialPythonReadInto(
21 PyObject* fildes,
22 void* buf,
23 size_t nbytes);
24 static Py_ssize_t doPartialPythonWrite(
25 PyObject* fildes,
26 void* buf,
27 size_t nbytes);
28
29 template <>
doPartialRead(int fildes,void * buf,size_t nbytes)30 Py_ssize_t doPartialRead<int>(int fildes, void* buf, size_t nbytes) {
31 return read(fildes, buf, nbytes);
32 }
33
34 template <>
doPartialRead(PyObject * fildes,void * buf,size_t nbytes)35 Py_ssize_t doPartialRead<PyObject*>(
36 PyObject* fildes,
37 void* buf,
38 size_t nbytes) {
39 // Try to use fildes.readinto() instead of fildes.read()
40 // because it is more memory efficient.
41 // TODO: Stop calling PyObject_HasAttrString() in a loop on our read loop
42 auto has_readinto = PyObject_HasAttrString(fildes, "readinto") == 1;
43 if (has_readinto) {
44 return doPartialPythonReadInto(fildes, buf, nbytes);
45 }
46 return doPartialPythonReadBuffered(fildes, buf, nbytes);
47 }
48
49 template <>
doPartialWrite(int fildes,void * buf,size_t nbytes)50 Py_ssize_t doPartialWrite<int>(int fildes, void* buf, size_t nbytes) {
51 return write(fildes, buf, nbytes);
52 }
53
54 template <>
doPartialWrite(PyObject * fildes,void * buf,size_t nbytes)55 Py_ssize_t doPartialWrite<PyObject*>(
56 PyObject* fildes,
57 void* buf,
58 size_t nbytes) {
59 return doPartialPythonWrite(fildes, buf, nbytes);
60 }
61
isUnsupportedOperation()62 static inline bool isUnsupportedOperation() {
63 THPObjectPtr io(PyImport_ImportModule("io"));
64 if (!io)
65 throw python_error();
66 THPObjectPtr exception(PyObject_GetAttrString(io, "UnsupportedOperation"));
67 if (!exception)
68 throw python_error();
69 return PyErr_ExceptionMatches(exception.get());
70 }
71
72 // Call Python fildes.read(nbytes) and copy it to buf.
doPartialPythonReadBuffered(PyObject * fildes,void * buf,size_t raw_nbytes)73 static inline Py_ssize_t doPartialPythonReadBuffered(
74 PyObject* fildes,
75 void* buf,
76 size_t raw_nbytes) {
77 // If we request a large amount of data, f.read() will internally try to
78 // allocate a buffer of that size. This is counterproductive, because
79 // it's not the buffer we ultimately want to write the data into. Read
80 // less than that and avoid allocating too much extra memory.
81 // TODO: Maybe 260 KB is a bit small...
82 const size_t nbytes = std::min<size_t>(raw_nbytes, 262144u); // 2^18 (~260 KB)
83
84 THPObjectPtr r(PyObject_CallMethod(fildes, "read", "i", nbytes));
85 if (!r)
86 throw python_error();
87
88 auto size = PyBytes_GET_SIZE(r.get());
89 const void* py_buf = PyBytes_AsString(r.get());
90
91 // we read EOF
92 if (size == 0) {
93 return 0;
94 }
95
96 // Slurp it into the buffer we actually want
97 memcpy(buf, py_buf, size);
98
99 return size;
100 }
101
102 // Either does fildes.readinto(buf) or fildes.write(buf)
doPartialPythonIO(PyObject * fildes,void * buf,size_t nbytes,bool is_read)103 static inline Py_ssize_t doPartialPythonIO(
104 PyObject* fildes,
105 void* buf,
106 size_t nbytes,
107 bool is_read) {
108 auto rw_flag = is_read ? PyBUF_WRITE : PyBUF_READ;
109 THPObjectPtr memview(PyMemoryView_FromMemory(
110 reinterpret_cast<char*>(buf), static_cast<Py_ssize_t>(nbytes), rw_flag));
111 if (!memview)
112 throw python_error();
113
114 std::string method = "write";
115 if (is_read) {
116 method = "readinto";
117 }
118 THPObjectPtr r(
119 PyObject_CallMethod(fildes, method.c_str(), "O", memview.get()));
120 if (r) {
121 return PyLong_AsSsize_t(r.get());
122 }
123
124 // fildes.readinto can return UnsupportedOperation so fall back to
125 // fildes.read.
126 if (is_read && isUnsupportedOperation()) {
127 PyErr_Clear();
128 return doPartialPythonReadBuffered(fildes, buf, nbytes);
129 }
130 throw python_error();
131 }
132
133 // Call Python fildes.readinto(buf)
doPartialPythonReadInto(PyObject * fildes,void * buf,size_t nbytes)134 static Py_ssize_t doPartialPythonReadInto(
135 PyObject* fildes,
136 void* buf,
137 size_t nbytes) {
138 return doPartialPythonIO(fildes, buf, nbytes, /* is_read */ true);
139 }
140
141 // Call Python fildes.write(buf)
doPartialPythonWrite(PyObject * fildes,void * buf,size_t nbytes)142 static Py_ssize_t doPartialPythonWrite(
143 PyObject* fildes,
144 void* buf,
145 size_t nbytes) {
146 return doPartialPythonIO(fildes, buf, nbytes, /* is_read */ false);
147 }
148
149 // Requires that we read EXACTLY nbytes; fails if we don't.
150 template <typename io>
doRead(io fildes,void * raw_buf,size_t nbytes)151 void doRead(io fildes, void* raw_buf, size_t nbytes) {
152 char* buf = static_cast<char*>(raw_buf);
153 while (nbytes > 0) {
154 errno = 0; // doPartialRead may not set errno
155 // we read in 1GB blocks to avoid bugs on Mac OS X Lion
156 // see https://github.com/pytorch/pytorch/issues/1031 for more details
157 Py_ssize_t r =
158 doPartialRead(fildes, buf, std::min<size_t>(nbytes, 1073741824));
159 if (r < 0) {
160 int err = errno;
161 TORCH_INTERNAL_ASSERT(
162 err != 0, "read(): impossible! r < 0, but no errno was set");
163 TORCH_INTERNAL_ASSERT(
164 err != EAGAIN,
165 "read(): non-blocking fd ",
166 fildes,
167 " read EAGAIN; cowardly refusing to spin-wait");
168 if (err == EINTR) {
169 continue;
170 } else {
171 AT_ERROR("read(): fd ", fildes, " failed with ", strerror(err));
172 }
173 } else if (r == 0) {
174 break;
175 }
176 buf += r;
177 // This is guaranteed by POSIX, but I just want to be double-sure
178 // to not underflow a signed integer.
179 AT_ASSERT(static_cast<size_t>(r) <= nbytes);
180 nbytes -= r;
181 }
182 if (nbytes != 0) {
183 AT_ERROR(
184 "unexpected EOF, expected ",
185 nbytes,
186 " more bytes. The file might be corrupted.");
187 }
188 }
189
190 template <typename io>
doWrite(io fildes,void * raw_buf,size_t nbytes)191 void doWrite(io fildes, void* raw_buf, size_t nbytes) {
192 char* buf = static_cast<char*>(raw_buf);
193 while (nbytes > 0) {
194 errno = 0; // doPartialWrite may not set errno
195 // we write in 1GB blocks to avoid bugs on Mac OS X Lion
196 // see https://github.com/pytorch/pytorch/issues/1031 for more details
197 Py_ssize_t r =
198 doPartialWrite(fildes, buf, std::min<size_t>(nbytes, 1073741824));
199 if (r < 0) {
200 int err = errno;
201 TORCH_INTERNAL_ASSERT(
202 err != 0, "write(): impossible! r < 0, but no errno was set");
203 TORCH_INTERNAL_ASSERT(
204 err != EAGAIN,
205 "write(): non-blocking fd ",
206 fildes,
207 " read EAGAIN; cowardly refusing to spin-wait");
208 if (err == EINTR) {
209 continue;
210 } else {
211 AT_ERROR("write(): fd ", fildes, " failed with ", strerror(err));
212 }
213 }
214 buf += r;
215 AT_ASSERT(static_cast<size_t>(r) <= nbytes);
216 nbytes -= r;
217 }
218 }
219
220 // save_save is necessary since the old eager format saved storages as
221 // [size + data], but the v1.5 eager format removes this since size is saved in
222 // the filesize.
223 template <class io>
THPStorage_writeFileRaw(c10::StorageImpl * self,io fd,bool save_size,uint64_t element_size)224 void THPStorage_writeFileRaw(
225 c10::StorageImpl* self,
226 io fd,
227 bool save_size,
228 uint64_t element_size) {
229 c10::DeviceGuard guard(self->device());
230 uint8_t* data{};
231 at::Tensor cpu_tensor;
232 size_t size_bytes = self->nbytes();
233 size_t numel = size_bytes / element_size;
234 if (self->device_type() == at::kCPU) {
235 // We are using a mutable pointer here because we're ultimately
236 // calling into a Python API that requires that, even though it
237 // won't mutate the data.
238 data = static_cast<uint8_t*>(self->mutable_data());
239 } else {
240 // Here we use a tensor.to() to impl D2H for all non-CPU device.
241 auto device_tensor = at::from_blob(
242 self->mutable_data(),
243 {static_cast<int64_t>(size_bytes)},
244 {1},
245 nullptr,
246 at::device(self->device()).dtype(c10::kByte),
247 {self->device()});
248 cpu_tensor = device_tensor.to(at::kCPU);
249 data = (uint8_t*)cpu_tensor.data_ptr();
250 }
251 if (save_size) {
252 if (torch::utils::THP_nativeByteOrder() ==
253 torch::utils::THPByteOrder::THP_LITTLE_ENDIAN)
254 doWrite(fd, &numel, sizeof(int64_t));
255 else {
256 int64_t nsize{}; // convert big endian cpu to little endian storage
257 torch::utils::THP_encodeInt64Buffer(
258 (uint8_t*)&nsize,
259 (const int64_t*)&numel,
260 torch::utils::THPByteOrder::THP_LITTLE_ENDIAN,
261 1);
262 doWrite(fd, &nsize, sizeof(int64_t));
263 }
264 }
265 // fast track for bytes and little endian
266 if (element_size == 1 ||
267 torch::utils::THP_nativeByteOrder() ==
268 torch::utils::THPByteOrder::THP_LITTLE_ENDIAN) {
269 doWrite(fd, data, size_bytes);
270 } else {
271 size_t buffer_size = std::min(numel, (size_t)5000);
272 std::vector<uint8_t> le_buffer;
273 le_buffer.resize(buffer_size * element_size);
274 for (size_t i = 0; i < numel; i += buffer_size) {
275 size_t to_convert = std::min(numel - i, buffer_size);
276 if (element_size == 2) {
277 torch::utils::THP_encodeInt16Buffer(
278 le_buffer.data(),
279 (const int16_t*)data + i,
280 torch::utils::THPByteOrder::THP_LITTLE_ENDIAN,
281 to_convert);
282 } else if (element_size == 4) {
283 torch::utils::THP_encodeInt32Buffer(
284 le_buffer.data(),
285 (const int32_t*)data + i,
286 torch::utils::THPByteOrder::THP_LITTLE_ENDIAN,
287 to_convert);
288 } else if (element_size == 8) {
289 torch::utils::THP_encodeInt64Buffer(
290 le_buffer.data(),
291 (const int64_t*)data + i,
292 torch::utils::THPByteOrder::THP_LITTLE_ENDIAN,
293 to_convert);
294 }
295 doWrite(fd, le_buffer.data(), to_convert * element_size);
296 }
297 }
298 }
299
300 template void THPStorage_writeFileRaw<int>(
301 c10::StorageImpl* self,
302 int fd,
303 bool save_size,
304 uint64_t element_size);
305 template void THPStorage_writeFileRaw<PyObject*>(
306 c10::StorageImpl* self,
307 PyObject* fd,
308 bool save_size,
309 uint64_t element_size);
310
311 template <class io>
THPStorage_readFileRaw(io file,c10::intrusive_ptr<c10::StorageImpl> storage,uint64_t element_size)312 c10::intrusive_ptr<c10::StorageImpl> THPStorage_readFileRaw(
313 io file,
314 c10::intrusive_ptr<c10::StorageImpl> storage,
315 uint64_t element_size) {
316 c10::OptionalDeviceGuard guard;
317 if (storage.defined()) {
318 guard.reset_device(storage->device());
319 }
320 int64_t size{};
321 doRead(file, &size, sizeof(int64_t));
322 if (torch::utils::THP_nativeByteOrder() ==
323 torch::utils::THPByteOrder::THP_BIG_ENDIAN) {
324 int64_t tsize = size; // convert little endian storage to big endian cpu
325 torch::utils::THP_decodeInt64Buffer(&size, (const uint8_t*)&tsize, true, 1);
326 }
327 size_t nbytes = element_size * size;
328 if (!storage.defined()) {
329 storage = c10::make_intrusive<at::StorageImpl>(
330 c10::StorageImpl::use_byte_size_t(),
331 nbytes,
332 c10::GetDefaultCPUAllocator(),
333 /*resizable=*/true);
334 } else {
335 size_t _storage_nbytes = storage->nbytes();
336 TORCH_CHECK(
337 _storage_nbytes == nbytes,
338 "storage has wrong byte size: expected %ld got %ld",
339 nbytes,
340 _storage_nbytes);
341 }
342
343 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
344 std::unique_ptr<char[]> cpu_data;
345
346 uint8_t* data{};
347 if (storage->device_type() == at::kCPU) {
348 data = static_cast<uint8_t*>(storage->mutable_data());
349 } else {
350 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
351 cpu_data = std::unique_ptr<char[]>(new char[nbytes]);
352 data = (uint8_t*)cpu_data.get();
353 }
354
355 // fast track for bytes and little endian
356 if (element_size == 1 ||
357 torch::utils::THP_nativeByteOrder() ==
358 torch::utils::THPByteOrder::THP_LITTLE_ENDIAN) {
359 doRead(file, data, storage->nbytes());
360 } else {
361 int64_t buffer_size = std::min(size, (int64_t)5000);
362 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
363 std::unique_ptr<uint8_t[]> le_buffer(
364 new uint8_t[buffer_size * element_size]);
365
366 for (int64_t i = 0; i < size; i += buffer_size) {
367 size_t to_convert = std::min(size - i, buffer_size);
368 doRead(file, le_buffer.get(), element_size * to_convert);
369
370 // NOLINTNEXTLINE(bugprone-branch-clone)
371 if (element_size == 2) {
372 torch::utils::THP_decodeInt16Buffer(
373 (int16_t*)data + i, le_buffer.get(), true, to_convert);
374 } else if (element_size == 4) {
375 torch::utils::THP_decodeInt32Buffer(
376 (int32_t*)data + i, le_buffer.get(), true, to_convert);
377 } else if (element_size == 8) {
378 torch::utils::THP_decodeInt64Buffer(
379 (int64_t*)data + i, le_buffer.get(), true, to_convert);
380 }
381 }
382 }
383
384 if (storage->device_type() != at::kCPU) {
385 // Here we use a tensor.copy_() to impl H2D for all non-CPU device.
386 auto cpu_tensor = at::from_blob(
387 (void*)data,
388 {static_cast<int64_t>(nbytes)},
389 at::device(at::kCPU).dtype(c10::kByte));
390 auto device_tensor = at::from_blob(
391 storage->mutable_data(),
392 {static_cast<int64_t>(nbytes)},
393 {1},
394 nullptr,
395 at::device(storage->device()).dtype(c10::kByte),
396 {storage->device()});
397 device_tensor.copy_(cpu_tensor);
398 }
399 return storage;
400 }
401
402 template c10::intrusive_ptr<c10::StorageImpl> THPStorage_readFileRaw<int>(
403 int fd,
404 c10::intrusive_ptr<c10::StorageImpl> storage,
405 uint64_t element_size);
406 template c10::intrusive_ptr<c10::StorageImpl> THPStorage_readFileRaw<PyObject*>(
407 PyObject* fd,
408 c10::intrusive_ptr<c10::StorageImpl> storage,
409 uint64_t element_size);
410