home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- '''
- The purpose of this module is to encapsulate the concept of a test
- which might be presented in several ways. For example, a test might
- require manual intervention whereas another test might be completely
- automatic. Either way, this module provides the base common to each type
- of test.
- '''
- import re
- import logging
- from gettext import gettext as _
- from checkbox.lib.environ import add_variable, remove_variable
- from checkbox.lib.signal import signal_to_name, signal_to_description
- from checkbox.command import Command
- from checkbox.frontend import frontend
- from checkbox.requires import Requires
- DESKTOP = 'desktop'
- LAPTOP = 'laptop'
- SERVER = 'server'
- ALL_CATEGORIES = [
- DESKTOP,
- LAPTOP,
- SERVER]
- I386 = 'i386'
- AMD64 = 'amd64'
- LPIA = 'lpia'
- SPARC = 'sparc'
- ALL_ARCHITECTURES = [
- I386,
- AMD64,
- LPIA,
- SPARC]
- FAIL = 'fail'
- PASS = 'pass'
- SKIP = 'skip'
- ALL_STATUS = [
- PASS,
- FAIL,
- SKIP]
-
- class TestResult(object):
-
- def __init__(self, test, status, data, duration = None):
- self.test = test
- self.status = status
- self.data = data
- self.duration = duration
-
-
- def attributes(self):
- return {
- 'status': self.status,
- 'data': self.data,
- 'duration': self.duration }
-
- attributes = property(attributes)
-
- def devices(self):
- return self.test.requires.get_devices()
-
- devices = property(devices)
-
- def packages(self):
- return self.test.requires.get_packages()
-
- packages = property(packages)
-
- def _get_status(self):
- return self._status
-
-
- def _set_status(self, status):
- if status not in ALL_STATUS:
- raise Exception, 'Invalid status: %s' % status
- status not in ALL_STATUS
- self._status = status
-
- status = property(_get_status, _set_status)
-
-
- class TestCommand(Command):
-
- def __init__(self, test, *args, **kwargs):
- super(TestCommand, self).__init__(test.command, test.timeout, *args, **kwargs)
- self.test = test
-
-
- def execute(self, *args, **kwargs):
- return super(TestCommand, self).execute(*args, **kwargs)
-
- execute = frontend('get_test_result')(execute)
-
- def post_execute(self, result):
- result = super(TestCommand, self).post_execute(result)
- if result.if_exited:
- exit_status = result.exit_status
- if exit_status == 0:
- status = PASS
- data = result.stdout
- if not data:
- data = result.stderr
-
- elif exit_status == 127:
- status = SKIP
- data = _('Command failed, skipping.')
- else:
- status = FAIL
- data = result.stderr
- elif result.if_signaled:
- status = SKIP
- term_signal = result.term_signal
- data = _('Received terminate signal %s: %s') % (signal_to_name(term_signal), signal_to_description(term_signal))
- else:
- raise Exception, 'Command not terminated: %s' % self.get_command()
- duration = result.if_exited.duration
- return TestResult(self.test, status, data, duration)
-
-
-
- class TestDescription(Command):
-
- def __init__(self, test):
- super(TestDescription, self).__init__(test.description, test.timeout)
- self.test = test
- self.output = None
-
-
- def get_command(self):
- command = super(TestDescription, self).get_command()
- return 'cat <<EOF\n%s\nEOF\n' % command
-
-
- def pre_execute(self, result = None):
- super(TestDescription, self).pre_execute()
- if re.search('\\$output', self.get_command()):
- if not (self.output) and not result:
- result = self.test.command()
-
- if result:
- self.output = result.data.strip()
- result.data = ''
-
- add_variable('output', self.output)
-
-
-
- def execute(self, *args, **kwargs):
- return super(TestDescription, self).execute(*args, **kwargs)
-
- execute = frontend('get_test_description')(execute)
-
- def post_execute(self, result):
- result = super(TestDescription, self).post_execute(result)
- remove_variable('output')
- if not (result.if_exited) or result.exit_status != 0:
- raise Exception, 'Description failed: %s' % self.get_command()
- result.exit_status != 0
- return result.stdout
-
-
-
- class Test(object):
- """
- Test base class which should be inherited by each test
- implementation. A test instance contains the following required
- fields:
-
- name: Unique name for a test.
- plugin: Plugin name to handle this test.
- description: Long description of what the test does.
- suite: Name of the suite containing this test.
-
- An instance also contains the following optional fields:
-
- architectures: List of architectures for which this test is relevant:
- amd64, i386, powerpc and/or sparc
- categories: List of categories for which this test is relevant:
- desktop, laptop and/or server
- command: Command to run for the test.
- depends: List of names on which this test depends. So, if
- the other test fails, this test will be skipped.
- requires: Registry expressions which are requirements for
- this test: 'input.mouse' in info.capabilities
- timeout: Timeout for running the command.
- user: User to run the command.
- optional: Boolean expression set to True if this test is optional
- or False if this test is required.
- """
- required_fields = [
- 'name',
- 'plugin',
- 'description',
- 'suite']
- optional_fields = {
- 'architectures': [],
- 'categories': [],
- 'command': None,
- 'depends': [],
- 'requires': None,
- 'timeout': None,
- 'user': None,
- 'optional': False }
-
- def __init__(self, registry, **attributes):
- super(Test, self).__setattr__('attributes', attributes)
- for field in [
- 'architectures',
- 'categories',
- 'depends']:
- if attributes.has_key(field):
- attributes[field] = re.split('\\s*,\\s*', attributes[field])
- continue
-
- for field in [
- 'timeout']:
- if attributes.has_key(field):
- attributes[field] = int(attributes[field])
- continue
-
- for field in self.optional_fields.keys():
- if not attributes.has_key(field):
- attributes[field] = self.optional_fields[field]
- continue
-
- attributes['requires'] = Requires(registry, attributes['requires'])
- attributes['command'] = TestCommand(self)
- attributes['description'] = TestDescription(self)
- self._validate()
-
-
- def _validate(self):
- for field in self.attributes.keys():
- if field not in self.required_fields + self.optional_fields.keys():
- logging.info('Test attributes contains unknown field: %s', field)
- del self.attributes[field]
- continue
-
- for field in self.required_fields:
- if not self.attributes.has_key(field):
- raise Exception, "Test attributes does not contain '%s': %s" % (field, self.attributes)
- self.attributes.has_key(field)
-
-
-
- def __getattr__(self, name):
- if name not in self.attributes:
- raise AttributeError, name
- name not in self.attributes
- return self.attributes[name]
-
-
- def __setattr__(self, name, value):
- if name not in self.attributes:
- raise AttributeError, name
- name not in self.attributes
- self.attributes[name] = value
- self._validate()
-
-
-