1#!/usr/bin/env python3 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5from bcc import BPF 6import ctypes as ct 7import random 8import time 9import subprocess 10from bcc.utils import get_online_cpus 11from unittest import main, TestCase 12 13class TestArray(TestCase): 14 def test_simple(self): 15 b = BPF(text=b"""BPF_ARRAY(table1, u64, 128);""") 16 t1 = b[b"table1"] 17 t1[ct.c_int(0)] = ct.c_ulonglong(100) 18 t1[ct.c_int(127)] = ct.c_ulonglong(1000) 19 for i, v in t1.items(): 20 if i.value == 0: 21 self.assertEqual(v.value, 100) 22 if i.value == 127: 23 self.assertEqual(v.value, 1000) 24 self.assertEqual(len(t1), 128) 25 26 def test_native_type(self): 27 b = BPF(text=b"""BPF_ARRAY(table1, u64, 128);""") 28 t1 = b[b"table1"] 29 t1[0] = ct.c_ulonglong(100) 30 t1[-2] = ct.c_ulonglong(37) 31 t1[127] = ct.c_ulonglong(1000) 32 for i, v in t1.items(): 33 if i.value == 0: 34 self.assertEqual(v.value, 100) 35 if i.value == 127: 36 self.assertEqual(v.value, 1000) 37 self.assertEqual(len(t1), 128) 38 self.assertEqual(t1[-2].value, 37) 39 self.assertEqual(t1[-1].value, t1[127].value) 40 41 def test_perf_buffer(self): 42 self.counter = 0 43 44 class Data(ct.Structure): 45 _fields_ = [("ts", ct.c_ulonglong)] 46 47 def cb(cpu, data, size): 48 self.assertGreater(size, ct.sizeof(Data)) 49 event = ct.cast(data, ct.POINTER(Data)).contents 50 self.counter += 1 51 52 def lost_cb(lost): 53 self.assertGreater(lost, 0) 54 55 text = b""" 56BPF_PERF_OUTPUT(events); 57int do_sys_nanosleep(void *ctx) { 58 struct { 59 u64 ts; 60 } data = {bpf_ktime_get_ns()}; 61 events.perf_submit(ctx, &data, sizeof(data)); 62 return 0; 63} 64""" 65 b = BPF(text=text) 66 b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"), 67 fn_name=b"do_sys_nanosleep") 68 b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"), 69 fn_name=b"do_sys_nanosleep") 70 b[b"events"].open_perf_buffer(cb, lost_cb=lost_cb) 71 subprocess.call(['sleep', '0.1']) 72 b.perf_buffer_poll() 73 self.assertGreater(self.counter, 0) 74 b.cleanup() 75 76 def test_perf_buffer_for_each_cpu(self): 77 self.events = [] 78 79 class Data(ct.Structure): 80 _fields_ = [("cpu", ct.c_ulonglong)] 81 82 def cb(cpu, data, size): 83 self.assertGreater(size, ct.sizeof(Data)) 84 event = ct.cast(data, ct.POINTER(Data)).contents 85 self.events.append(event) 86 87 def lost_cb(lost): 88 self.assertGreater(lost, 0) 89 90 text = b""" 91BPF_PERF_OUTPUT(events); 92int do_sys_nanosleep(void *ctx) { 93 struct { 94 u64 cpu; 95 } data = {bpf_get_smp_processor_id()}; 96 events.perf_submit(ctx, &data, sizeof(data)); 97 return 0; 98} 99""" 100 b = BPF(text=text) 101 b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"), 102 fn_name=b"do_sys_nanosleep") 103 b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"), 104 fn_name=b"do_sys_nanosleep") 105 b[b"events"].open_perf_buffer(cb, lost_cb=lost_cb) 106 online_cpus = get_online_cpus() 107 for cpu in online_cpus: 108 subprocess.call(['taskset', '-c', str(cpu), 'sleep', '0.1']) 109 b.perf_buffer_poll() 110 b.cleanup() 111 self.assertGreaterEqual(len(self.events), len(online_cpus), 'Received only {}/{} events'.format(len(self.events), len(online_cpus))) 112 113if __name__ == "__main__": 114 main() 115