home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / checkbox / plugins / backend_manager.py < prev    next >
Encoding:
Python Source  |  2009-04-27  |  6.2 KB  |  178 lines

  1. #
  2. # This file is part of Checkbox.
  3. #
  4. # Copyright 2008 Canonical Ltd.
  5. #
  6. # Checkbox is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # Checkbox is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with Checkbox.  If not, see <http://www.gnu.org/licenses/>.
  18. #
  19.  
  20. import logging
  21.  
  22. import gobject
  23. import dbus
  24. import dbus.service
  25. import dbus.mainloop.glib
  26.  
  27. from checkbox.lib.environ import append_path
  28.  
  29. from checkbox.properties import Int, String
  30.  
  31.  
  32. class PermissionDeniedByPolicy(dbus.DBusException):
  33.     _dbus_error_name = "com.ubuntu.checkbox.PermissionDeniedByPolicy"
  34.  
  35.  
  36. class UnknownRegistryException(dbus.DBusException):
  37.     _dbus_error_name = 'com.ubuntu.DeviceDriver.InvalidDriverDBException'
  38.  
  39. class UnknownTestException(dbus.DBusException):
  40.     _dbus_error_name = 'com.ubuntu.DeviceDriver.InvalidDriverDBException'
  41.  
  42.  
  43. class BackendManager(dbus.service.Object):
  44.  
  45.     DBUS_INTERFACE_NAME = "com.ubuntu.checkbox"
  46.  
  47.     DBUS_BUS_NAME = "com.ubuntu.checkbox"
  48.  
  49.     timeout = Int(default=600)
  50.     path = String(default="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
  51.  
  52.     def __init__(self):
  53.         self.dbus_info = None
  54.         self.polkit = None
  55.         self.loop = False
  56.         self.tests = {}
  57.  
  58.     def __repr__(self):
  59.         return "<BackendManager>"
  60.  
  61.     def register(self, manager):
  62.         import dbus.mainloop.glib
  63.         dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  64.         self.bus = dbus.SystemBus()
  65.         self.dbus_name = dbus.service.BusName(self.DBUS_BUS_NAME, self.bus)
  66.  
  67.         # Set environment which has been reset by dbus
  68.         for path in self.path.split(":"):
  69.             append_path(path)
  70.  
  71.         self._manager = manager
  72.         for (rt, rh) in [
  73.              ("test-.*", self.test_all),
  74.              ("run", self.run)]:
  75.             self._manager.reactor.call_on(rt, rh)
  76.  
  77.     @dbus.service.method(DBUS_INTERFACE_NAME,
  78.         in_signature="s", out_signature="s", sender_keyword="sender",
  79.         connection_keyword="conn")
  80.     def get_registry(self, name, sender=None, conn=None):
  81.         self._check_polkit_privilege(sender, conn, "com.ubuntu.checkbox.info")
  82.         if name not in self._manager.registry:
  83.             raise UnknownRegistryException, "Registry not found: %s" % name
  84.  
  85.         return str(self._manager.registry[name])
  86.  
  87.     @dbus.service.method(DBUS_INTERFACE_NAME,
  88.         in_signature="ss", out_signature="as", sender_keyword="sender",
  89.         connection_keyword="conn")
  90.     def get_test_result(self, suite, name, sender=None, conn=None):
  91.         self._check_polkit_privilege(sender, conn, "com.ubuntu.checkbox.test")
  92.         if (suite, name) not in self.tests:
  93.             raise UnknownTestException, \
  94.                 "Suite/test not found: %s/%s" % (suite, name)
  95.  
  96.         test = self.tests[(suite, name)]
  97.         result = test.command()
  98.         return (result.status, result.data, str(result.duration))
  99.  
  100.     @dbus.service.method(DBUS_INTERFACE_NAME,
  101.         in_signature="ss", out_signature="s", sender_keyword="sender",
  102.         connection_keyword="conn")
  103.     def get_test_description(self, suite, name, sender=None, conn=None):
  104.         self._check_polkit_privilege(sender, conn, "com.ubuntu.checkbox.test")
  105.         if (suite, name) not in self.tests:
  106.             raise UnknownTestException, \
  107.                 "Suite/test not found: %s/%s" % (suite, name)
  108.  
  109.         test = self.tests[(suite, name)]
  110.         return test.description()
  111.  
  112.     def test_all(self, test):
  113.         self.tests[(test.suite, test.name)] = test
  114.  
  115.     def run(self):
  116.         self._manager.reactor.fire("gather")
  117.  
  118.         # Delay instantiating the service after register
  119.         dbus.service.Object.__init__(self, self.bus, "/checkbox")
  120.         main_loop = gobject.MainLoop()
  121.         self.loop = False
  122.         if self.timeout:
  123.             def _t():
  124.                 main_loop.quit()
  125.                 return True
  126.             gobject.timeout_add(self.timeout * 1000, _t)
  127.  
  128.         # run until we time out
  129.         while not self.loop:
  130.             if self.timeout:
  131.                 self.loop = True
  132.             main_loop.run()
  133.  
  134.     def _check_polkit_privilege(self, sender, conn, privilege):
  135.         """Verify that sender has a given PolicyKit privilege.
  136.  
  137.         sender is the sender's (private) D-BUS name, such as ":1:42"
  138.         (sender_keyword in @dbus.service.methods). conn is
  139.         the dbus.Connection object (connection_keyword in
  140.         @dbus.service.methods). privilege is the PolicyKit privilege string.
  141.  
  142.         This method returns if the caller is privileged, and otherwise throws a
  143.         PermissionDeniedByPolicy exception.
  144.         """
  145.         if sender is None and conn is None:
  146.             # called locally, not through D-BUS
  147.             return
  148.  
  149.         # get peer PID
  150.         if self.dbus_info is None:
  151.             self.dbus_info = dbus.Interface(conn.get_object("org.freedesktop.DBus",
  152.                 "/org/freedesktop/DBus/Bus", False), "org.freedesktop.DBus")
  153.         pid = self.dbus_info.GetConnectionUnixProcessID(sender)
  154.  
  155.         # query PolicyKit
  156.         if self.polkit is None:
  157.             self.polkit = dbus.Interface(dbus.SystemBus().get_object(
  158.                 "org.freedesktop.PolicyKit", "/", False),
  159.                 "org.freedesktop.PolicyKit")
  160.         try:
  161.             res = self.polkit.IsProcessAuthorized(privilege, pid, True)
  162.         except dbus.DBusException, e:
  163.             if e._dbus_error_name == "org.freedesktop.DBus.Error.ServiceUnknown":
  164.                 # polkitd timed out, connect again
  165.                 self.polkit = None
  166.                 return self._check_polkit_privilege(sender, conn, privilege)
  167.             else:
  168.                 raise
  169.  
  170.         if res != "yes":
  171.             logging.debug("_check_polkit_privilege: sender %s "
  172.                 "on connection %s pid %i requested %s: %s",
  173.                 sender, conn, pid, privilege, res)
  174.             raise PermissionDeniedByPolicy(privilege + " " + res)
  175.  
  176.  
  177. factory = BackendManager
  178.