1 #include <torch/csrc/python_headers.h>
2 #ifdef _MSC_VER
3 #include <c10/util/win32-headers.h>
4 #endif
5 #include <structmember.h>
6
7 #include <c10/core/CPUAllocator.h>
8 #include <libshm.h>
9 #include <torch/csrc/CudaIPCTypes.h>
10 #include <torch/csrc/Device.h>
11 #include <torch/csrc/DynamicTypes.h>
12 #include <torch/csrc/THP.h>
13 #include <torch/csrc/autograd/utils/wrap_outputs.h>
14 #include <torch/csrc/copy_utils.h>
15
16 #include <c10/util/intrusive_ptr.h>
17 #include <fmt/format.h>
18
19 #include <torch/csrc/Storage.h>
20 #include <torch/csrc/StorageSharing.h>
21
22 #ifdef USE_CUDA
23 #include <c10/cuda/CUDAGuard.h>
24 #include <cuda.h>
25 #include <cuda_runtime.h>
26 #endif
27
28 #include <ATen/MapAllocator.h>
29 #include <ATen/StorageUtils.h>
30 #include <torch/csrc/utils/python_numbers.h>
31 #include <atomic>
32 #include <string>
33
THPStorage_sharedDecref(PyObject * self,PyObject * noargs)34 static PyObject* THPStorage_sharedDecref(PyObject* self, PyObject* noargs) {
35 HANDLE_TH_ERRORS
36 THPStorage_assertNotNull(self);
37 const auto& storage = THPStorage_Unpack(self);
38 c10::DeviceType device_type = storage.device_type();
39 if (device_type == at::kCPU) {
40 THManagedMapAllocator* ctx =
41 THManagedMapAllocator::fromDataPtr(storage.data_ptr());
42 if (ctx) {
43 ctx->decref();
44 }
45 }
46 Py_INCREF(self);
47 return self;
48 END_HANDLE_TH_ERRORS
49 }
50
THPStorage_sharedIncref(PyObject * self,PyObject * noargs)51 static PyObject* THPStorage_sharedIncref(PyObject* self, PyObject* noargs) {
52 HANDLE_TH_ERRORS
53 THPStorage_assertNotNull(self);
54 const auto& storage = THPStorage_Unpack(self);
55 c10::DeviceType device_type = storage.device_type();
56 if (device_type == at::kCPU) {
57 THManagedMapAllocator* ctx =
58 THManagedMapAllocator::fromDataPtr(storage.data_ptr());
59 if (ctx) {
60 ctx->incref();
61 }
62 }
63 Py_RETURN_NONE;
64 END_HANDLE_TH_ERRORS
65 }
66
THPStorage_pyNewFilenameStorage(PyObject * _unused,PyObject * args)67 static PyObject* THPStorage_pyNewFilenameStorage(
68 PyObject* _unused,
69 PyObject* args) {
70 HANDLE_TH_ERRORS
71 long long size = 0;
72 if (!PyArg_ParseTuple(args, "L", &size)) {
73 return nullptr;
74 }
75 if (size < 0) {
76 return nullptr;
77 }
78
79 int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
80 std::string handle = at::NewProcessWideShmHandle();
81 return THPStorage_NewWithStorage(
82 THPStorageClass,
83 c10::make_intrusive<at::StorageImpl>(
84 c10::StorageImpl::use_byte_size_t(),
85 size,
86 THManagedMapAllocator::makeDataPtr(
87 "", handle.c_str(), flags, static_cast<size_t>(size)),
88 /*allocator=*/nullptr,
89 /*resizable=*/false),
90 c10::impl::PyInterpreterStatus::TAGGED_BY_US);
91 END_HANDLE_TH_ERRORS
92 }
93
THPStorage_shareFilename(PyObject * self,PyObject * noargs)94 static PyObject* THPStorage_shareFilename(PyObject* self, PyObject* noargs) {
95 HANDLE_TH_ERRORS
96 THPStorage_assertNotNull(self);
97 const auto& storage = THPStorage_Unpack(self);
98 TORCH_CHECK(
99 storage.device_type() == at::kCPU,
100 "_share_filename_: only available on CPU");
101 THManagedMapAllocator* ctx =
102 THManagedMapAllocator::fromDataPtr(storage.data_ptr());
103 // Storage is already in shared memory, just return a handle
104 if (ctx) {
105 // done
106 } else {
107 // TODO: retry on collision
108 // TODO: free GIL - but remember to reacquire it when an exception is thrown
109 int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
110 std::string handle = at::NewProcessWideShmHandle();
111 // Create a new storage in shared memory
112 at::Storage new_storage(c10::make_intrusive<at::StorageImpl>(
113 c10::StorageImpl::use_byte_size_t(),
114 storage.nbytes(),
115 THManagedMapAllocator::makeDataPtr(
116 "", handle.c_str(), flags, storage.nbytes()),
117 /*allocator=*/nullptr,
118 /*resizable=*/false));
119
120 {
121 // Copying into shared memory can be slow, so release the GIL
122 pybind11::gil_scoped_release no_gil;
123 // Copy data from old storage into the new one
124 at::storage_copy(new_storage, storage);
125 }
126
127 // Replace the old data_ptr and allocator with the new ones
128 storage.set_data_ptr(std::move(new_storage.mutable_data_ptr()));
129 storage.unsafeGetStorageImpl()->set_allocator(new_storage.allocator());
130
131 ctx = THManagedMapAllocator::fromDataPtr(storage.data_ptr());
132 AT_ASSERT(ctx);
133 }
134
135 THPObjectPtr manager_handle(PyBytes_FromString(ctx->manager_handle()));
136 if (!manager_handle)
137 return nullptr;
138 THPObjectPtr storage_handle(PyBytes_FromString(ctx->filename()));
139 if (!storage_handle)
140 return nullptr;
141 THPObjectPtr size(THPUtils_packUInt64(storage.nbytes()));
142 if (!size)
143 return nullptr;
144
145 THPObjectPtr tuple(PyTuple_New(3));
146 if (!tuple)
147 return nullptr;
148 PyTuple_SET_ITEM(tuple.get(), 0, manager_handle.release());
149 PyTuple_SET_ITEM(tuple.get(), 1, storage_handle.release());
150 PyTuple_SET_ITEM(tuple.get(), 2, size.release());
151 return tuple.release();
152 END_HANDLE_TH_ERRORS
153 }
154
THPStorage_newSharedFilename(PyObject * _unused,PyObject * args)155 static PyObject* THPStorage_newSharedFilename(
156 PyObject* _unused,
157 PyObject* args) {
158 HANDLE_TH_ERRORS
159 TORCH_CHECK(PyTuple_GET_SIZE(args) == 3, "tuple of 3 items expected");
160 PyObject* _manager_handle = PyTuple_GET_ITEM(args, 0);
161 PyObject* _object_handle = PyTuple_GET_ITEM(args, 1);
162 PyObject* _size = PyTuple_GET_ITEM(args, 2);
163 if (!PyBytes_Check(_manager_handle) || !PyBytes_Check(_object_handle) ||
164 !THPUtils_checkLong(_size)) {
165 THPUtils_invalidArguments(
166 args,
167 nullptr,
168 "_new_shared in file system mode",
169 1,
170 "a handle (string/bytes) and storage size (int)");
171 return nullptr;
172 }
173 const char* manager_handle = PyBytes_AS_STRING(_manager_handle);
174 const char* object_handle = PyBytes_AS_STRING(_object_handle);
175 uint64_t size = THPUtils_unpackUInt64(_size);
176 int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
177 return THPStorage_NewWithStorage(
178 THPStorageClass,
179 c10::make_intrusive<at::StorageImpl>(
180 c10::StorageImpl::use_byte_size_t(),
181 size,
182 THManagedMapAllocator::makeDataPtr(
183 manager_handle, object_handle, flags, size),
184 /*allocator=*/nullptr,
185 /*resizable=*/false),
186 c10::impl::PyInterpreterStatus::TAGGED_BY_US);
187 END_HANDLE_TH_ERRORS
188 }
189
THPStorage_pyNewFdStorage(PyObject * _unused,PyObject * args)190 static PyObject* THPStorage_pyNewFdStorage(PyObject* _unused, PyObject* args) {
191 HANDLE_TH_ERRORS
192 long long size = 0;
193 if (!PyArg_ParseTuple(args, "L", &size)) {
194 return nullptr;
195 }
196 if (size < 0) {
197 return nullptr;
198 }
199 return THPStorage_NewWithStorage(
200 THPStorageClass,
201 at::new_shm_fd_storage(size),
202 c10::impl::PyInterpreterStatus::TAGGED_BY_US);
203 END_HANDLE_TH_ERRORS
204 }
205
THPStorage_shareFd(PyObject * self,PyObject * noargs)206 static PyObject* THPStorage_shareFd(PyObject* self, PyObject* noargs) {
207 HANDLE_TH_ERRORS
208 THPStorage_assertNotNull(self);
209 const auto& storage = THPStorage_Unpack(self);
210 TORCH_CHECK(
211 storage.device_type() == at::kCPU, "_share_fd_: only available on CPU");
212 at::MapAllocator* ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
213 // Storage is already in shared memory, just return a handle
214 if (ctx) {
215 // done
216 } else {
217 at::Storage new_storage(at::new_shm_fd_storage(storage.nbytes()));
218 {
219 // Copying into shared memory can be slow, so release the GIL
220 pybind11::gil_scoped_release no_gil;
221 // Copy data from old storage into the new one
222 at::storage_copy(new_storage, storage);
223 }
224
225 // Replace the old data_ptr and allocator with the new ones
226 storage.set_data_ptr(std::move(new_storage.mutable_data_ptr()));
227 storage.unsafeGetStorageImpl()->set_allocator(new_storage.allocator());
228
229 ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
230 AT_ASSERT(ctx);
231 }
232
233 THPObjectPtr storage_handle(THPUtils_packInt32(ctx->fd()));
234 if (!storage_handle)
235 return nullptr;
236 THPObjectPtr size(THPUtils_packUInt64(storage.nbytes()));
237 if (!size)
238 return nullptr;
239
240 THPObjectPtr tuple(PyTuple_New(2));
241 if (!tuple)
242 return nullptr;
243 PyTuple_SET_ITEM(tuple.get(), 0, storage_handle.release());
244 PyTuple_SET_ITEM(tuple.get(), 1, size.release());
245 return tuple.release();
246 END_HANDLE_TH_ERRORS
247 }
248
THPStorage_newSharedFd(PyObject * _unused,PyObject * args)249 static PyObject* THPStorage_newSharedFd(PyObject* _unused, PyObject* args) {
250 HANDLE_TH_ERRORS
251 TORCH_CHECK(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
252 PyObject* _tmp_fd = PyTuple_GET_ITEM(args, 0);
253 PyObject* _size = PyTuple_GET_ITEM(args, 1);
254 if (!THPUtils_checkLong(_tmp_fd) || !THPUtils_checkLong(_size)) {
255 THPUtils_invalidArguments(
256 args,
257 nullptr,
258 "_new_shared in file descriptor mode",
259 1,
260 "a file descriptor (int) and storage size (int)");
261 return nullptr;
262 }
263 int tmp_fd = (int)THPUtils_unpackLong(_tmp_fd);
264 int64_t size = THPUtils_unpackLong(_size);
265 int fd = dup(tmp_fd);
266 if (fd == -1) {
267 THPUtils_setError("could not duplicate a shared memory file descriptor");
268 return nullptr;
269 }
270
271 int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE |
272 at::ALLOCATOR_MAPPED_KEEPFD | at::ALLOCATOR_MAPPED_FROMFD;
273 return THPStorage_NewWithStorage(
274 THPStorageClass,
275 c10::make_intrusive<at::StorageImpl>(
276 c10::StorageImpl::use_byte_size_t(),
277 size,
278 at::MapAllocator::makeDataPtr(
279 at::WITH_FD, "", fd, flags, size, nullptr),
280 /*allocator=*/nullptr,
281 /*resizable=*/false),
282 c10::impl::PyInterpreterStatus::TAGGED_BY_US);
283 END_HANDLE_TH_ERRORS
284 }
285
THPStorage_shareCuda(PyObject * self,PyObject * noargs)286 static PyObject* THPStorage_shareCuda(PyObject* self, PyObject* noargs) {
287 HANDLE_TH_ERRORS
288 THPStorage_assertNotNull(self);
289 #ifdef USE_CUDA
290 const auto& storage = THPStorage_Unpack(self);
291 TORCH_CHECK(
292 storage.device_type() == at::kCUDA,
293 "_share_cuda_: only available on CUDA");
294 c10::StorageImpl* storage_impl = storage.unsafeGetStorageImpl();
295
296 if (storage_impl->received_cuda()) {
297 AT_ERROR(
298 "Attempted to send CUDA tensor received from another process; this is not currently supported. Consider cloning before sending.");
299 }
300
301 at::DeviceGuard device_guard(storage.device());
302 THPObjectPtr tuple(PyTuple_New(8));
303 THPObjectPtr device(THPUtils_packInt32(storage.device().index()));
304 THPObjectPtr _handle(Py_None);
305 Py_INCREF(Py_None);
306 THPObjectPtr size_bytes(THPUtils_packUInt64(storage.nbytes()));
307 THPObjectPtr _offset_bytes(THPUtils_packInt32(0));
308 THPObjectPtr _ref_counter(Py_None);
309 Py_INCREF(Py_None);
310 THPObjectPtr _ref_counter_offset(THPUtils_packInt32(0));
311 THPObjectPtr _event_handle(Py_None);
312 Py_INCREF(Py_None);
313 THPObjectPtr _event_sync_required(Py_None);
314 Py_INCREF(Py_None);
315 if (storage.data()) {
316 // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
317 auto shandle =
318 c10::cuda::CUDACachingAllocator::shareIpcHandle(storage.mutable_data());
319 _handle = PyBytes_FromStringAndSize(
320 shandle.handle.c_str(), (Py_ssize_t)shandle.handle.size());
321 _offset_bytes = PyLong_FromSsize_t((Py_ssize_t)shandle.offset);
322
323 // Put Storage Data behind new ref counting context
324 // See Note [CUDA IPC Refcounting implementation explained]
325 at::DataPtr sent_data_ptr = torch::GetNewRefCountedSentData(
326 storage.mutable_data(), storage.device());
327 auto old_data_ptr = storage.set_data_ptr(std::move(sent_data_ptr));
328 auto sent_data =
329 static_cast<torch::CudaIPCSentData*>(storage.data_ptr().get_context());
330 sent_data->set_original_ptr(std::move(old_data_ptr));
331 _ref_counter = PyBytes_FromString((sent_data->handle()).c_str());
332 _ref_counter_offset = THPUtils_packUInt64(sent_data->offset());
333
334 // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
335 cudaIpcEventHandle_t ipc_event_handle;
336
337 if (sent_data->event_sync_required_) {
338 C10_CUDA_CHECK(
339 cudaIpcGetEventHandle(&ipc_event_handle, sent_data->event_));
340 }
341
342 _event_handle = PyBytes_FromStringAndSize(
343 (char*)&ipc_event_handle, CUDA_IPC_HANDLE_SIZE);
344 _event_sync_required = PyBool_FromLong(sent_data->event_sync_required_);
345 }
346
347 if (!tuple || !device || !_handle || !size_bytes || !_offset_bytes ||
348 !_event_handle) {
349 return nullptr;
350 }
351 PyTuple_SET_ITEM(tuple.get(), 0, device.release());
352 // cudaIpcMemHandle_t(of basePtr)
353 PyTuple_SET_ITEM(tuple.get(), 1, _handle.release());
354 // Size(in bytes) of the real storage, note this is not the size of basePtr
355 // memory block.
356 PyTuple_SET_ITEM(tuple.get(), 2, size_bytes.release());
357 // Offset(in bytes) of the real storage in the basePtr memory block.
358 // NB: this offset MUST be in bytes instead of numel, since we use
359 // (storage_handle, offset)
360 // as key in shared_cache(multiprocessing/reduction.py).
361 // Offset in numel cannot uniquely represent a storage.
362 PyTuple_SET_ITEM(tuple.get(), 3, _offset_bytes.release());
363 PyTuple_SET_ITEM(tuple.get(), 4, _ref_counter.release());
364 PyTuple_SET_ITEM(tuple.get(), 5, _ref_counter_offset.release());
365 PyTuple_SET_ITEM(tuple.get(), 6, _event_handle.release());
366 PyTuple_SET_ITEM(tuple.get(), 7, _event_sync_required.release());
367 return tuple.release();
368 #else
369 TORCH_CHECK(false, "CUDA is not available");
370 #endif
371 END_HANDLE_TH_ERRORS
372 }
373
THPStorage_releaseIPCCounter(PyObject * _unused,PyObject * args)374 static PyObject* THPStorage_releaseIPCCounter(
375 PyObject* _unused,
376 PyObject* args) {
377 HANDLE_TH_ERRORS
378 #ifdef USE_CUDA
379 TORCH_CHECK(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
380 PyObject* _ref_counter = PyTuple_GET_ITEM(args, 0);
381 PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 1);
382 if (!(PyBytes_Check(_ref_counter) &&
383 THPUtils_checkLong(_ref_counter_offset))) {
384 THPUtils_invalidArguments(
385 args,
386 nullptr,
387 "_release_ipc_counter in CUDA mode",
388 1,
389 "(bytes _ref_counter, int _ref_counter_offset)");
390 return nullptr;
391 }
392 std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
393 ptrdiff_t ref_counter_offset =
394 (ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
395 // We don't want to break existing code, so resource deletion is best
396 // effort basis. Exception expected if producer process terminated
397 // before consumer released data.
398 int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
399 try {
400 auto sptr = at::RefcountedMapAllocator::makeDataPtr(
401 ref_counter_handle.c_str(),
402 flags,
403 sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
404 nullptr);
405 *(static_cast<int64_t*>(sptr.get()) + ref_counter_offset) -= 1;
406 } catch (c10::Error& err) {
407 // Already warned inside of producer process
408 }
409 Py_RETURN_NONE;
410 #else
411 TORCH_CHECK(false, "CUDA is not available");
412 #endif
413 END_HANDLE_TH_ERRORS
414 }
415
416 #ifdef USE_CUDA
THPStorage_bytesAsHandleString(PyObject * handle)417 static std::string THPStorage_bytesAsHandleString(PyObject* handle) {
418 HANDLE_TH_ERRORS
419 char* buffer = nullptr;
420 Py_ssize_t handle_size = 0;
421 if (PyBytes_AsStringAndSize(handle, &buffer, &handle_size) == -1) {
422 TORCH_CHECK(handle_size == CUDA_IPC_HANDLE_SIZE, "incorrect handle");
423 }
424 return std::string(buffer, handle_size);
425 END_HANDLE_TH_ERRORS_RET("")
426 }
427 #endif
428
THPStorage_newSharedCuda(PyObject * _unused,PyObject * args)429 static PyObject* THPStorage_newSharedCuda(PyObject* _unused, PyObject* args) {
430 HANDLE_TH_ERRORS
431 #ifdef USE_CUDA
432 TORCH_CHECK(PyTuple_GET_SIZE(args) == 8, "tuple of 8 items expected");
433 PyObject* _device = PyTuple_GET_ITEM(args, 0);
434 PyObject* _handle = PyTuple_GET_ITEM(args, 1);
435 PyObject* _size_bytes = PyTuple_GET_ITEM(args, 2);
436 PyObject* _offset_bytes = PyTuple_GET_ITEM(args, 3);
437 PyObject* _ref_counter = PyTuple_GET_ITEM(args, 4);
438 PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 5);
439 PyObject* _event_handle = PyTuple_GET_ITEM(args, 6);
440 PyObject* _event_sync_required = PyTuple_GET_ITEM(args, 7);
441 if (!(THPUtils_checkLong(_device) && THPUtils_checkLong(_size_bytes) &&
442 PyBytes_Check(_handle) && PyBytes_Check(_ref_counter) &&
443 PyBytes_Check(_event_handle) && THPUtils_checkLong(_offset_bytes) &&
444 THPUtils_checkLong(_ref_counter_offset) &&
445 PyBool_Check(_event_sync_required))) {
446 THPUtils_invalidArguments(
447 args,
448 nullptr,
449 "_new_shared in CUDA mode",
450 1,
451 "(int device, bytes handle, int storage_size_bytes, int storage_offset_bytes, bytes _ref_counter, int _ref_counter_offset, bytes event_handle, bool event_sync_required)");
452 return nullptr;
453 }
454
455 size_t storage_size =
456 (size_t)THPUtils_unpackLong(_size_bytes) / sizeof(uint8_t);
457 ptrdiff_t storage_offset_bytes =
458 (ptrdiff_t)THPUtils_unpackLong(_offset_bytes);
459
460 const auto device = c10::checked_convert<c10::DeviceIndex>(
461 THPUtils_unpackLong(_device), "c10::DeviceIndex");
462 at::cuda::CUDAGuard device_guard(device);
463
464 if (PyObject_IsTrue(_event_sync_required)) {
465 // Ensure that producer prepared all tensor's data
466 std::string s_ipc_event_handle =
467 THPStorage_bytesAsHandleString(_event_handle);
468 if (s_ipc_event_handle.empty()) {
469 return nullptr;
470 }
471 auto ipc_event_handle = reinterpret_cast<const cudaIpcEventHandle_t*>(
472 s_ipc_event_handle.c_str());
473 // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
474 cudaEvent_t event;
475 cudaIpcOpenEventHandle(&event, *ipc_event_handle);
476 C10_CUDA_CHECK(
477 cudaStreamWaitEvent(c10::cuda::getCurrentCUDAStream(device), event, 0));
478 }
479
480 std::string s_handle = THPStorage_bytesAsHandleString(_handle);
481 if (s_handle.empty()) {
482 return nullptr;
483 }
484 std::shared_ptr<void> basePtr =
485 c10::cuda::CUDACachingAllocator::getIpcDevPtr(s_handle);
486
487 // Offset the basePtr to reconstruct the real storage
488 // devPtr = basePtr + storage_offset
489 void* devPtr = basePtr.get();
490 devPtr = (char*)devPtr + storage_offset_bytes;
491
492 std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
493 ptrdiff_t ref_counter_offset =
494 (ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
495
496 struct IpcDeleterContext {
497 std::string ref_counter_handle;
498 ptrdiff_t ref_counter_offset{};
499 c10::DeviceIndex device{-1};
500 torch::CudaIPCReceivedData received_data;
501 };
502
503 auto ctx = std::make_unique<IpcDeleterContext>();
504 ctx->ref_counter_handle = std::move(ref_counter_handle);
505 ctx->ref_counter_offset = ref_counter_offset;
506 ctx->device = device;
507 ctx->received_data.shared_ptr_ = std::move(basePtr);
508
509 auto cur_device = at::cuda::current_device();
510 c10::DataPtr data_ptr(
511 devPtr,
512 ctx.release(),
513 +[](void* ctx_) {
514 std::unique_ptr<IpcDeleterContext> ctx(
515 static_cast<IpcDeleterContext*>(ctx_));
516 ctx->received_data.shared_ptr_.reset();
517
518 // Sync default stream to make sure all operations related to the
519 // storage is finished (otherwise another process may reuse memory and
520 // corrupt data)
521
522 // Ideally all shared memory reference counting could be replaced by
523 // sending untriggered CUDA event from the producer to consumer and
524 // using this event as the criteria of memory release. However, CUDA
525 // (atm 10.1) does not support the creation of untriggered events and
526 // performance impact of having thousands of shared events is unknown.
527
528 // TODO: Instead of cudaStreamSynchronize it is possible to add Stream
529 // Callback and release counter inside of it (need to check performance
530 // impact)
531 at::cuda::stream_synchronize(
532 c10::cuda::getCurrentCUDAStream(ctx->device));
533
534 // We don't want to break existing code, so resource deletion is best
535 // effort basis. Exception expected if producer process terminated
536 // before consumer released data.
537 int flags =
538 at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
539 try {
540 auto sptr = at::RefcountedMapAllocator::makeDataPtr(
541 ctx->ref_counter_handle.c_str(),
542 flags,
543 sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
544 nullptr);
545 *(static_cast<int64_t*>(sptr.get()) + ctx->ref_counter_offset) -= 1;
546 } catch (c10::Error& err) {
547 // Already warned inside of producer process
548 }
549 },
550 at::Device(at::DeviceType::CUDA, cur_device));
551
552 auto base = c10::make_intrusive<at::StorageImpl>(
553 c10::StorageImpl::use_byte_size_t(),
554 storage_size,
555 std::move(data_ptr),
556 /*allocator=*/nullptr,
557 /*resizable=*/false);
558
559 base->set_resizable(false);
560 base->set_received_cuda(true);
561
562 return THPStorage_NewWithStorage(
563 THPStorageClass,
564 std::move(base),
565 c10::impl::PyInterpreterStatus::TAGGED_BY_US);
566 #else
567 TORCH_CHECK(false, "CUDA is not available");
568 #endif
569 END_HANDLE_TH_ERRORS
570 }
571
572 // Returns an object that holds a "weak" pointer to the c10::StorageImpl. This
573 // pointer keeps the c10::StorageImpl struct live, but does not retain the data
574 // pointer.
575 //
576 // NB: This does NOT preserve object identity when you call it multiple times
THPStorage_weakRef(PyObject * self,PyObject * args)577 static PyObject* THPStorage_weakRef(PyObject* self, PyObject* args) {
578 HANDLE_TH_ERRORS
579 c10::StorageImpl* storage = THPStorage_Unpack(self).unsafeGetStorageImpl();
580 return PyLong_FromVoidPtr(c10::raw::intrusive_ptr::make_weak(storage));
581 END_HANDLE_TH_ERRORS
582 }
583
THPStorage_newWithWeakPtr(PyObject * _unused,PyObject * arg)584 PyObject* THPStorage_newWithWeakPtr(PyObject* _unused, PyObject* arg) {
585 HANDLE_TH_ERRORS
586 TORCH_CHECK(
587 THPUtils_checkLong(arg), "_new_with_weak_ptr(): arg must be an 'int'");
588 c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
589 if (auto* storage = c10::raw::weak_intrusive_ptr::lock(weak_storage)) {
590 return THPStorage_Wrap(
591 c10::intrusive_ptr<c10::StorageImpl>::reclaim(storage));
592 }
593 Py_RETURN_NONE;
594 END_HANDLE_TH_ERRORS
595 }
596
THPStorage_freeWeakRef(PyObject * _unused,PyObject * arg)597 PyObject* THPStorage_freeWeakRef(PyObject* _unused, PyObject* arg) {
598 HANDLE_TH_ERRORS
599 if (arg == Py_None) {
600 Py_RETURN_NONE;
601 }
602 TORCH_CHECK(
603 THPUtils_checkLong(arg), "_free_weak_ref(): arg must be an 'int'");
604 c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
605 c10::raw::weak_intrusive_ptr::decref(weak_storage);
606
607 Py_RETURN_NONE;
608 END_HANDLE_TH_ERRORS
609 }
610
THPStorage_expired(PyObject * _unused,PyObject * arg)611 PyObject* THPStorage_expired(PyObject* _unused, PyObject* arg) {
612 HANDLE_TH_ERRORS
613 TORCH_CHECK(THPUtils_checkLong(arg), "_expired(): arg must be an 'int'");
614 c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
615 return PyBool_FromLong(
616 c10::raw::weak_intrusive_ptr::use_count(weak_storage) == 0);
617 END_HANDLE_TH_ERRORS
618 }
619
THPStorage_sharedFd(PyObject * self,PyObject * noargs)620 PyObject* THPStorage_sharedFd(PyObject* self, PyObject* noargs) {
621 HANDLE_TH_ERRORS
622 THPStorage_assertNotNull(self);
623 at::MapAllocator* ctx = nullptr;
624 const auto& storage = THPStorage_Unpack(self);
625 if (storage.device_type() == at::kCPU) {
626 ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
627 }
628
629 TORCH_CHECK(ctx, "couldn't retrieve a shared file descriptor");
630 return THPUtils_packInt32(ctx->fd());
631 END_HANDLE_TH_ERRORS
632 }
633
THPStorage_isShared(PyObject * self,PyObject * noargs)634 PyObject* THPStorage_isShared(PyObject* self, PyObject* noargs) {
635 const auto& storage = THPStorage_Unpack(self);
636 if (storage.device_type() == at::kCUDA) {
637 Py_RETURN_TRUE;
638 }
639 if (at::MapAllocator::fromDataPtr(storage.data_ptr()) ||
640 THManagedMapAllocator::fromDataPtr(storage.data_ptr())) {
641 Py_RETURN_TRUE;
642 } else {
643 Py_RETURN_FALSE;
644 }
645 }
646
647 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
648 static PyMethodDef THPStorage_sharingMethods[] = {
649 {"_new_with_weak_ptr",
650 THPStorage_newWithWeakPtr,
651 METH_O | METH_CLASS,
652 nullptr},
653 {"_share_cuda_", THPStorage_shareCuda, METH_NOARGS, nullptr},
654 {"_new_shared_cuda",
655 THPStorage_newSharedCuda,
656 METH_VARARGS | METH_STATIC,
657 nullptr},
658 {"_release_ipc_counter_cuda",
659 THPStorage_releaseIPCCounter,
660 METH_VARARGS | METH_STATIC,
661 nullptr},
662 {"_share_fd_cpu_", THPStorage_shareFd, METH_NOARGS, nullptr},
663 {"_new_shared_fd_cpu",
664 THPStorage_newSharedFd,
665 METH_VARARGS | METH_STATIC,
666 nullptr},
667 {"_new_using_fd_cpu",
668 THPStorage_pyNewFdStorage,
669 METH_VARARGS | METH_STATIC,
670 nullptr},
671 {"_share_filename_cpu_", THPStorage_shareFilename, METH_NOARGS, nullptr},
672 {"_new_shared_filename_cpu",
673 THPStorage_newSharedFilename,
674 METH_VARARGS | METH_STATIC,
675 nullptr},
676 {"_new_using_filename_cpu",
677 THPStorage_pyNewFilenameStorage,
678 METH_VARARGS | METH_STATIC,
679 nullptr},
680 {"_weak_ref", THPStorage_weakRef, METH_NOARGS, nullptr},
681 {"_free_weak_ref", THPStorage_freeWeakRef, METH_O | METH_STATIC, nullptr},
682 {"_expired", THPStorage_expired, METH_O | METH_STATIC, nullptr},
683 {"_shared_decref", THPStorage_sharedDecref, METH_NOARGS, nullptr},
684 {"_shared_incref", THPStorage_sharedIncref, METH_NOARGS, nullptr},
685 {"_get_shared_fd", THPStorage_sharedFd, METH_NOARGS, nullptr},
686 {"is_shared", THPStorage_isShared, METH_NOARGS, nullptr},
687 {nullptr}};
688
THPStorage_getSharingMethods()689 PyMethodDef* THPStorage_getSharingMethods() {
690 return THPStorage_sharingMethods;
691 }
692