xref: /aosp_15_r20/external/autotest/server/cros/network/iperf_runner.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2021 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import os.path
7import logging
8import time
9import math
10import numbers
11import numpy
12from enum import IntEnum
13
14from autotest_lib.client.common_lib import error
15from autotest_lib.client.common_lib.cros import path_utils
16
17
18class IperfResult(object):
19    """Logic for parsing and representing iperf results."""
20
21    @staticmethod
22    def from_iperf_output(results, config):
23        """Parse the text output of iperf and return an IperfResult.
24
25        @param results string raw results from iperf.
26        @param config IperfConfig the config for the test.
27
28        @return IperfResult result.
29
30        """
31
32        class IperfIndex(IntEnum):
33            """Defines the indices of certain values in iperf output."""
34            LOG_ID_INDEX = 5
35            INTERVAL_INDEX = 6
36            DATA_TRANSFERED_INDEX = 7
37            PERCENT_LOSS_INDEX = 12
38
39        NUM_FIELDS_IN_SERVER_OUTPUT = 14
40
41        lines = results.splitlines()
42        total_throughput = 0
43        test_durations = []
44        percent_losses = []
45        for line in lines:
46            fields = line.split(',')
47            # Negative Log ID values are used for sum total output which we
48            # don't use.
49            if float(fields[IperfIndex.LOG_ID_INDEX]) < 0:
50                continue
51            # Filter out client side logs from UDP results. We only want server
52            # side results because they reflect the amount of data that was
53            # actually received by the server. Server output has 14 fields while
54            # client output has 9 fields, so we use this to differentiate.
55            # Ideally we'd use the '-x D' option to filter the client side data,
56            # but this option makes the iperf output unreliable.
57            if config.udp and len(fields) < NUM_FIELDS_IN_SERVER_OUTPUT:
58                continue
59            total_data_bytes = float(fields[IperfIndex.DATA_TRANSFERED_INDEX])
60            test_interval = fields[IperfIndex.INTERVAL_INDEX]
61            test_start_end = test_interval.split('-')
62            duration = float(test_start_end[1]) - float(test_start_end[0])
63            test_durations.append(duration)
64            total_throughput += IperfResult._calculate_throughput(
65                    total_data_bytes, duration)
66            if (config.udp):
67                percent_losses.append(
68                        float(fields[IperfIndex.PERCENT_LOSS_INDEX]))
69
70        # We should get one line of output for each port used in the test. In
71        # rare cases, the data from one of the ports is not included in the
72        # iperf output, so discard results in these cases.
73        expected_num_output_lines = config.num_ports
74        if config.bidirectional:
75            expected_num_output_lines *= 2
76        if len(test_durations) != expected_num_output_lines:
77            logging.info(
78                    'iperf command output was missing some data, ignoring test run.'
79            )
80            return None
81
82        test_duration = math.fsum(test_durations) / len(test_durations)
83        if config.udp:
84            percent_loss = math.fsum(percent_losses) / len(percent_losses)
85        else:
86            percent_loss = None
87        return IperfResult(test_duration, total_throughput, percent_loss)
88
89    @staticmethod
90    def from_samples(samples):
91        """Build an averaged IperfResult from |samples|.
92
93        Calculate an representative sample with averaged values
94        and standard deviation of throughput from samples.
95
96        @param samples list of IperfResult objects.
97        @return IperfResult object.
98
99        """
100        if len(samples) == 0:
101            return None
102        duration_samples = [float(sample.duration) for sample in samples]
103        duration_mean = numpy.mean(duration_samples)
104
105        throughput_samples = [float(sample.throughput) for sample in samples]
106        throughput_mean = numpy.mean(throughput_samples)
107        throughput_dev = numpy.std(throughput_samples)
108
109        # For TCP connections, the packet loss is 0 by definition. In these
110        # cases, the percent_loss will be None for all samples, and UDP results
111        # should never have a percent_loss of None, so we can just check the
112        # first sample.
113        if samples[0].percent_loss == None:
114            percent_loss_mean = None
115        else:
116            percent_loss_samples = [
117                    float(sample.percent_loss) for sample in samples
118            ]
119            percent_loss_mean = numpy.mean(percent_loss_samples)
120
121        return IperfResult(duration_mean,
122                           throughput_mean,
123                           percent_loss_mean,
124                           throughput_dev=throughput_dev)
125
126    def throughput_cv_less_than_maximum(self, max_cv):
127        """Check that the throughput from this result is "accurate" enough.
128
129        We say that an IperfResult is "accurate" enough when the coefficient of
130        variance (standard deviation / mean) is below the passed in fraction.
131
132        @param fraction float maximum coefficient of variance for the
133        throughput sample.
134        @return True on above condition.
135
136        """
137        if self.throughput is None or self.throughput_dev is None:
138            return True
139
140        if not self.throughput_dev and not self.throughput:
141            # 0/0 is undefined, but take this to be good for our purposes.
142            return True
143
144        if self.throughput_dev and not self.throughput:
145            # Deviation is non-zero, but the average is 0.  Deviation
146            # as a fraction of the self.throughput is undefined but in theory
147            # a "very large number."
148            return False
149
150        if self.throughput_dev / self.throughput > max_cv:
151            return False
152
153        return True
154
155    @staticmethod
156    def _calculate_throughput(total_data, duration):
157        """Calculate the throughput from the total bytes transeferred and the
158        duration of the test.
159
160        @param total_data int The number of bytes transferred during the test.
161        @param duration float The duration of the test in seconds.
162
163        @return float The throughput of the test in Mbps.
164        """
165        if duration == 0:
166            return 0
167        total_bits = total_data * 8
168        bits_per_second = total_bits / duration
169        return bits_per_second / 1000000
170
171    def __init__(self, duration, throughput, percent_loss,
172                 throughput_dev=None):
173        """Construct an IperfResult.
174
175        @param duration float how long the test took in seconds.
176        @param throughput float test throughput in Mbps.
177        @param percent_loss float percentage of packets lost in UDP transfer.
178        @param throughput_dev standard deviation of throughputs.
179        """
180        self.duration = duration
181        self.throughput = throughput
182        self.percent_loss = percent_loss
183        self.throughput_dev = throughput_dev
184
185    def get_keyval(self, prefix=''):
186        ret = {}
187        if prefix:
188            prefix = prefix + '_'
189        if self.throughput_dev is None:
190            margin = ''
191        else:
192            margin = '+-%0.2f' % self.throughput_dev
193        if self.throughput is not None:
194            ret[prefix + 'throughput'] = '%0.2f%s' % (self.throughput, margin)
195        return ret
196
197    def __repr__(self):
198        fields = []
199        fields += [
200                '%s=%0.2f' % item for item in list(vars(self).items())
201                if item[1] is not None and isinstance(item[1], numbers.Number)
202        ]
203        return '%s(%s)' % (self.__class__.__name__, ', '.join(fields))
204
205
206class IperfConfig(object):
207    """ Defines the configuration for an iperf run. """
208    DEFAULT_TEST_TIME = 10
209    DEFAULT_MAX_BANDWIDTH = '10000M'
210    DEFAULT_NUM_PORTS = 4
211
212    IPERF_TEST_TYPE_TCP_TX = 'tcp_tx'
213    IPERF_TEST_TYPE_TCP_RX = 'tcp_rx'
214    IPERF_TEST_TYPE_TCP_BIDIRECTIONAL = 'tcp_bidirectional'
215    IPERF_TEST_TYPE_UDP_TX = 'udp_tx'
216    IPERF_TEST_TYPE_UDP_RX = 'udp_rx'
217    IPERF_TEST_TYPE_UDP_BIDIRECTIONAL = 'udp_bidirectional'
218
219    def __init__(self,
220                 test_type,
221                 max_bandwidth=DEFAULT_MAX_BANDWIDTH,
222                 test_time=DEFAULT_TEST_TIME,
223                 num_ports=DEFAULT_NUM_PORTS):
224        """ Construct an IperfConfig.
225
226        @param test_type string, PerfTestTypes test type.
227        @param max_bandwidth string maximum bandwidth to be used during the test
228        e.x. 100M (100 Mbps).
229        @param test_time int number of seconds to run the test for.
230        @param num_ports int number of ports use in the test.
231        """
232
233        if test_type == IperfConfig.IPERF_TEST_TYPE_TCP_TX:
234            self.udp = False
235            self.bidirectional = False
236        elif test_type == IperfConfig.IPERF_TEST_TYPE_TCP_RX:
237            self.udp = False
238            self.bidirectional = False
239        elif test_type == IperfConfig.IPERF_TEST_TYPE_TCP_BIDIRECTIONAL:
240            self.udp = False
241            self.bidirectional = True
242        elif test_type == IperfConfig.IPERF_TEST_TYPE_UDP_TX:
243            self.udp = True
244            self.bidirectional = False
245        elif test_type == IperfConfig.IPERF_TEST_TYPE_UDP_RX:
246            self.udp = True
247            self.bidirectional = False
248        elif test_type == IperfConfig.IPERF_TEST_TYPE_UDP_BIDIRECTIONAL:
249            self.udp = True
250            self.bidirectional = True
251        else:
252            raise error.TestFail(
253                    'Test type %s is not supported by iperf_runner.' %
254                    test_type)
255        self.max_bandwidth = max_bandwidth
256        self.test_time = test_time
257        self.num_ports = num_ports
258        self.test_type = test_type
259
260
261class IperfRunner(object):
262    """Delegate to run iperf on a client/server pair."""
263
264    DEFAULT_TEST_TIME = 10
265    IPERF_SERVER_MAX_STARTUP_WAIT_TIME = 11
266    IPERF_CLIENT_TURNDOWN_WAIT_TIME = 1
267    IPERF_COMMAND_TIMEOUT_MARGIN = 20
268
269    def __init__(
270            self,
271            client_proxy,
272            server_proxy,
273            config,
274            client_interface=None,
275            server_interface=None,
276    ):
277        """Construct an IperfRunner. Use the IP addresses of the passed
278        interfaces if they are provided. Otherwise, attempt to use the WiFi
279        interface on the devices.
280
281        @param client LinuxSystem object.
282        @param server LinuxSystem object.
283        @param client_interface Interface object.
284        @param server_interface Interface object.
285
286        """
287        self._client_proxy = client_proxy
288        self._server_proxy = server_proxy
289        self._server_host = server_proxy.host
290        self._client_host = client_proxy.host
291        if server_interface:
292            self._server_ip = server_interface.ipv4_address
293        # If a server interface was not explicitly provided, attempt to use
294        # the WiFi IP of the device.
295        else:
296            try:
297                self._server_ip = server_proxy.wifi_ip
298            except:
299                raise error.TestFail('Server device has no WiFi IP address, '\
300                    'and no alternate interface was specified.')
301
302        if client_interface:
303            self._client_ip = client_interface.ipv4_address
304        # If a client interface was not explicitly provided, use the WiFi IP
305        # address of the WiFiClient device.
306        else:
307            try:
308                self._client_ip = client_proxy.wifi_ip
309            except:
310                raise error.TestFail('Client device has no WiFi IP address, '\
311                    'and no alternate interface was specified.')
312
313        # Assume minijail0 is on ${PATH}, but raise exception if it's not
314        # available on both server and client.
315        self._minijail = 'minijail0'
316        path_utils.must_be_installed(self._minijail, host=self._server_host)
317        path_utils.must_be_installed(self._minijail, host=self._client_host)
318        # Bind mount a tmpfs over /tmp, since netserver hard-codes the /tmp
319        # path. netserver's log files aren't useful anyway.
320        self._minijail = ("%s -v -k 'tmpfs,/tmp,tmpfs,"
321                          "MS_NODEV|MS_NOEXEC|MS_NOSUID,mode=755,size=10M'" %
322                          self._minijail)
323
324        self._config = config
325        self._command_iperf_server = path_utils.must_be_installed(
326                'iperf', host=self._server_host)
327        self._command_iperf_client = path_utils.must_be_installed(
328                'iperf', host=self._client_host)
329        self._udp_flag = '-u' if config.udp else ''
330        self._bidirectional_flag = '-d' if config.bidirectional else ''
331
332    def __enter__(self):
333        self._restart_iperf_server()
334        return self
335
336    def __exit__(self, exc_type, exc_value, traceback):
337        self._client_proxy.firewall_cleanup()
338        self._server_proxy.firewall_cleanup()
339        self._kill_iperf_server()
340        self._kill_iperf_client()
341
342    def _kill_iperf_client(self):
343        """Kills any existing iperf process on the client."""
344        self._client_host.run('pkill -9 %s' %
345                              os.path.basename(self._command_iperf_client),
346                              ignore_status=True)
347
348    def _kill_iperf_server(self):
349        """Kills any existing iperf process on the serving host."""
350        self._server_host.run('pkill -9 %s' %
351                              os.path.basename(self._command_iperf_server),
352                              ignore_status=True)
353
354    def _restart_iperf_server(self):
355        """Start an iperf server on the server device. Also opens up firewalls
356        on the test devices.
357        """
358        logging.info('Starting iperf server...')
359        self._kill_iperf_server()
360        logging.debug('iperf server invocation: %s %s -s -B %s -D %s -w 320k',
361                      self._minijail, self._command_iperf_server,
362                      self._server_ip, self._udp_flag)
363        devnull = open(os.devnull, "w")
364        # 320kB is the maximum socket buffer size on Gale (default is 208kB).
365        self._server_host.run('%s %s -s -B %s -D %s -w 320k' %
366                              (self._minijail, self._command_iperf_server,
367                               self._server_ip, self._udp_flag),
368                              stderr_tee=devnull)
369        startup_time = time.time()
370        # Ensure the endpoints aren't firewalled.
371        protocol = 'udp' if self._config.udp else 'tcp'
372        self._client_proxy.firewall_open(protocol, self._server_ip)
373        self._server_proxy.firewall_open(protocol, self._client_ip)
374
375        # Run a client iperf test. The client will attempt to connect to the
376        # server for 10 seconds, but will exit early if it succeeds before
377        # that. This ensures that the server has had suffiecient time to come
378        # online before we begin the tests. We don't fail on timeout here
379        # because the logic for failed connections is contained in run().
380        iperf_test = '%s -c %s -B %s -t 1 %s' % (
381                self._command_iperf_client, self._server_ip, self._client_ip,
382                self._udp_flag)
383        result = self._client_host.run(
384                iperf_test,
385                ignore_status=True,
386                ignore_timeout=True,
387                timeout=self.IPERF_SERVER_MAX_STARTUP_WAIT_TIME)
388        if not result or result.exit_status:
389            logging.debug(
390                    'Failed to make a connection to the server in %s seconds.',
391                    self.IPERF_SERVER_MAX_STARTUP_WAIT_TIME)
392        else:
393            logging.debug('Successfully made a connection to the server.')
394        # TODO(b:198343041) When iperf2 clients are run too quickly back to
395        # back, the server is unable to distinguish between them. Wait briefly
396        # to allow the server to reset.
397        time.sleep(self.IPERF_CLIENT_TURNDOWN_WAIT_TIME)
398
399    def run(self, ignore_failures=False, retry_count=3):
400        """Run iperf and take a performance measurement.
401
402        @param ignore_failures bool True iff iperf runs that fail should be
403                ignored.  If this happens, run will return a None value rather
404                than an IperfResult.
405        @param retry_count int number of times to retry the iperf command if
406                it fails due to an internal timeout within iperf.
407        @return IperfResult summarizing an iperf run.
408
409        """
410        iperf_client = '%s -c %s -B %s -b %s -x C -y c -P 4 -t %s %s %s' % (
411                self._command_iperf_client, self._server_ip, self._client_ip,
412                self._config.max_bandwidth, self._config.test_time,
413                self._udp_flag, self._bidirectional_flag)
414
415        logging.info('Running iperf client for %d seconds.',
416                     self._config.test_time)
417        logging.debug('iperf client invocation: %s', iperf_client)
418        timeout = self._config.test_time + self.IPERF_COMMAND_TIMEOUT_MARGIN
419
420        for _ in range(retry_count):
421            result = self._client_host.run(iperf_client,
422                                           ignore_status=True,
423                                           ignore_timeout=ignore_failures,
424                                           timeout=timeout)
425            if not result:
426                logging.info('Retrying iperf after empty result.')
427                continue
428
429            # Exit retry loop on success.
430            if not result.exit_status:
431                break
432
433            # We are in an unhandled error case.
434            logging.info('Retrying iperf after an unknown error.')
435
436        if ignore_failures and (result is None or result.exit_status):
437            return None
438
439        if result is None:
440            raise error.TestFail("No results; cmd: %s", iperf_client)
441
442        if result.exit_status:
443            raise error.CmdError(iperf_client, result,
444                                 "Command returned non-zero exit status")
445        # TODO(b:198343041) When iperf2 clients are run too quickly back to
446        # back, the server is unable to distinguish between them. Wait briefly
447        # to allow the server to reset.
448        time.sleep(self.IPERF_CLIENT_TURNDOWN_WAIT_TIME)
449        return IperfResult.from_iperf_output(result.stdout, self._config)
450