xref: /aosp_15_r20/external/bcc/tests/python/test_xlate1.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1*387f9dfdSAndroid Build Coastguard Worker#!/usr/bin/env python3
2*387f9dfdSAndroid Build Coastguard Worker# Copyright (c) PLUMgrid, Inc.
3*387f9dfdSAndroid Build Coastguard Worker# Licensed under the Apache License, Version 2.0 (the "License")
4*387f9dfdSAndroid Build Coastguard Worker
5*387f9dfdSAndroid Build Coastguard Workerfrom netaddr import IPAddress
6*387f9dfdSAndroid Build Coastguard Workerfrom bcc import BPF
7*387f9dfdSAndroid Build Coastguard Workerfrom pyroute2 import IPRoute, protocols
8*387f9dfdSAndroid Build Coastguard Workerfrom socket import socket, AF_INET, SOCK_DGRAM
9*387f9dfdSAndroid Build Coastguard Workerfrom subprocess import call
10*387f9dfdSAndroid Build Coastguard Workerimport sys
11*387f9dfdSAndroid Build Coastguard Workerfrom time import sleep
12*387f9dfdSAndroid Build Coastguard Workerfrom unittest import main, TestCase
13*387f9dfdSAndroid Build Coastguard Worker
14*387f9dfdSAndroid Build Coastguard Workerarg1 = sys.argv.pop(1).encode()
15*387f9dfdSAndroid Build Coastguard Workerarg2 = "".encode()
16*387f9dfdSAndroid Build Coastguard Workerif len(sys.argv) > 1:
17*387f9dfdSAndroid Build Coastguard Worker  arg2 = sys.argv.pop(1)
18*387f9dfdSAndroid Build Coastguard Worker
19*387f9dfdSAndroid Build Coastguard Workerclass TestBPFFilter(TestCase):
20*387f9dfdSAndroid Build Coastguard Worker    def setUp(self):
21*387f9dfdSAndroid Build Coastguard Worker        b = BPF(arg1, arg2, debug=0)
22*387f9dfdSAndroid Build Coastguard Worker        fn = b.load_func(b"on_packet", BPF.SCHED_ACT)
23*387f9dfdSAndroid Build Coastguard Worker        ip = IPRoute()
24*387f9dfdSAndroid Build Coastguard Worker        ifindex = ip.link_lookup(ifname=b"eth0")[0]
25*387f9dfdSAndroid Build Coastguard Worker        # set up a network to change the flow:
26*387f9dfdSAndroid Build Coastguard Worker        #             outside      |       inside
27*387f9dfdSAndroid Build Coastguard Worker        # 172.16.1.1 - 172.16.1.2  |  192.168.1.1 - 192.16.1.2
28*387f9dfdSAndroid Build Coastguard Worker        ip.addr("del", index=ifindex, address="172.16.1.2", mask=24)
29*387f9dfdSAndroid Build Coastguard Worker        ip.addr("add", index=ifindex, address="192.168.1.2", mask=24)
30*387f9dfdSAndroid Build Coastguard Worker        # add an ingress and egress qdisc
31*387f9dfdSAndroid Build Coastguard Worker        ip.tc("add", "ingress", ifindex, "ffff:")
32*387f9dfdSAndroid Build Coastguard Worker        ip.tc("add", "sfq", ifindex, "1:")
33*387f9dfdSAndroid Build Coastguard Worker        # add same program to both ingress/egress, so pkt is translated in both directions
34*387f9dfdSAndroid Build Coastguard Worker        action = {"kind": "bpf", "fd": fn.fd, "name": fn.name, "action": "ok"}
35*387f9dfdSAndroid Build Coastguard Worker        ip.tc("add-filter", "u32", ifindex, ":1", parent="ffff:", action=[action],
36*387f9dfdSAndroid Build Coastguard Worker                protocol=protocols.ETH_P_ALL, classid=1, target=0x10002, keys=['0x0/0x0+0'])
37*387f9dfdSAndroid Build Coastguard Worker        ip.tc("add-filter", "u32", ifindex, ":2", parent="1:", action=[action],
38*387f9dfdSAndroid Build Coastguard Worker                protocol=protocols.ETH_P_ALL, classid=1, target=0x10002, keys=['0x0/0x0+0'])
39*387f9dfdSAndroid Build Coastguard Worker        self.xlate = b.get_table(b"xlate")
40*387f9dfdSAndroid Build Coastguard Worker
41*387f9dfdSAndroid Build Coastguard Worker    def test_xlate(self):
42*387f9dfdSAndroid Build Coastguard Worker        key1 = self.xlate.Key(IPAddress("172.16.1.2").value, IPAddress("172.16.1.1").value)
43*387f9dfdSAndroid Build Coastguard Worker        leaf1 = self.xlate.Leaf(IPAddress("192.168.1.2").value, IPAddress("192.168.1.1").value, 0, 0)
44*387f9dfdSAndroid Build Coastguard Worker        self.xlate[key1] = leaf1
45*387f9dfdSAndroid Build Coastguard Worker        key2 = self.xlate.Key(IPAddress("192.168.1.1").value, IPAddress("192.168.1.2").value)
46*387f9dfdSAndroid Build Coastguard Worker        leaf2 = self.xlate.Leaf(IPAddress("172.16.1.1").value, IPAddress("172.16.1.2").value, 0, 0)
47*387f9dfdSAndroid Build Coastguard Worker        self.xlate[key2] = leaf2
48*387f9dfdSAndroid Build Coastguard Worker        call(["ping", "-c1", "192.168.1.1"])
49*387f9dfdSAndroid Build Coastguard Worker        leaf = self.xlate[key1]
50*387f9dfdSAndroid Build Coastguard Worker        self.assertGreater(leaf.ip_xlated_pkts, 0)
51*387f9dfdSAndroid Build Coastguard Worker        self.assertGreater(leaf.arp_xlated_pkts, 0)
52*387f9dfdSAndroid Build Coastguard Worker        leaf = self.xlate[key2]
53*387f9dfdSAndroid Build Coastguard Worker        self.assertGreater(leaf.ip_xlated_pkts, 0)
54*387f9dfdSAndroid Build Coastguard Worker        self.assertGreater(leaf.arp_xlated_pkts, 0)
55*387f9dfdSAndroid Build Coastguard Worker
56*387f9dfdSAndroid Build Coastguard Workerif __name__ == "__main__":
57*387f9dfdSAndroid Build Coastguard Worker    main()
58