home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- if __name__ != 'test.test_support':
- raise ImportError, 'test_support must be imported from the test package'
-
- import sys
-
- class Error(Exception):
- pass
-
-
- class TestFailed(Error):
- pass
-
-
- class TestSkipped(Error):
- pass
-
-
- class ResourceDenied(TestSkipped):
- pass
-
- verbose = 1
- use_resources = None
- max_memuse = 0
- real_max_memuse = 0
- _original_stdout = None
-
- def record_original_stdout(stdout):
- global _original_stdout
- _original_stdout = stdout
-
-
- def get_original_stdout():
- if not _original_stdout:
- pass
- return sys.stdout
-
-
- def unload(name):
-
- try:
- del sys.modules[name]
- except KeyError:
- pass
-
-
-
- def unlink(filename):
- import os
-
- try:
- os.unlink(filename)
- except OSError:
- pass
-
-
-
- def forget(modname):
- unload(modname)
- import os
- for dirname in sys.path:
- unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
- unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
-
-
-
- def is_resource_enabled(resource):
- if use_resources is not None:
- pass
- return resource in use_resources
-
-
- def requires(resource, msg = None):
- if sys._getframe().f_back.f_globals.get('__name__') == '__main__':
- return None
-
- if not is_resource_enabled(resource):
- if msg is None:
- msg = "Use of the `%s' resource not enabled" % resource
-
- raise ResourceDenied(msg)
-
-
-
- def bind_port(sock, host = '', preferred_port = 54321):
- import socket
- import errno
- for port in [
- preferred_port,
- 9907,
- 10243,
- 32999,
- 0]:
-
- try:
- sock.bind((host, port))
- if port == 0:
- port = sock.getsockname()[1]
-
- return port
- continue
- except socket.error:
- (err, msg) = None
- if err != errno.EADDRINUSE:
- raise
-
- print >>sys.__stderr__, ' WARNING: failed to listen on port %d, trying another' % port
- continue
-
-
-
- raise TestFailed, 'unable to find port to listen on'
-
- FUZZ = 1e-06
-
- def fcmp(x, y):
- if type(x) == type(0) or type(y) == type(0):
-
- try:
- (x, y) = coerce(x, y)
- fuzz = (abs(x) + abs(y)) * FUZZ
- if abs(x - y) <= fuzz:
- return 0
-
- elif type(x) == type(y) and type(x) in (type(()), type([])):
- for i in range(min(len(x), len(y))):
- outcome = fcmp(x[i], y[i])
- if outcome != 0:
- return outcome
- continue
-
- return cmp(len(x), len(y))
-
- return cmp(x, y)
-
-
- try:
- unicode
- have_unicode = 1
- except NameError:
- have_unicode = 0
-
- is_jython = sys.platform.startswith('java')
- import os
- if os.name == 'java':
- TESTFN = '$test'
- elif os.name == 'riscos':
- TESTFN = 'testfile'
- else:
- TESTFN = '@test'
- if have_unicode:
- if isinstance('', unicode):
- TESTFN_UNICODE = '@test-\xe0\xf2'
- else:
- TESTFN_UNICODE = unicode('@test-\xe0\xf2', 'latin-1')
- TESTFN_ENCODING = sys.getfilesystemencoding()
- if not hasattr(sys, 'getwindowsversion') or sys.getwindowsversion()[3] < 2:
- TESTFN_UNICODE_UNENCODEABLE = None
- else:
- TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\\u5171\\u6709\\u3055\\u308c\\u308b"')
-
- try:
- TESTFN_UNICODE_UNENCODEABLE.encode('Latin1')
- except UnicodeEncodeError:
- pass
-
- print 'WARNING: The filename %r CAN be encoded by the filesystem. Unicode filename tests may not be effective' % TESTFN_UNICODE_UNENCODEABLE
-
- fp = None
-
- try:
- fp = open(TESTFN, 'w+')
- except IOError:
- TMP_TESTFN = os.path.join('/tmp', TESTFN)
-
- try:
- fp = open(TMP_TESTFN, 'w+')
- TESTFN = TMP_TESTFN
- del TMP_TESTFN
- except IOError:
- print 'WARNING: tests will fail, unable to write to: %s or %s' % (TESTFN, TMP_TESTFN)
- except:
- None<EXCEPTION MATCH>IOError
-
-
- None<EXCEPTION MATCH>IOError
-
- if fp is not None:
- fp.close()
- unlink(TESTFN)
-
- del os
- del fp
-
- def findfile(file, here = __file__):
- import os
- if os.path.isabs(file):
- return file
-
- path = sys.path
- path = [
- os.path.dirname(here)] + path
- for dn in path:
- fn = os.path.join(dn, file)
- if os.path.exists(fn):
- return fn
- continue
-
- return file
-
-
- def verify(condition, reason = 'test failed'):
- if not condition:
- raise TestFailed(reason)
-
-
-
- def vereq(a, b):
- if not a == b:
- raise TestFailed, '%r == %r' % (a, b)
-
-
-
- def sortdict(dict):
- items = dict.items()
- items.sort()
- reprpairs = [ '%r: %r' % pair for pair in items ]
- withcommas = ', '.join(reprpairs)
- return '{%s}' % withcommas
-
-
- def check_syntax(statement):
-
- try:
- compile(statement, '<string>', 'exec')
- except SyntaxError:
- pass
-
- print 'Missing SyntaxError: "%s"' % statement
-
-
- def open_urlresource(url):
- import urllib
- import urlparse
- import os.path as os
- filename = urlparse.urlparse(url)[2].split('/')[-1]
- for path in [
- os.path.curdir,
- os.path.pardir]:
- fn = os.path.join(path, filename)
- if os.path.exists(fn):
- return open(fn)
- continue
-
- requires('urlfetch')
- print >>get_original_stdout(), '\tfetching %s ...' % url
- (fn, _) = urllib.urlretrieve(url, filename)
- return open(fn)
-
-
- def run_with_locale(catstr, *locales):
-
- def decorator(func):
-
- def inner(*args, **kwds):
-
- try:
- import locale
- category = getattr(locale, catstr)
- orig_locale = locale.setlocale(category)
- except AttributeError:
- raise
- except:
- locale = None
- orig_locale = None
-
- for loc in locales:
-
- try:
- locale.setlocale(category, loc)
- continue
- continue
-
-
-
- try:
- return func(*args, **kwds)
- finally:
- if locale and orig_locale:
- locale.setlocale(category, orig_locale)
-
-
-
- inner.func_name = func.func_name
- inner.__doc__ = func.__doc__
- return inner
-
- return decorator
-
- _1M = 1048576
- _1G = 1024 * _1M
- _2G = 2 * _1G
- _4G = 4 * _1G
-
- class _Dummy:
-
- def __getslice__(self, i, j):
- return j
-
-
- MAX_Py_ssize_t = _Dummy()[:]
-
- def set_memlimit(limit):
- global real_max_memuse, max_memuse
- import re
- sizes = {
- 'k': 1024,
- 'm': _1M,
- 'g': _1G,
- 't': 1024 * _1G }
- m = re.match('(\\d+(\\.\\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE)
- if m is None:
- raise ValueError('Invalid memory limit %r' % (limit,))
-
- memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
- real_max_memuse = memlimit
- if memlimit > MAX_Py_ssize_t:
- memlimit = MAX_Py_ssize_t
-
- if memlimit < _2G - 1:
- raise ValueError('Memory limit %r too low to be useful' % (limit,))
-
- max_memuse = memlimit
-
-
- def bigmemtest(minsize, memuse, overhead = 5 * _1M):
-
- def decorator(f):
-
- def wrapper(self):
- if not max_memuse:
- maxsize = 5147
- self.failIf(maxsize * memuse + overhead > 20 * _1M)
- else:
- maxsize = int((max_memuse - overhead) / memuse)
- if maxsize < minsize:
- if verbose:
- sys.stderr.write('Skipping %s because of memory constraint\n' % (f.__name__,))
-
- return None
-
- maxsize = max(maxsize - 50 * _1M, minsize)
- return f(self, maxsize)
-
- wrapper.minsize = minsize
- wrapper.memuse = memuse
- wrapper.overhead = overhead
- return wrapper
-
- return decorator
-
-
- def precisionbigmemtest(size, memuse, overhead = 5 * _1M):
-
- def decorator(f):
-
- def wrapper(self):
- if not real_max_memuse:
- maxsize = 5147
- else:
- maxsize = size
- if real_max_memuse and real_max_memuse < maxsize * memuse:
- if verbose:
- sys.stderr.write('Skipping %s because of memory constraint\n' % (f.__name__,))
-
- return None
-
- return f(self, maxsize)
-
- wrapper.size = size
- wrapper.memuse = memuse
- wrapper.overhead = overhead
- return wrapper
-
- return decorator
-
-
- def bigaddrspacetest(f):
-
- def wrapper(self):
- if max_memuse < MAX_Py_ssize_t:
- if verbose:
- sys.stderr.write('Skipping %s because of memory constraint\n' % (f.__name__,))
-
- else:
- return f(self)
-
- return wrapper
-
- import unittest
-
- class BasicTestRunner:
-
- def run(self, test):
- result = unittest.TestResult()
- test(result)
- return result
-
-
-
- def run_suite(suite, testclass = None):
- if verbose:
- runner = unittest.TextTestRunner(sys.stdout, verbosity = 2)
- else:
- runner = BasicTestRunner()
- result = runner.run(suite)
- if not result.wasSuccessful():
- if len(result.errors) == 1 and not (result.failures):
- err = result.errors[0][1]
- elif len(result.failures) == 1 and not (result.errors):
- err = result.failures[0][1]
- elif testclass is None:
- msg = 'errors occurred; run in verbose mode for details'
- else:
- msg = 'errors occurred in %s.%s' % (testclass.__module__, testclass.__name__)
- raise TestFailed(msg)
- raise TestFailed(err)
-
-
-
- def run_unittest(*classes):
- suite = unittest.TestSuite()
- for cls in classes:
- if isinstance(cls, (unittest.TestSuite, unittest.TestCase)):
- suite.addTest(cls)
- continue
- suite.addTest(unittest.makeSuite(cls))
-
- if len(classes) == 1:
- testclass = classes[0]
- else:
- testclass = None
- run_suite(suite, testclass)
-
-
- def run_doctest(module, verbosity = None):
- import doctest
- if verbosity is None:
- verbosity = verbose
- else:
- verbosity = None
- save_stdout = sys.stdout
- sys.stdout = get_original_stdout()
-
- try:
- (f, t) = doctest.testmod(module, verbose = verbosity)
- if f:
- raise TestFailed('%d of %d doctests failed' % (f, t))
- finally:
- sys.stdout = save_stdout
-
- if verbose:
- print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
-
- return (f, t)
-
-
- def threading_setup():
- import threading
- return (len(threading._active), len(threading._limbo))
-
-
- def threading_cleanup(num_active, num_limbo):
- import threading
- import time
- _MAX_COUNT = 10
- count = 0
- while len(threading._active) != num_active and count < _MAX_COUNT:
- count += 1
- time.sleep(0.1)
- count = 0
- while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
- count += 1
- time.sleep(0.1)
-
-
- def reap_children():
- import os
- if hasattr(os, 'waitpid'):
- any_process = -1
- while True:
-
- try:
- (pid, status) = os.waitpid(any_process, os.WNOHANG)
- if pid == 0:
- break
- continue
- break
- continue
-
-
-
-