1# mypy: allow-untyped-defs 2import warnings 3from typing import List 4 5import torch 6from torch._utils import ( 7 _flatten_dense_tensors, 8 _get_device_index, 9 _handle_complex, 10 _reorder_tensors_as, 11 _take_tensors, 12 _unflatten_dense_tensors, 13) 14from torch.cuda import nccl 15 16 17def broadcast(tensor, devices=None, *, out=None): 18 r"""Broadcasts a tensor to specified GPU devices. 19 20 Args: 21 tensor (Tensor): tensor to broadcast. Can be on CPU or GPU. 22 devices (Iterable[torch.device, str or int], optional): an iterable of 23 GPU devices, among which to broadcast. 24 out (Sequence[Tensor], optional, keyword-only): the GPU tensors to 25 store output results. 26 27 .. note:: 28 Exactly one of :attr:`devices` and :attr:`out` must be specified. 29 30 Returns: 31 - If :attr:`devices` is specified, 32 a tuple containing copies of :attr:`tensor`, placed on 33 :attr:`devices`. 34 - If :attr:`out` is specified, 35 a tuple containing :attr:`out` tensors, each containing a copy of 36 :attr:`tensor`. 37 """ 38 tensor = _handle_complex(tensor) 39 if not ((devices is None) ^ (out is None)): 40 raise RuntimeError( 41 f"Exactly one of 'devices' and 'out' must be specified, but got devices={devices} and out={out}" 42 ) 43 if devices is not None: 44 devices = [_get_device_index(d) for d in devices] 45 return torch._C._broadcast(tensor, devices) 46 else: 47 return torch._C._broadcast_out(tensor, out) 48 49 50def broadcast_coalesced(tensors, devices, buffer_size=10485760): 51 """Broadcast a sequence of tensors to the specified GPUs. 52 53 Small tensors are first coalesced into a buffer to reduce the number of synchronizations. 54 55 Args: 56 tensors (sequence): tensors to broadcast. Must be on the same device, 57 either CPU or GPU. 58 devices (Iterable[torch.device, str or int]): an iterable of GPU 59 devices, among which to broadcast. 60 buffer_size (int): maximum size of the buffer used for coalescing 61 62 Returns: 63 A tuple containing copies of :attr:`tensor`, placed on :attr:`devices`. 64 """ 65 devices = [_get_device_index(d) for d in devices] 66 tensors = [_handle_complex(t) for t in tensors] 67 return torch._C._broadcast_coalesced(tensors, devices, buffer_size) 68 69 70def reduce_add(inputs, destination=None): 71 """Sum tensors from multiple GPUs. 72 73 All inputs should have matching shapes, dtype, and layout. The output tensor 74 will be of the same shape, dtype, and layout. 75 76 Args: 77 inputs (Iterable[Tensor]): an iterable of tensors to add. 78 destination (int, optional): a device on which the output will be 79 placed (default: current device). 80 81 Returns: 82 A tensor containing an elementwise sum of all inputs, placed on the 83 :attr:`destination` device. 84 """ 85 destination = _get_device_index(destination, optional=True) 86 input_size = inputs[0].size() 87 root_index = None # index of input tensor that already is on the correct device 88 for i, inp in enumerate(inputs): 89 assert inp.device.type != "cpu", "reduce_add expects all inputs to be on GPUs" 90 if inp.get_device() == destination: 91 root_index = i 92 if inp.size() != input_size: 93 got = "x".join(str(x) for x in inp.size()) 94 expected = "x".join(str(x) for x in input_size) 95 raise ValueError( 96 f"input {i} has invalid size: got {got}, but expected {expected}" 97 ) 98 if root_index is None: 99 raise RuntimeError( 100 "reduce_add expects destination to be on the same GPU with one of the tensors" 101 ) 102 103 if len(inputs) == 1: 104 return inputs[0] 105 106 if nccl.is_available(inputs): 107 result = torch.empty_like(inputs[root_index]) 108 nccl.reduce(inputs, output=result, root=root_index) 109 else: 110 destination_device = torch.device(inputs[root_index].device.type, destination) 111 nonroot = [t for i, t in enumerate(inputs) if i != root_index] 112 # make a new tensor w/o clone 113 result = inputs[root_index] + nonroot[0].to( 114 device=destination_device, non_blocking=True 115 ) 116 for other in nonroot[1:]: 117 result.add_(other.to(device=destination_device, non_blocking=True)) 118 return result 119 120 121def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760): 122 """Sum tensors from multiple GPUs. 123 124 Small tensors are first coalesced into a buffer to reduce the number 125 of synchronizations. 126 127 Args: 128 inputs (Iterable[Iterable[Tensor]]): iterable of iterables that 129 contain tensors from a single device. 130 destination (int, optional): a device on which the output will be 131 placed (default: current device). 132 buffer_size (int): maximum size of the buffer used for coalescing 133 134 Returns: 135 A tuple of tensors containing an elementwise sum of each group of 136 inputs, placed on the ``destination`` device. 137 """ 138 # TODO: When `len(inputs) == 1` and all inputs are on `destination`, just 139 # return `inputs`. 140 dense_tensors: List[List] = [[] for _ in inputs] # shape (num_gpus, num_tensors) 141 output = [] 142 ref_order = [] 143 # process sparse ones first since they may have different sizes on different gpus 144 for tensor_at_gpus in zip(*inputs): 145 if all(t.is_sparse for t in tensor_at_gpus): 146 result = reduce_add(tensor_at_gpus, destination) # this will be sparse too 147 output.append(result) 148 ref_order.append(tensor_at_gpus[0]) 149 else: 150 for coll, t in zip(dense_tensors, tensor_at_gpus): 151 coll.append(t.to_dense() if t.is_sparse else t) 152 ref_order.append(dense_tensors[0][-1]) 153 itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors] 154 # now the dense ones, which have consistent sizes 155 for chunks in zip(*itrs): 156 flat_tensors = [ 157 _flatten_dense_tensors(chunk) for chunk in chunks 158 ] # (num_gpus,) 159 flat_result = reduce_add(flat_tensors, destination) 160 for t in _unflatten_dense_tensors(flat_result, chunks[0]): 161 # The unflattened tensors do not share storage, and we don't expose 162 # base flat tensor anyways, so give them different version counters. 163 # See NOTE [ Version Counter in comm.*_coalesced ] 164 output.append(t.data) 165 return tuple(_reorder_tensors_as(output, ref_order)) 166 167 168def scatter(tensor, devices=None, chunk_sizes=None, dim=0, streams=None, *, out=None): 169 """Scatters tensor across multiple GPUs. 170 171 Args: 172 tensor (Tensor): tensor to scatter. Can be on CPU or GPU. 173 devices (Iterable[torch.device, str or int], optional): an iterable of 174 GPU devices, among which to scatter. 175 chunk_sizes (Iterable[int], optional): sizes of chunks to be placed on 176 each device. It should match :attr:`devices` in length and sums to 177 ``tensor.size(dim)``. If not specified, :attr:`tensor` will be divided 178 into equal chunks. 179 dim (int, optional): A dimension along which to chunk :attr:`tensor`. 180 Default: ``0``. 181 streams (Iterable[torch.cuda.Stream], optional): an iterable of Streams, among 182 which to execute the scatter. If not specified, the default stream will 183 be utilized. 184 out (Sequence[Tensor], optional, keyword-only): the GPU tensors to 185 store output results. Sizes of these tensors must match that of 186 :attr:`tensor`, except for :attr:`dim`, where the total size must 187 sum to ``tensor.size(dim)``. 188 189 .. note:: 190 Exactly one of :attr:`devices` and :attr:`out` must be specified. When 191 :attr:`out` is specified, :attr:`chunk_sizes` must not be specified and 192 will be inferred from sizes of :attr:`out`. 193 194 Returns: 195 - If :attr:`devices` is specified, 196 a tuple containing chunks of :attr:`tensor`, placed on 197 :attr:`devices`. 198 - If :attr:`out` is specified, 199 a tuple containing :attr:`out` tensors, each containing a chunk of 200 :attr:`tensor`. 201 """ 202 tensor = _handle_complex(tensor) 203 if out is None: 204 devices = [_get_device_index(d) for d in devices] 205 return tuple(torch._C._scatter(tensor, devices, chunk_sizes, dim, streams)) 206 else: 207 if devices is not None: 208 raise RuntimeError( 209 f"'devices' must not be specified when 'out' is specified, but got devices={devices}" 210 ) 211 if chunk_sizes is not None: 212 raise RuntimeError( 213 f"'chunk_sizes' must not be specified when 'out' is specified, but got chunk_sizes={chunk_sizes}" 214 ) 215 return tuple(torch._C._scatter_out(tensor, out, dim, streams)) 216 217 218def gather(tensors, dim=0, destination=None, *, out=None): 219 r"""Gathers tensors from multiple GPU devices. 220 221 Args: 222 tensors (Iterable[Tensor]): an iterable of tensors to gather. 223 Tensor sizes in all dimensions other than :attr:`dim` have to match. 224 dim (int, optional): a dimension along which the tensors will be 225 concatenated. Default: ``0``. 226 destination (torch.device, str, or int, optional): the output device. 227 Can be CPU or CUDA. Default: the current CUDA device. 228 out (Tensor, optional, keyword-only): the tensor to store gather result. 229 Its sizes must match those of :attr:`tensors`, except for :attr:`dim`, 230 where the size must equal ``sum(tensor.size(dim) for tensor in tensors)``. 231 Can be on CPU or CUDA. 232 233 .. note:: 234 :attr:`destination` must not be specified when :attr:`out` is specified. 235 236 Returns: 237 - If :attr:`destination` is specified, 238 a tensor located on :attr:`destination` device, that is a result of 239 concatenating :attr:`tensors` along :attr:`dim`. 240 - If :attr:`out` is specified, 241 the :attr:`out` tensor, now containing results of concatenating 242 :attr:`tensors` along :attr:`dim`. 243 """ 244 tensors = [_handle_complex(t) for t in tensors] 245 if out is None: 246 if destination == -1: 247 warnings.warn( 248 "Using -1 to represent CPU tensor is deprecated. Please use a " 249 'device object or string instead, e.g., "cpu".', 250 FutureWarning, 251 stacklevel=2, 252 ) 253 destination = _get_device_index(destination, allow_cpu=True, optional=True) 254 return torch._C._gather(tensors, dim, destination) 255 else: 256 if destination is not None: 257 raise RuntimeError( 258 f"'destination' must not be specified when 'out' is specified, but got destination={destination}" 259 ) 260 return torch._C._gather_out(tensors, out, dim) 261