xref: /aosp_15_r20/external/pytorch/torch/nn/parallel/comm.py (revision da0073e96a02ea20f0ac840b70461e3646d07c45)
1# mypy: allow-untyped-defs
2import warnings
3from typing import List
4
5import torch
6from torch._utils import (
7    _flatten_dense_tensors,
8    _get_device_index,
9    _handle_complex,
10    _reorder_tensors_as,
11    _take_tensors,
12    _unflatten_dense_tensors,
13)
14from torch.cuda import nccl
15
16
17def broadcast(tensor, devices=None, *, out=None):
18    r"""Broadcasts a tensor to specified GPU devices.
19
20    Args:
21        tensor (Tensor): tensor to broadcast. Can be on CPU or GPU.
22        devices (Iterable[torch.device, str or int], optional): an iterable of
23          GPU devices, among which to broadcast.
24        out (Sequence[Tensor], optional, keyword-only): the GPU tensors to
25          store output results.
26
27    .. note::
28        Exactly one of :attr:`devices` and :attr:`out` must be specified.
29
30    Returns:
31        - If :attr:`devices` is specified,
32            a tuple containing copies of :attr:`tensor`, placed on
33            :attr:`devices`.
34        - If :attr:`out` is specified,
35            a tuple containing :attr:`out` tensors, each containing a copy of
36            :attr:`tensor`.
37    """
38    tensor = _handle_complex(tensor)
39    if not ((devices is None) ^ (out is None)):
40        raise RuntimeError(
41            f"Exactly one of 'devices' and 'out' must be specified, but got devices={devices} and out={out}"
42        )
43    if devices is not None:
44        devices = [_get_device_index(d) for d in devices]
45        return torch._C._broadcast(tensor, devices)
46    else:
47        return torch._C._broadcast_out(tensor, out)
48
49
50def broadcast_coalesced(tensors, devices, buffer_size=10485760):
51    """Broadcast a sequence of tensors to the specified GPUs.
52
53    Small tensors are first coalesced into a buffer to reduce the number of synchronizations.
54
55    Args:
56        tensors (sequence): tensors to broadcast. Must be on the same device,
57          either CPU or GPU.
58        devices (Iterable[torch.device, str or int]): an iterable of GPU
59          devices, among which to broadcast.
60        buffer_size (int): maximum size of the buffer used for coalescing
61
62    Returns:
63        A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`.
64    """
65    devices = [_get_device_index(d) for d in devices]
66    tensors = [_handle_complex(t) for t in tensors]
67    return torch._C._broadcast_coalesced(tensors, devices, buffer_size)
68
69
70def reduce_add(inputs, destination=None):
71    """Sum tensors from multiple GPUs.
72
73    All inputs should have matching shapes, dtype, and layout. The output tensor
74    will be of the same shape, dtype, and layout.
75
76    Args:
77        inputs (Iterable[Tensor]): an iterable of tensors to add.
78        destination (int, optional): a device on which the output will be
79            placed (default: current device).
80
81    Returns:
82        A tensor containing an elementwise sum of all inputs, placed on the
83        :attr:`destination` device.
84    """
85    destination = _get_device_index(destination, optional=True)
86    input_size = inputs[0].size()
87    root_index = None  # index of input tensor that already is on the correct device
88    for i, inp in enumerate(inputs):
89        assert inp.device.type != "cpu", "reduce_add expects all inputs to be on GPUs"
90        if inp.get_device() == destination:
91            root_index = i
92        if inp.size() != input_size:
93            got = "x".join(str(x) for x in inp.size())
94            expected = "x".join(str(x) for x in input_size)
95            raise ValueError(
96                f"input {i} has invalid size: got {got}, but expected {expected}"
97            )
98    if root_index is None:
99        raise RuntimeError(
100            "reduce_add expects destination to be on the same GPU with one of the tensors"
101        )
102
103    if len(inputs) == 1:
104        return inputs[0]
105
106    if nccl.is_available(inputs):
107        result = torch.empty_like(inputs[root_index])
108        nccl.reduce(inputs, output=result, root=root_index)
109    else:
110        destination_device = torch.device(inputs[root_index].device.type, destination)
111        nonroot = [t for i, t in enumerate(inputs) if i != root_index]
112        # make a new tensor w/o clone
113        result = inputs[root_index] + nonroot[0].to(
114            device=destination_device, non_blocking=True
115        )
116        for other in nonroot[1:]:
117            result.add_(other.to(device=destination_device, non_blocking=True))
118    return result
119
120
121def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
122    """Sum tensors from multiple GPUs.
123
124    Small tensors are first coalesced into a buffer to reduce the number
125    of synchronizations.
126
127    Args:
128        inputs (Iterable[Iterable[Tensor]]): iterable of iterables that
129            contain tensors from a single device.
130        destination (int, optional): a device on which the output will be
131            placed (default: current device).
132        buffer_size (int): maximum size of the buffer used for coalescing
133
134    Returns:
135        A tuple of tensors containing an elementwise sum of each group of
136        inputs, placed on the ``destination`` device.
137    """
138    # TODO: When `len(inputs) == 1` and all inputs are on `destination`, just
139    #       return `inputs`.
140    dense_tensors: List[List] = [[] for _ in inputs]  # shape (num_gpus, num_tensors)
141    output = []
142    ref_order = []
143    # process sparse ones first since they may have different sizes on different gpus
144    for tensor_at_gpus in zip(*inputs):
145        if all(t.is_sparse for t in tensor_at_gpus):
146            result = reduce_add(tensor_at_gpus, destination)  # this will be sparse too
147            output.append(result)
148            ref_order.append(tensor_at_gpus[0])
149        else:
150            for coll, t in zip(dense_tensors, tensor_at_gpus):
151                coll.append(t.to_dense() if t.is_sparse else t)
152            ref_order.append(dense_tensors[0][-1])
153    itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors]
154    # now the dense ones, which have consistent sizes
155    for chunks in zip(*itrs):
156        flat_tensors = [
157            _flatten_dense_tensors(chunk) for chunk in chunks
158        ]  # (num_gpus,)
159        flat_result = reduce_add(flat_tensors, destination)
160        for t in _unflatten_dense_tensors(flat_result, chunks[0]):
161            # The unflattened tensors do not share storage, and we don't expose
162            # base flat tensor anyways, so give them different version counters.
163            # See NOTE [ Version Counter in comm.*_coalesced ]
164            output.append(t.data)
165    return tuple(_reorder_tensors_as(output, ref_order))
166
167
168def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None):
169    """Scatters tensor across multiple GPUs.
170
171    Args:
172        tensor (Tensor): tensor to scatter. Can be on CPU or GPU.
173        devices (Iterable[torch.device, str or int], optional): an iterable of
174          GPU devices, among which to scatter.
175        chunk_sizes (Iterable[int], optional): sizes of chunks to be placed on
176          each device. It should match :attr:`devices` in length and sums to
177          ``tensor.size(dim)``. If not specified, :attr:`tensor` will be divided
178          into equal chunks.
179        dim (int, optional): A dimension along which to chunk :attr:`tensor`.
180          Default: ``0``.
181        streams (Iterable[torch.cuda.Stream], optional): an iterable of Streams, among
182          which to execute the scatter. If not specified, the default stream will
183          be utilized.
184        out (Sequence[Tensor], optional, keyword-only): the GPU tensors to
185          store output results. Sizes of these tensors must match that of
186          :attr:`tensor`, except for :attr:`dim`, where the total size must
187          sum to ``tensor.size(dim)``.
188
189    .. note::
190        Exactly one of :attr:`devices` and :attr:`out` must be specified. When
191        :attr:`out` is specified, :attr:`chunk_sizes` must not be specified and
192        will be inferred from sizes of :attr:`out`.
193
194    Returns:
195        - If :attr:`devices` is specified,
196            a tuple containing chunks of :attr:`tensor`, placed on
197            :attr:`devices`.
198        - If :attr:`out` is specified,
199            a tuple containing :attr:`out` tensors, each containing a chunk of
200            :attr:`tensor`.
201    """
202    tensor = _handle_complex(tensor)
203    if out is None:
204        devices = [_get_device_index(d) for d in devices]
205        return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams))
206    else:
207        if devices is not None:
208            raise RuntimeError(
209                f"'devices' must not be specified when 'out' is specified, but got devices={devices}"
210            )
211        if chunk_sizes is not None:
212            raise RuntimeError(
213                f"'chunk_sizes' must not be specified when 'out' is specified, but got chunk_sizes={chunk_sizes}"
214            )
215        return tuple(torch._C._scatter_out(tensor, out, dim, streams))
216
217
218def gather(tensors, dim=0, destination=None, *, out=None):
219    r"""Gathers tensors from multiple GPU devices.
220
221    Args:
222        tensors (Iterable[Tensor]): an iterable of tensors to gather.
223          Tensor sizes in all dimensions other than :attr:`dim` have to match.
224        dim (int, optional): a dimension along which the tensors will be
225          concatenated. Default: ``0``.
226        destination (torch.device, str, or int, optional): the output device.
227          Can be CPU or CUDA. Default: the current CUDA device.
228        out (Tensor, optional, keyword-only): the tensor to store gather result.
229          Its sizes must match those of :attr:`tensors`, except for :attr:`dim`,
230          where the size must equal ``sum(tensor.size(dim) for tensor in tensors)``.
231          Can be on CPU or CUDA.
232
233    .. note::
234        :attr:`destination` must not be specified when :attr:`out` is specified.
235
236    Returns:
237        - If :attr:`destination` is specified,
238            a tensor located on :attr:`destination` device, that is a result of
239            concatenating :attr:`tensors` along :attr:`dim`.
240        - If :attr:`out` is specified,
241            the :attr:`out` tensor, now containing results of concatenating
242            :attr:`tensors` along :attr:`dim`.
243    """
244    tensors = [_handle_complex(t) for t in tensors]
245    if out is None:
246        if destination == -1:
247            warnings.warn(
248                "Using -1 to represent CPU tensor is deprecated. Please use a "
249                'device object or string instead, e.g., "cpu".',
250                FutureWarning,
251                stacklevel=2,
252            )
253        destination = _get_device_index(destination, allow_cpu=True, optional=True)
254        return torch._C._gather(tensors, dim, destination)
255    else:
256        if destination is not None:
257            raise RuntimeError(
258                f"'destination' must not be specified when 'out' is specified, but got destination={destination}"
259            )
260        return torch._C._gather_out(tensors, out, dim)
261