1# mypy: allow-untyped-defs 2import torch 3from torch.distributions import constraints 4from torch.distributions.distribution import Distribution 5from torch.distributions.utils import ( 6 broadcast_all, 7 lazy_property, 8 logits_to_probs, 9 probs_to_logits, 10) 11 12 13__all__ = ["Binomial"] 14 15 16def _clamp_by_zero(x): 17 # works like clamp(x, min=0) but has grad at 0 is 0.5 18 return (x.clamp(min=0) + x - x.clamp(max=0)) / 2 19 20 21class Binomial(Distribution): 22 r""" 23 Creates a Binomial distribution parameterized by :attr:`total_count` and 24 either :attr:`probs` or :attr:`logits` (but not both). :attr:`total_count` must be 25 broadcastable with :attr:`probs`/:attr:`logits`. 26 27 Example:: 28 29 >>> # xdoctest: +IGNORE_WANT("non-deterministic") 30 >>> m = Binomial(100, torch.tensor([0 , .2, .8, 1])) 31 >>> x = m.sample() 32 tensor([ 0., 22., 71., 100.]) 33 34 >>> m = Binomial(torch.tensor([[5.], [10.]]), torch.tensor([0.5, 0.8])) 35 >>> x = m.sample() 36 tensor([[ 4., 5.], 37 [ 7., 6.]]) 38 39 Args: 40 total_count (int or Tensor): number of Bernoulli trials 41 probs (Tensor): Event probabilities 42 logits (Tensor): Event log-odds 43 """ 44 arg_constraints = { 45 "total_count": constraints.nonnegative_integer, 46 "probs": constraints.unit_interval, 47 "logits": constraints.real, 48 } 49 has_enumerate_support = True 50 51 def __init__(self, total_count=1, probs=None, logits=None, validate_args=None): 52 if (probs is None) == (logits is None): 53 raise ValueError( 54 "Either `probs` or `logits` must be specified, but not both." 55 ) 56 if probs is not None: 57 ( 58 self.total_count, 59 self.probs, 60 ) = broadcast_all(total_count, probs) 61 self.total_count = self.total_count.type_as(self.probs) 62 else: 63 ( 64 self.total_count, 65 self.logits, 66 ) = broadcast_all(total_count, logits) 67 self.total_count = self.total_count.type_as(self.logits) 68 69 self._param = self.probs if probs is not None else self.logits 70 batch_shape = self._param.size() 71 super().__init__(batch_shape, validate_args=validate_args) 72 73 def expand(self, batch_shape, _instance=None): 74 new = self._get_checked_instance(Binomial, _instance) 75 batch_shape = torch.Size(batch_shape) 76 new.total_count = self.total_count.expand(batch_shape) 77 if "probs" in self.__dict__: 78 new.probs = self.probs.expand(batch_shape) 79 new._param = new.probs 80 if "logits" in self.__dict__: 81 new.logits = self.logits.expand(batch_shape) 82 new._param = new.logits 83 super(Binomial, new).__init__(batch_shape, validate_args=False) 84 new._validate_args = self._validate_args 85 return new 86 87 def _new(self, *args, **kwargs): 88 return self._param.new(*args, **kwargs) 89 90 @constraints.dependent_property(is_discrete=True, event_dim=0) 91 def support(self): 92 return constraints.integer_interval(0, self.total_count) 93 94 @property 95 def mean(self): 96 return self.total_count * self.probs 97 98 @property 99 def mode(self): 100 return ((self.total_count + 1) * self.probs).floor().clamp(max=self.total_count) 101 102 @property 103 def variance(self): 104 return self.total_count * self.probs * (1 - self.probs) 105 106 @lazy_property 107 def logits(self): 108 return probs_to_logits(self.probs, is_binary=True) 109 110 @lazy_property 111 def probs(self): 112 return logits_to_probs(self.logits, is_binary=True) 113 114 @property 115 def param_shape(self): 116 return self._param.size() 117 118 def sample(self, sample_shape=torch.Size()): 119 shape = self._extended_shape(sample_shape) 120 with torch.no_grad(): 121 return torch.binomial( 122 self.total_count.expand(shape), self.probs.expand(shape) 123 ) 124 125 def log_prob(self, value): 126 if self._validate_args: 127 self._validate_sample(value) 128 log_factorial_n = torch.lgamma(self.total_count + 1) 129 log_factorial_k = torch.lgamma(value + 1) 130 log_factorial_nmk = torch.lgamma(self.total_count - value + 1) 131 # k * log(p) + (n - k) * log(1 - p) = k * (log(p) - log(1 - p)) + n * log(1 - p) 132 # (case logit < 0) = k * logit - n * log1p(e^logit) 133 # (case logit > 0) = k * logit - n * (log(p) - log(1 - p)) + n * log(p) 134 # = k * logit - n * logit - n * log1p(e^-logit) 135 # (merge two cases) = k * logit - n * max(logit, 0) - n * log1p(e^-|logit|) 136 normalize_term = ( 137 self.total_count * _clamp_by_zero(self.logits) 138 + self.total_count * torch.log1p(torch.exp(-torch.abs(self.logits))) 139 - log_factorial_n 140 ) 141 return ( 142 value * self.logits - log_factorial_k - log_factorial_nmk - normalize_term 143 ) 144 145 def entropy(self): 146 total_count = int(self.total_count.max()) 147 if not self.total_count.min() == total_count: 148 raise NotImplementedError( 149 "Inhomogeneous total count not supported by `entropy`." 150 ) 151 152 log_prob = self.log_prob(self.enumerate_support(False)) 153 return -(torch.exp(log_prob) * log_prob).sum(0) 154 155 def enumerate_support(self, expand=True): 156 total_count = int(self.total_count.max()) 157 if not self.total_count.min() == total_count: 158 raise NotImplementedError( 159 "Inhomogeneous total count not supported by `enumerate_support`." 160 ) 161 values = torch.arange( 162 1 + total_count, dtype=self._param.dtype, device=self._param.device 163 ) 164 values = values.view((-1,) + (1,) * len(self._batch_shape)) 165 if expand: 166 values = values.expand((-1,) + self._batch_shape) 167 return values 168