home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import os
- import sys
- import unittest
- import getopt
- import time
- use_resources = []
-
- class ResourceDenied(Exception):
- pass
-
-
- def is_resource_enabled(resource):
- if sys._getframe().f_back.f_globals.get('__name__') == '__main__':
- return True
- if not use_resources is not None and resource in use_resources:
- pass
- result = '*' in use_resources
- if not result:
- _unavail[resource] = None
-
- return result
-
- _unavail = { }
-
- def requires(resource, msg = None):
- if sys._getframe().f_back.f_globals.get('__name__') == '__main__':
- return None
- if not is_resource_enabled(resource):
- if msg is None:
- msg = "Use of the `%s' resource not enabled" % resource
-
- raise ResourceDenied(msg)
- is_resource_enabled(resource)
-
-
- def find_package_modules(package, mask):
- import fnmatch
- if hasattr(package, '__loader__') and hasattr(package.__loader__, '_files'):
- path = package.__name__.replace('.', os.path.sep)
- mask = os.path.join(path, mask)
- for fnm in package.__loader__._files.iterkeys():
- if fnmatch.fnmatchcase(fnm, mask):
- yield os.path.splitext(fnm)[0].replace(os.path.sep, '.')
- continue
-
- else:
- path = package.__path__[0]
- for fnm in os.listdir(path):
- if fnmatch.fnmatchcase(fnm, mask):
- yield '%s.%s' % (package.__name__, os.path.splitext(fnm)[0])
- continue
-
-
-
- def get_tests(package, mask, verbosity, exclude = ()):
- tests = []
- skipped = []
- for modname in find_package_modules(package, mask):
- if modname.split('.')[-1] in exclude:
- skipped.append(modname)
- if verbosity > 1:
- print >>sys.stderr, 'Skipped %s: excluded' % modname
- continue
- continue
-
-
- try:
- mod = __import__(modname, globals(), locals(), [
- '*'])
- except ResourceDenied:
- detail = None
- skipped.append(modname)
- if verbosity > 1:
- print >>sys.stderr, 'Skipped %s: %s' % (modname, detail)
- continue
- continue
-
- for name in dir(mod):
- if name.startswith('_'):
- continue
-
- o = getattr(mod, name)
- if type(o) is type(unittest.TestCase) and issubclass(o, unittest.TestCase):
- tests.append(o)
- continue
-
-
- return (skipped, tests)
-
-
- def usage():
- print __doc__
- return 1
-
-
- def test_with_refcounts(runner, verbosity, testcase):
- import gc
- import ctypes
- ptc = ctypes._pointer_type_cache.copy()
- cfc = ctypes._c_functype_cache.copy()
- wfc = ctypes._win_functype_cache.copy()
-
- def cleanup():
- ctypes._pointer_type_cache = ptc.copy()
- ctypes._c_functype_cache = cfc.copy()
- ctypes._win_functype_cache = wfc.copy()
- gc.collect()
-
- test = unittest.makeSuite(testcase)
- for i in range(5):
- rc = sys.gettotalrefcount()
- runner.run(test)
- cleanup()
-
- COUNT = 5
- refcounts = [
- None] * COUNT
- for i in range(COUNT):
- rc = sys.gettotalrefcount()
- runner.run(test)
- cleanup()
- refcounts[i] = sys.gettotalrefcount() - rc
-
- if filter(None, refcounts):
- print '%s leaks:\n\t' % testcase, refcounts
- elif verbosity:
- print '%s: ok.' % testcase
-
-
-
- class TestRunner(unittest.TextTestRunner):
-
- def run(self, test, skipped):
- result = self._makeResult()
- startTime = time.time()
- test(result)
- stopTime = time.time()
- timeTaken = stopTime - startTime
- result.printErrors()
- self.stream.writeln(result.separator2)
- run = result.testsRun
- if _unavail:
- requested = _unavail.keys()
- requested.sort()
- if not run != 1 or 's':
- pass
- if not len(skipped) != 1 or 's':
- pass
- self.stream.writeln('Ran %d test%s in %.3fs (%s module%s skipped)' % (run, '', timeTaken, len(skipped), ''))
- self.stream.writeln('Unavailable resources: %s' % ', '.join(requested))
- elif not run != 1 or 's':
- pass
- self.stream.writeln('Ran %d test%s in %.3fs' % (run, '', timeTaken))
- self.stream.writeln()
- if not result.wasSuccessful():
- self.stream.write('FAILED (')
- (failed, errored) = map(len, (result.failures, result.errors))
- if failed:
- self.stream.write('failures=%d' % failed)
-
- if errored:
- if failed:
- self.stream.write(', ')
-
- self.stream.write('errors=%d' % errored)
-
- self.stream.writeln(')')
- else:
- self.stream.writeln('OK')
- return result
-
-
-
- def main(*packages):
-
- try:
- (opts, args) = getopt.getopt(sys.argv[1:], 'rqvu:x:')
- except getopt.error:
- return usage()
-
- verbosity = 1
- search_leaks = False
- exclude = []
- for flag, value in opts:
- if flag == '-q':
- verbosity -= 1
- continue
- if flag == '-v':
- verbosity += 1
- continue
- if flag == '-r':
-
- try:
- sys.gettotalrefcount
- except AttributeError:
- print >>sys.stderr, '-r flag requires Python debug build'
- return -1
-
- search_leaks = True
- continue
- if flag == '-u':
- use_resources.extend(value.split(','))
- continue
- if flag == '-x':
- exclude.extend(value.split(','))
- continue
-
- mask = 'test_*.py'
- if args:
- mask = args[0]
-
- for package in packages:
- run_tests(package, mask, verbosity, search_leaks, exclude)
-
-
-
- def run_tests(package, mask, verbosity, search_leaks, exclude):
- (skipped, testcases) = get_tests(package, mask, verbosity, exclude)
- runner = TestRunner(verbosity = verbosity)
- suites = [ unittest.makeSuite(o) for o in testcases ]
- suite = unittest.TestSuite(suites)
- result = runner.run(suite, skipped)
- return bool(result.errors)
-
-
- class BasicTestRunner:
-
- def run(self, test):
- result = unittest.TestResult()
- test(result)
- return result
-
-
-