home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2010 November / maximum-cd-2010-11.iso / DiscContents / calibre-0.7.13.msi / file_2850 (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2010-08-06  |  12.2 KB  |  320 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __all__ = [
  5.     'validator']
  6. import re
  7. import sys
  8. from types import DictType, StringType, TupleType, ListType
  9. import warnings
  10. header_re = re.compile('^[a-zA-Z][a-zA-Z0-9\\-_]*$')
  11. bad_header_value_re = re.compile('[\\000-\\037]')
  12.  
  13. class WSGIWarning(Warning):
  14.     pass
  15.  
  16.  
  17. def assert_(cond, *args):
  18.     if not cond:
  19.         raise AssertionError(*args)
  20.     cond
  21.  
  22.  
  23. def validator(application):
  24.     
  25.     def lint_app(*args, **kw):
  26.         assert_(len(args) == 2, 'Two arguments required')
  27.         assert_(not kw, 'No keyword arguments allowed')
  28.         (environ, start_response) = args
  29.         check_environ(environ)
  30.         start_response_started = []
  31.         
  32.         def start_response_wrapper(*args, **kw):
  33.             if not len(args) == 2:
  34.                 pass
  35.             assert_(len(args) == 3, 'Invalid number of arguments: %s' % (args,))
  36.             assert_(not kw, 'No keyword arguments allowed')
  37.             status = args[0]
  38.             headers = args[1]
  39.             if len(args) == 3:
  40.                 exc_info = args[2]
  41.             else:
  42.                 exc_info = None
  43.             check_status(status)
  44.             check_headers(headers)
  45.             check_content_type(status, headers)
  46.             check_exc_info(exc_info)
  47.             start_response_started.append(None)
  48.             return WriteWrapper(start_response(*args))
  49.  
  50.         environ['wsgi.input'] = InputWrapper(environ['wsgi.input'])
  51.         environ['wsgi.errors'] = ErrorWrapper(environ['wsgi.errors'])
  52.         iterator = application(environ, start_response_wrapper)
  53.         if iterator is not None:
  54.             pass
  55.         assert_(iterator != False, 'The application must return an iterator, if only an empty list')
  56.         check_iterator(iterator)
  57.         return IteratorWrapper(iterator, start_response_started)
  58.  
  59.     return lint_app
  60.  
  61.  
  62. class InputWrapper:
  63.     
  64.     def __init__(self, wsgi_input):
  65.         self.input = wsgi_input
  66.  
  67.     
  68.     def read(self, *args):
  69.         assert_(len(args) <= 1)
  70.         v = self.input.read(*args)
  71.         assert_(type(v) is type(''))
  72.         return v
  73.  
  74.     
  75.     def readline(self):
  76.         v = self.input.readline()
  77.         assert_(type(v) is type(''))
  78.         return v
  79.  
  80.     
  81.     def readlines(self, *args):
  82.         assert_(len(args) <= 1)
  83.         lines = self.input.readlines(*args)
  84.         assert_(type(lines) is type([]))
  85.         for line in lines:
  86.             assert_(type(line) is type(''))
  87.         
  88.         return lines
  89.  
  90.     
  91.     def __iter__(self):
  92.         while None:
  93.             line = self.readline()
  94.             if not line:
  95.                 return None
  96.             yield line
  97.             continue
  98.             return None
  99.  
  100.     
  101.     def close(self):
  102.         assert_(0, 'input.close() must not be called')
  103.  
  104.  
  105.  
  106. class ErrorWrapper:
  107.     
  108.     def __init__(self, wsgi_errors):
  109.         self.errors = wsgi_errors
  110.  
  111.     
  112.     def write(self, s):
  113.         assert_(type(s) is type(''))
  114.         self.errors.write(s)
  115.  
  116.     
  117.     def flush(self):
  118.         self.errors.flush()
  119.  
  120.     
  121.     def writelines(self, seq):
  122.         for line in seq:
  123.             self.write(line)
  124.         
  125.  
  126.     
  127.     def close(self):
  128.         assert_(0, 'errors.close() must not be called')
  129.  
  130.  
  131.  
  132. class WriteWrapper:
  133.     
  134.     def __init__(self, wsgi_writer):
  135.         self.writer = wsgi_writer
  136.  
  137.     
  138.     def __call__(self, s):
  139.         assert_(type(s) is type(''))
  140.         self.writer(s)
  141.  
  142.  
  143.  
  144. class PartialIteratorWrapper:
  145.     
  146.     def __init__(self, wsgi_iterator):
  147.         self.iterator = wsgi_iterator
  148.  
  149.     
  150.     def __iter__(self):
  151.         return IteratorWrapper(self.iterator, None)
  152.  
  153.  
  154.  
  155. class IteratorWrapper:
  156.     
  157.     def __init__(self, wsgi_iterator, check_start_response):
  158.         self.original_iterator = wsgi_iterator
  159.         self.iterator = iter(wsgi_iterator)
  160.         self.closed = False
  161.         self.check_start_response = check_start_response
  162.  
  163.     
  164.     def __iter__(self):
  165.         return self
  166.  
  167.     
  168.     def next(self):
  169.         assert_(not (self.closed), 'Iterator read after closed')
  170.         v = self.iterator.next()
  171.         if self.check_start_response is not None:
  172.             assert_(self.check_start_response, 'The application returns and we started iterating over its body, but start_response has not yet been called')
  173.             self.check_start_response = None
  174.         
  175.         return v
  176.  
  177.     
  178.     def close(self):
  179.         self.closed = True
  180.         if hasattr(self.original_iterator, 'close'):
  181.             self.original_iterator.close()
  182.         
  183.  
  184.     
  185.     def __del__(self):
  186.         if not self.closed:
  187.             sys.stderr.write('Iterator garbage collected without being closed')
  188.         
  189.         assert_(self.closed, 'Iterator garbage collected without being closed')
  190.  
  191.  
  192.  
  193. def check_environ(environ):
  194.     assert_(type(environ) is DictType, 'Environment is not of the right type: %r (environment: %r)' % (type(environ), environ))
  195.     for key in [
  196.         'REQUEST_METHOD',
  197.         'SERVER_NAME',
  198.         'SERVER_PORT',
  199.         'wsgi.version',
  200.         'wsgi.input',
  201.         'wsgi.errors',
  202.         'wsgi.multithread',
  203.         'wsgi.multiprocess',
  204.         'wsgi.run_once']:
  205.         assert_(key in environ, 'Environment missing required key: %r' % (key,))
  206.     
  207.     for key in [
  208.         'HTTP_CONTENT_TYPE',
  209.         'HTTP_CONTENT_LENGTH']:
  210.         assert_(key not in environ, 'Environment should not have the key: %s (use %s instead)' % (key, key[5:]))
  211.     
  212.     if 'QUERY_STRING' not in environ:
  213.         warnings.warn('QUERY_STRING is not in the WSGI environment; the cgi module will use sys.argv when this variable is missing, so application errors are more likely', WSGIWarning)
  214.     
  215.     for key in environ.keys():
  216.         if '.' in key:
  217.             continue
  218.         
  219.         assert_(type(environ[key]) is StringType, 'Environmental variable %s is not a string: %r (value: %r)' % (key, type(environ[key]), environ[key]))
  220.     
  221.     assert_(type(environ['wsgi.version']) is TupleType, 'wsgi.version should be a tuple (%r)' % (environ['wsgi.version'],))
  222.     assert_(environ['wsgi.url_scheme'] in ('http', 'https'), 'wsgi.url_scheme unknown: %r' % environ['wsgi.url_scheme'])
  223.     check_input(environ['wsgi.input'])
  224.     check_errors(environ['wsgi.errors'])
  225.     if environ['REQUEST_METHOD'] not in ('GET', 'HEAD', 'POST', 'OPTIONS', 'PUT', 'DELETE', 'TRACE'):
  226.         warnings.warn('Unknown REQUEST_METHOD: %r' % environ['REQUEST_METHOD'], WSGIWarning)
  227.     
  228.     if not not environ.get('SCRIPT_NAME'):
  229.         pass
  230.     assert_(environ['SCRIPT_NAME'].startswith('/'), "SCRIPT_NAME doesn't start with /: %r" % environ['SCRIPT_NAME'])
  231.     if not not environ.get('PATH_INFO'):
  232.         pass
  233.     assert_(environ['PATH_INFO'].startswith('/'), "PATH_INFO doesn't start with /: %r" % environ['PATH_INFO'])
  234.     if environ.get('CONTENT_LENGTH'):
  235.         assert_(int(environ['CONTENT_LENGTH']) >= 0, 'Invalid CONTENT_LENGTH: %r' % environ['CONTENT_LENGTH'])
  236.     
  237.     if not environ.get('SCRIPT_NAME'):
  238.         assert_(environ.has_key('PATH_INFO'), "One of SCRIPT_NAME or PATH_INFO are required (PATH_INFO should at least be '/' if SCRIPT_NAME is empty)")
  239.     
  240.     assert_(environ.get('SCRIPT_NAME') != '/', "SCRIPT_NAME cannot be '/'; it should instead be '', and PATH_INFO should be '/'")
  241.  
  242.  
  243. def check_input(wsgi_input):
  244.     for attr in [
  245.         'read',
  246.         'readline',
  247.         'readlines',
  248.         '__iter__']:
  249.         assert_(hasattr(wsgi_input, attr), "wsgi.input (%r) doesn't have the attribute %s" % (wsgi_input, attr))
  250.     
  251.  
  252.  
  253. def check_errors(wsgi_errors):
  254.     for attr in [
  255.         'flush',
  256.         'write',
  257.         'writelines']:
  258.         assert_(hasattr(wsgi_errors, attr), "wsgi.errors (%r) doesn't have the attribute %s" % (wsgi_errors, attr))
  259.     
  260.  
  261.  
  262. def check_status(status):
  263.     assert_(type(status) is StringType, 'Status must be a string (not %r)' % status)
  264.     status_code = status.split(None, 1)[0]
  265.     assert_(len(status_code) == 3, 'Status codes must be three characters: %r' % status_code)
  266.     status_int = int(status_code)
  267.     assert_(status_int >= 100, 'Status code is invalid: %r' % status_int)
  268.     if len(status) < 4 or status[3] != ' ':
  269.         warnings.warn('The status string (%r) should be a three-digit integer followed by a single space and a status explanation' % status, WSGIWarning)
  270.     
  271.  
  272.  
  273. def check_headers(headers):
  274.     assert_(type(headers) is ListType, 'Headers (%r) must be of type list: %r' % (headers, type(headers)))
  275.     header_names = { }
  276.     for item in headers:
  277.         assert_(type(item) is TupleType, 'Individual headers (%r) must be of type tuple: %r' % (item, type(item)))
  278.         assert_(len(item) == 2)
  279.         (name, value) = item
  280.         assert_(name.lower() != 'status', 'The Status header cannot be used; it conflicts with CGI script, and HTTP status is not given through headers (value: %r).' % value)
  281.         header_names[name.lower()] = None
  282.         if '\n' not in name:
  283.             pass
  284.         assert_(':' not in name, "Header names may not contain ':' or '\\n': %r" % name)
  285.         assert_(header_re.search(name), 'Bad header name: %r' % name)
  286.         if not name.endswith('-'):
  287.             pass
  288.         assert_(not name.endswith('_'), "Names may not end in '-' or '_': %r" % name)
  289.         if bad_header_value_re.search(value):
  290.             assert_(0, 'Bad header value: %r (bad char: %r)' % (value, bad_header_value_re.search(value).group(0)))
  291.             continue
  292.     
  293.  
  294.  
  295. def check_content_type(status, headers):
  296.     code = int(status.split(None, 1)[0])
  297.     NO_MESSAGE_BODY = (204, 304)
  298.     for name, value in headers:
  299.         if name.lower() == 'content-type':
  300.             if code not in NO_MESSAGE_BODY:
  301.                 return None
  302.             assert_(0, 'Content-Type header found in a %s response, which must not return content.' % code)
  303.             continue
  304.         code not in NO_MESSAGE_BODY
  305.     
  306.     if code not in NO_MESSAGE_BODY:
  307.         assert_(0, 'No Content-Type header found in headers (%s)' % headers)
  308.     
  309.  
  310.  
  311. def check_exc_info(exc_info):
  312.     if not exc_info is None:
  313.         pass
  314.     assert_(type(exc_info) is type(()), 'exc_info (%r) is not a tuple: %r' % (exc_info, type(exc_info)))
  315.  
  316.  
  317. def check_iterator(iterator):
  318.     assert_(not isinstance(iterator, str), 'You should not return a string as your application iterator, instead return a single-item list containing that string.')
  319.  
  320.