home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.6)
-
- if __name__ != 'test.test_support':
- raise ImportError('test_support must be imported from the test package')
- __name__ != 'test.test_support'
- import contextlib
- import errno
- import socket
- import sys
- import os
- import shutil
- import warnings
- import unittest
- __all__ = [
- 'Error',
- 'TestFailed',
- 'TestSkipped',
- 'ResourceDenied',
- 'import_module',
- 'verbose',
- 'use_resources',
- 'max_memuse',
- 'record_original_stdout',
- 'get_original_stdout',
- 'unload',
- 'unlink',
- 'rmtree',
- 'forget',
- 'is_resource_enabled',
- 'requires',
- 'find_unused_port',
- 'bind_port',
- 'fcmp',
- 'have_unicode',
- 'is_jython',
- 'TESTFN',
- 'HOST',
- 'FUZZ',
- 'findfile',
- 'verify',
- 'vereq',
- 'sortdict',
- 'check_syntax_error',
- 'open_urlresource',
- 'check_warnings',
- 'CleanImport',
- 'EnvironmentVarGuard',
- 'captured_output',
- 'captured_stdout',
- 'TransientResource',
- 'transient_internet',
- 'run_with_locale',
- 'set_memlimit',
- 'bigmemtest',
- 'bigaddrspacetest',
- 'BasicTestRunner',
- 'run_unittest',
- 'run_doctest',
- 'threading_setup',
- 'threading_cleanup',
- 'reap_children']
-
- class Error(Exception):
- pass
-
-
- class TestFailed(Error):
- pass
-
-
- class TestSkipped(Error):
- pass
-
-
- class ResourceDenied(TestSkipped):
- pass
-
-
- def import_module(name, deprecated = False):
- warnings.catch_warnings().__enter__()
-
- try:
-
- try:
- module = __import__(name, level = 0)
- except ImportError:
- None if deprecated else warnings.catch_warnings()
- None if deprecated else warnings.catch_warnings()
- raise TestSkipped('No module named ' + name)
- except:
- None if deprecated else warnings.catch_warnings()
-
- return module
- finally:
- pass
-
-
- verbose = 1
- use_resources = None
- max_memuse = 0
- real_max_memuse = 0
- _original_stdout = None
-
- def record_original_stdout(stdout):
- global _original_stdout
- _original_stdout = stdout
-
-
- def get_original_stdout():
- if not _original_stdout:
- pass
- return sys.stdout
-
-
- def unload(name):
-
- try:
- del sys.modules[name]
- except KeyError:
- pass
-
-
-
- def unlink(filename):
-
- try:
- os.unlink(filename)
- except OSError:
- pass
-
-
-
- def rmtree(path):
-
- try:
- shutil.rmtree(path)
- except OSError:
- e = None
- if e.errno not in (errno.ENOENT, errno.ESRCH):
- raise
- e.errno not in (errno.ENOENT, errno.ESRCH)
-
-
-
- def forget(modname):
- unload(modname)
- for dirname in sys.path:
- unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
- unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
-
-
-
- def is_resource_enabled(resource):
- if use_resources is not None:
- pass
- return resource in use_resources
-
-
- def requires(resource, msg = None):
- if sys._getframe().f_back.f_globals.get('__name__') == '__main__':
- return None
- if not is_resource_enabled(resource):
- if msg is None:
- msg = "Use of the `%s' resource not enabled" % resource
-
- raise ResourceDenied(msg)
- is_resource_enabled(resource)
-
- HOST = 'localhost'
-
- def find_unused_port(family = socket.AF_INET, socktype = socket.SOCK_STREAM):
- tempsock = socket.socket(family, socktype)
- port = bind_port(tempsock)
- tempsock.close()
- del tempsock
- return port
-
-
- def bind_port(sock, host = HOST):
- if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
- if hasattr(socket, 'SO_REUSEADDR'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
- raise TestFailed('tests should never set the SO_REUSEADDR socket option on TCP/IP sockets!')
- sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1
-
- if hasattr(socket, 'SO_REUSEPORT'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
- raise TestFailed('tests should never set the SO_REUSEPORT socket option on TCP/IP sockets!')
- sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1
-
- if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
-
-
- sock.bind((host, 0))
- port = sock.getsockname()[1]
- return port
-
- FUZZ = 1e-06
-
- def fcmp(x, y):
- if isinstance(x, float) or isinstance(y, float):
-
- try:
- fuzz = (abs(x) + abs(y)) * FUZZ
- if abs(x - y) <= fuzz:
- return 0
-
- elif type(x) == type(y) and isinstance(x, (tuple, list)):
- for i in range(min(len(x), len(y))):
- outcome = fcmp(x[i], y[i])
- if outcome != 0:
- return outcome
-
- return (len(x) > len(y)) - (len(x) < len(y))
- outcome != 0
- return (x > y) - (x < y)
-
-
- try:
- unicode
- have_unicode = True
- except NameError:
- have_unicode = False
-
- is_jython = sys.platform.startswith('java')
- if os.name == 'java':
- TESTFN = '$test'
- elif os.name == 'riscos':
- TESTFN = 'testfile'
- else:
- TESTFN = '@test'
- if have_unicode:
- if isinstance('', unicode):
- TESTFN_UNICODE = '@test-\xe0\xf2'
- else:
- TESTFN_UNICODE = unicode('@test-\xe0\xf2', 'latin-1')
- TESTFN_ENCODING = sys.getfilesystemencoding()
- if not hasattr(sys, 'getwindowsversion') or sys.getwindowsversion()[3] < 2:
- TESTFN_UNICODE_UNENCODEABLE = None
- else:
- TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\\u5171\\u6709\\u3055\\u308c\\u308b"')
-
- try:
- TESTFN_UNICODE_UNENCODEABLE.encode('Latin1')
- except UnicodeEncodeError:
- pass
-
- print 'WARNING: The filename %r CAN be encoded by the filesystem. Unicode filename tests may not be effective' % TESTFN_UNICODE_UNENCODEABLE
-
- fp = None
-
- try:
- fp = open(TESTFN, 'w+')
- except IOError:
- TMP_TESTFN = os.path.join('/tmp', TESTFN)
-
- try:
- fp = open(TMP_TESTFN, 'w+')
- TESTFN = TMP_TESTFN
- del TMP_TESTFN
- except IOError:
- print 'WARNING: tests will fail, unable to write to: %s or %s' % (TESTFN, TMP_TESTFN)
- except:
- None<EXCEPTION MATCH>IOError
-
-
- None<EXCEPTION MATCH>IOError
-
- if fp is not None:
- fp.close()
- unlink(TESTFN)
-
- del fp
-
- def findfile(file, here = __file__):
- if os.path.isabs(file):
- return file
- path = sys.path
- path = [
- os.path.dirname(here)] + path
- for dn in path:
- fn = os.path.join(dn, file)
- if os.path.exists(fn):
- return fn
-
- return file
-
-
- def verify(condition, reason = 'test failed'):
- if not condition:
- raise TestFailed(reason)
- condition
-
-
- def vereq(a, b):
- if not a == b:
- raise TestFailed('%r == %r' % (a, b))
- a == b
-
-
- def sortdict(dict):
- items = dict.items()
- items.sort()
- reprpairs = [ '%r: %r' % pair for pair in items ]
- withcommas = ', '.join(reprpairs)
- return '{%s}' % withcommas
-
-
- def make_bad_fd():
- file = open(TESTFN, 'wb')
-
- try:
- return file.fileno()
- finally:
- file.close()
- unlink(TESTFN)
-
-
-
- def check_syntax_error(testcase, statement):
-
- try:
- compile(statement, '<test string>', 'exec')
- except SyntaxError:
- pass
-
- testcase.fail('Missing SyntaxError: "%s"' % statement)
-
-
- def open_urlresource(url):
- import urllib
- import urlparse
- requires('urlfetch')
- filename = urlparse.urlparse(url)[2].split('/')[-1]
- for path in [
- os.path.curdir,
- os.path.pardir]:
- fn = os.path.join(path, filename)
- if os.path.exists(fn):
- return open(fn)
-
- print >>get_original_stdout(), '\tfetching %s ...' % url
- (fn, _) = urllib.urlretrieve(url, filename)
- return open(fn)
-
-
- class WarningsRecorder(object):
-
- def __init__(self, warnings_list):
- self.warnings = warnings_list
-
-
- def __getattr__(self, attr):
- if self.warnings:
- return getattr(self.warnings[-1], attr)
- if attr in warnings.WarningMessage._WARNING_DETAILS:
- return None
- raise AttributeError('%r has no attribute %r' % (self, attr))
-
-
- def reset(self):
- del self.warnings[:]
-
-
-
- def check_warnings():
-
- try:
- w = _[1]
- yield WarningsRecorder(w)
- warnings.catch_warnings(record = True).__exit__
- finally:
- pass
-
-
- check_warnings = contextlib.contextmanager(check_warnings)
-
- class CleanImport(object):
-
- def __init__(self, *module_names):
- self.original_modules = sys.modules.copy()
- for module_name in module_names:
- if module_name in sys.modules:
- module = sys.modules[module_name]
- if module.__name__ != module_name:
- del sys.modules[module.__name__]
-
- del sys.modules[module_name]
- continue
-
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, *ignore_exc):
- sys.modules.update(self.original_modules)
-
-
-
- class EnvironmentVarGuard(object):
-
- def __init__(self):
- self._changed = { }
-
-
- def set(self, envvar, value):
- if envvar not in self._changed:
- self._changed[envvar] = os.environ.get(envvar)
-
- os.environ[envvar] = value
-
-
- def unset(self, envvar):
- if envvar not in self._changed:
- self._changed[envvar] = os.environ.get(envvar)
-
- if envvar in os.environ:
- del os.environ[envvar]
-
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, *ignore_exc):
- for k, v in self._changed.items():
- if v is None:
- if k in os.environ:
- del os.environ[k]
-
- k in os.environ
- os.environ[k] = v
-
-
-
-
- class TransientResource(object):
-
- def __init__(self, exc, **kwargs):
- self.exc = exc
- self.attrs = kwargs
-
-
- def __enter__(self):
- return self
-
-
- def __exit__(self, type_ = None, value = None, traceback = None):
- if type_ is not None and issubclass(self.exc, type_):
- for attr, attr_value in self.attrs.iteritems():
- if not hasattr(value, attr):
- break
-
- if getattr(value, attr) != attr_value:
- break
- continue
- else:
- raise ResourceDenied('an optional resource is not available')
-
-
-
- def transient_internet():
- time_out = TransientResource(IOError, errno = errno.ETIMEDOUT)
- socket_peer_reset = TransientResource(socket.error, errno = errno.ECONNRESET)
- ioerror_peer_reset = TransientResource(IOError, errno = errno.ECONNRESET)
- return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
-
-
- def captured_output(stream_name):
- import StringIO
- orig_stdout = getattr(sys, stream_name)
- setattr(sys, stream_name, StringIO.StringIO())
-
- try:
- yield getattr(sys, stream_name)
- finally:
- setattr(sys, stream_name, orig_stdout)
-
-
- captured_output = contextlib.contextmanager(captured_output)
-
- def captured_stdout():
- return captured_output('stdout')
-
-
- def run_with_locale(catstr, *locales):
-
- def decorator(func):
-
- def inner(*args, **kwds):
-
- try:
- import locale
- category = getattr(locale, catstr)
- orig_locale = locale.setlocale(category)
- except AttributeError:
- raise
- except:
- locale = None
- orig_locale = None
-
- for loc in locales:
-
- try:
- locale.setlocale(category, loc)
- continue
- continue
-
-
-
- try:
- return func(*args, **kwds)
- finally:
- if locale and orig_locale:
- locale.setlocale(category, orig_locale)
-
-
-
- inner.func_name = func.func_name
- inner.__doc__ = func.__doc__
- return inner
-
- return decorator
-
- _1M = 1048576
- _1G = 1024 * _1M
- _2G = 2 * _1G
- _4G = 4 * _1G
- MAX_Py_ssize_t = sys.maxsize
-
- def set_memlimit(limit):
- global real_max_memuse, max_memuse
- import re
- sizes = {
- 'k': 1024,
- 'm': _1M,
- 'g': _1G,
- 't': 1024 * _1G }
- m = re.match('(\\d+(\\.\\d+)?) (K|M|G|T)b?$', limit, re.IGNORECASE | re.VERBOSE)
- if m is None:
- raise ValueError('Invalid memory limit %r' % (limit,))
- m is None
- memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
- real_max_memuse = memlimit
- if memlimit > MAX_Py_ssize_t:
- memlimit = MAX_Py_ssize_t
-
- if memlimit < _2G - 1:
- raise ValueError('Memory limit %r too low to be useful' % (limit,))
- memlimit < _2G - 1
- max_memuse = memlimit
-
-
- def bigmemtest(minsize, memuse, overhead = 5 * _1M):
-
- def decorator(f):
-
- def wrapper(self):
- return f(self, maxsize)
-
- wrapper.minsize = minsize
- wrapper.memuse = memuse
- wrapper.overhead = overhead
- return wrapper
-
- return decorator
-
-
- def precisionbigmemtest(size, memuse, overhead = 5 * _1M):
-
- def decorator(f):
-
- def wrapper(self):
- return f(self, maxsize)
-
- wrapper.size = size
- wrapper.memuse = memuse
- wrapper.overhead = overhead
- return wrapper
-
- return decorator
-
-
- def bigaddrspacetest(f):
-
- def wrapper(self):
- if max_memuse < MAX_Py_ssize_t:
- if verbose:
- sys.stderr.write('Skipping %s because of memory constraint\n' % (f.__name__,))
-
- else:
- return f(self)
- return max_memuse < MAX_Py_ssize_t
-
- return wrapper
-
-
- class BasicTestRunner:
-
- def run(self, test):
- result = unittest.TestResult()
- test(result)
- return result
-
-
-
- def _run_suite(suite):
- if verbose:
- runner = unittest.TextTestRunner(sys.stdout, verbosity = 2)
- else:
- runner = BasicTestRunner()
- result = runner.run(suite)
- if not result.wasSuccessful():
- if len(result.errors) == 1 and not (result.failures):
- err = result.errors[0][1]
- elif len(result.failures) == 1 and not (result.errors):
- err = result.failures[0][1]
- else:
- err = 'errors occurred; run in verbose mode for details'
- raise TestFailed(err)
- result.wasSuccessful()
-
-
- def run_unittest(*classes):
- valid_types = (unittest.TestSuite, unittest.TestCase)
- suite = unittest.TestSuite()
- for cls in classes:
- if isinstance(cls, str):
- if cls in sys.modules:
- suite.addTest(unittest.findTestCases(sys.modules[cls]))
- else:
- raise ValueError('str arguments must be keys in sys.modules')
- cls in sys.modules
- if isinstance(cls, valid_types):
- suite.addTest(cls)
- continue
- suite.addTest(unittest.makeSuite(cls))
-
- _run_suite(suite)
-
-
- def run_doctest(module, verbosity = None):
- import doctest
- if verbosity is None:
- verbosity = verbose
- else:
- verbosity = None
- save_stdout = sys.stdout
- sys.stdout = get_original_stdout()
-
- try:
- (f, t) = doctest.testmod(module, verbose = verbosity)
- if f:
- raise TestFailed('%d of %d doctests failed' % (f, t))
- f
- finally:
- sys.stdout = save_stdout
-
- if verbose:
- print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
-
- return (f, t)
-
-
- def threading_setup():
- import threading
- return (len(threading._active), len(threading._limbo))
-
-
- def threading_cleanup(num_active, num_limbo):
- import threading
- import time
- _MAX_COUNT = 10
- count = 0
- while len(threading._active) != num_active and count < _MAX_COUNT:
- count += 1
- time.sleep(0.1)
- count = 0
- while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
- count += 1
- time.sleep(0.1)
-
-
- def reap_children():
- if hasattr(os, 'waitpid'):
- any_process = -1
- while True:
-
- try:
- (pid, status) = os.waitpid(any_process, os.WNOHANG)
- if pid == 0:
- break
- continue
- break
- continue
-
-
-
-