xref: /aosp_15_r20/external/bcc/tests/python/test_lpm_trie.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1#!/usr/bin/env python3
2# Copyright (c) 2017 Facebook, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5import ctypes as ct
6import os
7from unittest import main, skipUnless, TestCase
8from utils import kernel_version_ge
9from bcc import BPF
10from netaddr import IPAddress
11
12class KeyV4(ct.Structure):
13    _fields_ = [("prefixlen", ct.c_uint),
14                ("data", ct.c_ubyte * 4)]
15
16class KeyV6(ct.Structure):
17    _fields_ = [("prefixlen", ct.c_uint),
18                ("data", ct.c_ushort * 8)]
19
20@skipUnless(kernel_version_ge(4, 11), "requires kernel >= 4.11")
21class TestLpmTrie(TestCase):
22    def test_lpm_trie_v4(self):
23        test_prog1 = b"""
24        struct key_v4 {
25            u32 prefixlen;
26            u32 data[4];
27        };
28        BPF_LPM_TRIE(trie, struct key_v4, int, 16);
29        """
30        b = BPF(text=test_prog1)
31        t = b[b"trie"]
32
33        k1 = KeyV4(24, (192, 168, 0, 0))
34        v1 = ct.c_int(24)
35        t[k1] = v1
36
37        k2 = KeyV4(28, (192, 168, 0, 0))
38        v2 = ct.c_int(28)
39        t[k2] = v2
40
41        k = KeyV4(32, (192, 168, 0, 15))
42        self.assertEqual(t[k].value, 28)
43
44        k = KeyV4(32, (192, 168, 0, 127))
45        self.assertEqual(t[k].value, 24)
46
47        with self.assertRaises(KeyError):
48            k = KeyV4(32, (172, 16, 1, 127))
49            v = t[k]
50
51    def test_lpm_trie_v6(self):
52        test_prog1 = b"""
53        struct key_v6 {
54            u32 prefixlen;
55            u32 data[4];
56        };
57        BPF_LPM_TRIE(trie, struct key_v6, int, 16);
58        """
59        b = BPF(text=test_prog1)
60        t = b[b"trie"]
61
62        k1 = KeyV6(64, IPAddress('2a00:1450:4001:814:200e::').words)
63        v1 = ct.c_int(64)
64        t[k1] = v1
65
66        k2 = KeyV6(96, IPAddress('2a00:1450:4001:814::200e').words)
67        v2 = ct.c_int(96)
68        t[k2] = v2
69
70        k = KeyV6(128, IPAddress('2a00:1450:4001:814::1024').words)
71        self.assertEqual(t[k].value, 96)
72
73        k = KeyV6(128, IPAddress('2a00:1450:4001:814:2046::').words)
74        self.assertEqual(t[k].value, 64)
75
76        with self.assertRaises(KeyError):
77            k = KeyV6(128, IPAddress('2a00:ffff::').words)
78            v = t[k]
79
80if __name__ == "__main__":
81    main()
82