xref: /aosp_15_r20/external/pytorch/docs/source/rpc/rref.rst (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1:orphan:
2
3.. _remote-reference-protocol:
4
5Remote Reference Protocol
6=========================
7
8This note describes the design details of Remote Reference protocol and walks
9through message flows in different scenarios. Make sure you're familiar with the
10:ref:`distributed-rpc-framework` before proceeding.
11
12Background
13^^^^^^^^^^
14
15RRef stands for Remote REFerence. It is a reference of an object which is
16located on the local or remote worker, and transparently handles reference
17counting under the hood. Conceptually, it can be considered as a distributed
18shared pointer. Applications can create an RRef by calling
19:meth:`~torch.distributed.rpc.remote`. Each RRef is owned by the callee worker
20of the :meth:`~torch.distributed.rpc.remote` call (i.e., owner) and can be used
21by multiple users. The owner stores the real data and keeps track of the global
22reference count. Every RRef can be uniquely identified by a global ``RRefId``,
23which is assigned at the time of creation on the caller of the
24:meth:`~torch.distributed.rpc.remote` call.
25
26On the owner worker, there is only one ``OwnerRRef`` instance, which contains
27the real data, while on user workers, there can be as many ``UserRRefs`` as
28necessary, and ``UserRRef`` does not hold the data. All usage on the owner will
29retrieve the unique ``OwnerRRef`` instance using the globally unique ``RRefId``.
30A ``UserRRef`` will be created when it is used as an argument or return value in
31:meth:`~torch.distributed.rpc.rpc_sync`,
32:meth:`~torch.distributed.rpc.rpc_async` or
33:meth:`~torch.distributed.rpc.remote` invocation, and the owner will be notified
34according to update the reference count. An ``OwnerRRef`` and its data will be
35deleted when there is no ``UserRRef`` instances globally and there are no
36reference to the ``OwnerRRef`` on the owner as well.
37
38
39Assumptions
40^^^^^^^^^^^
41
42RRef protocol is designed with the following assumptions.
43
44- **Transient Network Failures**: The RRef design handles transient
45  network failures by retrying messages. It cannot handle node crashes or
46  permanent network partitions. When those incidents occur, the application
47  should take down all workers, revert to the previous checkpoint, and resume
48  training.
49- **Non-idempotent UDFs**: We assume the user functions (UDF) provided to
50  :meth:`~torch.distributed.rpc.rpc_sync`,
51  :meth:`~torch.distributed.rpc.rpc_async` or
52  :meth:`~torch.distributed.rpc.remote` are not idempotent and therefore
53  cannot be retried. However, internal RRef control messages are idempotent and
54  retried upon message failure.
55- **Out of Order Message Delivery**: We do not assume message delivery order
56  between any pair of nodes, because both sender and receiver are using multiple
57  threads. There is no guarantee on which message will be processed first.
58
59
60RRef Lifetime
61^^^^^^^^^^^^^
62
63The goal of the protocol is to delete an ``OwnerRRef`` at an appropriate time.
64The right time to delete an ``OwnerRRef`` is when there are no living
65``UserRRef`` instances and user code is not holding references to the
66``OwnerRRef`` either. The tricky part is to determine if there are any living
67``UserRRef`` instances.
68
69Design Reasoning
70----------------
71
72A user can get a ``UserRRef`` in three situations:
73
741) Receiving a ``UserRRef`` from the owner.
752) Receiving a ``UserRRef`` from another user.
763) Creating a new ``UserRRef`` owned by another worker.
77
78
79Case 1 is the simplest where the owner passes its RRef to a user, where the
80owner calls :meth:`~torch.distributed.rpc.rpc_sync`,
81:meth:`~torch.distributed.rpc.rpc_async`, or
82:meth:`~torch.distributed.rpc.remote` and uses its RRef as an argument. In this
83case a new ``UserRRef`` will be created on the user. As the owner is the caller,
84it can easily update its local reference count on the ``OwnerRRef``.
85
86The only requirement is that any
87``UserRRef`` must notify the owner upon destruction. Hence, we need the first
88guarantee:
89
90**G1. The owner will be notified when any UserRRef is deleted.**
91
92As messages might come delayed or out-of-order, we need one more guarantee to
93make sure the delete message is not processed too soon. If A sends a message to
94B that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B
95(the child RRef).
96
97**G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the
98owner.**
99
100In cases 2 and 3, it is possible that the owner has only partial or no knowledge
101at all about the RRef fork graph. For example, an RRef could be
102constructed on a user, and before the owner receives any RPC call, the
103creator user might have already shared the RRef with other users, and those
104users could further share the RRef. One invariant is that the fork graph of
105any RRef is always a tree, because forking an RRef always
106creates a new ``UserRRef`` instance on the callee (except if the callee is the
107owner), and hence every RRef has a single parent.
108
109The owner's view on any ``UserRRef`` in the tree has three stages:
110
111.. code::
112
113  1) unknown -> 2) known -> 3) deleted.
114
115The owner's view of the entire tree keeps changing. The owner deletes its
116``OwnerRRef`` instance when it thinks there are no living ``UserRRef``
117instances, i.e.,
118when ``OwnerRRef`` is deleted, all ``UserRRef`` instances could be either indeed
119deleted or unknown. The dangerous case is when some forks are unknown and others
120are deleted.
121
122**G2** trivially guarantees that no parent ``UserRRef`` can be deleted before
123the owner knows all of its children ``UserRRef`` instances. However, it is
124possible that the child ``UserRRef`` may be deleted before the owner knows its
125parent ``UserRRef``.
126
127Consider the following example, where the ``OwnerRRef`` forks to A, then A forks
128to Y, and Y forks to Z:
129
130.. code::
131
132  OwnerRRef -> A -> Y -> Z
133
134If all of Z's messages, including the delete message, are processed by the
135owner before Y's messages. the owner will learn of Z's deletion before
136knowing Y exists. Nevertheless, this does not cause any problem. Because, at least
137one of Y's ancestors will be alive (A) and it will
138prevent the owner from deleting the ``OwnerRRef``. More specifically, if the
139owner does not know Y, A cannot be deleted due to **G2**, and the owner knows A
140since it is A's parent.
141
142Things get a little trickier if the RRef is created on a user:
143
144
145.. code::
146
147  OwnerRRef
148      ^
149      |
150      A -> Y -> Z
151
152
153If Z calls :meth:`~torch.distributed.rpc.RRef.to_here` on the ``UserRRef``, the
154owner at least knows A when Z is deleted, because otherwise,
155:meth:`~torch.distributed.rpc.RRef.to_here` wouldn't finish. If Z does not call
156:meth:`~torch.distributed.rpc.RRef.to_here`, it is possible that the owner
157receives all messages from Z before any message from A and Y. In this case, as
158the real data of the ``OwnerRRef`` has not been created yet, there is nothing to
159be deleted either. It is the same as Z does not exist at all. Hence, it's still
160OK.
161
162Implementation
163--------------
164
165**G1** is implemented by sending out a delete message in ``UserRRef``
166destructor. To provide **G2**, the parent ``UserRRef`` is put into a context
167whenever it is forked, indexed by the new ``ForkId``. The parent ``UserRRef`` is
168only removed from the context when it receives an acknowledgement message (ACK)
169from the child, and the child will only send out the ACK when it is confirmed by
170the owner.
171
172
173Protocol Scenarios
174^^^^^^^^^^^^^^^^^^
175
176Let's now discuss how the above designs translate to the protocol in four
177scenarios.
178
179User Share RRef with Owner as Return Value
180------------------------------------------
181
182
183.. code::
184
185  import torch
186  import torch.distributed.rpc as rpc
187
188  # on worker A
189  rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
190  # say the rref has RRefId 100 and ForkId 1
191  rref.to_here()
192
193
194In this case, the ``UserRRef`` is created on the user worker A, then it is
195passed to the owner worker B together with the remote message, and then B
196creates the ``OwnerRRef``. The method :meth:`~torch.distributed.rpc.remote`
197returns immediately, meaning that the ``UserRRef`` can be forked/used before
198the owner knows about it.
199
200On the owner, when receiving the :meth:`~torch.distributed.rpc.remote` call, it
201will create the ``OwnerRRef``, and returns an ACK to acknowledge ``{100, 1}``
202(``RRefId``, ``ForkId``). Only after receiving this ACK, can A delete its
203``UserRRef``. This involves both **G1** and **G2**. **G1** is obvious. For
204**G2**, the ``OwnerRRef`` is a child of the ``UserRRef``, and the ``UserRRef``
205is not deleted until it receives the ACK from the owner.
206
207.. image:: https://user-images\.githubusercontent\.com/16999635/69164772-98181300-0abe-11ea-93a7-9ad9f757cd94.png
208    :alt: user_to_owner_ret.png
209    :width: 500 px
210
211The diagram above shows the message flow, where solid arrow contains user
212function and dashed arrow are builtin messages. Note that the first two messages
213from A to B (:meth:`~torch.distributed.rpc.remote` and
214:meth:`~torch.distributed.rpc.RRef.to_here`) may
215arrive at B in any order, but the final delete message will only be sent out
216when:
217
218- B acknowledges ``UserRRef {100, 1}`` (G2), and
219- Python GC agrees to delete the local ``UserRRef`` instance. This occurs when
220  the RRef is no longer in scope and is eligible for garbage collection.
221
222
223
224User Share RRef with Owner as Argument
225--------------------------------------
226
227.. code::
228
229  import torch
230  import torch.distributed.rpc as rpc
231
232  # on worker A and worker B
233  def func(rref):
234    pass
235
236  # on worker A
237  rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
238  # say the rref has RRefId 100 and ForkId 1
239  rpc.rpc_async('B', func, args=(rref, ))
240
241
242In this case, after creating the ``UserRRef`` on A, A uses it as an argument in
243a followup RPC call to B. A will keep ``UserRRef {100, 1}`` alive until it
244receives the acknowledge from B (**G2**, not the return value of the RPC call).
245This is necessary because A should not send out the delete message until all
246previous messages are received, otherwise, the ``OwnerRRef`` could be
247deleted before usage as we do not guarantee message delivery order. This is done
248by creating a child ``ForkId`` of RRef, holding them in a map until receives the
249owner confirms the child ``ForkId``. The figure below shows the message flow.
250
251.. image:: https://user-images.githubusercontent.com/16999635/69164845-b67e0e80-0abe-11ea-93fa-d24674e75a2b.png
252    :alt: user_to_owner_arg.png
253    :width: 500 px
254
255
256Note that the ``UserRRef`` could be deleted on B before func finishes or even
257starts. However this is OK, as at the time B sends out ACK for the child
258``ForkId``, it already acquired the ``OwnerRRef`` instance, which would prevent
259it been deleted too soon.
260
261
262Owner Share RRef with User
263--------------------------
264
265Owner to user is the simplest case, where the owner can update reference
266counting locally, and does not need any additional control message to notify
267others. Regarding **G2**, it is same as the parent receives the ACK from the
268owner immediately, as the parent is the owner.
269
270.. code::
271
272  import torch
273  import torch.distributed.rpc as RRef, rpc
274
275  # on worker B and worker C
276  def func(rref):
277    pass
278
279  # on worker B, creating a local RRef
280  rref = RRef("data")
281  # say the rref has RRefId 100
282  dist.rpc_async('C', func, args=(rref, ))
283
284
285.. image:: https://user-images.githubusercontent.com/16999635/69164921-c990de80-0abe-11ea-9250-d32ad00cf4ae.png
286    :alt: owner_to_user.png
287    :width: 500 px
288
289The figure above shows the message flow. Note that when the ``OwnerRRef`` exits
290scope after the rpc_async call, it will not be deleted, because internally
291there is a map to hold it alive if there is any known forks, in which case is
292``UserRRef {100, 1}``. (**G2**)
293
294
295User Share RRef with User
296-------------------------
297
298This is the most complicated case where caller user (parent ``UserRRef``),
299callee user (child ``UserRRef``), and the owner all need to get involved.
300
301.. code::
302
303  import torch
304  import torch.distributed.rpc as rpc
305
306  # on worker A and worker C
307  def func(rref):
308    pass
309
310  # on worker A
311  rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
312  # say the rref has RRefId 100 and ForkId 1
313  rpc.rpc_async('C', func, args=(rref, ))
314
315.. image:: https://user-images.githubusercontent.com/16999635/69164971-d6adcd80-0abe-11ea-971d-6b7af131f0fd.png
316    :alt: user_to_user.png
317    :width: 500 px
318
319When C receives the child ``UserRRef`` from A, it sends out a fork request to
320the owner B. Later, when the B confirms the ``UserRRef`` on C, C will perform
321two actions in parallel: 1) send out the child ACK to A ,and 2) run the user
322provided function. During this time, the parent (A) will hold its
323``UserRRef {100, 1}`` alive to achieve **G2**.
324