xref: /aosp_15_r20/external/pytorch/torch/csrc/StorageSharing.cpp (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1 #include <torch/csrc/python_headers.h>
2 #ifdef _MSC_VER
3 #include <c10/util/win32-headers.h>
4 #endif
5 #include <structmember.h>
6 
7 #include <c10/core/CPUAllocator.h>
8 #include <libshm.h>
9 #include <torch/csrc/CudaIPCTypes.h>
10 #include <torch/csrc/Device.h>
11 #include <torch/csrc/DynamicTypes.h>
12 #include <torch/csrc/THP.h>
13 #include <torch/csrc/autograd/utils/wrap_outputs.h>
14 #include <torch/csrc/copy_utils.h>
15 
16 #include <c10/util/intrusive_ptr.h>
17 #include <fmt/format.h>
18 
19 #include <torch/csrc/Storage.h>
20 #include <torch/csrc/StorageSharing.h>
21 
22 #ifdef USE_CUDA
23 #include <c10/cuda/CUDAGuard.h>
24 #include <cuda.h>
25 #include <cuda_runtime.h>
26 #endif
27 
28 #include <ATen/MapAllocator.h>
29 #include <ATen/StorageUtils.h>
30 #include <torch/csrc/utils/python_numbers.h>
31 #include <atomic>
32 #include <string>
33 
THPStorage_sharedDecref(PyObject * self,PyObject * noargs)34 static PyObject* THPStorage_sharedDecref(PyObject* self, PyObject* noargs) {
35   HANDLE_TH_ERRORS
36   THPStorage_assertNotNull(self);
37   const auto& storage = THPStorage_Unpack(self);
38   c10::DeviceType device_type = storage.device_type();
39   if (device_type == at::kCPU) {
40     THManagedMapAllocator* ctx =
41         THManagedMapAllocator::fromDataPtr(storage.data_ptr());
42     if (ctx) {
43       ctx->decref();
44     }
45   }
46   Py_INCREF(self);
47   return self;
48   END_HANDLE_TH_ERRORS
49 }
50 
THPStorage_sharedIncref(PyObject * self,PyObject * noargs)51 static PyObject* THPStorage_sharedIncref(PyObject* self, PyObject* noargs) {
52   HANDLE_TH_ERRORS
53   THPStorage_assertNotNull(self);
54   const auto& storage = THPStorage_Unpack(self);
55   c10::DeviceType device_type = storage.device_type();
56   if (device_type == at::kCPU) {
57     THManagedMapAllocator* ctx =
58         THManagedMapAllocator::fromDataPtr(storage.data_ptr());
59     if (ctx) {
60       ctx->incref();
61     }
62   }
63   Py_RETURN_NONE;
64   END_HANDLE_TH_ERRORS
65 }
66 
THPStorage_pyNewFilenameStorage(PyObject * _unused,PyObject * args)67 static PyObject* THPStorage_pyNewFilenameStorage(
68     PyObject* _unused,
69     PyObject* args) {
70   HANDLE_TH_ERRORS
71   long long size = 0;
72   if (!PyArg_ParseTuple(args, "L", &size)) {
73     return nullptr;
74   }
75   if (size < 0) {
76     return nullptr;
77   }
78 
79   int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
80   std::string handle = at::NewProcessWideShmHandle();
81   return THPStorage_NewWithStorage(
82       THPStorageClass,
83       c10::make_intrusive<at::StorageImpl>(
84           c10::StorageImpl::use_byte_size_t(),
85           size,
86           THManagedMapAllocator::makeDataPtr(
87               "", handle.c_str(), flags, static_cast<size_t>(size)),
88           /*allocator=*/nullptr,
89           /*resizable=*/false),
90       c10::impl::PyInterpreterStatus::TAGGED_BY_US);
91   END_HANDLE_TH_ERRORS
92 }
93 
THPStorage_shareFilename(PyObject * self,PyObject * noargs)94 static PyObject* THPStorage_shareFilename(PyObject* self, PyObject* noargs) {
95   HANDLE_TH_ERRORS
96   THPStorage_assertNotNull(self);
97   const auto& storage = THPStorage_Unpack(self);
98   TORCH_CHECK(
99       storage.device_type() == at::kCPU,
100       "_share_filename_: only available on CPU");
101   THManagedMapAllocator* ctx =
102       THManagedMapAllocator::fromDataPtr(storage.data_ptr());
103   // Storage is already in shared memory, just return a handle
104   if (ctx) {
105     // done
106   } else {
107     // TODO: retry on collision
108     // TODO: free GIL - but remember to reacquire it when an exception is thrown
109     int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
110     std::string handle = at::NewProcessWideShmHandle();
111     // Create a new storage in shared memory
112     at::Storage new_storage(c10::make_intrusive<at::StorageImpl>(
113         c10::StorageImpl::use_byte_size_t(),
114         storage.nbytes(),
115         THManagedMapAllocator::makeDataPtr(
116             "", handle.c_str(), flags, storage.nbytes()),
117         /*allocator=*/nullptr,
118         /*resizable=*/false));
119 
120     {
121       // Copying into shared memory can be slow, so release the GIL
122       pybind11::gil_scoped_release no_gil;
123       // Copy data from old storage into the new one
124       at::storage_copy(new_storage, storage);
125     }
126 
127     // Replace the old data_ptr and allocator with the new ones
128     storage.set_data_ptr(std::move(new_storage.mutable_data_ptr()));
129     storage.unsafeGetStorageImpl()->set_allocator(new_storage.allocator());
130 
131     ctx = THManagedMapAllocator::fromDataPtr(storage.data_ptr());
132     AT_ASSERT(ctx);
133   }
134 
135   THPObjectPtr manager_handle(PyBytes_FromString(ctx->manager_handle()));
136   if (!manager_handle)
137     return nullptr;
138   THPObjectPtr storage_handle(PyBytes_FromString(ctx->filename()));
139   if (!storage_handle)
140     return nullptr;
141   THPObjectPtr size(THPUtils_packUInt64(storage.nbytes()));
142   if (!size)
143     return nullptr;
144 
145   THPObjectPtr tuple(PyTuple_New(3));
146   if (!tuple)
147     return nullptr;
148   PyTuple_SET_ITEM(tuple.get(), 0, manager_handle.release());
149   PyTuple_SET_ITEM(tuple.get(), 1, storage_handle.release());
150   PyTuple_SET_ITEM(tuple.get(), 2, size.release());
151   return tuple.release();
152   END_HANDLE_TH_ERRORS
153 }
154 
THPStorage_newSharedFilename(PyObject * _unused,PyObject * args)155 static PyObject* THPStorage_newSharedFilename(
156     PyObject* _unused,
157     PyObject* args) {
158   HANDLE_TH_ERRORS
159   TORCH_CHECK(PyTuple_GET_SIZE(args) == 3, "tuple of 3 items expected");
160   PyObject* _manager_handle = PyTuple_GET_ITEM(args, 0);
161   PyObject* _object_handle = PyTuple_GET_ITEM(args, 1);
162   PyObject* _size = PyTuple_GET_ITEM(args, 2);
163   if (!PyBytes_Check(_manager_handle) || !PyBytes_Check(_object_handle) ||
164       !THPUtils_checkLong(_size)) {
165     THPUtils_invalidArguments(
166         args,
167         nullptr,
168         "_new_shared in file system mode",
169         1,
170         "a handle (string/bytes) and storage size (int)");
171     return nullptr;
172   }
173   const char* manager_handle = PyBytes_AS_STRING(_manager_handle);
174   const char* object_handle = PyBytes_AS_STRING(_object_handle);
175   uint64_t size = THPUtils_unpackUInt64(_size);
176   int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
177   return THPStorage_NewWithStorage(
178       THPStorageClass,
179       c10::make_intrusive<at::StorageImpl>(
180           c10::StorageImpl::use_byte_size_t(),
181           size,
182           THManagedMapAllocator::makeDataPtr(
183               manager_handle, object_handle, flags, size),
184           /*allocator=*/nullptr,
185           /*resizable=*/false),
186       c10::impl::PyInterpreterStatus::TAGGED_BY_US);
187   END_HANDLE_TH_ERRORS
188 }
189 
THPStorage_pyNewFdStorage(PyObject * _unused,PyObject * args)190 static PyObject* THPStorage_pyNewFdStorage(PyObject* _unused, PyObject* args) {
191   HANDLE_TH_ERRORS
192   long long size = 0;
193   if (!PyArg_ParseTuple(args, "L", &size)) {
194     return nullptr;
195   }
196   if (size < 0) {
197     return nullptr;
198   }
199   return THPStorage_NewWithStorage(
200       THPStorageClass,
201       at::new_shm_fd_storage(size),
202       c10::impl::PyInterpreterStatus::TAGGED_BY_US);
203   END_HANDLE_TH_ERRORS
204 }
205 
THPStorage_shareFd(PyObject * self,PyObject * noargs)206 static PyObject* THPStorage_shareFd(PyObject* self, PyObject* noargs) {
207   HANDLE_TH_ERRORS
208   THPStorage_assertNotNull(self);
209   const auto& storage = THPStorage_Unpack(self);
210   TORCH_CHECK(
211       storage.device_type() == at::kCPU, "_share_fd_: only available on CPU");
212   at::MapAllocator* ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
213   // Storage is already in shared memory, just return a handle
214   if (ctx) {
215     // done
216   } else {
217     at::Storage new_storage(at::new_shm_fd_storage(storage.nbytes()));
218     {
219       // Copying into shared memory can be slow, so release the GIL
220       pybind11::gil_scoped_release no_gil;
221       // Copy data from old storage into the new one
222       at::storage_copy(new_storage, storage);
223     }
224 
225     // Replace the old data_ptr and allocator with the new ones
226     storage.set_data_ptr(std::move(new_storage.mutable_data_ptr()));
227     storage.unsafeGetStorageImpl()->set_allocator(new_storage.allocator());
228 
229     ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
230     AT_ASSERT(ctx);
231   }
232 
233   THPObjectPtr storage_handle(THPUtils_packInt32(ctx->fd()));
234   if (!storage_handle)
235     return nullptr;
236   THPObjectPtr size(THPUtils_packUInt64(storage.nbytes()));
237   if (!size)
238     return nullptr;
239 
240   THPObjectPtr tuple(PyTuple_New(2));
241   if (!tuple)
242     return nullptr;
243   PyTuple_SET_ITEM(tuple.get(), 0, storage_handle.release());
244   PyTuple_SET_ITEM(tuple.get(), 1, size.release());
245   return tuple.release();
246   END_HANDLE_TH_ERRORS
247 }
248 
THPStorage_newSharedFd(PyObject * _unused,PyObject * args)249 static PyObject* THPStorage_newSharedFd(PyObject* _unused, PyObject* args) {
250   HANDLE_TH_ERRORS
251   TORCH_CHECK(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
252   PyObject* _tmp_fd = PyTuple_GET_ITEM(args, 0);
253   PyObject* _size = PyTuple_GET_ITEM(args, 1);
254   if (!THPUtils_checkLong(_tmp_fd) || !THPUtils_checkLong(_size)) {
255     THPUtils_invalidArguments(
256         args,
257         nullptr,
258         "_new_shared in file descriptor mode",
259         1,
260         "a file descriptor (int) and storage size (int)");
261     return nullptr;
262   }
263   int tmp_fd = (int)THPUtils_unpackLong(_tmp_fd);
264   int64_t size = THPUtils_unpackLong(_size);
265   int fd = dup(tmp_fd);
266   if (fd == -1) {
267     THPUtils_setError("could not duplicate a shared memory file descriptor");
268     return nullptr;
269   }
270 
271   int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE |
272       at::ALLOCATOR_MAPPED_KEEPFD | at::ALLOCATOR_MAPPED_FROMFD;
273   return THPStorage_NewWithStorage(
274       THPStorageClass,
275       c10::make_intrusive<at::StorageImpl>(
276           c10::StorageImpl::use_byte_size_t(),
277           size,
278           at::MapAllocator::makeDataPtr(
279               at::WITH_FD, "", fd, flags, size, nullptr),
280           /*allocator=*/nullptr,
281           /*resizable=*/false),
282       c10::impl::PyInterpreterStatus::TAGGED_BY_US);
283   END_HANDLE_TH_ERRORS
284 }
285 
THPStorage_shareCuda(PyObject * self,PyObject * noargs)286 static PyObject* THPStorage_shareCuda(PyObject* self, PyObject* noargs) {
287   HANDLE_TH_ERRORS
288   THPStorage_assertNotNull(self);
289 #ifdef USE_CUDA
290   const auto& storage = THPStorage_Unpack(self);
291   TORCH_CHECK(
292       storage.device_type() == at::kCUDA,
293       "_share_cuda_: only available on CUDA");
294   c10::StorageImpl* storage_impl = storage.unsafeGetStorageImpl();
295 
296   if (storage_impl->received_cuda()) {
297     AT_ERROR(
298         "Attempted to send CUDA tensor received from another process; this is not currently supported. Consider cloning before sending.");
299   }
300 
301   at::DeviceGuard device_guard(storage.device());
302   THPObjectPtr tuple(PyTuple_New(8));
303   THPObjectPtr device(THPUtils_packInt32(storage.device().index()));
304   THPObjectPtr _handle(Py_None);
305   Py_INCREF(Py_None);
306   THPObjectPtr size_bytes(THPUtils_packUInt64(storage.nbytes()));
307   THPObjectPtr _offset_bytes(THPUtils_packInt32(0));
308   THPObjectPtr _ref_counter(Py_None);
309   Py_INCREF(Py_None);
310   THPObjectPtr _ref_counter_offset(THPUtils_packInt32(0));
311   THPObjectPtr _event_handle(Py_None);
312   Py_INCREF(Py_None);
313   THPObjectPtr _event_sync_required(Py_None);
314   Py_INCREF(Py_None);
315   if (storage.data()) {
316     // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
317     auto shandle =
318         c10::cuda::CUDACachingAllocator::shareIpcHandle(storage.mutable_data());
319     _handle = PyBytes_FromStringAndSize(
320         shandle.handle.c_str(), (Py_ssize_t)shandle.handle.size());
321     _offset_bytes = PyLong_FromSsize_t((Py_ssize_t)shandle.offset);
322 
323     // Put Storage Data behind new ref counting context
324     // See Note [CUDA IPC Refcounting implementation explained]
325     at::DataPtr sent_data_ptr = torch::GetNewRefCountedSentData(
326         storage.mutable_data(), storage.device());
327     auto old_data_ptr = storage.set_data_ptr(std::move(sent_data_ptr));
328     auto sent_data =
329         static_cast<torch::CudaIPCSentData*>(storage.data_ptr().get_context());
330     sent_data->set_original_ptr(std::move(old_data_ptr));
331     _ref_counter = PyBytes_FromString((sent_data->handle()).c_str());
332     _ref_counter_offset = THPUtils_packUInt64(sent_data->offset());
333 
334     // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
335     cudaIpcEventHandle_t ipc_event_handle;
336 
337     if (sent_data->event_sync_required_) {
338       C10_CUDA_CHECK(
339           cudaIpcGetEventHandle(&ipc_event_handle, sent_data->event_));
340     }
341 
342     _event_handle = PyBytes_FromStringAndSize(
343         (char*)&ipc_event_handle, CUDA_IPC_HANDLE_SIZE);
344     _event_sync_required = PyBool_FromLong(sent_data->event_sync_required_);
345   }
346 
347   if (!tuple || !device || !_handle || !size_bytes || !_offset_bytes ||
348       !_event_handle) {
349     return nullptr;
350   }
351   PyTuple_SET_ITEM(tuple.get(), 0, device.release());
352   // cudaIpcMemHandle_t(of basePtr)
353   PyTuple_SET_ITEM(tuple.get(), 1, _handle.release());
354   // Size(in bytes) of the real storage, note this is not the size of basePtr
355   // memory block.
356   PyTuple_SET_ITEM(tuple.get(), 2, size_bytes.release());
357   // Offset(in bytes) of the real storage in the basePtr memory block.
358   // NB: this offset MUST be in bytes instead of numel, since we use
359   // (storage_handle, offset)
360   //     as key in shared_cache(multiprocessing/reduction.py).
361   //     Offset in numel cannot uniquely represent a storage.
362   PyTuple_SET_ITEM(tuple.get(), 3, _offset_bytes.release());
363   PyTuple_SET_ITEM(tuple.get(), 4, _ref_counter.release());
364   PyTuple_SET_ITEM(tuple.get(), 5, _ref_counter_offset.release());
365   PyTuple_SET_ITEM(tuple.get(), 6, _event_handle.release());
366   PyTuple_SET_ITEM(tuple.get(), 7, _event_sync_required.release());
367   return tuple.release();
368 #else
369   TORCH_CHECK(false, "CUDA is not available");
370 #endif
371   END_HANDLE_TH_ERRORS
372 }
373 
THPStorage_releaseIPCCounter(PyObject * _unused,PyObject * args)374 static PyObject* THPStorage_releaseIPCCounter(
375     PyObject* _unused,
376     PyObject* args) {
377   HANDLE_TH_ERRORS
378 #ifdef USE_CUDA
379   TORCH_CHECK(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
380   PyObject* _ref_counter = PyTuple_GET_ITEM(args, 0);
381   PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 1);
382   if (!(PyBytes_Check(_ref_counter) &&
383         THPUtils_checkLong(_ref_counter_offset))) {
384     THPUtils_invalidArguments(
385         args,
386         nullptr,
387         "_release_ipc_counter in CUDA mode",
388         1,
389         "(bytes _ref_counter, int _ref_counter_offset)");
390     return nullptr;
391   }
392   std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
393   ptrdiff_t ref_counter_offset =
394       (ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
395   // We don't want to break existing code, so resource deletion is best
396   // effort basis. Exception expected if producer process terminated
397   // before consumer released data.
398   int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
399   try {
400     auto sptr = at::RefcountedMapAllocator::makeDataPtr(
401         ref_counter_handle.c_str(),
402         flags,
403         sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
404         nullptr);
405     *(static_cast<int64_t*>(sptr.get()) + ref_counter_offset) -= 1;
406   } catch (c10::Error& err) {
407     // Already warned inside of producer process
408   }
409   Py_RETURN_NONE;
410 #else
411   TORCH_CHECK(false, "CUDA is not available");
412 #endif
413   END_HANDLE_TH_ERRORS
414 }
415 
416 #ifdef USE_CUDA
THPStorage_bytesAsHandleString(PyObject * handle)417 static std::string THPStorage_bytesAsHandleString(PyObject* handle) {
418   HANDLE_TH_ERRORS
419   char* buffer = nullptr;
420   Py_ssize_t handle_size = 0;
421   if (PyBytes_AsStringAndSize(handle, &buffer, &handle_size) == -1) {
422     TORCH_CHECK(handle_size == CUDA_IPC_HANDLE_SIZE, "incorrect handle");
423   }
424   return std::string(buffer, handle_size);
425   END_HANDLE_TH_ERRORS_RET("")
426 }
427 #endif
428 
THPStorage_newSharedCuda(PyObject * _unused,PyObject * args)429 static PyObject* THPStorage_newSharedCuda(PyObject* _unused, PyObject* args) {
430   HANDLE_TH_ERRORS
431 #ifdef USE_CUDA
432   TORCH_CHECK(PyTuple_GET_SIZE(args) == 8, "tuple of 8 items expected");
433   PyObject* _device = PyTuple_GET_ITEM(args, 0);
434   PyObject* _handle = PyTuple_GET_ITEM(args, 1);
435   PyObject* _size_bytes = PyTuple_GET_ITEM(args, 2);
436   PyObject* _offset_bytes = PyTuple_GET_ITEM(args, 3);
437   PyObject* _ref_counter = PyTuple_GET_ITEM(args, 4);
438   PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 5);
439   PyObject* _event_handle = PyTuple_GET_ITEM(args, 6);
440   PyObject* _event_sync_required = PyTuple_GET_ITEM(args, 7);
441   if (!(THPUtils_checkLong(_device) && THPUtils_checkLong(_size_bytes) &&
442         PyBytes_Check(_handle) && PyBytes_Check(_ref_counter) &&
443         PyBytes_Check(_event_handle) && THPUtils_checkLong(_offset_bytes) &&
444         THPUtils_checkLong(_ref_counter_offset) &&
445         PyBool_Check(_event_sync_required))) {
446     THPUtils_invalidArguments(
447         args,
448         nullptr,
449         "_new_shared in CUDA mode",
450         1,
451         "(int device, bytes handle, int storage_size_bytes, int storage_offset_bytes, bytes _ref_counter, int _ref_counter_offset, bytes event_handle, bool event_sync_required)");
452     return nullptr;
453   }
454 
455   size_t storage_size =
456       (size_t)THPUtils_unpackLong(_size_bytes) / sizeof(uint8_t);
457   ptrdiff_t storage_offset_bytes =
458       (ptrdiff_t)THPUtils_unpackLong(_offset_bytes);
459 
460   const auto device = c10::checked_convert<c10::DeviceIndex>(
461       THPUtils_unpackLong(_device), "c10::DeviceIndex");
462   at::cuda::CUDAGuard device_guard(device);
463 
464   if (PyObject_IsTrue(_event_sync_required)) {
465     // Ensure that producer prepared all tensor's data
466     std::string s_ipc_event_handle =
467         THPStorage_bytesAsHandleString(_event_handle);
468     if (s_ipc_event_handle.empty()) {
469       return nullptr;
470     }
471     auto ipc_event_handle = reinterpret_cast<const cudaIpcEventHandle_t*>(
472         s_ipc_event_handle.c_str());
473     // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
474     cudaEvent_t event;
475     cudaIpcOpenEventHandle(&event, *ipc_event_handle);
476     C10_CUDA_CHECK(
477         cudaStreamWaitEvent(c10::cuda::getCurrentCUDAStream(device), event, 0));
478   }
479 
480   std::string s_handle = THPStorage_bytesAsHandleString(_handle);
481   if (s_handle.empty()) {
482     return nullptr;
483   }
484   std::shared_ptr<void> basePtr =
485       c10::cuda::CUDACachingAllocator::getIpcDevPtr(s_handle);
486 
487   // Offset the basePtr to reconstruct the real storage
488   // devPtr = basePtr + storage_offset
489   void* devPtr = basePtr.get();
490   devPtr = (char*)devPtr + storage_offset_bytes;
491 
492   std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
493   ptrdiff_t ref_counter_offset =
494       (ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
495 
496   struct IpcDeleterContext {
497     std::string ref_counter_handle;
498     ptrdiff_t ref_counter_offset{};
499     c10::DeviceIndex device{-1};
500     torch::CudaIPCReceivedData received_data;
501   };
502 
503   auto ctx = std::make_unique<IpcDeleterContext>();
504   ctx->ref_counter_handle = std::move(ref_counter_handle);
505   ctx->ref_counter_offset = ref_counter_offset;
506   ctx->device = device;
507   ctx->received_data.shared_ptr_ = std::move(basePtr);
508 
509   auto cur_device = at::cuda::current_device();
510   c10::DataPtr data_ptr(
511       devPtr,
512       ctx.release(),
513       +[](void* ctx_) {
514         std::unique_ptr<IpcDeleterContext> ctx(
515             static_cast<IpcDeleterContext*>(ctx_));
516         ctx->received_data.shared_ptr_.reset();
517 
518         // Sync default stream to make sure all operations related to the
519         // storage is finished (otherwise another process may reuse memory and
520         // corrupt data)
521 
522         // Ideally all shared memory reference counting could be replaced by
523         // sending untriggered CUDA event from the producer to consumer and
524         // using this event as the criteria of memory release. However, CUDA
525         // (atm 10.1) does not support the creation of untriggered events and
526         // performance impact of having thousands of shared events is unknown.
527 
528         // TODO: Instead of cudaStreamSynchronize it is possible to add Stream
529         // Callback and release counter inside of it (need to check performance
530         // impact)
531         at::cuda::stream_synchronize(
532             c10::cuda::getCurrentCUDAStream(ctx->device));
533 
534         // We don't want to break existing code, so resource deletion is best
535         // effort basis. Exception expected if producer process terminated
536         // before consumer released data.
537         int flags =
538             at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
539         try {
540           auto sptr = at::RefcountedMapAllocator::makeDataPtr(
541               ctx->ref_counter_handle.c_str(),
542               flags,
543               sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
544               nullptr);
545           *(static_cast<int64_t*>(sptr.get()) + ctx->ref_counter_offset) -= 1;
546         } catch (c10::Error& err) {
547           // Already warned inside of producer process
548         }
549       },
550       at::Device(at::DeviceType::CUDA, cur_device));
551 
552   auto base = c10::make_intrusive<at::StorageImpl>(
553       c10::StorageImpl::use_byte_size_t(),
554       storage_size,
555       std::move(data_ptr),
556       /*allocator=*/nullptr,
557       /*resizable=*/false);
558 
559   base->set_resizable(false);
560   base->set_received_cuda(true);
561 
562   return THPStorage_NewWithStorage(
563       THPStorageClass,
564       std::move(base),
565       c10::impl::PyInterpreterStatus::TAGGED_BY_US);
566 #else
567   TORCH_CHECK(false, "CUDA is not available");
568 #endif
569   END_HANDLE_TH_ERRORS
570 }
571 
572 // Returns an object that holds a "weak" pointer to the c10::StorageImpl.  This
573 // pointer keeps the c10::StorageImpl struct live, but does not retain the data
574 // pointer.
575 //
576 // NB: This does NOT preserve object identity when you call it multiple times
THPStorage_weakRef(PyObject * self,PyObject * args)577 static PyObject* THPStorage_weakRef(PyObject* self, PyObject* args) {
578   HANDLE_TH_ERRORS
579   c10::StorageImpl* storage = THPStorage_Unpack(self).unsafeGetStorageImpl();
580   return PyLong_FromVoidPtr(c10::raw::intrusive_ptr::make_weak(storage));
581   END_HANDLE_TH_ERRORS
582 }
583 
THPStorage_newWithWeakPtr(PyObject * _unused,PyObject * arg)584 PyObject* THPStorage_newWithWeakPtr(PyObject* _unused, PyObject* arg) {
585   HANDLE_TH_ERRORS
586   TORCH_CHECK(
587       THPUtils_checkLong(arg), "_new_with_weak_ptr(): arg must be an 'int'");
588   c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
589   if (auto* storage = c10::raw::weak_intrusive_ptr::lock(weak_storage)) {
590     return THPStorage_Wrap(
591         c10::intrusive_ptr<c10::StorageImpl>::reclaim(storage));
592   }
593   Py_RETURN_NONE;
594   END_HANDLE_TH_ERRORS
595 }
596 
THPStorage_freeWeakRef(PyObject * _unused,PyObject * arg)597 PyObject* THPStorage_freeWeakRef(PyObject* _unused, PyObject* arg) {
598   HANDLE_TH_ERRORS
599   if (arg == Py_None) {
600     Py_RETURN_NONE;
601   }
602   TORCH_CHECK(
603       THPUtils_checkLong(arg), "_free_weak_ref(): arg must be an 'int'");
604   c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
605   c10::raw::weak_intrusive_ptr::decref(weak_storage);
606 
607   Py_RETURN_NONE;
608   END_HANDLE_TH_ERRORS
609 }
610 
THPStorage_expired(PyObject * _unused,PyObject * arg)611 PyObject* THPStorage_expired(PyObject* _unused, PyObject* arg) {
612   HANDLE_TH_ERRORS
613   TORCH_CHECK(THPUtils_checkLong(arg), "_expired(): arg must be an 'int'");
614   c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
615   return PyBool_FromLong(
616       c10::raw::weak_intrusive_ptr::use_count(weak_storage) == 0);
617   END_HANDLE_TH_ERRORS
618 }
619 
THPStorage_sharedFd(PyObject * self,PyObject * noargs)620 PyObject* THPStorage_sharedFd(PyObject* self, PyObject* noargs) {
621   HANDLE_TH_ERRORS
622   THPStorage_assertNotNull(self);
623   at::MapAllocator* ctx = nullptr;
624   const auto& storage = THPStorage_Unpack(self);
625   if (storage.device_type() == at::kCPU) {
626     ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
627   }
628 
629   TORCH_CHECK(ctx, "couldn't retrieve a shared file descriptor");
630   return THPUtils_packInt32(ctx->fd());
631   END_HANDLE_TH_ERRORS
632 }
633 
THPStorage_isShared(PyObject * self,PyObject * noargs)634 PyObject* THPStorage_isShared(PyObject* self, PyObject* noargs) {
635   const auto& storage = THPStorage_Unpack(self);
636   if (storage.device_type() == at::kCUDA) {
637     Py_RETURN_TRUE;
638   }
639   if (at::MapAllocator::fromDataPtr(storage.data_ptr()) ||
640       THManagedMapAllocator::fromDataPtr(storage.data_ptr())) {
641     Py_RETURN_TRUE;
642   } else {
643     Py_RETURN_FALSE;
644   }
645 }
646 
647 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
648 static PyMethodDef THPStorage_sharingMethods[] = {
649     {"_new_with_weak_ptr",
650      THPStorage_newWithWeakPtr,
651      METH_O | METH_CLASS,
652      nullptr},
653     {"_share_cuda_", THPStorage_shareCuda, METH_NOARGS, nullptr},
654     {"_new_shared_cuda",
655      THPStorage_newSharedCuda,
656      METH_VARARGS | METH_STATIC,
657      nullptr},
658     {"_release_ipc_counter_cuda",
659      THPStorage_releaseIPCCounter,
660      METH_VARARGS | METH_STATIC,
661      nullptr},
662     {"_share_fd_cpu_", THPStorage_shareFd, METH_NOARGS, nullptr},
663     {"_new_shared_fd_cpu",
664      THPStorage_newSharedFd,
665      METH_VARARGS | METH_STATIC,
666      nullptr},
667     {"_new_using_fd_cpu",
668      THPStorage_pyNewFdStorage,
669      METH_VARARGS | METH_STATIC,
670      nullptr},
671     {"_share_filename_cpu_", THPStorage_shareFilename, METH_NOARGS, nullptr},
672     {"_new_shared_filename_cpu",
673      THPStorage_newSharedFilename,
674      METH_VARARGS | METH_STATIC,
675      nullptr},
676     {"_new_using_filename_cpu",
677      THPStorage_pyNewFilenameStorage,
678      METH_VARARGS | METH_STATIC,
679      nullptr},
680     {"_weak_ref", THPStorage_weakRef, METH_NOARGS, nullptr},
681     {"_free_weak_ref", THPStorage_freeWeakRef, METH_O | METH_STATIC, nullptr},
682     {"_expired", THPStorage_expired, METH_O | METH_STATIC, nullptr},
683     {"_shared_decref", THPStorage_sharedDecref, METH_NOARGS, nullptr},
684     {"_shared_incref", THPStorage_sharedIncref, METH_NOARGS, nullptr},
685     {"_get_shared_fd", THPStorage_sharedFd, METH_NOARGS, nullptr},
686     {"is_shared", THPStorage_isShared, METH_NOARGS, nullptr},
687     {nullptr}};
688 
THPStorage_getSharingMethods()689 PyMethodDef* THPStorage_getSharingMethods() {
690   return THPStorage_sharingMethods;
691 }
692