1#!/usr/bin/env python3 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5from bcc import BPF 6import os 7import sys 8from unittest import main, TestCase 9 10class TestKprobeMaxactive(TestCase): 11 def setUp(self): 12 self.b = BPF(text=b""" 13 typedef struct { int idx; } Key; 14 typedef struct { u64 val; } Val; 15 BPF_HASH(stats, Key, Val, 3); 16 int hello(void *ctx) { 17 Val *val = stats.lookup_or_init(&(Key){1}, &(Val){0}); 18 val->val++; 19 return 0; 20 } 21 int goodbye(void *ctx) { 22 Val *val = stats.lookup_or_init(&(Key){2}, &(Val){0}); 23 val->val++; 24 return 0; 25 } 26 """) 27 self.b.attach_kprobe(event_re=self.b.get_syscall_prefix() + b"bpf", 28 fn_name=b"hello") 29 self.b.attach_kretprobe(event_re=self.b.get_syscall_prefix() + b"bpf", 30 fn_name=b"goodbye", maxactive=128) 31 32 def test_send1(self): 33 k1 = self.b[b"stats"].Key(1) 34 k2 = self.b[b"stats"].Key(2) 35 self.assertTrue(self.b[b"stats"][k1].val >= 2) 36 self.assertTrue(self.b[b"stats"][k2].val == 1) 37 38if __name__ == "__main__": 39 main() 40