1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2018 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17"""Unittests for the parser module.""" 18 19from __future__ import absolute_import 20from __future__ import division 21from __future__ import print_function 22 23import os 24import shutil 25import tempfile 26import unittest 27from importlib import resources 28 29import arch 30import bpf 31import parser # pylint: disable=wrong-import-order 32 33ARCH_64 = arch.Arch.load_from_json_bytes( 34 resources.files("testdata").joinpath("arch_64.json").read_bytes() 35) 36 37 38class TokenizerTests(unittest.TestCase): 39 """Tests for ParserState.tokenize.""" 40 41 @staticmethod 42 def _tokenize(line): 43 parser_state = parser.ParserState('<memory>') 44 return list(parser_state.tokenize([line]))[0] 45 46 def test_tokenize(self): 47 """Accept valid tokens.""" 48 self.assertEqual([ 49 (token.type, token.value) 50 for token in TokenizerTests._tokenize('@include /minijail.policy') 51 ], [ 52 ('INCLUDE', '@include'), 53 ('PATH', '/minijail.policy'), 54 ]) 55 self.assertEqual([ 56 (token.type, token.value) 57 for token in TokenizerTests._tokenize('@include ./minijail.policy') 58 ], [ 59 ('INCLUDE', '@include'), 60 ('PATH', './minijail.policy'), 61 ]) 62 self.assertEqual( 63 [(token.type, token.value) for token in TokenizerTests._tokenize( 64 'read: arg0 in ~0xffff || arg0 & (1|2) && arg0 == 0755; ' 65 'return ENOSYS # ignored')], [ 66 ('IDENTIFIER', 'read'), 67 ('COLON', ':'), 68 ('ARGUMENT', 'arg0'), 69 ('OP', 'in'), 70 ('BITWISE_COMPLEMENT', '~'), 71 ('NUMERIC_CONSTANT', '0xffff'), 72 ('OR', '||'), 73 ('ARGUMENT', 'arg0'), 74 ('OP', '&'), 75 ('LPAREN', '('), 76 ('NUMERIC_CONSTANT', '1'), 77 ('BITWISE_OR', '|'), 78 ('NUMERIC_CONSTANT', '2'), 79 ('RPAREN', ')'), 80 ('AND', '&&'), 81 ('ARGUMENT', 'arg0'), 82 ('OP', '=='), 83 ('NUMERIC_CONSTANT', '0755'), 84 ('SEMICOLON', ';'), 85 ('RETURN', 'return'), 86 ('IDENTIFIER', 'ENOSYS'), 87 ]) 88 # Ensure that tokens that have an otherwise valid token as prefix are 89 # still matched correctly. 90 self.assertEqual([ 91 (token.type, token.value) 92 for token in TokenizerTests._tokenize( 93 'inotify_wait return_sys killall trace_sys') 94 ], [ 95 ('IDENTIFIER', 'inotify_wait'), 96 ('IDENTIFIER', 'return_sys'), 97 ('IDENTIFIER', 'killall'), 98 ('IDENTIFIER', 'trace_sys'), 99 ]) 100 101 def test_tokenize_invalid_token(self): 102 """Reject tokenizer errors.""" 103 with self.assertRaisesRegex(parser.ParseException, 104 (r'<memory>\(1:1\): invalid token\n' 105 r' %invalid-token%\n' 106 r' \^')): 107 TokenizerTests._tokenize('%invalid-token%') 108 109 110class ParseConstantTests(unittest.TestCase): 111 """Tests for PolicyParser.parse_value.""" 112 113 def setUp(self): 114 self.arch = ARCH_64 115 self.parser = parser.PolicyParser( 116 self.arch, kill_action=bpf.KillProcess()) 117 118 def _tokenize(self, line): 119 # pylint: disable=protected-access 120 return list(self.parser._parser_state.tokenize([line]))[0] 121 122 def test_parse_constant_unsigned(self): 123 """Accept reasonably-sized unsigned constants.""" 124 self.assertEqual( 125 self.parser.parse_value(self._tokenize('0x80000000')), 0x80000000) 126 if self.arch.bits == 64: 127 self.assertEqual( 128 self.parser.parse_value(self._tokenize('0x8000000000000000')), 129 0x8000000000000000) 130 131 def test_parse_constant_unsigned_too_big(self): 132 """Reject unreasonably-sized unsigned constants.""" 133 if self.arch.bits == 32: 134 with self.assertRaisesRegex(parser.ParseException, 135 'unsigned overflow'): 136 self.parser.parse_value(self._tokenize('0x100000000')) 137 with self.assertRaisesRegex(parser.ParseException, 138 'unsigned overflow'): 139 self.parser.parse_value(self._tokenize('0x10000000000000000')) 140 141 def test_parse_constant_signed(self): 142 """Accept reasonably-sized signed constants.""" 143 self.assertEqual( 144 self.parser.parse_value(self._tokenize('-1')), 145 self.arch.max_unsigned) 146 147 def test_parse_constant_signed_too_negative(self): 148 """Reject unreasonably-sized signed constants.""" 149 if self.arch.bits == 32: 150 with self.assertRaisesRegex(parser.ParseException, 151 'signed underflow'): 152 self.parser.parse_value(self._tokenize('-0x800000001')) 153 with self.assertRaisesRegex(parser.ParseException, 'signed underflow'): 154 self.parser.parse_value(self._tokenize('-0x8000000000000001')) 155 156 def test_parse_mask(self): 157 """Accept parsing a mask value.""" 158 self.assertEqual( 159 self.parser.parse_value(self._tokenize('0x1|0x2|0x4|0x8')), 0xf) 160 161 def test_parse_parenthesized_expressions(self): 162 """Accept parsing parenthesized expressions.""" 163 bad_expressions = [ 164 '(1', 165 '|(1)', 166 '(1)|', 167 '()', 168 '(', 169 '((', 170 '(()', 171 '(()1', 172 ] 173 for expression in bad_expressions: 174 with self.assertRaises(parser.ParseException, msg=expression): 175 self.parser.parse_value(self._tokenize(expression)) 176 177 bad_partial_expressions = [ 178 '1)', 179 '(1)1', 180 '1(0)', 181 ] 182 for expression in bad_partial_expressions: 183 tokens = self._tokenize(expression) 184 self.parser.parse_value(tokens) 185 self.assertNotEqual(tokens, []) 186 187 good_expressions = [ 188 '(3)', 189 '(1)|2', 190 '1|(2)', 191 '(1)|(2)', 192 '((3))', 193 '0|(1|2)', 194 '(0|1|2)', 195 ] 196 for expression in good_expressions: 197 self.assertEqual( 198 self.parser.parse_value(self._tokenize(expression)), 3) 199 200 def test_parse_constant_complements(self): 201 """Accept complementing constants.""" 202 self.assertEqual( 203 self.parser.parse_value(self._tokenize('~0')), 204 self.arch.max_unsigned) 205 self.assertEqual( 206 self.parser.parse_value(self._tokenize('~0|~0')), 207 self.arch.max_unsigned) 208 if self.arch.bits == 32: 209 self.assertEqual( 210 self.parser.parse_value( 211 self._tokenize('~0x005AF0FF|~0xFFA50FFF')), 0xFFFFFF00) 212 self.assertEqual( 213 self.parser.parse_value( 214 self._tokenize('0x0F|~(0x005AF000|0x00A50FFF)|0xF0')), 215 0xFF0000FF) 216 else: 217 self.assertEqual( 218 self.parser.parse_value( 219 self._tokenize('~0x00005A5AF0F0FFFF|~0xFFFFA5A50F0FFFFF')), 220 0xFFFFFFFFFFFF0000) 221 self.assertEqual( 222 self.parser.parse_value( 223 self._tokenize( 224 '0x00FF|~(0x00005A5AF0F00000|0x0000A5A50F0FFFFF)|0xFF00' 225 )), 0xFFFF00000000FFFF) 226 227 def test_parse_double_complement(self): 228 """Reject double-complementing constants.""" 229 with self.assertRaisesRegex(parser.ParseException, 230 'double complement'): 231 self.parser.parse_value(self._tokenize('~~0')) 232 233 def test_parse_empty_complement(self): 234 """Reject complementing nothing.""" 235 with self.assertRaisesRegex(parser.ParseException, 'empty complement'): 236 self.parser.parse_value(self._tokenize('0|~')) 237 238 def test_parse_named_constant(self): 239 """Accept parsing a named constant.""" 240 self.assertEqual( 241 self.parser.parse_value(self._tokenize('O_RDONLY')), 0) 242 243 def test_parse_empty_constant(self): 244 """Reject parsing nothing.""" 245 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 246 self.parser.parse_value([]) 247 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 248 self.parser.parse_value(self._tokenize('0|')) 249 250 def test_parse_invalid_constant(self): 251 """Reject parsing invalid constants.""" 252 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 253 self.parser.parse_value(self._tokenize('foo')) 254 255 256class ParseFilterExpressionTests(unittest.TestCase): 257 """Tests for PolicyParser.parse_argument_expression.""" 258 259 def setUp(self): 260 self.arch = ARCH_64 261 self.parser = parser.PolicyParser( 262 self.arch, kill_action=bpf.KillProcess()) 263 264 def _tokenize(self, line): 265 # pylint: disable=protected-access 266 return list(self.parser._parser_state.tokenize([line]))[0] 267 268 def test_parse_argument_expression(self): 269 """Accept valid argument expressions.""" 270 self.assertEqual( 271 self.parser.parse_argument_expression( 272 self._tokenize( 273 'arg0 in 0xffff || arg0 == PROT_EXEC && arg1 == PROT_WRITE' 274 )), [ 275 [parser.Atom(0, 'in', 0xffff)], 276 [parser.Atom(0, '==', 4), 277 parser.Atom(1, '==', 2)], 278 ]) 279 280 def test_parse_number_argument_expression(self): 281 """Accept valid argument expressions with any octal/decimal/hex number.""" 282 # 4607 == 010777 == 0x11ff 283 self.assertEqual( 284 self.parser.parse_argument_expression( 285 self._tokenize('arg0 in 4607')), [ 286 [parser.Atom(0, 'in', 4607)], 287 ]) 288 289 self.assertEqual( 290 self.parser.parse_argument_expression( 291 self._tokenize('arg0 in 010777')), [ 292 [parser.Atom(0, 'in', 4607)], 293 ]) 294 295 self.assertEqual( 296 self.parser.parse_argument_expression( 297 self._tokenize('arg0 in 0x11ff')), [ 298 [parser.Atom(0, 'in', 4607)], 299 ]) 300 301 def test_parse_empty_argument_expression(self): 302 """Reject empty argument expressions.""" 303 with self.assertRaisesRegex(parser.ParseException, 304 'empty argument expression'): 305 self.parser.parse_argument_expression( 306 self._tokenize('arg0 in 0xffff ||')) 307 308 def test_parse_empty_clause(self): 309 """Reject empty clause.""" 310 with self.assertRaisesRegex(parser.ParseException, 'empty clause'): 311 self.parser.parse_argument_expression( 312 self._tokenize('arg0 in 0xffff &&')) 313 314 def test_parse_invalid_argument(self): 315 """Reject invalid argument.""" 316 with self.assertRaisesRegex(parser.ParseException, 'invalid argument'): 317 self.parser.parse_argument_expression( 318 self._tokenize('argX in 0xffff')) 319 320 def test_parse_invalid_operator(self): 321 """Reject invalid operator.""" 322 with self.assertRaisesRegex(parser.ParseException, 'invalid operator'): 323 self.parser.parse_argument_expression( 324 self._tokenize('arg0 = 0xffff')) 325 326 def test_parse_missing_operator(self): 327 """Reject missing operator.""" 328 with self.assertRaisesRegex(parser.ParseException, 'missing operator'): 329 self.parser.parse_argument_expression(self._tokenize('arg0')) 330 331 def test_parse_missing_operand(self): 332 """Reject missing operand.""" 333 with self.assertRaisesRegex(parser.ParseException, 'empty constant'): 334 self.parser.parse_argument_expression(self._tokenize('arg0 ==')) 335 336 337class ParseFilterTests(unittest.TestCase): 338 """Tests for PolicyParser.parse_filter.""" 339 340 def setUp(self): 341 self.arch = ARCH_64 342 self.parser = parser.PolicyParser( 343 self.arch, kill_action=bpf.KillProcess()) 344 345 def _tokenize(self, line): 346 # pylint: disable=protected-access 347 return list(self.parser._parser_state.tokenize([line]))[0] 348 349 def test_parse_filter(self): 350 """Accept valid filters.""" 351 self.assertEqual( 352 self.parser.parse_filter(self._tokenize('arg0 == 0')), [ 353 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 354 ]) 355 self.assertEqual( 356 self.parser.parse_filter(self._tokenize('kill-process')), [ 357 parser.Filter(None, bpf.KillProcess()), 358 ]) 359 self.assertEqual( 360 self.parser.parse_filter(self._tokenize('kill-thread')), [ 361 parser.Filter(None, bpf.KillThread()), 362 ]) 363 self.assertEqual( 364 self.parser.parse_filter(self._tokenize('trap')), [ 365 parser.Filter(None, bpf.Trap()), 366 ]) 367 self.assertEqual( 368 self.parser.parse_filter(self._tokenize('return ENOSYS')), [ 369 parser.Filter(None, 370 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 371 ]) 372 self.assertEqual( 373 self.parser.parse_filter(self._tokenize('trace')), [ 374 parser.Filter(None, bpf.Trace()), 375 ]) 376 self.assertEqual( 377 self.parser.parse_filter(self._tokenize('user-notify')), [ 378 parser.Filter(None, bpf.UserNotify()), 379 ]) 380 self.assertEqual( 381 self.parser.parse_filter(self._tokenize('log')), [ 382 parser.Filter(None, bpf.Log()), 383 ]) 384 self.assertEqual( 385 self.parser.parse_filter(self._tokenize('allow')), [ 386 parser.Filter(None, bpf.Allow()), 387 ]) 388 self.assertEqual( 389 self.parser.parse_filter(self._tokenize('1')), [ 390 parser.Filter(None, bpf.Allow()), 391 ]) 392 self.assertEqual( 393 self.parser.parse_filter( 394 self._tokenize( 395 '{ arg0 == 0, arg0 == 1; return ENOSYS, trap }')), 396 [ 397 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 398 parser.Filter([[parser.Atom(0, '==', 1)]], 399 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 400 parser.Filter(None, bpf.Trap()), 401 ]) 402 403 def test_parse_missing_return_value(self): 404 """Reject missing return value.""" 405 with self.assertRaisesRegex(parser.ParseException, 406 'missing return value'): 407 self.parser.parse_filter(self._tokenize('return')) 408 409 def test_parse_invalid_return_value(self): 410 """Reject invalid return value.""" 411 with self.assertRaisesRegex(parser.ParseException, 'invalid constant'): 412 self.parser.parse_filter(self._tokenize('return arg0')) 413 414 def test_parse_unclosed_brace(self): 415 """Reject unclosed brace.""" 416 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 417 self.parser.parse_filter(self._tokenize('{ allow')) 418 419 420class ParseFilterDenylistTests(unittest.TestCase): 421 """Tests for PolicyParser.parse_filter with a denylist policy.""" 422 423 def setUp(self): 424 self.arch = ARCH_64 425 self.kill_action = bpf.KillProcess() 426 self.parser = parser.PolicyParser( 427 self.arch, kill_action=self.kill_action, denylist=True) 428 429 def _tokenize(self, line): 430 # pylint: disable=protected-access 431 return list(self.parser._parser_state.tokenize([line]))[0] 432 433 def test_parse_filter(self): 434 """Accept only filters that return an errno.""" 435 self.assertEqual( 436 self.parser.parse_filter(self._tokenize('arg0 == 0; return ENOSYS')), 437 [ 438 parser.Filter([[parser.Atom(0, '==', 0)]], 439 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 440 ]) 441 442 443class ParseFilterStatementTests(unittest.TestCase): 444 """Tests for PolicyParser.parse_filter_statement.""" 445 446 def setUp(self): 447 self.arch = ARCH_64 448 self.parser = parser.PolicyParser( 449 self.arch, kill_action=bpf.KillProcess()) 450 451 def _tokenize(self, line): 452 # pylint: disable=protected-access 453 return list(self.parser._parser_state.tokenize([line]))[0] 454 455 def assertEqualIgnoringToken(self, actual, expected, msg=None): 456 """Similar to assertEqual, but ignores the token field.""" 457 if (actual.syscalls != expected.syscalls or 458 actual.filters != expected.filters): 459 self.fail('%r != %r' % (actual, expected), msg) 460 461 def test_parse_filter_statement(self): 462 """Accept valid filter statements.""" 463 self.assertEqualIgnoringToken( 464 self.parser.parse_filter_statement( 465 self._tokenize('read: arg0 == 0')), 466 parser.ParsedFilterStatement( 467 syscalls=(parser.Syscall('read', 0), ), 468 filters=[ 469 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 470 ], 471 token=None)) 472 self.assertEqualIgnoringToken( 473 self.parser.parse_filter_statement( 474 self._tokenize('{read, write}: arg0 == 0')), 475 parser.ParsedFilterStatement( 476 syscalls=( 477 parser.Syscall('read', 0), 478 parser.Syscall('write', 1), 479 ), 480 filters=[ 481 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 482 ], 483 token=None)) 484 self.assertEqualIgnoringToken( 485 self.parser.parse_filter_statement( 486 self._tokenize('io@libc: arg0 == 0')), 487 parser.ParsedFilterStatement( 488 syscalls=( 489 parser.Syscall('read', 0), 490 parser.Syscall('write', 1), 491 ), 492 filters=[ 493 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 494 ], 495 token=None)) 496 self.assertEqualIgnoringToken( 497 self.parser.parse_filter_statement( 498 self._tokenize('file-io@systemd: arg0 == 0')), 499 parser.ParsedFilterStatement( 500 syscalls=( 501 parser.Syscall('read', 0), 502 parser.Syscall('write', 1), 503 ), 504 filters=[ 505 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 506 ], 507 token=None)) 508 self.assertEqualIgnoringToken( 509 self.parser.parse_filter_statement( 510 self._tokenize('kill: arg0 == 0')), 511 parser.ParsedFilterStatement( 512 syscalls=( 513 parser.Syscall('kill', 62), 514 ), 515 filters=[ 516 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 517 ], 518 token=None)) 519 520 def test_parse_metadata(self): 521 """Accept valid filter statements with metadata.""" 522 self.assertEqualIgnoringToken( 523 self.parser.parse_filter_statement( 524 self._tokenize('read[arch=test]: arg0 == 0')), 525 parser.ParsedFilterStatement( 526 syscalls=( 527 parser.Syscall('read', 0), 528 ), 529 filters=[ 530 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 531 ], 532 token=None)) 533 self.assertEqualIgnoringToken( 534 self.parser.parse_filter_statement( 535 self._tokenize( 536 '{read, nonexistent[arch=nonexistent]}: arg0 == 0')), 537 parser.ParsedFilterStatement( 538 syscalls=( 539 parser.Syscall('read', 0), 540 ), 541 filters=[ 542 parser.Filter([[parser.Atom(0, '==', 0)]], bpf.Allow()), 543 ], 544 token=None)) 545 546 def test_parse_unclosed_brace(self): 547 """Reject unclosed brace.""" 548 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 549 self.parser.parse_filter(self._tokenize('{ allow')) 550 551 def test_parse_invalid_syscall_group(self): 552 """Reject invalid syscall groups.""" 553 with self.assertRaisesRegex(parser.ParseException, 'unclosed brace'): 554 self.parser.parse_filter_statement( 555 self._tokenize('{ read, write: arg0 == 0')) 556 557 def test_parse_missing_colon(self): 558 """Reject missing colon.""" 559 with self.assertRaisesRegex(parser.ParseException, 'missing colon'): 560 self.parser.parse_filter_statement(self._tokenize('read')) 561 562 def test_parse_invalid_colon(self): 563 """Reject invalid colon.""" 564 with self.assertRaisesRegex(parser.ParseException, 'invalid colon'): 565 self.parser.parse_filter_statement(self._tokenize('read arg0')) 566 567 def test_parse_missing_filter(self): 568 """Reject missing filter.""" 569 with self.assertRaisesRegex(parser.ParseException, 'missing filter'): 570 self.parser.parse_filter_statement(self._tokenize('read:')) 571 572 573class ParseFileTests(unittest.TestCase): 574 """Tests for PolicyParser.parse_file.""" 575 576 def setUp(self): 577 self.arch = ARCH_64 578 self.parser = parser.PolicyParser( 579 self.arch, kill_action=bpf.KillProcess()) 580 self.tempdir = tempfile.mkdtemp() 581 582 def tearDown(self): 583 shutil.rmtree(self.tempdir) 584 585 def _write_file(self, filename, contents): 586 """Helper to write out a file for testing.""" 587 path = os.path.join(self.tempdir, filename) 588 with open(path, 'w') as outf: 589 outf.write(contents) 590 return path 591 592 def test_parse_simple(self): 593 """Allow simple policy files.""" 594 path = self._write_file( 595 'test.policy', """ 596 # Comment. 597 read: allow 598 write: allow 599 """) 600 601 self.assertEqual( 602 self.parser.parse_file(path), 603 parser.ParsedPolicy( 604 default_action=bpf.KillProcess(), 605 filter_statements=[ 606 parser.FilterStatement( 607 syscall=parser.Syscall('read', 0), 608 frequency=1, 609 filters=[ 610 parser.Filter(None, bpf.Allow()), 611 ]), 612 parser.FilterStatement( 613 syscall=parser.Syscall('write', 1), 614 frequency=1, 615 filters=[ 616 parser.Filter(None, bpf.Allow()), 617 ]), 618 ])) 619 620 def test_parse_multiline(self): 621 """Allow simple multi-line policy files.""" 622 path = self._write_file( 623 'test.policy', """ 624 # Comment. 625 read: \ 626 allow 627 write: allow 628 """) 629 630 self.assertEqual( 631 self.parser.parse_file(path), 632 parser.ParsedPolicy( 633 default_action=bpf.KillProcess(), 634 filter_statements=[ 635 parser.FilterStatement( 636 syscall=parser.Syscall('read', 0), 637 frequency=1, 638 filters=[ 639 parser.Filter(None, bpf.Allow()), 640 ]), 641 parser.FilterStatement( 642 syscall=parser.Syscall('write', 1), 643 frequency=1, 644 filters=[ 645 parser.Filter(None, bpf.Allow()), 646 ]), 647 ])) 648 649 def test_parse_default(self): 650 """Allow defining a default action.""" 651 path = self._write_file( 652 'test.policy', """ 653 @default kill-thread 654 read: allow 655 """) 656 657 self.assertEqual( 658 self.parser.parse_file(path), 659 parser.ParsedPolicy( 660 default_action=bpf.KillThread(), 661 filter_statements=[ 662 parser.FilterStatement( 663 syscall=parser.Syscall('read', 0), 664 frequency=1, 665 filters=[ 666 parser.Filter(None, bpf.Allow()), 667 ]), 668 ])) 669 670 def test_parse_default_permissive(self): 671 """Reject defining a permissive default action.""" 672 path = self._write_file( 673 'test.policy', """ 674 @default log 675 read: allow 676 """) 677 678 with self.assertRaisesRegex(parser.ParseException, 679 r'invalid permissive default action'): 680 self.parser.parse_file(path) 681 682 def test_parse_simple_grouped(self): 683 """Allow simple policy files.""" 684 path = self._write_file( 685 'test.policy', """ 686 # Comment. 687 {read, write}: allow 688 """) 689 690 self.assertEqual( 691 self.parser.parse_file(path), 692 parser.ParsedPolicy( 693 default_action=bpf.KillProcess(), 694 filter_statements=[ 695 parser.FilterStatement( 696 syscall=parser.Syscall('read', 0), 697 frequency=1, 698 filters=[ 699 parser.Filter(None, bpf.Allow()), 700 ]), 701 parser.FilterStatement( 702 syscall=parser.Syscall('write', 1), 703 frequency=1, 704 filters=[ 705 parser.Filter(None, bpf.Allow()), 706 ]), 707 ])) 708 709 def test_parse_other_arch(self): 710 """Allow entries that only target another architecture.""" 711 path = self._write_file( 712 'test.policy', """ 713 # Comment. 714 read[arch=nonexistent]: allow 715 write: allow 716 """) 717 718 self.assertEqual( 719 self.parser.parse_file(path), 720 parser.ParsedPolicy( 721 default_action=bpf.KillProcess(), 722 filter_statements=[ 723 parser.FilterStatement( 724 syscall=parser.Syscall('write', 1), 725 frequency=1, 726 filters=[ 727 parser.Filter(None, bpf.Allow()), 728 ]), 729 ])) 730 731 def test_parse_include(self): 732 """Allow including policy files.""" 733 path = self._write_file( 734 'test.include.policy', """ 735 {read, write}: arg0 == 0; allow 736 """) 737 path = self._write_file( 738 'test.policy', """ 739 @include ./test.include.policy 740 read: return ENOSYS 741 """) 742 743 self.assertEqual( 744 self.parser.parse_file(path), 745 parser.ParsedPolicy( 746 default_action=bpf.KillProcess(), 747 filter_statements=[ 748 parser.FilterStatement( 749 syscall=parser.Syscall('read', 0), 750 frequency=1, 751 filters=[ 752 parser.Filter([[parser.Atom(0, '==', 0)]], 753 bpf.Allow()), 754 parser.Filter( 755 None, 756 bpf.ReturnErrno( 757 self.arch.constants['ENOSYS'])), 758 ]), 759 parser.FilterStatement( 760 syscall=parser.Syscall('write', 1), 761 frequency=1, 762 filters=[ 763 parser.Filter([[parser.Atom(0, '==', 0)]], 764 bpf.Allow()), 765 parser.Filter(None, bpf.KillProcess()), 766 ]), 767 ])) 768 769 def test_parse_invalid_include(self): 770 """Reject including invalid policy files.""" 771 with self.assertRaisesRegex(parser.ParseException, 772 r'empty include path'): 773 path = self._write_file( 774 'test.policy', """ 775 @include 776 """) 777 self.parser.parse_file(path) 778 779 with self.assertRaisesRegex(parser.ParseException, 780 r'invalid include path'): 781 path = self._write_file( 782 'test.policy', """ 783 @include arg0 784 """) 785 self.parser.parse_file(path) 786 787 with self.assertRaisesRegex(parser.ParseException, 788 r'@include statement nested too deep'): 789 path = self._write_file( 790 'test.policy', """ 791 @include ./test.policy 792 """) 793 self.parser.parse_file(path) 794 795 with self.assertRaisesRegex(parser.ParseException, 796 r'Could not @include .*'): 797 path = self._write_file( 798 'test.policy', """ 799 @include ./nonexistent.policy 800 """) 801 self.parser.parse_file(path) 802 803 def test_parse_frequency(self): 804 """Allow including frequency files.""" 805 self._write_file( 806 'test.frequency', """ 807 read: 2 808 write: 3 809 """) 810 path = self._write_file( 811 'test.policy', """ 812 @frequency ./test.frequency 813 read: allow 814 """) 815 816 self.assertEqual( 817 self.parser.parse_file(path), 818 parser.ParsedPolicy( 819 default_action=bpf.KillProcess(), 820 filter_statements=[ 821 parser.FilterStatement( 822 syscall=parser.Syscall('read', 0), 823 frequency=2, 824 filters=[ 825 parser.Filter(None, bpf.Allow()), 826 ]), 827 ])) 828 829 def test_parse_invalid_frequency(self): 830 """Reject including invalid frequency files.""" 831 path = self._write_file('test.policy', 832 """@frequency ./test.frequency""") 833 834 with self.assertRaisesRegex(parser.ParseException, r'missing colon'): 835 self._write_file('test.frequency', """ 836 read 837 """) 838 self.parser.parse_file(path) 839 840 with self.assertRaisesRegex(parser.ParseException, r'invalid colon'): 841 self._write_file('test.frequency', """ 842 read foo 843 """) 844 self.parser.parse_file(path) 845 846 with self.assertRaisesRegex(parser.ParseException, r'missing number'): 847 self._write_file('test.frequency', """ 848 read: 849 """) 850 self.parser.parse_file(path) 851 852 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 853 self._write_file('test.frequency', """ 854 read: foo 855 """) 856 self.parser.parse_file(path) 857 858 with self.assertRaisesRegex(parser.ParseException, r'invalid number'): 859 self._write_file('test.frequency', """ 860 read: -1 861 """) 862 self.parser.parse_file(path) 863 864 with self.assertRaisesRegex(parser.ParseException, 865 r'empty frequency path'): 866 path = self._write_file( 867 'test.policy', """ 868 @frequency 869 """) 870 self.parser.parse_file(path) 871 872 with self.assertRaisesRegex(parser.ParseException, 873 r'invalid frequency path'): 874 path = self._write_file( 875 'test.policy', """ 876 @frequency arg0 877 """) 878 self.parser.parse_file(path) 879 880 with self.assertRaisesRegex(parser.ParseException, 881 r'Could not open frequency file.*'): 882 path = self._write_file( 883 'test.policy', """ 884 @frequency ./nonexistent.frequency 885 """) 886 self.parser.parse_file(path) 887 888 def test_parse_multiple_unconditional(self): 889 """Reject actions after an unconditional action.""" 890 path = self._write_file( 891 'test.policy', """ 892 read: allow 893 read: allow 894 """) 895 896 with self.assertRaisesRegex( 897 parser.ParseException, 898 (r'test.policy\(3:17\): ' 899 r'Syscall read.*already had an unconditional action ' 900 r'applied')): 901 self.parser.parse_file(path) 902 903 path = self._write_file( 904 'test.policy', """ 905 read: log 906 read: arg0 == 0; log 907 """) 908 909 with self.assertRaisesRegex( 910 parser.ParseException, 911 (r'test.policy\(3:17\): ' 912 r'Syscall read.*already had an unconditional action ' 913 r'applied')): 914 self.parser.parse_file(path) 915 916 def test_parse_allowlist_denylist_header(self): 917 """Reject trying to compile denylist policy file as allowlist.""" 918 with self.assertRaisesRegex(parser.ParseException, 919 r'policy is denylist, but flag --denylist ' 920 'not passed in'): 921 path = self._write_file( 922 'test.policy', """ 923 @denylist 924 """) 925 self.parser.parse_file(path) 926 927 928class ParseFileDenylistTests(unittest.TestCase): 929 """Tests for PolicyParser.parse_file.""" 930 931 def setUp(self): 932 self.arch = ARCH_64 933 self.kill_action = bpf.KillProcess() 934 self.parser = parser.PolicyParser( 935 self.arch, kill_action=self.kill_action, denylist=True) 936 self.tempdir = tempfile.mkdtemp() 937 938 def tearDown(self): 939 shutil.rmtree(self.tempdir) 940 941 def _write_file(self, filename, contents): 942 """Helper to write out a file for testing.""" 943 path = os.path.join(self.tempdir, filename) 944 with open(path, 'w') as outf: 945 outf.write(contents) 946 return path 947 948 def test_parse_simple(self): 949 """Allow simple denylist policy files.""" 950 path = self._write_file( 951 'test.policy', """ 952 # Comment. 953 @denylist 954 read: return ENOSYS 955 write: return ENOSYS 956 """) 957 958 self.assertEqual( 959 self.parser.parse_file(path), 960 parser.ParsedPolicy( 961 default_action=bpf.Allow(), 962 filter_statements=[ 963 parser.FilterStatement( 964 syscall=parser.Syscall('read', 0), 965 frequency=1, 966 filters=[ 967 parser.Filter(None, bpf.ReturnErrno( 968 self.arch.constants['ENOSYS'])), 969 ]), 970 parser.FilterStatement( 971 syscall=parser.Syscall('write', 1), 972 frequency=1, 973 filters=[ 974 parser.Filter(None, bpf.ReturnErrno( 975 self.arch.constants['ENOSYS'])), 976 ]), 977 ])) 978 979 def test_parse_simple_with_arg(self): 980 """Allow simple denylist policy files.""" 981 path = self._write_file( 982 'test.policy', """ 983 # Comment. 984 @denylist 985 read: return ENOSYS 986 write: arg0 == 0 ; return ENOSYS 987 """) 988 989 self.assertEqual( 990 self.parser.parse_file(path), 991 parser.ParsedPolicy( 992 default_action=bpf.Allow(), 993 filter_statements=[ 994 parser.FilterStatement( 995 syscall=parser.Syscall('read', 0), 996 frequency=1, 997 filters=[ 998 parser.Filter(None, bpf.ReturnErrno( 999 self.arch.constants['ENOSYS'])), 1000 ]), 1001 parser.FilterStatement( 1002 syscall=parser.Syscall('write', 1), 1003 frequency=1, 1004 filters=[ 1005 parser.Filter([[parser.Atom(0, '==', 0)]], 1006 bpf.ReturnErrno(self.arch.constants['ENOSYS'])), 1007 parser.Filter(None, bpf.Allow()), 1008 ]), 1009 ])) 1010 1011 1012 def test_parse_denylist_no_header(self): 1013 """Reject trying to compile denylist policy file as allowlist.""" 1014 with self.assertRaisesRegex(parser.ParseException, 1015 r'policy must contain @denylist flag to be ' 1016 'compiled with --denylist flag'): 1017 path = self._write_file( 1018 'test.policy', """ 1019 read: return ENOSYS 1020 """) 1021 self.parser.parse_file(path) 1022 1023if __name__ == '__main__': 1024 unittest.main() 1025