xref: /aosp_15_r20/external/bcc/tests/python/test_ringbuf.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1#!/usr/bin/env python3
2# Copyright (c) PLUMgrid, Inc.
3# Licensed under the Apache License, Version 2.0 (the "License")
4
5from bcc import BPF
6import os
7import ctypes as ct
8import random
9import time
10import subprocess
11from unittest import main, TestCase, skipUnless
12from utils import kernel_version_ge
13
14class TestRingbuf(TestCase):
15    @skipUnless(kernel_version_ge(5,8), "requires kernel >= 5.8")
16    def test_ringbuf_output(self):
17        self.counter = 0
18
19        class Data(ct.Structure):
20            _fields_ = [("ts", ct.c_ulonglong)]
21
22        def cb(ctx, data, size):
23            self.assertEqual(size, ct.sizeof(Data))
24            event = ct.cast(data, ct.POINTER(Data)).contents
25            self.counter += 1
26
27        text = b"""
28BPF_RINGBUF_OUTPUT(events, 8);
29struct data_t {
30    u64 ts;
31};
32int do_sys_nanosleep(void *ctx) {
33    struct data_t data = {bpf_ktime_get_ns()};
34    events.ringbuf_output(&data, sizeof(data), 0);
35    return 0;
36}
37"""
38        b = BPF(text=text)
39        b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"),
40                        fn_name=b"do_sys_nanosleep")
41        b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"),
42                        fn_name=b"do_sys_nanosleep")
43        b[b"events"].open_ring_buffer(cb)
44        subprocess.call(['sleep', '0.1'])
45        b.ring_buffer_poll()
46        self.assertGreater(self.counter, 0)
47        b.cleanup()
48
49    @skipUnless(kernel_version_ge(5,8), "requires kernel >= 5.8")
50    def test_ringbuf_consume(self):
51        self.counter = 0
52
53        class Data(ct.Structure):
54            _fields_ = [("ts", ct.c_ulonglong)]
55
56        def cb(ctx, data, size):
57            self.assertEqual(size, ct.sizeof(Data))
58            event = ct.cast(data, ct.POINTER(Data)).contents
59            self.counter += 1
60
61        text = b"""
62BPF_RINGBUF_OUTPUT(events, 8);
63struct data_t {
64    u64 ts;
65};
66int do_sys_nanosleep(void *ctx) {
67    struct data_t data = {bpf_ktime_get_ns()};
68    events.ringbuf_output(&data, sizeof(data), 0);
69    return 0;
70}
71"""
72        b = BPF(text=text)
73        b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"),
74                        fn_name=b"do_sys_nanosleep")
75        b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"),
76                        fn_name=b"do_sys_nanosleep")
77        b[b"events"].open_ring_buffer(cb)
78        subprocess.call(['sleep', '0.1'])
79        b.ring_buffer_consume()
80        self.assertGreater(self.counter, 0)
81        b.cleanup()
82
83    @skipUnless(kernel_version_ge(5,8), "requires kernel >= 5.8")
84    def test_ringbuf_submit(self):
85        self.counter = 0
86
87        class Data(ct.Structure):
88            _fields_ = [("ts", ct.c_ulonglong)]
89
90        def cb(ctx, data, size):
91            self.assertEqual(size, ct.sizeof(Data))
92            event = ct.cast(data, ct.POINTER(Data)).contents
93            self.counter += 1
94
95        text = b"""
96BPF_RINGBUF_OUTPUT(events, 8);
97struct data_t {
98    u64 ts;
99};
100int do_sys_nanosleep(void *ctx) {
101    struct data_t *data = events.ringbuf_reserve(sizeof(struct data_t));
102    if (!data)
103        return 1;
104    data->ts = bpf_ktime_get_ns();
105    events.ringbuf_submit(data, 0);
106    return 0;
107}
108"""
109        b = BPF(text=text)
110        b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"),
111                        fn_name=b"do_sys_nanosleep")
112        b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"),
113                        fn_name=b"do_sys_nanosleep")
114        b[b"events"].open_ring_buffer(cb)
115        subprocess.call(['sleep', '0.1'])
116        b.ring_buffer_poll()
117        self.assertGreater(self.counter, 0)
118        b.cleanup()
119
120    @skipUnless(kernel_version_ge(5,8), "requires kernel >= 5.8")
121    def test_ringbuf_discard(self):
122        self.counter = 0
123
124        class Data(ct.Structure):
125            _fields_ = [("ts", ct.c_ulonglong)]
126
127        def cb(ctx, data, size):
128            self.assertEqual(size, ct.sizeof(Data))
129            event = ct.cast(data, ct.POINTER(Data)).contents
130            self.counter += 1
131
132        text = b"""
133BPF_RINGBUF_OUTPUT(events, 8);
134struct data_t {
135    u64 ts;
136};
137int do_sys_nanosleep(void *ctx) {
138    struct data_t *data = events.ringbuf_reserve(sizeof(struct data_t));
139    if (!data)
140        return 1;
141    data->ts = bpf_ktime_get_ns();
142    events.ringbuf_discard(data, 0);
143    return 0;
144}
145"""
146        b = BPF(text=text)
147        b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"),
148                        fn_name=b"do_sys_nanosleep")
149        b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"),
150                        fn_name=b"do_sys_nanosleep")
151        b[b"events"].open_ring_buffer(cb)
152        subprocess.call(['sleep', '0.1'])
153        b.ring_buffer_poll()
154        self.assertEqual(self.counter, 0)
155        b.cleanup()
156
157    @skipUnless(kernel_version_ge(5,8), "requires kernel >= 5.8")
158    def test_ringbuf_query(self):
159        PAGE_SIZE = 8
160        self.counter = 0
161        self.page_counts = 0
162
163        class Data(ct.Structure):
164            _fields_ = [("page_cnt", ct.c_ulonglong)]
165
166        def cb(ctx, data, size):
167            self.assertEqual(size, ct.sizeof(Data))
168            event = ct.cast(data, ct.POINTER(Data)).contents
169            self.page_counts += event.page_cnt
170            self.counter += 1
171
172        text = b"""
173BPF_RINGBUF_OUTPUT(events, %i);
174struct data_t {
175    u64 page_cnt;
176};
177int do_sys_nanosleep(void *ctx) {
178    u64 res = 0;
179    res = events.ringbuf_query(BPF_RB_RING_SIZE);
180    if(res == 0) {
181        return 1;
182    }
183    struct data_t data = {res / PAGE_SIZE};
184    events.ringbuf_output(&data, sizeof(data), 0);
185    return 0;
186}
187"""
188        text = text % PAGE_SIZE
189        b = BPF(text=text)
190        b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"),
191                        fn_name=b"do_sys_nanosleep")
192        b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"),
193                        fn_name=b"do_sys_nanosleep")
194        b[b"events"].open_ring_buffer(cb)
195        subprocess.call(['sleep', '0.1'])
196        b.ring_buffer_poll()
197        self.assertEqual(self.page_counts / self.counter, PAGE_SIZE)
198        b.cleanup()
199
200if __name__ == "__main__":
201    main()
202