xref: /aosp_15_r20/external/bcc/tests/python/test_map_batch_ops.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1#!/usr/bin/env python3
2#
3# USAGE: test_map_batch_ops.py
4#
5# Copyright (c) Emilien Gobillot
6# Licensed under the Apache License, Version 2.0 (the "License")
7
8from __future__ import print_function
9from unittest import main, skipUnless, TestCase
10from utils import kernel_version_ge
11from bcc import BPF
12
13import os
14import ctypes as ct
15
16
17@skipUnless(kernel_version_ge(5, 6), "requires kernel >= 5.6")
18class TestMapBatch(TestCase):
19    MAPSIZE = 1024
20    SUBSET_SIZE = 32
21
22    def fill_hashmap(self):
23        b = BPF(text=b"""BPF_HASH(map, int, int, %d);""" % self.MAPSIZE)
24        hmap = b[b"map"]
25        for i in range(0, self.MAPSIZE):
26            hmap[ct.c_int(i)] = ct.c_int(i)
27        return hmap
28
29    def prepare_keys_subset(self, hmap, count=None):
30        if not count:
31            count = self.SUBSET_SIZE
32        keys = (hmap.Key * count)()
33        i = 0
34        for k, _ in sorted(hmap.items_lookup_batch(), key=lambda k:k[0].value):
35            if i < count:
36                keys[i] = k.value
37                i += 1
38            else:
39                break
40
41        return keys
42
43    def prepare_values_subset(self, hmap, count=None):
44        if not count:
45            count = self.SUBSET_SIZE
46        values = (hmap.Leaf * count)()
47        i = 0
48        for _, v in sorted(hmap.items_lookup_batch(), key=lambda k:k[0].value):
49            if i < count:
50                values[i] = v.value * v.value
51                i += 1
52            else:
53                break
54        return values
55
56    def check_hashmap_values(self, it):
57        i = 0
58        for k, v in sorted(it, key=lambda kv:kv[0].value):
59            self.assertEqual(k.value, i)
60            self.assertEqual(v.value, i)
61            i += 1
62        return i
63
64    def test_lookup_and_delete_batch_all_keys(self):
65        # fill the hashmap
66        hmap = self.fill_hashmap()
67
68        # check values and count them
69        count = self.check_hashmap_values(hmap.items_lookup_and_delete_batch())
70        self.assertEqual(count, self.MAPSIZE)
71
72        # and check the delete has worked, i.e map is now empty
73        count = sum(1 for _ in hmap.items())
74        self.assertEqual(count, 0)
75
76    def test_lookup_batch_all_keys(self):
77        # fill the hashmap
78        hmap = self.fill_hashmap()
79
80        # check values and count them
81        count = self.check_hashmap_values(hmap.items_lookup_batch())
82        self.assertEqual(count, self.MAPSIZE)
83
84    def test_delete_batch_all_keys(self):
85        # Delete all key/value in the map
86        # fill the hashmap
87        hmap = self.fill_hashmap()
88        hmap.items_delete_batch()
89
90        # check the delete has worked, i.e map is now empty
91        count = sum(1 for _ in hmap.items())
92        self.assertEqual(count, 0)
93
94    def test_delete_batch_subset(self):
95        # Delete only a subset of key/value in the map
96        # fill the hashmap
97        hmap = self.fill_hashmap()
98        keys = self.prepare_keys_subset(hmap)
99
100        hmap.items_delete_batch(keys)
101        # check the delete has worked, i.e map is now empty
102        count = sum(1 for _ in hmap.items())
103        self.assertEqual(count, self.MAPSIZE - self.SUBSET_SIZE)
104
105    def test_update_batch_all_keys(self):
106        hmap = self.fill_hashmap()
107
108        # preparing keys and new values arrays
109        keys = (hmap.Key * self.MAPSIZE)()
110        new_values = (hmap.Leaf * self.MAPSIZE)()
111        for i in range(self.MAPSIZE):
112            keys[i] = ct.c_int(i)
113            new_values[i] = ct.c_int(-1)
114        hmap.items_update_batch(keys, new_values)
115
116        # check the update has worked, i.e sum of values is -NUM_KEYS
117        count = sum(v.value for v in hmap.values())
118        self.assertEqual(count, -1*self.MAPSIZE)
119
120    def test_update_batch_subset(self):
121        # fill the hashmap
122        hmap = self.fill_hashmap()
123        keys = self.prepare_keys_subset(hmap, count=self.SUBSET_SIZE)
124        new_values = self.prepare_values_subset(hmap, count=self.SUBSET_SIZE)
125
126        hmap.items_update_batch(keys, new_values)
127
128        # check all the values in the map
129        # the first self.SUBSET_SIZE keys follow this rule value = keys * keys
130        # the remaning keys follow this rule : value = keys
131        i = 0
132        for k, v in sorted(hmap.items_lookup_batch(),
133                           key=lambda kv:kv[0].value):
134            if i < self.SUBSET_SIZE:
135                # values are the square of the keys
136                self.assertEqual(v.value, k.value * k.value)
137                i += 1
138            else:
139                # values = keys
140                self.assertEqual(v.value, k.value)
141
142        self.assertEqual(i, self.SUBSET_SIZE)
143
144
145if __name__ == "__main__":
146    main()
147