1:orphan: 2 3.. _remote-reference-protocol: 4 5Remote Reference Protocol 6========================= 7 8This note describes the design details of Remote Reference protocol and walks 9through message flows in different scenarios. Make sure you're familiar with the 10:ref:`distributed-rpc-framework` before proceeding. 11 12Background 13^^^^^^^^^^ 14 15RRef stands for Remote REFerence. It is a reference of an object which is 16located on the local or remote worker, and transparently handles reference 17counting under the hood. Conceptually, it can be considered as a distributed 18shared pointer. Applications can create an RRef by calling 19:meth:`~torch.distributed.rpc.remote`. Each RRef is owned by the callee worker 20of the :meth:`~torch.distributed.rpc.remote` call (i.e., owner) and can be used 21by multiple users. The owner stores the real data and keeps track of the global 22reference count. Every RRef can be uniquely identified by a global ``RRefId``, 23which is assigned at the time of creation on the caller of the 24:meth:`~torch.distributed.rpc.remote` call. 25 26On the owner worker, there is only one ``OwnerRRef`` instance, which contains 27the real data, while on user workers, there can be as many ``UserRRefs`` as 28necessary, and ``UserRRef`` does not hold the data. All usage on the owner will 29retrieve the unique ``OwnerRRef`` instance using the globally unique ``RRefId``. 30A ``UserRRef`` will be created when it is used as an argument or return value in 31:meth:`~torch.distributed.rpc.rpc_sync`, 32:meth:`~torch.distributed.rpc.rpc_async` or 33:meth:`~torch.distributed.rpc.remote` invocation, and the owner will be notified 34according to update the reference count. An ``OwnerRRef`` and its data will be 35deleted when there is no ``UserRRef`` instances globally and there are no 36reference to the ``OwnerRRef`` on the owner as well. 37 38 39Assumptions 40^^^^^^^^^^^ 41 42RRef protocol is designed with the following assumptions. 43 44- **Transient Network Failures**: The RRef design handles transient 45 network failures by retrying messages. It cannot handle node crashes or 46 permanent network partitions. When those incidents occur, the application 47 should take down all workers, revert to the previous checkpoint, and resume 48 training. 49- **Non-idempotent UDFs**: We assume the user functions (UDF) provided to 50 :meth:`~torch.distributed.rpc.rpc_sync`, 51 :meth:`~torch.distributed.rpc.rpc_async` or 52 :meth:`~torch.distributed.rpc.remote` are not idempotent and therefore 53 cannot be retried. However, internal RRef control messages are idempotent and 54 retried upon message failure. 55- **Out of Order Message Delivery**: We do not assume message delivery order 56 between any pair of nodes, because both sender and receiver are using multiple 57 threads. There is no guarantee on which message will be processed first. 58 59 60RRef Lifetime 61^^^^^^^^^^^^^ 62 63The goal of the protocol is to delete an ``OwnerRRef`` at an appropriate time. 64The right time to delete an ``OwnerRRef`` is when there are no living 65``UserRRef`` instances and user code is not holding references to the 66``OwnerRRef`` either. The tricky part is to determine if there are any living 67``UserRRef`` instances. 68 69Design Reasoning 70---------------- 71 72A user can get a ``UserRRef`` in three situations: 73 741) Receiving a ``UserRRef`` from the owner. 752) Receiving a ``UserRRef`` from another user. 763) Creating a new ``UserRRef`` owned by another worker. 77 78 79Case 1 is the simplest where the owner passes its RRef to a user, where the 80owner calls :meth:`~torch.distributed.rpc.rpc_sync`, 81:meth:`~torch.distributed.rpc.rpc_async`, or 82:meth:`~torch.distributed.rpc.remote` and uses its RRef as an argument. In this 83case a new ``UserRRef`` will be created on the user. As the owner is the caller, 84it can easily update its local reference count on the ``OwnerRRef``. 85 86The only requirement is that any 87``UserRRef`` must notify the owner upon destruction. Hence, we need the first 88guarantee: 89 90**G1. The owner will be notified when any UserRRef is deleted.** 91 92As messages might come delayed or out-of-order, we need one more guarantee to 93make sure the delete message is not processed too soon. If A sends a message to 94B that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B 95(the child RRef). 96 97**G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the 98owner.** 99 100In cases 2 and 3, it is possible that the owner has only partial or no knowledge 101at all about the RRef fork graph. For example, an RRef could be 102constructed on a user, and before the owner receives any RPC call, the 103creator user might have already shared the RRef with other users, and those 104users could further share the RRef. One invariant is that the fork graph of 105any RRef is always a tree, because forking an RRef always 106creates a new ``UserRRef`` instance on the callee (except if the callee is the 107owner), and hence every RRef has a single parent. 108 109The owner's view on any ``UserRRef`` in the tree has three stages: 110 111.. code:: 112 113 1) unknown -> 2) known -> 3) deleted. 114 115The owner's view of the entire tree keeps changing. The owner deletes its 116``OwnerRRef`` instance when it thinks there are no living ``UserRRef`` 117instances, i.e., 118when ``OwnerRRef`` is deleted, all ``UserRRef`` instances could be either indeed 119deleted or unknown. The dangerous case is when some forks are unknown and others 120are deleted. 121 122**G2** trivially guarantees that no parent ``UserRRef`` can be deleted before 123the owner knows all of its children ``UserRRef`` instances. However, it is 124possible that the child ``UserRRef`` may be deleted before the owner knows its 125parent ``UserRRef``. 126 127Consider the following example, where the ``OwnerRRef`` forks to A, then A forks 128to Y, and Y forks to Z: 129 130.. code:: 131 132 OwnerRRef -> A -> Y -> Z 133 134If all of Z's messages, including the delete message, are processed by the 135owner before Y's messages. the owner will learn of Z's deletion before 136knowing Y exists. Nevertheless, this does not cause any problem. Because, at least 137one of Y's ancestors will be alive (A) and it will 138prevent the owner from deleting the ``OwnerRRef``. More specifically, if the 139owner does not know Y, A cannot be deleted due to **G2**, and the owner knows A 140since it is A's parent. 141 142Things get a little trickier if the RRef is created on a user: 143 144 145.. code:: 146 147 OwnerRRef 148 ^ 149 | 150 A -> Y -> Z 151 152 153If Z calls :meth:`~torch.distributed.rpc.RRef.to_here` on the ``UserRRef``, the 154owner at least knows A when Z is deleted, because otherwise, 155:meth:`~torch.distributed.rpc.RRef.to_here` wouldn't finish. If Z does not call 156:meth:`~torch.distributed.rpc.RRef.to_here`, it is possible that the owner 157receives all messages from Z before any message from A and Y. In this case, as 158the real data of the ``OwnerRRef`` has not been created yet, there is nothing to 159be deleted either. It is the same as Z does not exist at all. Hence, it's still 160OK. 161 162Implementation 163-------------- 164 165**G1** is implemented by sending out a delete message in ``UserRRef`` 166destructor. To provide **G2**, the parent ``UserRRef`` is put into a context 167whenever it is forked, indexed by the new ``ForkId``. The parent ``UserRRef`` is 168only removed from the context when it receives an acknowledgement message (ACK) 169from the child, and the child will only send out the ACK when it is confirmed by 170the owner. 171 172 173Protocol Scenarios 174^^^^^^^^^^^^^^^^^^ 175 176Let's now discuss how the above designs translate to the protocol in four 177scenarios. 178 179User Share RRef with Owner as Return Value 180------------------------------------------ 181 182 183.. code:: 184 185 import torch 186 import torch.distributed.rpc as rpc 187 188 # on worker A 189 rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) 190 # say the rref has RRefId 100 and ForkId 1 191 rref.to_here() 192 193 194In this case, the ``UserRRef`` is created on the user worker A, then it is 195passed to the owner worker B together with the remote message, and then B 196creates the ``OwnerRRef``. The method :meth:`~torch.distributed.rpc.remote` 197returns immediately, meaning that the ``UserRRef`` can be forked/used before 198the owner knows about it. 199 200On the owner, when receiving the :meth:`~torch.distributed.rpc.remote` call, it 201will create the ``OwnerRRef``, and returns an ACK to acknowledge ``{100, 1}`` 202(``RRefId``, ``ForkId``). Only after receiving this ACK, can A delete its 203``UserRRef``. This involves both **G1** and **G2**. **G1** is obvious. For 204**G2**, the ``OwnerRRef`` is a child of the ``UserRRef``, and the ``UserRRef`` 205is not deleted until it receives the ACK from the owner. 206 207.. image:: https://user-images\.githubusercontent\.com/16999635/69164772-98181300-0abe-11ea-93a7-9ad9f757cd94.png 208 :alt: user_to_owner_ret.png 209 :width: 500 px 210 211The diagram above shows the message flow, where solid arrow contains user 212function and dashed arrow are builtin messages. Note that the first two messages 213from A to B (:meth:`~torch.distributed.rpc.remote` and 214:meth:`~torch.distributed.rpc.RRef.to_here`) may 215arrive at B in any order, but the final delete message will only be sent out 216when: 217 218- B acknowledges ``UserRRef {100, 1}`` (G2), and 219- Python GC agrees to delete the local ``UserRRef`` instance. This occurs when 220 the RRef is no longer in scope and is eligible for garbage collection. 221 222 223 224User Share RRef with Owner as Argument 225-------------------------------------- 226 227.. code:: 228 229 import torch 230 import torch.distributed.rpc as rpc 231 232 # on worker A and worker B 233 def func(rref): 234 pass 235 236 # on worker A 237 rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) 238 # say the rref has RRefId 100 and ForkId 1 239 rpc.rpc_async('B', func, args=(rref, )) 240 241 242In this case, after creating the ``UserRRef`` on A, A uses it as an argument in 243a followup RPC call to B. A will keep ``UserRRef {100, 1}`` alive until it 244receives the acknowledge from B (**G2**, not the return value of the RPC call). 245This is necessary because A should not send out the delete message until all 246previous messages are received, otherwise, the ``OwnerRRef`` could be 247deleted before usage as we do not guarantee message delivery order. This is done 248by creating a child ``ForkId`` of RRef, holding them in a map until receives the 249owner confirms the child ``ForkId``. The figure below shows the message flow. 250 251.. image:: https://user-images.githubusercontent.com/16999635/69164845-b67e0e80-0abe-11ea-93fa-d24674e75a2b.png 252 :alt: user_to_owner_arg.png 253 :width: 500 px 254 255 256Note that the ``UserRRef`` could be deleted on B before func finishes or even 257starts. However this is OK, as at the time B sends out ACK for the child 258``ForkId``, it already acquired the ``OwnerRRef`` instance, which would prevent 259it been deleted too soon. 260 261 262Owner Share RRef with User 263-------------------------- 264 265Owner to user is the simplest case, where the owner can update reference 266counting locally, and does not need any additional control message to notify 267others. Regarding **G2**, it is same as the parent receives the ACK from the 268owner immediately, as the parent is the owner. 269 270.. code:: 271 272 import torch 273 import torch.distributed.rpc as RRef, rpc 274 275 # on worker B and worker C 276 def func(rref): 277 pass 278 279 # on worker B, creating a local RRef 280 rref = RRef("data") 281 # say the rref has RRefId 100 282 dist.rpc_async('C', func, args=(rref, )) 283 284 285.. image:: https://user-images.githubusercontent.com/16999635/69164921-c990de80-0abe-11ea-9250-d32ad00cf4ae.png 286 :alt: owner_to_user.png 287 :width: 500 px 288 289The figure above shows the message flow. Note that when the ``OwnerRRef`` exits 290scope after the rpc_async call, it will not be deleted, because internally 291there is a map to hold it alive if there is any known forks, in which case is 292``UserRRef {100, 1}``. (**G2**) 293 294 295User Share RRef with User 296------------------------- 297 298This is the most complicated case where caller user (parent ``UserRRef``), 299callee user (child ``UserRRef``), and the owner all need to get involved. 300 301.. code:: 302 303 import torch 304 import torch.distributed.rpc as rpc 305 306 # on worker A and worker C 307 def func(rref): 308 pass 309 310 # on worker A 311 rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) 312 # say the rref has RRefId 100 and ForkId 1 313 rpc.rpc_async('C', func, args=(rref, )) 314 315.. image:: https://user-images.githubusercontent.com/16999635/69164971-d6adcd80-0abe-11ea-971d-6b7af131f0fd.png 316 :alt: user_to_user.png 317 :width: 500 px 318 319When C receives the child ``UserRRef`` from A, it sends out a fork request to 320the owner B. Later, when the B confirms the ``UserRRef`` on C, C will perform 321two actions in parallel: 1) send out the child ACK to A ,and 2) run the user 322provided function. During this time, the parent (A) will hold its 323``UserRRef {100, 1}`` alive to achieve **G2**. 324