1#!/usr/bin/python3 -EsI 2 3import dbus 4import dbus.service 5from dbus.mainloop.glib import DBusGMainLoop 6from gi.repository import GObject 7from gi.repository import GLib 8import os 9import selinux 10from subprocess import Popen, PIPE, STDOUT 11 12 13class selinux_server(dbus.service.Object): 14 default_polkit_auth_required = "org.selinux.semanage" 15 16 def __init__(self, *p, **k): 17 super(selinux_server, self).__init__(*p, **k) 18 19 def is_authorized(self, sender, action_id): 20 bus = dbus.SystemBus() 21 proxy = bus.get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority') 22 authority = dbus.Interface(proxy, dbus_interface='org.freedesktop.PolicyKit1.Authority') 23 subject = ('system-bus-name', {'name': sender}) 24 result = authority.CheckAuthorization(subject, action_id, {}, 1, '') 25 return result[0] 26 27 # 28 # The semanage method runs a transaction on a series of semanage commands, 29 # these commands can take the output of customized 30 # 31 @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") 32 def semanage(self, buf, sender): 33 if not self.is_authorized(sender, "org.selinux.semanage"): 34 raise dbus.exceptions.DBusException("Not authorized") 35 p = Popen(["/usr/sbin/semanage", "import"], stdout=PIPE, stderr=PIPE, stdin=PIPE, universal_newlines=True) 36 p.stdin.write(buf) 37 output = p.communicate() 38 if p.returncode and p.returncode != 0: 39 raise dbus.exceptions.DBusException(output[1]) 40 41 # 42 # The customized method will return all of the custommizations for policy 43 # on the server. This output can be used with the semanage method on 44 # another server to make the two systems have duplicate policy. 45 # 46 @dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender") 47 def customized(self, sender): 48 if not self.is_authorized(sender, "org.selinux.customized"): 49 raise dbus.exceptions.DBusException("Not authorized") 50 p = Popen(["/usr/sbin/semanage", "export"], stdout=PIPE, stderr=PIPE, universal_newlines=True) 51 buf = p.stdout.read() 52 output = p.communicate() 53 if p.returncode and p.returncode != 0: 54 raise OSError("Failed to read SELinux configuration: %s", output) 55 return buf 56 57 # 58 # The semodule_list method will return the output of semodule --list=full, using the customized polkit, 59 # since this is a readonly behaviour 60 # 61 @dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender") 62 def semodule_list(self, sender): 63 if not self.is_authorized(sender, "org.selinux.semodule_list"): 64 raise dbus.exceptions.DBusException("Not authorized") 65 p = Popen(["/usr/sbin/semodule", "--list=full"], stdout=PIPE, stderr=PIPE, universal_newlines=True) 66 buf = p.stdout.read() 67 output = p.communicate() 68 if p.returncode and p.returncode != 0: 69 raise OSError("Failed to list SELinux modules: %s", output) 70 return buf 71 72 # 73 # The restorecon method modifies any file path to the default system label 74 # 75 @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") 76 def restorecon(self, path, sender): 77 if not self.is_authorized(sender, "org.selinux.restorecon"): 78 raise dbus.exceptions.DBusException("Not authorized") 79 selinux.restorecon(str(path), recursive=1) 80 81 # 82 # The setenforce method turns off the current enforcement of SELinux 83 # 84 @dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender") 85 def setenforce(self, value, sender): 86 if not self.is_authorized(sender, "org.selinux.setenforce"): 87 raise dbus.exceptions.DBusException("Not authorized") 88 selinux.security_setenforce(value) 89 90 # 91 # The setenforce method turns off the current enforcement of SELinux 92 # 93 @dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender") 94 def relabel_on_boot(self, value, sender): 95 if not self.is_authorized(sender, "org.selinux.relabel_on_boot"): 96 raise dbus.exceptions.DBusException("Not authorized") 97 if value == 1: 98 fd = open("/.autorelabel", "w") 99 fd.close() 100 else: 101 try: 102 os.unlink("/.autorelabel") 103 except FileNotFoundError: 104 pass 105 106 def write_selinux_config(self, enforcing=None, policy=None): 107 path = selinux.selinux_path() + "config" 108 backup_path = path + ".bck" 109 fd = open(path) 110 lines = fd.readlines() 111 fd.close() 112 fd = open(backup_path, "w") 113 for l in lines: 114 if enforcing and l.startswith("SELINUX="): 115 fd.write("SELINUX=%s\n" % enforcing) 116 continue 117 if policy and l.startswith("SELINUXTYPE="): 118 fd.write("SELINUXTYPE=%s\n" % policy) 119 continue 120 fd.write(l) 121 fd.close() 122 os.rename(backup_path, path) 123 124 # 125 # The change_default_enforcement modifies the current enforcement mode 126 # 127 @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") 128 def change_default_mode(self, value, sender): 129 if not self.is_authorized(sender, "org.selinux.change_default_mode"): 130 raise dbus.exceptions.DBusException("Not authorized") 131 values = ["enforcing", "permissive", "disabled"] 132 if value not in values: 133 raise ValueError("Enforcement mode must be %s" % ", ".join(values)) 134 self.write_selinux_config(enforcing=value) 135 136 # 137 # The change_default_policy method modifies the policy type 138 # 139 @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") 140 def change_default_policy(self, value, sender): 141 if not self.is_authorized(sender, "org.selinux.change_default_policy"): 142 raise dbus.exceptions.DBusException("Not authorized") 143 path = selinux.selinux_path() + value 144 if os.path.isdir(path): 145 return self.write_selinux_config(policy=value) 146 raise ValueError("%s does not exist" % path) 147 148if __name__ == "__main__": 149 DBusGMainLoop(set_as_default=True) 150 mainloop = GLib.MainLoop() 151 152 system_bus = dbus.SystemBus() 153 name = dbus.service.BusName("org.selinux", system_bus) 154 server = selinux_server(system_bus, "/org/selinux/object") 155 mainloop.run() 156