1#!/usr/bin/env python3 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5import ctypes as ct 6import os 7import unittest 8from bcc import BPF 9import multiprocessing 10 11class TestLru(unittest.TestCase): 12 def test_lru_hash(self): 13 b = BPF(text=b"""BPF_TABLE("lru_hash", int, u64, lru, 1024);""") 14 t = b[b"lru"] 15 for i in range(1, 1032): 16 t[ct.c_int(i)] = ct.c_ulonglong(i) 17 for i, v in t.items(): 18 self.assertEqual(v.value, i.value) 19 # BPF_MAP_TYPE_LRU_HASH eviction happens in batch and we expect less 20 # items than specified size. 21 self.assertLess(len(t), 1024); 22 23 def test_lru_percpu_hash(self): 24 test_prog1 = b""" 25 BPF_TABLE("lru_percpu_hash", u32, u32, stats, 1); 26 int hello_world(void *ctx) { 27 u32 key=0; 28 u32 value = 0, *val; 29 val = stats.lookup_or_try_init(&key, &value); 30 if (val) { 31 *val += 1; 32 } 33 return 0; 34 } 35 """ 36 b = BPF(text=test_prog1) 37 stats_map = b.get_table(b"stats") 38 event_name = b.get_syscall_fnname(b"clone") 39 b.attach_kprobe(event=event_name, fn_name=b"hello_world") 40 ini = stats_map.Leaf() 41 for i in range(0, multiprocessing.cpu_count()): 42 ini[i] = 0 43 # First initialize with key 1 44 stats_map[ stats_map.Key(1) ] = ini 45 # Then initialize with key 0 46 stats_map[ stats_map.Key(0) ] = ini 47 # Key 1 should have been evicted 48 with self.assertRaises(KeyError): 49 val = stats_map[ stats_map.Key(1) ] 50 f = os.popen("hostname") 51 f.close() 52 self.assertEqual(len(stats_map),1) 53 val = stats_map[ stats_map.Key(0) ] 54 sum = stats_map.sum(stats_map.Key(0)) 55 avg = stats_map.average(stats_map.Key(0)) 56 max = stats_map.max(stats_map.Key(0)) 57 self.assertGreater(sum.value, 0) 58 self.assertGreater(max.value, 0) 59 b.detach_kprobe(event_name) 60 61if __name__ == "__main__": 62 unittest.main() 63