1#!/usr/bin/env python3 2# Copyright (c) 2017 Facebook, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5import ctypes as ct 6import os 7from unittest import main, skipUnless, TestCase 8from utils import kernel_version_ge 9from bcc import BPF 10from netaddr import IPAddress 11 12class KeyV4(ct.Structure): 13 _fields_ = [("prefixlen", ct.c_uint), 14 ("data", ct.c_ubyte * 4)] 15 16class KeyV6(ct.Structure): 17 _fields_ = [("prefixlen", ct.c_uint), 18 ("data", ct.c_ushort * 8)] 19 20@skipUnless(kernel_version_ge(4, 11), "requires kernel >= 4.11") 21class TestLpmTrie(TestCase): 22 def test_lpm_trie_v4(self): 23 test_prog1 = b""" 24 struct key_v4 { 25 u32 prefixlen; 26 u32 data[4]; 27 }; 28 BPF_LPM_TRIE(trie, struct key_v4, int, 16); 29 """ 30 b = BPF(text=test_prog1) 31 t = b[b"trie"] 32 33 k1 = KeyV4(24, (192, 168, 0, 0)) 34 v1 = ct.c_int(24) 35 t[k1] = v1 36 37 k2 = KeyV4(28, (192, 168, 0, 0)) 38 v2 = ct.c_int(28) 39 t[k2] = v2 40 41 k = KeyV4(32, (192, 168, 0, 15)) 42 self.assertEqual(t[k].value, 28) 43 44 k = KeyV4(32, (192, 168, 0, 127)) 45 self.assertEqual(t[k].value, 24) 46 47 with self.assertRaises(KeyError): 48 k = KeyV4(32, (172, 16, 1, 127)) 49 v = t[k] 50 51 def test_lpm_trie_v6(self): 52 test_prog1 = b""" 53 struct key_v6 { 54 u32 prefixlen; 55 u32 data[4]; 56 }; 57 BPF_LPM_TRIE(trie, struct key_v6, int, 16); 58 """ 59 b = BPF(text=test_prog1) 60 t = b[b"trie"] 61 62 k1 = KeyV6(64, IPAddress('2a00:1450:4001:814:200e::').words) 63 v1 = ct.c_int(64) 64 t[k1] = v1 65 66 k2 = KeyV6(96, IPAddress('2a00:1450:4001:814::200e').words) 67 v2 = ct.c_int(96) 68 t[k2] = v2 69 70 k = KeyV6(128, IPAddress('2a00:1450:4001:814::1024').words) 71 self.assertEqual(t[k].value, 96) 72 73 k = KeyV6(128, IPAddress('2a00:1450:4001:814:2046::').words) 74 self.assertEqual(t[k].value, 64) 75 76 with self.assertRaises(KeyError): 77 k = KeyV6(128, IPAddress('2a00:ffff::').words) 78 v = t[k] 79 80if __name__ == "__main__": 81 main() 82