xref: /aosp_15_r20/external/bcc/tests/python/test_array.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1#!/usr/bin/env python3
2# Copyright (c) PLUMgrid, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5from bcc import BPF
6import ctypes as ct
7import random
8import time
9import subprocess
10from bcc.utils import get_online_cpus
11from unittest import main, TestCase
12
13class TestArray(TestCase):
14    def test_simple(self):
15        b = BPF(text=b"""BPF_ARRAY(table1, u64, 128);""")
16        t1 = b[b"table1"]
17        t1[ct.c_int(0)] = ct.c_ulonglong(100)
18        t1[ct.c_int(127)] = ct.c_ulonglong(1000)
19        for i, v in t1.items():
20            if i.value == 0:
21                self.assertEqual(v.value, 100)
22            if i.value == 127:
23                self.assertEqual(v.value, 1000)
24        self.assertEqual(len(t1), 128)
25
26    def test_native_type(self):
27        b = BPF(text=b"""BPF_ARRAY(table1, u64, 128);""")
28        t1 = b[b"table1"]
29        t1[0] = ct.c_ulonglong(100)
30        t1[-2] = ct.c_ulonglong(37)
31        t1[127] = ct.c_ulonglong(1000)
32        for i, v in t1.items():
33            if i.value == 0:
34                self.assertEqual(v.value, 100)
35            if i.value == 127:
36                self.assertEqual(v.value, 1000)
37        self.assertEqual(len(t1), 128)
38        self.assertEqual(t1[-2].value, 37)
39        self.assertEqual(t1[-1].value, t1[127].value)
40
41    def test_perf_buffer(self):
42        self.counter = 0
43
44        class Data(ct.Structure):
45            _fields_ = [("ts", ct.c_ulonglong)]
46
47        def cb(cpu, data, size):
48            self.assertGreater(size, ct.sizeof(Data))
49            event = ct.cast(data, ct.POINTER(Data)).contents
50            self.counter += 1
51
52        def lost_cb(lost):
53            self.assertGreater(lost, 0)
54
55        text = b"""
56BPF_PERF_OUTPUT(events);
57int do_sys_nanosleep(void *ctx) {
58    struct {
59        u64 ts;
60    } data = {bpf_ktime_get_ns()};
61    events.perf_submit(ctx, &data, sizeof(data));
62    return 0;
63}
64"""
65        b = BPF(text=text)
66        b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"),
67                        fn_name=b"do_sys_nanosleep")
68        b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"),
69                        fn_name=b"do_sys_nanosleep")
70        b[b"events"].open_perf_buffer(cb, lost_cb=lost_cb)
71        subprocess.call(['sleep', '0.1'])
72        b.perf_buffer_poll()
73        self.assertGreater(self.counter, 0)
74        b.cleanup()
75
76    def test_perf_buffer_for_each_cpu(self):
77        self.events = []
78
79        class Data(ct.Structure):
80            _fields_ = [("cpu", ct.c_ulonglong)]
81
82        def cb(cpu, data, size):
83            self.assertGreater(size, ct.sizeof(Data))
84            event = ct.cast(data, ct.POINTER(Data)).contents
85            self.events.append(event)
86
87        def lost_cb(lost):
88            self.assertGreater(lost, 0)
89
90        text = b"""
91BPF_PERF_OUTPUT(events);
92int do_sys_nanosleep(void *ctx) {
93    struct {
94        u64 cpu;
95    } data = {bpf_get_smp_processor_id()};
96    events.perf_submit(ctx, &data, sizeof(data));
97    return 0;
98}
99"""
100        b = BPF(text=text)
101        b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"),
102                        fn_name=b"do_sys_nanosleep")
103        b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"),
104                        fn_name=b"do_sys_nanosleep")
105        b[b"events"].open_perf_buffer(cb, lost_cb=lost_cb)
106        online_cpus = get_online_cpus()
107        for cpu in online_cpus:
108            subprocess.call(['taskset', '-c', str(cpu), 'sleep', '0.1'])
109        b.perf_buffer_poll()
110        b.cleanup()
111        self.assertGreaterEqual(len(self.events), len(online_cpus), 'Received only {}/{} events'.format(len(self.events), len(online_cpus)))
112
113if __name__ == "__main__":
114    main()
115