xref: /aosp_15_r20/external/bcc/tests/python/test_call1.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1*387f9dfdSAndroid Build Coastguard Worker#!/usr/bin/env python3
2*387f9dfdSAndroid Build Coastguard Worker# Copyright (c) PLUMgrid, Inc.
3*387f9dfdSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License")
4*387f9dfdSAndroid Build Coastguard Worker
5*387f9dfdSAndroid Build Coastguard Workerfrom ctypes import c_ushort, c_int, c_ulonglong
6*387f9dfdSAndroid Build Coastguard Workerfrom netaddr import IPAddress
7*387f9dfdSAndroid Build Coastguard Workerfrom bcc import BPF
8*387f9dfdSAndroid Build Coastguard Workerfrom pyroute2 import IPRoute
9*387f9dfdSAndroid Build Coastguard Workerfrom socket import socket, AF_INET, SOCK_DGRAM
10*387f9dfdSAndroid Build Coastguard Workerimport sys
11*387f9dfdSAndroid Build Coastguard Workerfrom time import sleep
12*387f9dfdSAndroid Build Coastguard Workerfrom unittest import main, TestCase
13*387f9dfdSAndroid Build Coastguard Workerfrom utils import mayFail
14*387f9dfdSAndroid Build Coastguard Worker
15*387f9dfdSAndroid Build Coastguard Workerarg1 = sys.argv.pop(1)
16*387f9dfdSAndroid Build Coastguard Worker
17*387f9dfdSAndroid Build Coastguard WorkerS_EOP = 1
18*387f9dfdSAndroid Build Coastguard WorkerS_ETHER = 2
19*387f9dfdSAndroid Build Coastguard WorkerS_ARP = 3
20*387f9dfdSAndroid Build Coastguard WorkerS_IP = 4
21*387f9dfdSAndroid Build Coastguard Worker
22*387f9dfdSAndroid Build Coastguard Workerclass TestBPFSocket(TestCase):
23*387f9dfdSAndroid Build Coastguard Worker    def setUp(self):
24*387f9dfdSAndroid Build Coastguard Worker        b = BPF(src_file=arg1.encode(), debug=0)
25*387f9dfdSAndroid Build Coastguard Worker        ether_fn = b.load_func(b"parse_ether", BPF.SCHED_CLS)
26*387f9dfdSAndroid Build Coastguard Worker        arp_fn = b.load_func(b"parse_arp", BPF.SCHED_CLS)
27*387f9dfdSAndroid Build Coastguard Worker        ip_fn = b.load_func(b"parse_ip", BPF.SCHED_CLS)
28*387f9dfdSAndroid Build Coastguard Worker        eop_fn = b.load_func(b"eop", BPF.SCHED_CLS)
29*387f9dfdSAndroid Build Coastguard Worker        ip = IPRoute()
30*387f9dfdSAndroid Build Coastguard Worker        ifindex = ip.link_lookup(ifname=b"eth0")[0]
31*387f9dfdSAndroid Build Coastguard Worker        ip.tc("add", "sfq", ifindex, "1:")
32*387f9dfdSAndroid Build Coastguard Worker        ip.tc("add-filter", "bpf", ifindex, ":1", fd=ether_fn.fd,
33*387f9dfdSAndroid Build Coastguard Worker              name=ether_fn.name, parent="1:", action="ok", classid=1)
34*387f9dfdSAndroid Build Coastguard Worker        self.jump = b.get_table(b"jump", c_int, c_int)
35*387f9dfdSAndroid Build Coastguard Worker        self.jump[c_int(S_ARP)] = c_int(arp_fn.fd)
36*387f9dfdSAndroid Build Coastguard Worker        self.jump[c_int(S_IP)] = c_int(ip_fn.fd)
37*387f9dfdSAndroid Build Coastguard Worker        self.jump[c_int(S_EOP)] = c_int(eop_fn.fd)
38*387f9dfdSAndroid Build Coastguard Worker        self.stats = b.get_table(b"stats", c_int, c_ulonglong)
39*387f9dfdSAndroid Build Coastguard Worker
40*387f9dfdSAndroid Build Coastguard Worker    @mayFail("This may fail on github actions environment due to udp packet loss")
41*387f9dfdSAndroid Build Coastguard Worker    def test_jumps(self):
42*387f9dfdSAndroid Build Coastguard Worker        udp = socket(AF_INET, SOCK_DGRAM)
43*387f9dfdSAndroid Build Coastguard Worker        udp.sendto(b"a" * 10, ("172.16.1.1", 5000))
44*387f9dfdSAndroid Build Coastguard Worker        udp.close()
45*387f9dfdSAndroid Build Coastguard Worker        self.assertGreater(self.stats[c_int(S_IP)].value, 0)
46*387f9dfdSAndroid Build Coastguard Worker        self.assertGreater(self.stats[c_int(S_ARP)].value, 0)
47*387f9dfdSAndroid Build Coastguard Worker        self.assertGreater(self.stats[c_int(S_EOP)].value, 1)
48*387f9dfdSAndroid Build Coastguard Worker
49*387f9dfdSAndroid Build Coastguard Workerif __name__ == "__main__":
50*387f9dfdSAndroid Build Coastguard Worker    main()
51