xref: /aosp_15_r20/external/bcc/tests/python/test_percpu.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1#!/usr/bin/env python3
2# Copyright (c) PLUMgrid, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5import os
6import unittest
7from bcc import BPF
8import multiprocessing
9
10MONITORED_SYSCALL=b"execve"
11
12class TestPercpu(unittest.TestCase):
13
14    def setUp(self):
15        try:
16            b = BPF(text=b'BPF_PERCPU_ARRAY(stub, u32, 1);')
17        except:
18            raise unittest.SkipTest("PerCpu unsupported on this kernel")
19
20    def test_helper(self):
21        test_prog1 = b"""
22        BPF_PERCPU_ARRAY(stub_default);
23        BPF_PERCPU_ARRAY(stub_type, u64);
24        BPF_PERCPU_ARRAY(stub_full, u64, 1024);
25        """
26        BPF(text=test_prog1)
27
28    def test_u64(self):
29        test_prog1 = b"""
30        BPF_PERCPU_HASH(stats, u32, u64, 1);
31        int hello_world(void *ctx) {
32            u32 key=0;
33            u64 value = 0, *val;
34            val = stats.lookup_or_try_init(&key, &value);
35            if (val) {
36                *val += 1;
37            }
38            return 0;
39        }
40        """
41        bpf_code = BPF(text=test_prog1)
42        stats_map = bpf_code.get_table(b"stats")
43        event_name = bpf_code.get_syscall_fnname(MONITORED_SYSCALL)
44        bpf_code.attach_kprobe(event=event_name, fn_name=b"hello_world")
45        ini = stats_map.Leaf()
46        for i in range(0, multiprocessing.cpu_count()):
47            ini[i] = 0
48        stats_map[ stats_map.Key(0) ] = ini
49        f = os.popen("hostname")
50        f.close()
51        self.assertEqual(len(stats_map),1)
52        val = stats_map[ stats_map.Key(0) ]
53        sum = stats_map.sum(stats_map.Key(0))
54        avg = stats_map.average(stats_map.Key(0))
55        max = stats_map.max(stats_map.Key(0))
56        self.assertGreater(sum.value, int(0))
57        self.assertGreater(max.value, int(0))
58        bpf_code.detach_kprobe(event_name)
59
60    def test_u32(self):
61        test_prog1 = b"""
62        BPF_PERCPU_ARRAY(stats, u32, 1);
63        int hello_world(void *ctx) {
64            u32 key=0;
65            u32 value = 0, *val;
66            val = stats.lookup_or_try_init(&key, &value);
67            if (val) {
68                *val += 1;
69            }
70            return 0;
71        }
72        """
73        bpf_code = BPF(text=test_prog1)
74        stats_map = bpf_code.get_table(b"stats")
75        event_name = bpf_code.get_syscall_fnname(MONITORED_SYSCALL)
76        bpf_code.attach_kprobe(event=event_name, fn_name=b"hello_world")
77        ini = stats_map.Leaf()
78        for i in range(0, multiprocessing.cpu_count()):
79            ini[i] = 0
80        stats_map[ stats_map.Key(0) ] = ini
81        f = os.popen("hostname")
82        f.close()
83        self.assertEqual(len(stats_map),1)
84        val = stats_map[ stats_map.Key(0) ]
85        sum = stats_map.sum(stats_map.Key(0))
86        avg = stats_map.average(stats_map.Key(0))
87        max = stats_map.max(stats_map.Key(0))
88        self.assertGreater(sum.value, int(0))
89        self.assertGreater(max.value, int(0))
90        bpf_code.detach_kprobe(event_name)
91
92    def test_struct_custom_func(self):
93        test_prog2 = b"""
94        typedef struct counter {
95        u32 c1;
96        u32 c2;
97        } counter;
98        BPF_PERCPU_HASH(stats, u32, counter, 1);
99        int hello_world(void *ctx) {
100            u32 key=0;
101            counter value = {0,0}, *val;
102            val = stats.lookup_or_try_init(&key, &value);
103            if (val) {
104                val->c1 += 1;
105                val->c2 += 1;
106            }
107            return 0;
108        }
109        """
110        bpf_code = BPF(text=test_prog2)
111        stats_map = bpf_code.get_table(b"stats",
112                reducer=lambda x,y: stats_map.sLeaf(x.c1+y.c1))
113        event_name = bpf_code.get_syscall_fnname(MONITORED_SYSCALL)
114        bpf_code.attach_kprobe(event=event_name, fn_name=b"hello_world")
115        ini = stats_map.Leaf()
116        for i in ini:
117            i = stats_map.sLeaf(0,0)
118        stats_map[ stats_map.Key(0) ] = ini
119        f = os.popen("hostname")
120        f.close()
121        self.assertEqual(len(stats_map),1)
122        k = stats_map[ stats_map.Key(0) ]
123        self.assertGreater(k.c1, int(0))
124        bpf_code.detach_kprobe(event_name)
125
126
127if __name__ == "__main__":
128    unittest.main()
129