xref: /aosp_15_r20/external/pigweed/pw_rpc/py/pw_rpc/lossy_channel.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""A channel adapter layer that introduces lossy behavior to a channel."""
15
16import abc
17import collections
18import copy
19import logging
20from typing import Callable, Deque
21import random
22import time
23
24import pw_rpc
25
26_LOG = logging.getLogger(__name__)
27
28
29# TODO(amontanez): The surface are of this API could be reduced significantly
30# with a few changes to LossyChannel.
31class LossController(abc.ABC):
32    """Interface for driving loss/corruption decisions of a LossyChannel."""
33
34    @abc.abstractmethod
35    def next_packet_duplicated(self) -> bool:
36        """Returns true if the next packet should be duplicated."""
37
38    @abc.abstractmethod
39    def next_packet_out_of_order(self) -> bool:
40        """Returns true if the next packet should be a re-ordered packet."""
41
42    @abc.abstractmethod
43    def next_packet_delayed(self) -> bool:
44        """Returns true if a delay should occur before sending a packet."""
45
46    @abc.abstractmethod
47    def next_packet_dropped(self) -> bool:
48        """Returns true if the next incoming packet should be dropped."""
49
50    @abc.abstractmethod
51    def next_packet_delay(self) -> int:
52        """The delay before sending the next packet, in milliseconds."""
53
54    @abc.abstractmethod
55    def next_num_dupes(self) -> int:
56        """Returns how many times the next packet should be duplicated."""
57
58    @abc.abstractmethod
59    def choose_out_of_order_packet(self, max_idx) -> int:
60        """Returns the index of the next reordered packet.
61
62        A return value of 0 represents the newest packet, while max_idx
63        represents the oldest packet.
64        """
65
66
67class ManualPacketFilter(LossController):
68    """Determines if a packet should be kept or dropped for testing purposes."""
69
70    _Action = Callable[[int], tuple[bool, bool]]
71    _KEEP = lambda _: (True, False)
72    _DROP = lambda _: (False, False)
73
74    def __init__(self) -> None:
75        self.packet_count = 0
76        self._actions: Deque[ManualPacketFilter._Action] = collections.deque()
77
78    def reset(self) -> None:
79        self.packet_count = 0
80        self._actions.clear()
81
82    def keep(self, count: int) -> None:
83        """Keeps the next count packets."""
84        self._actions.extend(ManualPacketFilter._KEEP for _ in range(count))
85
86    def drop(self, count: int) -> None:
87        """Drops the next count packets."""
88        self._actions.extend(ManualPacketFilter._DROP for _ in range(count))
89
90    def drop_every(self, every: int) -> None:
91        """Drops every Nth packet forever."""
92        self._actions.append(lambda count: (count % every != 0, True))
93
94    def randomly_drop(self, one_in: int, gen: random.Random) -> None:
95        """Drops packets randomly forever."""
96        self._actions.append(lambda _: (gen.randrange(one_in) != 0, True))
97
98    def keep_packet(self) -> bool:
99        """Returns whether the provided packet should be kept or dropped."""
100        self.packet_count += 1
101
102        if not self._actions:
103            return True
104
105        keep, repeat = self._actions[0](self.packet_count)
106
107        if not repeat:
108            self._actions.popleft()
109
110        return keep
111
112    def next_packet_duplicated(self) -> bool:
113        return False
114
115    def next_packet_out_of_order(self) -> bool:
116        return False
117
118    def next_packet_delayed(self) -> bool:
119        return False
120
121    def next_packet_dropped(self) -> bool:
122        return not self.keep_packet()
123
124    def next_packet_delay(self) -> int:
125        return 0
126
127    def next_num_dupes(self) -> int:
128        return 0
129
130    def choose_out_of_order_packet(self, max_idx) -> int:
131        return 0
132
133
134class RandomLossGenerator(LossController):
135    """Parametrized random number generator that drives a LossyChannel."""
136
137    def __init__(
138        self,
139        duplicated_packet_probability: float,
140        max_duplications_per_packet: int,
141        out_of_order_probability: float,
142        delayed_packet_probability: float,
143        delayed_packet_range_ms: tuple[int, int],
144        dropped_packet_probability: float,
145        seed: int | None = None,
146    ):
147        self.duplicated_packet_probability = duplicated_packet_probability
148        self.max_duplications_per_packet = max_duplications_per_packet
149        self.out_of_order_probability = out_of_order_probability
150        self.delayed_packet_probability = delayed_packet_probability
151        self.delayed_packet_range_ms = delayed_packet_range_ms
152        self.dropped_packet_probability = dropped_packet_probability
153        self._rng = random.Random(seed)
154
155    def next_packet_duplicated(self) -> bool:
156        return self.duplicated_packet_probability > self._rng.uniform(0.0, 1.0)
157
158    def next_packet_out_of_order(self) -> bool:
159        return self.out_of_order_probability > self._rng.uniform(0.0, 1.0)
160
161    def next_packet_delayed(self) -> bool:
162        return self.delayed_packet_probability > self._rng.uniform(0.0, 1.0)
163
164    def next_packet_dropped(self) -> bool:
165        return self.dropped_packet_probability > self._rng.uniform(0.0, 1.0)
166
167    def next_packet_delay(self) -> int:
168        return self._rng.randint(*self.delayed_packet_range_ms)
169
170    def next_num_dupes(self) -> int:
171        return self._rng.randint(1, self.max_duplications_per_packet)
172
173    def choose_out_of_order_packet(self, max_idx) -> int:
174        return self._rng.randint(0, max_idx)
175
176
177class LossyChannel(pw_rpc.ChannelManipulator):
178    """Introduces lossy behaviors into a channel."""
179
180    class _Packet:
181        """Container class to keep track of incoming packet sequence number."""
182
183        def __init__(self, sequence_number: int, payload: bytes):
184            self.sequence_number = sequence_number
185            self.payload = payload
186
187    def __init__(
188        self, name, loss_generator: LossController, max_num_old_packets=24
189    ):
190        super().__init__()
191        self.name = name
192        self._packets: Deque[LossyChannel._Packet] = collections.deque()
193        self._old_packets: Deque[LossyChannel._Packet] = collections.deque()
194        self._max_old_packet_window_size = max_num_old_packets
195        self.unique_packet_count = 0
196        self._rng = loss_generator
197
198    def _enqueue_old_packet(self, packet: _Packet):
199        if len(self._old_packets) >= self._max_old_packet_window_size:
200            self._old_packets.popleft()
201        self._old_packets.append(packet)
202
203    def _enqueue_packet(self, payload: bytes):
204        # Generate duplicate packets on ingress.
205        packet = self._Packet(self.unique_packet_count, payload)
206        self.unique_packet_count += 1
207
208        self._packets.append(packet)
209        self._enqueue_old_packet(packet)
210        if self._rng.next_packet_duplicated():
211            num_dupes = self._rng.next_num_dupes()
212            _LOG.debug('[%s] Duplicating packet %d times', self.name, num_dupes)
213            for _ in range(num_dupes):
214                self._packets.append(packet)
215
216    def _send_packets(self):
217        while self._packets:
218            packet = None
219
220            if self._rng.next_packet_out_of_order():
221                idx = self._rng.choose_out_of_order_packet(
222                    len(self._old_packets) - 1
223                )
224                _LOG.debug(
225                    '[%s] Selecting out of order packet at index %d',
226                    self.name,
227                    idx,
228                )
229                packet = copy.copy(self._old_packets[idx])
230                del self._old_packets[idx]
231            else:
232                packet = self._packets.popleft()
233
234            if self._rng.next_packet_delayed():
235                delay = self._rng.next_packet_delay()
236                _LOG.debug('[%s] Delaying channel by %d ms', self.name, delay)
237                time.sleep(delay / 1000)
238
239            action_msg = 'Dropped'
240            if not self._rng.next_packet_dropped():
241                action_msg = 'Sent'
242                self.send_packet(packet.payload)
243            _LOG.debug(
244                '[%s] %s packet #%d: %s',
245                self.name,
246                action_msg,
247                packet.sequence_number,
248                str(packet.payload),
249            )
250
251    def process_and_send(self, packet: bytes):
252        self._enqueue_packet(packet)
253        self._send_packets()
254