xref: /aosp_15_r20/external/minijail/tools/parser_unittest.py (revision 4b9c6d91573e8b3a96609339b46361b5476dd0f9)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2018 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Unittests for the parser module."""
18
19from __future__ import absolute_import
20from __future__ import division
21from __future__ import print_function
22
23import os
24import shutil
25import tempfile
26import unittest
27from importlib import resources
28
29import arch
30import bpf
31import parser  # pylint: disable=wrong-import-order
32
33ARCH_64 = arch.Arch.load_from_json_bytes(
34    resources.files("testdata").joinpath("arch_64.json").read_bytes()
35)
36
37
38class TokenizerTests(unittest.TestCase):
39    """Tests for ParserState.tokenize."""
40
41    @staticmethod
42    def _tokenize(line):
43        parser_state = parser.ParserState('<memory>')
44        return list(parser_state.tokenize([line]))[0]
45
46    def test_tokenize(self):
47        """Accept valid tokens."""
48        self.assertEqual([
49            (token.type, token.value)
50            for token in TokenizerTests._tokenize('@include /minijail.policy')
51        ], [
52            ('INCLUDE', '@include'),
53            ('PATH', '/minijail.policy'),
54        ])
55        self.assertEqual([
56            (token.type, token.value)
57            for token in TokenizerTests._tokenize('@include ./minijail.policy')
58        ], [
59            ('INCLUDE', '@include'),
60            ('PATH', './minijail.policy'),
61        ])
62        self.assertEqual(
63            [(token.type, token.value) for token in TokenizerTests._tokenize(
64                'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0755; '
65                'return ENOSYS # ignored')], [
66                    ('IDENTIFIER', 'read'),
67                    ('COLON', ':'),
68                    ('ARGUMENT', 'arg0'),
69                    ('OP', 'in'),
70                    ('BITWISE_COMPLEMENT', '~'),
71                    ('NUMERIC_CONSTANT', '0xffff'),
72                    ('OR', '||'),
73                    ('ARGUMENT', 'arg0'),
74                    ('OP', '&'),
75                    ('LPAREN', '('),
76                    ('NUMERIC_CONSTANT', '1'),
77                    ('BITWISE_OR', '|'),
78                    ('NUMERIC_CONSTANT', '2'),
79                    ('RPAREN', ')'),
80                    ('AND', '&&'),
81                    ('ARGUMENT', 'arg0'),
82                    ('OP', '=='),
83                    ('NUMERIC_CONSTANT', '0755'),
84                    ('SEMICOLON', ';'),
85                    ('RETURN', 'return'),
86                    ('IDENTIFIER', 'ENOSYS'),
87                ])
88        # Ensure that tokens that have an otherwise valid token as prefix are
89        # still matched correctly.
90        self.assertEqual([
91            (token.type, token.value)
92            for token in TokenizerTests._tokenize(
93                'inotify_wait return_sys killall trace_sys')
94        ], [
95            ('IDENTIFIER', 'inotify_wait'),
96            ('IDENTIFIER', 'return_sys'),
97            ('IDENTIFIER', 'killall'),
98            ('IDENTIFIER', 'trace_sys'),
99        ])
100
101    def test_tokenize_invalid_token(self):
102        """Reject tokenizer errors."""
103        with self.assertRaisesRegex(parser.ParseException,
104                                    (r'<memory>\(1:1\): invalid token\n'
105                                     r'    %invalid-token%\n'
106                                     r'    \^')):
107            TokenizerTests._tokenize('%invalid-token%')
108
109
110class ParseConstantTests(unittest.TestCase):
111    """Tests for PolicyParser.parse_value."""
112
113    def setUp(self):
114        self.arch = ARCH_64
115        self.parser = parser.PolicyParser(
116            self.arch, kill_action=bpf.KillProcess())
117
118    def _tokenize(self, line):
119        # pylint: disable=protected-access
120        return list(self.parser._parser_state.tokenize([line]))[0]
121
122    def test_parse_constant_unsigned(self):
123        """Accept reasonably-sized unsigned constants."""
124        self.assertEqual(
125            self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000)
126        if self.arch.bits == 64:
127            self.assertEqual(
128                self.parser.parse_value(self._tokenize('0x8000000000000000')),
129                0x8000000000000000)
130
131    def test_parse_constant_unsigned_too_big(self):
132        """Reject unreasonably-sized unsigned constants."""
133        if self.arch.bits == 32:
134            with self.assertRaisesRegex(parser.ParseException,
135                                        'unsigned overflow'):
136                self.parser.parse_value(self._tokenize('0x100000000'))
137        with self.assertRaisesRegex(parser.ParseException,
138                                    'unsigned overflow'):
139            self.parser.parse_value(self._tokenize('0x10000000000000000'))
140
141    def test_parse_constant_signed(self):
142        """Accept reasonably-sized signed constants."""
143        self.assertEqual(
144            self.parser.parse_value(self._tokenize('-1')),
145            self.arch.max_unsigned)
146
147    def test_parse_constant_signed_too_negative(self):
148        """Reject unreasonably-sized signed constants."""
149        if self.arch.bits == 32:
150            with self.assertRaisesRegex(parser.ParseException,
151                                        'signed underflow'):
152                self.parser.parse_value(self._tokenize('-0x800000001'))
153        with self.assertRaisesRegex(parser.ParseException, 'signed underflow'):
154            self.parser.parse_value(self._tokenize('-0x8000000000000001'))
155
156    def test_parse_mask(self):
157        """Accept parsing a mask value."""
158        self.assertEqual(
159            self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf)
160
161    def test_parse_parenthesized_expressions(self):
162        """Accept parsing parenthesized expressions."""
163        bad_expressions = [
164            '(1',
165            '|(1)',
166            '(1)|',
167            '()',
168            '(',
169            '((',
170            '(()',
171            '(()1',
172        ]
173        for expression in bad_expressions:
174            with self.assertRaises(parser.ParseException, msg=expression):
175                self.parser.parse_value(self._tokenize(expression))
176
177        bad_partial_expressions = [
178            '1)',
179            '(1)1',
180            '1(0)',
181        ]
182        for expression in bad_partial_expressions:
183            tokens = self._tokenize(expression)
184            self.parser.parse_value(tokens)
185            self.assertNotEqual(tokens, [])
186
187        good_expressions = [
188            '(3)',
189            '(1)|2',
190            '1|(2)',
191            '(1)|(2)',
192            '((3))',
193            '0|(1|2)',
194            '(0|1|2)',
195        ]
196        for expression in good_expressions:
197            self.assertEqual(
198                self.parser.parse_value(self._tokenize(expression)), 3)
199
200    def test_parse_constant_complements(self):
201        """Accept complementing constants."""
202        self.assertEqual(
203            self.parser.parse_value(self._tokenize('~0')),
204            self.arch.max_unsigned)
205        self.assertEqual(
206            self.parser.parse_value(self._tokenize('~0|~0')),
207            self.arch.max_unsigned)
208        if self.arch.bits == 32:
209            self.assertEqual(
210                self.parser.parse_value(
211                    self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00)
212            self.assertEqual(
213                self.parser.parse_value(
214                    self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')),
215                0xFF0000FF)
216        else:
217            self.assertEqual(
218                self.parser.parse_value(
219                    self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')),
220                0xFFFFFFFFFFFF0000)
221            self.assertEqual(
222                self.parser.parse_value(
223                    self._tokenize(
224                        '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00'
225                    )), 0xFFFF00000000FFFF)
226
227    def test_parse_double_complement(self):
228        """Reject double-complementing constants."""
229        with self.assertRaisesRegex(parser.ParseException,
230                                    'double complement'):
231            self.parser.parse_value(self._tokenize('~~0'))
232
233    def test_parse_empty_complement(self):
234        """Reject complementing nothing."""
235        with self.assertRaisesRegex(parser.ParseException, 'empty complement'):
236            self.parser.parse_value(self._tokenize('0|~'))
237
238    def test_parse_named_constant(self):
239        """Accept parsing a named constant."""
240        self.assertEqual(
241            self.parser.parse_value(self._tokenize('O_RDONLY')), 0)
242
243    def test_parse_empty_constant(self):
244        """Reject parsing nothing."""
245        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
246            self.parser.parse_value([])
247        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
248            self.parser.parse_value(self._tokenize('0|'))
249
250    def test_parse_invalid_constant(self):
251        """Reject parsing invalid constants."""
252        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
253            self.parser.parse_value(self._tokenize('foo'))
254
255
256class ParseFilterExpressionTests(unittest.TestCase):
257    """Tests for PolicyParser.parse_argument_expression."""
258
259    def setUp(self):
260        self.arch = ARCH_64
261        self.parser = parser.PolicyParser(
262            self.arch, kill_action=bpf.KillProcess())
263
264    def _tokenize(self, line):
265        # pylint: disable=protected-access
266        return list(self.parser._parser_state.tokenize([line]))[0]
267
268    def test_parse_argument_expression(self):
269        """Accept valid argument expressions."""
270        self.assertEqual(
271            self.parser.parse_argument_expression(
272                self._tokenize(
273                    'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE'
274                )), [
275                    [parser.Atom(0, 'in', 0xffff)],
276                    [parser.Atom(0, '==', 4),
277                     parser.Atom(1, '==', 2)],
278                ])
279
280    def test_parse_number_argument_expression(self):
281        """Accept valid argument expressions with any octal/decimal/hex number."""
282        # 4607 == 010777 == 0x11ff
283        self.assertEqual(
284            self.parser.parse_argument_expression(
285                self._tokenize('arg0 in 4607')), [
286                    [parser.Atom(0, 'in', 4607)],
287            ])
288
289        self.assertEqual(
290            self.parser.parse_argument_expression(
291                self._tokenize('arg0 in 010777')), [
292                    [parser.Atom(0, 'in', 4607)],
293            ])
294
295        self.assertEqual(
296            self.parser.parse_argument_expression(
297                self._tokenize('arg0 in 0x11ff')), [
298                    [parser.Atom(0, 'in', 4607)],
299            ])
300
301    def test_parse_empty_argument_expression(self):
302        """Reject empty argument expressions."""
303        with self.assertRaisesRegex(parser.ParseException,
304                                    'empty argument expression'):
305            self.parser.parse_argument_expression(
306                self._tokenize('arg0 in 0xffff ||'))
307
308    def test_parse_empty_clause(self):
309        """Reject empty clause."""
310        with self.assertRaisesRegex(parser.ParseException, 'empty clause'):
311            self.parser.parse_argument_expression(
312                self._tokenize('arg0 in 0xffff &&'))
313
314    def test_parse_invalid_argument(self):
315        """Reject invalid argument."""
316        with self.assertRaisesRegex(parser.ParseException, 'invalid argument'):
317            self.parser.parse_argument_expression(
318                self._tokenize('argX in 0xffff'))
319
320    def test_parse_invalid_operator(self):
321        """Reject invalid operator."""
322        with self.assertRaisesRegex(parser.ParseException, 'invalid operator'):
323            self.parser.parse_argument_expression(
324                self._tokenize('arg0 = 0xffff'))
325
326    def test_parse_missing_operator(self):
327        """Reject missing operator."""
328        with self.assertRaisesRegex(parser.ParseException, 'missing operator'):
329            self.parser.parse_argument_expression(self._tokenize('arg0'))
330
331    def test_parse_missing_operand(self):
332        """Reject missing operand."""
333        with self.assertRaisesRegex(parser.ParseException, 'empty constant'):
334            self.parser.parse_argument_expression(self._tokenize('arg0 =='))
335
336
337class ParseFilterTests(unittest.TestCase):
338    """Tests for PolicyParser.parse_filter."""
339
340    def setUp(self):
341        self.arch = ARCH_64
342        self.parser = parser.PolicyParser(
343            self.arch, kill_action=bpf.KillProcess())
344
345    def _tokenize(self, line):
346        # pylint: disable=protected-access
347        return list(self.parser._parser_state.tokenize([line]))[0]
348
349    def test_parse_filter(self):
350        """Accept valid filters."""
351        self.assertEqual(
352            self.parser.parse_filter(self._tokenize('arg0 == 0')), [
353                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
354            ])
355        self.assertEqual(
356            self.parser.parse_filter(self._tokenize('kill-process')), [
357                parser.Filter(None, bpf.KillProcess()),
358            ])
359        self.assertEqual(
360            self.parser.parse_filter(self._tokenize('kill-thread')), [
361                parser.Filter(None, bpf.KillThread()),
362            ])
363        self.assertEqual(
364            self.parser.parse_filter(self._tokenize('trap')), [
365                parser.Filter(None, bpf.Trap()),
366            ])
367        self.assertEqual(
368            self.parser.parse_filter(self._tokenize('return ENOSYS')), [
369                parser.Filter(None,
370                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
371            ])
372        self.assertEqual(
373            self.parser.parse_filter(self._tokenize('trace')), [
374                parser.Filter(None, bpf.Trace()),
375            ])
376        self.assertEqual(
377            self.parser.parse_filter(self._tokenize('user-notify')), [
378                parser.Filter(None, bpf.UserNotify()),
379            ])
380        self.assertEqual(
381            self.parser.parse_filter(self._tokenize('log')), [
382                parser.Filter(None, bpf.Log()),
383            ])
384        self.assertEqual(
385            self.parser.parse_filter(self._tokenize('allow')), [
386                parser.Filter(None, bpf.Allow()),
387            ])
388        self.assertEqual(
389            self.parser.parse_filter(self._tokenize('1')), [
390                parser.Filter(None, bpf.Allow()),
391            ])
392        self.assertEqual(
393            self.parser.parse_filter(
394                self._tokenize(
395                    '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')),
396            [
397                parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
398                parser.Filter([[parser.Atom(0, '==', 1)]],
399                              bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
400                parser.Filter(None, bpf.Trap()),
401            ])
402
403    def test_parse_missing_return_value(self):
404        """Reject missing return value."""
405        with self.assertRaisesRegex(parser.ParseException,
406                                    'missing return value'):
407            self.parser.parse_filter(self._tokenize('return'))
408
409    def test_parse_invalid_return_value(self):
410        """Reject invalid return value."""
411        with self.assertRaisesRegex(parser.ParseException, 'invalid constant'):
412            self.parser.parse_filter(self._tokenize('return arg0'))
413
414    def test_parse_unclosed_brace(self):
415        """Reject unclosed brace."""
416        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
417            self.parser.parse_filter(self._tokenize('{ allow'))
418
419
420class ParseFilterDenylistTests(unittest.TestCase):
421    """Tests for PolicyParser.parse_filter with a denylist policy."""
422
423    def setUp(self):
424        self.arch = ARCH_64
425        self.kill_action = bpf.KillProcess()
426        self.parser = parser.PolicyParser(
427            self.arch, kill_action=self.kill_action, denylist=True)
428
429    def _tokenize(self, line):
430        # pylint: disable=protected-access
431        return list(self.parser._parser_state.tokenize([line]))[0]
432
433    def test_parse_filter(self):
434        """Accept only filters that return an errno."""
435        self.assertEqual(
436            self.parser.parse_filter(self._tokenize('arg0 == 0; return ENOSYS')),
437            [
438                parser.Filter([[parser.Atom(0, '==', 0)]],
439                bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
440            ])
441
442
443class ParseFilterStatementTests(unittest.TestCase):
444    """Tests for PolicyParser.parse_filter_statement."""
445
446    def setUp(self):
447        self.arch = ARCH_64
448        self.parser = parser.PolicyParser(
449            self.arch, kill_action=bpf.KillProcess())
450
451    def _tokenize(self, line):
452        # pylint: disable=protected-access
453        return list(self.parser._parser_state.tokenize([line]))[0]
454
455    def assertEqualIgnoringToken(self, actual, expected, msg=None):
456        """Similar to assertEqual, but ignores the token field."""
457        if (actual.syscalls != expected.syscalls or
458            actual.filters != expected.filters):
459            self.fail('%r != %r' % (actual, expected), msg)
460
461    def test_parse_filter_statement(self):
462        """Accept valid filter statements."""
463        self.assertEqualIgnoringToken(
464            self.parser.parse_filter_statement(
465                self._tokenize('read: arg0 == 0')),
466            parser.ParsedFilterStatement(
467                syscalls=(parser.Syscall('read', 0), ),
468                filters=[
469                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
470                ],
471                token=None))
472        self.assertEqualIgnoringToken(
473            self.parser.parse_filter_statement(
474                self._tokenize('{read, write}: arg0 == 0')),
475            parser.ParsedFilterStatement(
476                syscalls=(
477                    parser.Syscall('read', 0),
478                    parser.Syscall('write', 1),
479                ),
480                filters=[
481                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
482                ],
483                token=None))
484        self.assertEqualIgnoringToken(
485            self.parser.parse_filter_statement(
486                self._tokenize('io@libc: arg0 == 0')),
487            parser.ParsedFilterStatement(
488                syscalls=(
489                    parser.Syscall('read', 0),
490                    parser.Syscall('write', 1),
491                ),
492                filters=[
493                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
494                ],
495                token=None))
496        self.assertEqualIgnoringToken(
497            self.parser.parse_filter_statement(
498                self._tokenize('file-io@systemd: arg0 == 0')),
499            parser.ParsedFilterStatement(
500                syscalls=(
501                    parser.Syscall('read', 0),
502                    parser.Syscall('write', 1),
503                ),
504                filters=[
505                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
506                ],
507                token=None))
508        self.assertEqualIgnoringToken(
509            self.parser.parse_filter_statement(
510                self._tokenize('kill: arg0 == 0')),
511            parser.ParsedFilterStatement(
512                syscalls=(
513                    parser.Syscall('kill', 62),
514                ),
515                filters=[
516                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
517                ],
518                token=None))
519
520    def test_parse_metadata(self):
521        """Accept valid filter statements with metadata."""
522        self.assertEqualIgnoringToken(
523            self.parser.parse_filter_statement(
524                self._tokenize('read[arch=test]: arg0 == 0')),
525            parser.ParsedFilterStatement(
526                syscalls=(
527                    parser.Syscall('read', 0),
528                ),
529                filters=[
530                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
531                ],
532                token=None))
533        self.assertEqualIgnoringToken(
534            self.parser.parse_filter_statement(
535                self._tokenize(
536                    '{read, nonexistent[arch=nonexistent]}: arg0 == 0')),
537            parser.ParsedFilterStatement(
538                syscalls=(
539                    parser.Syscall('read', 0),
540                ),
541                filters=[
542                    parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()),
543                ],
544                token=None))
545
546    def test_parse_unclosed_brace(self):
547        """Reject unclosed brace."""
548        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
549            self.parser.parse_filter(self._tokenize('{ allow'))
550
551    def test_parse_invalid_syscall_group(self):
552        """Reject invalid syscall groups."""
553        with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'):
554            self.parser.parse_filter_statement(
555                self._tokenize('{ read, write: arg0 == 0'))
556
557    def test_parse_missing_colon(self):
558        """Reject missing colon."""
559        with self.assertRaisesRegex(parser.ParseException, 'missing colon'):
560            self.parser.parse_filter_statement(self._tokenize('read'))
561
562    def test_parse_invalid_colon(self):
563        """Reject invalid colon."""
564        with self.assertRaisesRegex(parser.ParseException, 'invalid colon'):
565            self.parser.parse_filter_statement(self._tokenize('read arg0'))
566
567    def test_parse_missing_filter(self):
568        """Reject missing filter."""
569        with self.assertRaisesRegex(parser.ParseException, 'missing filter'):
570            self.parser.parse_filter_statement(self._tokenize('read:'))
571
572
573class ParseFileTests(unittest.TestCase):
574    """Tests for PolicyParser.parse_file."""
575
576    def setUp(self):
577        self.arch = ARCH_64
578        self.parser = parser.PolicyParser(
579            self.arch, kill_action=bpf.KillProcess())
580        self.tempdir = tempfile.mkdtemp()
581
582    def tearDown(self):
583        shutil.rmtree(self.tempdir)
584
585    def _write_file(self, filename, contents):
586        """Helper to write out a file for testing."""
587        path = os.path.join(self.tempdir, filename)
588        with open(path, 'w') as outf:
589            outf.write(contents)
590        return path
591
592    def test_parse_simple(self):
593        """Allow simple policy files."""
594        path = self._write_file(
595            'test.policy', """
596            # Comment.
597            read: allow
598            write: allow
599        """)
600
601        self.assertEqual(
602            self.parser.parse_file(path),
603            parser.ParsedPolicy(
604                default_action=bpf.KillProcess(),
605                filter_statements=[
606                    parser.FilterStatement(
607                        syscall=parser.Syscall('read', 0),
608                        frequency=1,
609                        filters=[
610                            parser.Filter(None, bpf.Allow()),
611                        ]),
612                    parser.FilterStatement(
613                        syscall=parser.Syscall('write', 1),
614                        frequency=1,
615                        filters=[
616                            parser.Filter(None, bpf.Allow()),
617                        ]),
618                ]))
619
620    def test_parse_multiline(self):
621        """Allow simple multi-line policy files."""
622        path = self._write_file(
623            'test.policy', """
624            # Comment.
625            read: \
626                allow
627            write: allow
628        """)
629
630        self.assertEqual(
631            self.parser.parse_file(path),
632            parser.ParsedPolicy(
633                default_action=bpf.KillProcess(),
634                filter_statements=[
635                    parser.FilterStatement(
636                        syscall=parser.Syscall('read', 0),
637                        frequency=1,
638                        filters=[
639                            parser.Filter(None, bpf.Allow()),
640                        ]),
641                    parser.FilterStatement(
642                        syscall=parser.Syscall('write', 1),
643                        frequency=1,
644                        filters=[
645                            parser.Filter(None, bpf.Allow()),
646                        ]),
647                ]))
648
649    def test_parse_default(self):
650        """Allow defining a default action."""
651        path = self._write_file(
652            'test.policy', """
653            @default kill-thread
654            read: allow
655        """)
656
657        self.assertEqual(
658            self.parser.parse_file(path),
659            parser.ParsedPolicy(
660                default_action=bpf.KillThread(),
661                filter_statements=[
662                    parser.FilterStatement(
663                        syscall=parser.Syscall('read', 0),
664                        frequency=1,
665                        filters=[
666                            parser.Filter(None, bpf.Allow()),
667                        ]),
668                ]))
669
670    def test_parse_default_permissive(self):
671        """Reject defining a permissive default action."""
672        path = self._write_file(
673            'test.policy', """
674            @default log
675            read: allow
676        """)
677
678        with self.assertRaisesRegex(parser.ParseException,
679                                    r'invalid permissive default action'):
680            self.parser.parse_file(path)
681
682    def test_parse_simple_grouped(self):
683        """Allow simple policy files."""
684        path = self._write_file(
685            'test.policy', """
686            # Comment.
687            {read, write}: allow
688        """)
689
690        self.assertEqual(
691            self.parser.parse_file(path),
692            parser.ParsedPolicy(
693                default_action=bpf.KillProcess(),
694                filter_statements=[
695                    parser.FilterStatement(
696                        syscall=parser.Syscall('read', 0),
697                        frequency=1,
698                        filters=[
699                            parser.Filter(None, bpf.Allow()),
700                        ]),
701                    parser.FilterStatement(
702                        syscall=parser.Syscall('write', 1),
703                        frequency=1,
704                        filters=[
705                            parser.Filter(None, bpf.Allow()),
706                        ]),
707                ]))
708
709    def test_parse_other_arch(self):
710        """Allow entries that only target another architecture."""
711        path = self._write_file(
712            'test.policy', """
713            # Comment.
714            read[arch=nonexistent]: allow
715            write: allow
716        """)
717
718        self.assertEqual(
719            self.parser.parse_file(path),
720            parser.ParsedPolicy(
721                default_action=bpf.KillProcess(),
722                filter_statements=[
723                    parser.FilterStatement(
724                        syscall=parser.Syscall('write', 1),
725                        frequency=1,
726                        filters=[
727                            parser.Filter(None, bpf.Allow()),
728                        ]),
729                ]))
730
731    def test_parse_include(self):
732        """Allow including policy files."""
733        path = self._write_file(
734            'test.include.policy', """
735            {read, write}: arg0 == 0; allow
736        """)
737        path = self._write_file(
738            'test.policy', """
739            @include ./test.include.policy
740            read: return ENOSYS
741        """)
742
743        self.assertEqual(
744            self.parser.parse_file(path),
745            parser.ParsedPolicy(
746                default_action=bpf.KillProcess(),
747                filter_statements=[
748                    parser.FilterStatement(
749                        syscall=parser.Syscall('read', 0),
750                        frequency=1,
751                        filters=[
752                            parser.Filter([[parser.Atom(0, '==', 0)]],
753                                          bpf.Allow()),
754                            parser.Filter(
755                                None,
756                                bpf.ReturnErrno(
757                                    self.arch.constants['ENOSYS'])),
758                        ]),
759                    parser.FilterStatement(
760                        syscall=parser.Syscall('write', 1),
761                        frequency=1,
762                        filters=[
763                            parser.Filter([[parser.Atom(0, '==', 0)]],
764                                          bpf.Allow()),
765                            parser.Filter(None, bpf.KillProcess()),
766                        ]),
767                ]))
768
769    def test_parse_invalid_include(self):
770        """Reject including invalid policy files."""
771        with self.assertRaisesRegex(parser.ParseException,
772                                    r'empty include path'):
773            path = self._write_file(
774                'test.policy', """
775                @include
776            """)
777            self.parser.parse_file(path)
778
779        with self.assertRaisesRegex(parser.ParseException,
780                                    r'invalid include path'):
781            path = self._write_file(
782                'test.policy', """
783                @include arg0
784            """)
785            self.parser.parse_file(path)
786
787        with self.assertRaisesRegex(parser.ParseException,
788                                    r'@include statement nested too deep'):
789            path = self._write_file(
790                'test.policy', """
791                @include ./test.policy
792            """)
793            self.parser.parse_file(path)
794
795        with self.assertRaisesRegex(parser.ParseException,
796                                    r'Could not @include .*'):
797            path = self._write_file(
798                'test.policy', """
799                @include ./nonexistent.policy
800            """)
801            self.parser.parse_file(path)
802
803    def test_parse_frequency(self):
804        """Allow including frequency files."""
805        self._write_file(
806            'test.frequency', """
807            read: 2
808            write: 3
809        """)
810        path = self._write_file(
811            'test.policy', """
812            @frequency ./test.frequency
813            read: allow
814        """)
815
816        self.assertEqual(
817            self.parser.parse_file(path),
818            parser.ParsedPolicy(
819                default_action=bpf.KillProcess(),
820                filter_statements=[
821                    parser.FilterStatement(
822                        syscall=parser.Syscall('read', 0),
823                        frequency=2,
824                        filters=[
825                            parser.Filter(None, bpf.Allow()),
826                        ]),
827                ]))
828
829    def test_parse_invalid_frequency(self):
830        """Reject including invalid frequency files."""
831        path = self._write_file('test.policy',
832                                """@frequency ./test.frequency""")
833
834        with self.assertRaisesRegex(parser.ParseException, r'missing colon'):
835            self._write_file('test.frequency', """
836                read
837            """)
838            self.parser.parse_file(path)
839
840        with self.assertRaisesRegex(parser.ParseException, r'invalid colon'):
841            self._write_file('test.frequency', """
842                read foo
843            """)
844            self.parser.parse_file(path)
845
846        with self.assertRaisesRegex(parser.ParseException, r'missing number'):
847            self._write_file('test.frequency', """
848                read:
849            """)
850            self.parser.parse_file(path)
851
852        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
853            self._write_file('test.frequency', """
854                read: foo
855            """)
856            self.parser.parse_file(path)
857
858        with self.assertRaisesRegex(parser.ParseException, r'invalid number'):
859            self._write_file('test.frequency', """
860                read: -1
861            """)
862            self.parser.parse_file(path)
863
864        with self.assertRaisesRegex(parser.ParseException,
865                                    r'empty frequency path'):
866            path = self._write_file(
867                'test.policy', """
868                @frequency
869            """)
870            self.parser.parse_file(path)
871
872        with self.assertRaisesRegex(parser.ParseException,
873                                    r'invalid frequency path'):
874            path = self._write_file(
875                'test.policy', """
876                @frequency arg0
877            """)
878            self.parser.parse_file(path)
879
880        with self.assertRaisesRegex(parser.ParseException,
881                                    r'Could not open frequency file.*'):
882            path = self._write_file(
883                'test.policy', """
884                @frequency ./nonexistent.frequency
885            """)
886            self.parser.parse_file(path)
887
888    def test_parse_multiple_unconditional(self):
889        """Reject actions after an unconditional action."""
890        path = self._write_file(
891            'test.policy', """
892            read: allow
893            read: allow
894        """)
895
896        with self.assertRaisesRegex(
897                parser.ParseException,
898                (r'test.policy\(3:17\): '
899                 r'Syscall read.*already had an unconditional action '
900                 r'applied')):
901            self.parser.parse_file(path)
902
903        path = self._write_file(
904            'test.policy', """
905            read: log
906            read: arg0 == 0; log
907        """)
908
909        with self.assertRaisesRegex(
910                parser.ParseException,
911                (r'test.policy\(3:17\): '
912                 r'Syscall read.*already had an unconditional action '
913                 r'applied')):
914            self.parser.parse_file(path)
915
916    def test_parse_allowlist_denylist_header(self):
917        """Reject trying to compile denylist policy file as allowlist."""
918        with self.assertRaisesRegex(parser.ParseException,
919                                    r'policy is denylist, but flag --denylist '
920                                    'not passed in'):
921            path = self._write_file(
922                'test.policy', """
923                @denylist
924            """)
925            self.parser.parse_file(path)
926
927
928class ParseFileDenylistTests(unittest.TestCase):
929    """Tests for PolicyParser.parse_file."""
930
931    def setUp(self):
932        self.arch = ARCH_64
933        self.kill_action = bpf.KillProcess()
934        self.parser = parser.PolicyParser(
935            self.arch, kill_action=self.kill_action, denylist=True)
936        self.tempdir = tempfile.mkdtemp()
937
938    def tearDown(self):
939        shutil.rmtree(self.tempdir)
940
941    def _write_file(self, filename, contents):
942        """Helper to write out a file for testing."""
943        path = os.path.join(self.tempdir, filename)
944        with open(path, 'w') as outf:
945            outf.write(contents)
946        return path
947
948    def test_parse_simple(self):
949        """Allow simple denylist policy files."""
950        path = self._write_file(
951            'test.policy', """
952            # Comment.
953            @denylist
954            read: return ENOSYS
955            write: return ENOSYS
956        """)
957
958        self.assertEqual(
959            self.parser.parse_file(path),
960            parser.ParsedPolicy(
961                default_action=bpf.Allow(),
962                filter_statements=[
963                    parser.FilterStatement(
964                        syscall=parser.Syscall('read', 0),
965                        frequency=1,
966                        filters=[
967                            parser.Filter(None, bpf.ReturnErrno(
968                                    self.arch.constants['ENOSYS'])),
969                        ]),
970                    parser.FilterStatement(
971                        syscall=parser.Syscall('write', 1),
972                        frequency=1,
973                        filters=[
974                            parser.Filter(None, bpf.ReturnErrno(
975                                    self.arch.constants['ENOSYS'])),
976                        ]),
977                ]))
978
979    def test_parse_simple_with_arg(self):
980        """Allow simple denylist policy files."""
981        path = self._write_file(
982            'test.policy', """
983            # Comment.
984            @denylist
985            read: return ENOSYS
986            write: arg0 == 0 ; return ENOSYS
987        """)
988
989        self.assertEqual(
990            self.parser.parse_file(path),
991            parser.ParsedPolicy(
992                default_action=bpf.Allow(),
993                filter_statements=[
994                    parser.FilterStatement(
995                        syscall=parser.Syscall('read', 0),
996                        frequency=1,
997                        filters=[
998                            parser.Filter(None, bpf.ReturnErrno(
999                                    self.arch.constants['ENOSYS'])),
1000                        ]),
1001                    parser.FilterStatement(
1002                        syscall=parser.Syscall('write', 1),
1003                        frequency=1,
1004                        filters=[
1005                            parser.Filter([[parser.Atom(0, '==', 0)]],
1006                                bpf.ReturnErrno(self.arch.constants['ENOSYS'])),
1007                            parser.Filter(None, bpf.Allow()),
1008                        ]),
1009                ]))
1010
1011
1012    def test_parse_denylist_no_header(self):
1013        """Reject trying to compile denylist policy file as allowlist."""
1014        with self.assertRaisesRegex(parser.ParseException,
1015                                    r'policy must contain @denylist flag to be '
1016                                    'compiled with --denylist flag'):
1017            path = self._write_file(
1018                'test.policy', """
1019                read: return ENOSYS
1020            """)
1021            self.parser.parse_file(path)
1022
1023if __name__ == '__main__':
1024    unittest.main()
1025