1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""A channel adapter layer that introduces lossy behavior to a channel.""" 15 16import abc 17import collections 18import copy 19import logging 20from typing import Callable, Deque 21import random 22import time 23 24import pw_rpc 25 26_LOG = logging.getLogger(__name__) 27 28 29# TODO(amontanez): The surface are of this API could be reduced significantly 30# with a few changes to LossyChannel. 31class LossController(abc.ABC): 32 """Interface for driving loss/corruption decisions of a LossyChannel.""" 33 34 @abc.abstractmethod 35 def next_packet_duplicated(self) -> bool: 36 """Returns true if the next packet should be duplicated.""" 37 38 @abc.abstractmethod 39 def next_packet_out_of_order(self) -> bool: 40 """Returns true if the next packet should be a re-ordered packet.""" 41 42 @abc.abstractmethod 43 def next_packet_delayed(self) -> bool: 44 """Returns true if a delay should occur before sending a packet.""" 45 46 @abc.abstractmethod 47 def next_packet_dropped(self) -> bool: 48 """Returns true if the next incoming packet should be dropped.""" 49 50 @abc.abstractmethod 51 def next_packet_delay(self) -> int: 52 """The delay before sending the next packet, in milliseconds.""" 53 54 @abc.abstractmethod 55 def next_num_dupes(self) -> int: 56 """Returns how many times the next packet should be duplicated.""" 57 58 @abc.abstractmethod 59 def choose_out_of_order_packet(self, max_idx) -> int: 60 """Returns the index of the next reordered packet. 61 62 A return value of 0 represents the newest packet, while max_idx 63 represents the oldest packet. 64 """ 65 66 67class ManualPacketFilter(LossController): 68 """Determines if a packet should be kept or dropped for testing purposes.""" 69 70 _Action = Callable[[int], tuple[bool, bool]] 71 _KEEP = lambda _: (True, False) 72 _DROP = lambda _: (False, False) 73 74 def __init__(self) -> None: 75 self.packet_count = 0 76 self._actions: Deque[ManualPacketFilter._Action] = collections.deque() 77 78 def reset(self) -> None: 79 self.packet_count = 0 80 self._actions.clear() 81 82 def keep(self, count: int) -> None: 83 """Keeps the next count packets.""" 84 self._actions.extend(ManualPacketFilter._KEEP for _ in range(count)) 85 86 def drop(self, count: int) -> None: 87 """Drops the next count packets.""" 88 self._actions.extend(ManualPacketFilter._DROP for _ in range(count)) 89 90 def drop_every(self, every: int) -> None: 91 """Drops every Nth packet forever.""" 92 self._actions.append(lambda count: (count % every != 0, True)) 93 94 def randomly_drop(self, one_in: int, gen: random.Random) -> None: 95 """Drops packets randomly forever.""" 96 self._actions.append(lambda _: (gen.randrange(one_in) != 0, True)) 97 98 def keep_packet(self) -> bool: 99 """Returns whether the provided packet should be kept or dropped.""" 100 self.packet_count += 1 101 102 if not self._actions: 103 return True 104 105 keep, repeat = self._actions[0](self.packet_count) 106 107 if not repeat: 108 self._actions.popleft() 109 110 return keep 111 112 def next_packet_duplicated(self) -> bool: 113 return False 114 115 def next_packet_out_of_order(self) -> bool: 116 return False 117 118 def next_packet_delayed(self) -> bool: 119 return False 120 121 def next_packet_dropped(self) -> bool: 122 return not self.keep_packet() 123 124 def next_packet_delay(self) -> int: 125 return 0 126 127 def next_num_dupes(self) -> int: 128 return 0 129 130 def choose_out_of_order_packet(self, max_idx) -> int: 131 return 0 132 133 134class RandomLossGenerator(LossController): 135 """Parametrized random number generator that drives a LossyChannel.""" 136 137 def __init__( 138 self, 139 duplicated_packet_probability: float, 140 max_duplications_per_packet: int, 141 out_of_order_probability: float, 142 delayed_packet_probability: float, 143 delayed_packet_range_ms: tuple[int, int], 144 dropped_packet_probability: float, 145 seed: int | None = None, 146 ): 147 self.duplicated_packet_probability = duplicated_packet_probability 148 self.max_duplications_per_packet = max_duplications_per_packet 149 self.out_of_order_probability = out_of_order_probability 150 self.delayed_packet_probability = delayed_packet_probability 151 self.delayed_packet_range_ms = delayed_packet_range_ms 152 self.dropped_packet_probability = dropped_packet_probability 153 self._rng = random.Random(seed) 154 155 def next_packet_duplicated(self) -> bool: 156 return self.duplicated_packet_probability > self._rng.uniform(0.0, 1.0) 157 158 def next_packet_out_of_order(self) -> bool: 159 return self.out_of_order_probability > self._rng.uniform(0.0, 1.0) 160 161 def next_packet_delayed(self) -> bool: 162 return self.delayed_packet_probability > self._rng.uniform(0.0, 1.0) 163 164 def next_packet_dropped(self) -> bool: 165 return self.dropped_packet_probability > self._rng.uniform(0.0, 1.0) 166 167 def next_packet_delay(self) -> int: 168 return self._rng.randint(*self.delayed_packet_range_ms) 169 170 def next_num_dupes(self) -> int: 171 return self._rng.randint(1, self.max_duplications_per_packet) 172 173 def choose_out_of_order_packet(self, max_idx) -> int: 174 return self._rng.randint(0, max_idx) 175 176 177class LossyChannel(pw_rpc.ChannelManipulator): 178 """Introduces lossy behaviors into a channel.""" 179 180 class _Packet: 181 """Container class to keep track of incoming packet sequence number.""" 182 183 def __init__(self, sequence_number: int, payload: bytes): 184 self.sequence_number = sequence_number 185 self.payload = payload 186 187 def __init__( 188 self, name, loss_generator: LossController, max_num_old_packets=24 189 ): 190 super().__init__() 191 self.name = name 192 self._packets: Deque[LossyChannel._Packet] = collections.deque() 193 self._old_packets: Deque[LossyChannel._Packet] = collections.deque() 194 self._max_old_packet_window_size = max_num_old_packets 195 self.unique_packet_count = 0 196 self._rng = loss_generator 197 198 def _enqueue_old_packet(self, packet: _Packet): 199 if len(self._old_packets) >= self._max_old_packet_window_size: 200 self._old_packets.popleft() 201 self._old_packets.append(packet) 202 203 def _enqueue_packet(self, payload: bytes): 204 # Generate duplicate packets on ingress. 205 packet = self._Packet(self.unique_packet_count, payload) 206 self.unique_packet_count += 1 207 208 self._packets.append(packet) 209 self._enqueue_old_packet(packet) 210 if self._rng.next_packet_duplicated(): 211 num_dupes = self._rng.next_num_dupes() 212 _LOG.debug('[%s] Duplicating packet %d times', self.name, num_dupes) 213 for _ in range(num_dupes): 214 self._packets.append(packet) 215 216 def _send_packets(self): 217 while self._packets: 218 packet = None 219 220 if self._rng.next_packet_out_of_order(): 221 idx = self._rng.choose_out_of_order_packet( 222 len(self._old_packets) - 1 223 ) 224 _LOG.debug( 225 '[%s] Selecting out of order packet at index %d', 226 self.name, 227 idx, 228 ) 229 packet = copy.copy(self._old_packets[idx]) 230 del self._old_packets[idx] 231 else: 232 packet = self._packets.popleft() 233 234 if self._rng.next_packet_delayed(): 235 delay = self._rng.next_packet_delay() 236 _LOG.debug('[%s] Delaying channel by %d ms', self.name, delay) 237 time.sleep(delay / 1000) 238 239 action_msg = 'Dropped' 240 if not self._rng.next_packet_dropped(): 241 action_msg = 'Sent' 242 self.send_packet(packet.payload) 243 _LOG.debug( 244 '[%s] %s packet #%d: %s', 245 self.name, 246 action_msg, 247 packet.sequence_number, 248 str(packet.payload), 249 ) 250 251 def process_and_send(self, packet: bytes): 252 self._enqueue_packet(packet) 253 self._send_packets() 254