home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_wsgiref.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  17.4 KB  |  613 lines

  1. from __future__ import nested_scopes    # Backward compat for 2.1
  2. from unittest import TestCase
  3. from wsgiref.util import setup_testing_defaults
  4. from wsgiref.headers import Headers
  5. from wsgiref.handlers import BaseHandler, BaseCGIHandler
  6. from wsgiref import util
  7. from wsgiref.validate import validator
  8. from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
  9. from wsgiref.simple_server import make_server
  10. from StringIO import StringIO
  11. from SocketServer import BaseServer
  12. import re, sys
  13.  
  14. from test import test_support
  15.  
  16. class MockServer(WSGIServer):
  17.     """Non-socket HTTP server"""
  18.  
  19.     def __init__(self, server_address, RequestHandlerClass):
  20.         BaseServer.__init__(self, server_address, RequestHandlerClass)
  21.         self.server_bind()
  22.  
  23.     def server_bind(self):
  24.         host, port = self.server_address
  25.         self.server_name = host
  26.         self.server_port = port
  27.         self.setup_environ()
  28.  
  29.  
  30. class MockHandler(WSGIRequestHandler):
  31.     """Non-socket HTTP handler"""
  32.     def setup(self):
  33.         self.connection = self.request
  34.         self.rfile, self.wfile = self.connection
  35.  
  36.     def finish(self):
  37.         pass
  38.  
  39.  
  40.  
  41.  
  42.  
  43. def hello_app(environ,start_response):
  44.     start_response("200 OK", [
  45.         ('Content-Type','text/plain'),
  46.         ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
  47.     ])
  48.     return ["Hello, world!"]
  49.  
  50. def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
  51.     server = make_server("", 80, app, MockServer, MockHandler)
  52.     inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
  53.     sys.stderr = err
  54.  
  55.     try:
  56.         server.finish_request((inp,out), ("127.0.0.1",8888))
  57.     finally:
  58.         sys.stderr = olderr
  59.  
  60.     return out.getvalue(), err.getvalue()
  61.  
  62.  
  63.  
  64.  
  65.  
  66.  
  67.  
  68.  
  69.  
  70.  
  71.  
  72.  
  73.  
  74.  
  75.  
  76.  
  77.  
  78.  
  79.  
  80.  
  81.  
  82.  
  83.  
  84. def compare_generic_iter(make_it,match):
  85.     """Utility to compare a generic 2.1/2.2+ iterator with an iterable
  86.  
  87.     If running under Python 2.2+, this tests the iterator using iter()/next(),
  88.     as well as __getitem__.  'make_it' must be a function returning a fresh
  89.     iterator to be tested (since this may test the iterator twice)."""
  90.  
  91.     it = make_it()
  92.     n = 0
  93.     for item in match:
  94.         if not it[n]==item: raise AssertionError
  95.         n+=1
  96.     try:
  97.         it[n]
  98.     except IndexError:
  99.         pass
  100.     else:
  101.         raise AssertionError("Too many items from __getitem__",it)
  102.  
  103.     try:
  104.         iter, StopIteration
  105.     except NameError:
  106.         pass
  107.     else:
  108.         # Only test iter mode under 2.2+
  109.         it = make_it()
  110.         if not iter(it) is it: raise AssertionError
  111.         for item in match:
  112.             if not it.next()==item: raise AssertionError
  113.         try:
  114.             it.next()
  115.         except StopIteration:
  116.             pass
  117.         else:
  118.             raise AssertionError("Too many items from .next()",it)
  119.  
  120.  
  121.  
  122.  
  123.  
  124.  
  125. class IntegrationTests(TestCase):
  126.  
  127.     def check_hello(self, out, has_length=True):
  128.         self.assertEqual(out,
  129.             "HTTP/1.0 200 OK\r\n"
  130.             "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
  131.             "Content-Type: text/plain\r\n"
  132.             "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
  133.             (has_length and  "Content-Length: 13\r\n" or "") +
  134.             "\r\n"
  135.             "Hello, world!"
  136.         )
  137.  
  138.     def test_plain_hello(self):
  139.         out, err = run_amock()
  140.         self.check_hello(out)
  141.  
  142.     def test_validated_hello(self):
  143.         out, err = run_amock(validator(hello_app))
  144.         # the middleware doesn't support len(), so content-length isn't there
  145.         self.check_hello(out, has_length=False)
  146.  
  147.     def test_simple_validation_error(self):
  148.         def bad_app(environ,start_response):
  149.             start_response("200 OK", ('Content-Type','text/plain'))
  150.             return ["Hello, world!"]
  151.         out, err = run_amock(validator(bad_app))
  152.         self.failUnless(out.endswith(
  153.             "A server error occurred.  Please contact the administrator."
  154.         ))
  155.         self.assertEqual(
  156.             err.splitlines()[-2],
  157.             "AssertionError: Headers (('Content-Type', 'text/plain')) must"
  158.             " be of type list: <type 'tuple'>"
  159.         )
  160.  
  161.  
  162.  
  163.  
  164.  
  165.  
  166. class UtilityTests(TestCase):
  167.  
  168.     def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
  169.         env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
  170.         util.setup_testing_defaults(env)
  171.         self.assertEqual(util.shift_path_info(env),part)
  172.         self.assertEqual(env['PATH_INFO'],pi_out)
  173.         self.assertEqual(env['SCRIPT_NAME'],sn_out)
  174.         return env
  175.  
  176.     def checkDefault(self, key, value, alt=None):
  177.         # Check defaulting when empty
  178.         env = {}
  179.         util.setup_testing_defaults(env)
  180.         if isinstance(value,StringIO):
  181.             self.failUnless(isinstance(env[key],StringIO))
  182.         else:
  183.             self.assertEqual(env[key],value)
  184.  
  185.         # Check existing value
  186.         env = {key:alt}
  187.         util.setup_testing_defaults(env)
  188.         self.failUnless(env[key] is alt)
  189.  
  190.     def checkCrossDefault(self,key,value,**kw):
  191.         util.setup_testing_defaults(kw)
  192.         self.assertEqual(kw[key],value)
  193.  
  194.     def checkAppURI(self,uri,**kw):
  195.         util.setup_testing_defaults(kw)
  196.         self.assertEqual(util.application_uri(kw),uri)
  197.  
  198.     def checkReqURI(self,uri,query=1,**kw):
  199.         util.setup_testing_defaults(kw)
  200.         self.assertEqual(util.request_uri(kw,query),uri)
  201.  
  202.  
  203.  
  204.  
  205.  
  206.  
  207.     def checkFW(self,text,size,match):
  208.  
  209.         def make_it(text=text,size=size):
  210.             return util.FileWrapper(StringIO(text),size)
  211.  
  212.         compare_generic_iter(make_it,match)
  213.  
  214.         it = make_it()
  215.         self.failIf(it.filelike.closed)
  216.  
  217.         for item in it:
  218.             pass
  219.  
  220.         self.failIf(it.filelike.closed)
  221.  
  222.         it.close()
  223.         self.failUnless(it.filelike.closed)
  224.  
  225.  
  226.     def testSimpleShifts(self):
  227.         self.checkShift('','/', '', '/', '')
  228.         self.checkShift('','/x', 'x', '/x', '')
  229.         self.checkShift('/','', None, '/', '')
  230.         self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
  231.         self.checkShift('/a','/x/',  'x', '/a/x', '/')
  232.  
  233.  
  234.     def testNormalizedShifts(self):
  235.         self.checkShift('/a/b', '/../y', '..', '/a', '/y')
  236.         self.checkShift('', '/../y', '..', '', '/y')
  237.         self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
  238.         self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
  239.         self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
  240.         self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
  241.         self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
  242.         self.checkShift('/a/b', '///', '', '/a/b/', '')
  243.         self.checkShift('/a/b', '/.//', '', '/a/b/', '')
  244.         self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
  245.         self.checkShift('/a/b', '/.', None, '/a/b', '')
  246.  
  247.  
  248.     def testDefaults(self):
  249.         for key, value in [
  250.             ('SERVER_NAME','127.0.0.1'),
  251.             ('SERVER_PORT', '80'),
  252.             ('SERVER_PROTOCOL','HTTP/1.0'),
  253.             ('HTTP_HOST','127.0.0.1'),
  254.             ('REQUEST_METHOD','GET'),
  255.             ('SCRIPT_NAME',''),
  256.             ('PATH_INFO','/'),
  257.             ('wsgi.version', (1,0)),
  258.             ('wsgi.run_once', 0),
  259.             ('wsgi.multithread', 0),
  260.             ('wsgi.multiprocess', 0),
  261.             ('wsgi.input', StringIO("")),
  262.             ('wsgi.errors', StringIO()),
  263.             ('wsgi.url_scheme','http'),
  264.         ]:
  265.             self.checkDefault(key,value)
  266.  
  267.  
  268.     def testCrossDefaults(self):
  269.         self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
  270.         self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
  271.         self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
  272.         self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
  273.         self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
  274.         self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
  275.         self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
  276.  
  277.  
  278.     def testGuessScheme(self):
  279.         self.assertEqual(util.guess_scheme({}), "http")
  280.         self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
  281.         self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
  282.         self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
  283.         self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
  284.  
  285.  
  286.  
  287.  
  288.  
  289.     def testAppURIs(self):
  290.         self.checkAppURI("http://127.0.0.1/")
  291.         self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
  292.         self.checkAppURI("http://spam.example.com:2071/",
  293.             HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
  294.         self.checkAppURI("http://spam.example.com/",
  295.             SERVER_NAME="spam.example.com")
  296.         self.checkAppURI("http://127.0.0.1/",
  297.             HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
  298.         self.checkAppURI("https://127.0.0.1/", HTTPS="on")
  299.         self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
  300.             HTTP_HOST=None)
  301.  
  302.     def testReqURIs(self):
  303.         self.checkReqURI("http://127.0.0.1/")
  304.         self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
  305.         self.checkReqURI("http://127.0.0.1/spammity/spam",
  306.             SCRIPT_NAME="/spammity", PATH_INFO="/spam")
  307.         self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
  308.             SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
  309.         self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
  310.             SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
  311.  
  312.     def testFileWrapper(self):
  313.         self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
  314.  
  315.     def testHopByHop(self):
  316.         for hop in (
  317.             "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
  318.             "TE Trailers Transfer-Encoding Upgrade"
  319.         ).split():
  320.             for alt in hop, hop.title(), hop.upper(), hop.lower():
  321.                 self.failUnless(util.is_hop_by_hop(alt))
  322.  
  323.         # Not comprehensive, just a few random header names
  324.         for hop in (
  325.             "Accept Cache-Control Date Pragma Trailer Via Warning"
  326.         ).split():
  327.             for alt in hop, hop.title(), hop.upper(), hop.lower():
  328.                 self.failIf(util.is_hop_by_hop(alt))
  329.  
  330. class HeaderTests(TestCase):
  331.  
  332.     def testMappingInterface(self):
  333.         test = [('x','y')]
  334.         self.assertEqual(len(Headers([])),0)
  335.         self.assertEqual(len(Headers(test[:])),1)
  336.         self.assertEqual(Headers(test[:]).keys(), ['x'])
  337.         self.assertEqual(Headers(test[:]).values(), ['y'])
  338.         self.assertEqual(Headers(test[:]).items(), test)
  339.         self.failIf(Headers(test).items() is test)  # must be copy!
  340.  
  341.         h=Headers([])
  342.         del h['foo']   # should not raise an error
  343.  
  344.         h['Foo'] = 'bar'
  345.         for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
  346.             self.failUnless(m('foo'))
  347.             self.failUnless(m('Foo'))
  348.             self.failUnless(m('FOO'))
  349.             self.failIf(m('bar'))
  350.  
  351.         self.assertEqual(h['foo'],'bar')
  352.         h['foo'] = 'baz'
  353.         self.assertEqual(h['FOO'],'baz')
  354.         self.assertEqual(h.get_all('foo'),['baz'])
  355.  
  356.         self.assertEqual(h.get("foo","whee"), "baz")
  357.         self.assertEqual(h.get("zoo","whee"), "whee")
  358.         self.assertEqual(h.setdefault("foo","whee"), "baz")
  359.         self.assertEqual(h.setdefault("zoo","whee"), "whee")
  360.         self.assertEqual(h["foo"],"baz")
  361.         self.assertEqual(h["zoo"],"whee")
  362.  
  363.     def testRequireList(self):
  364.         self.assertRaises(TypeError, Headers, "foo")
  365.  
  366.  
  367.     def testExtras(self):
  368.         h = Headers([])
  369.         self.assertEqual(str(h),'\r\n')
  370.  
  371.         h.add_header('foo','bar',baz="spam")
  372.         self.assertEqual(h['foo'], 'bar; baz="spam"')
  373.         self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
  374.  
  375.         h.add_header('Foo','bar',cheese=None)
  376.         self.assertEqual(h.get_all('foo'),
  377.             ['bar; baz="spam"', 'bar; cheese'])
  378.  
  379.         self.assertEqual(str(h),
  380.             'foo: bar; baz="spam"\r\n'
  381.             'Foo: bar; cheese\r\n'
  382.             '\r\n'
  383.         )
  384.  
  385.  
  386. class ErrorHandler(BaseCGIHandler):
  387.     """Simple handler subclass for testing BaseHandler"""
  388.  
  389.     def __init__(self,**kw):
  390.         setup_testing_defaults(kw)
  391.         BaseCGIHandler.__init__(
  392.             self, StringIO(''), StringIO(), StringIO(), kw,
  393.             multithread=True, multiprocess=True
  394.         )
  395.  
  396. class TestHandler(ErrorHandler):
  397.     """Simple handler subclass for testing BaseHandler, w/error passthru"""
  398.  
  399.     def handle_error(self):
  400.         raise   # for testing, we want to see what's happening
  401.  
  402.  
  403.  
  404.  
  405.  
  406.  
  407.  
  408.  
  409.  
  410.  
  411.  
  412. class HandlerTests(TestCase):
  413.  
  414.     def checkEnvironAttrs(self, handler):
  415.         env = handler.environ
  416.         for attr in [
  417.             'version','multithread','multiprocess','run_once','file_wrapper'
  418.         ]:
  419.             if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
  420.                 continue
  421.             self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
  422.  
  423.     def checkOSEnviron(self,handler):
  424.         empty = {}; setup_testing_defaults(empty)
  425.         env = handler.environ
  426.         from os import environ
  427.         for k,v in environ.items():
  428.             if not empty.has_key(k):
  429.                 self.assertEqual(env[k],v)
  430.         for k,v in empty.items():
  431.             self.failUnless(env.has_key(k))
  432.  
  433.     def testEnviron(self):
  434.         h = TestHandler(X="Y")
  435.         h.setup_environ()
  436.         self.checkEnvironAttrs(h)
  437.         self.checkOSEnviron(h)
  438.         self.assertEqual(h.environ["X"],"Y")
  439.  
  440.     def testCGIEnviron(self):
  441.         h = BaseCGIHandler(None,None,None,{})
  442.         h.setup_environ()
  443.         for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
  444.             self.assert_(h.environ.has_key(key))
  445.  
  446.     def testScheme(self):
  447.         h=TestHandler(HTTPS="on"); h.setup_environ()
  448.         self.assertEqual(h.environ['wsgi.url_scheme'],'https')
  449.         h=TestHandler(); h.setup_environ()
  450.         self.assertEqual(h.environ['wsgi.url_scheme'],'http')
  451.  
  452.  
  453.     def testAbstractMethods(self):
  454.         h = BaseHandler()
  455.         for name in [
  456.             '_flush','get_stdin','get_stderr','add_cgi_vars'
  457.         ]:
  458.             self.assertRaises(NotImplementedError, getattr(h,name))
  459.         self.assertRaises(NotImplementedError, h._write, "test")
  460.  
  461.  
  462.     def testContentLength(self):
  463.         # Demo one reason iteration is better than write()...  ;)
  464.  
  465.         def trivial_app1(e,s):
  466.             s('200 OK',[])
  467.             return [e['wsgi.url_scheme']]
  468.  
  469.         def trivial_app2(e,s):
  470.             s('200 OK',[])(e['wsgi.url_scheme'])
  471.             return []
  472.  
  473.         h = TestHandler()
  474.         h.run(trivial_app1)
  475.         self.assertEqual(h.stdout.getvalue(),
  476.             "Status: 200 OK\r\n"
  477.             "Content-Length: 4\r\n"
  478.             "\r\n"
  479.             "http")
  480.  
  481.         h = TestHandler()
  482.         h.run(trivial_app2)
  483.         self.assertEqual(h.stdout.getvalue(),
  484.             "Status: 200 OK\r\n"
  485.             "\r\n"
  486.             "http")
  487.  
  488.  
  489.  
  490.  
  491.  
  492.  
  493.  
  494.     def testBasicErrorOutput(self):
  495.  
  496.         def non_error_app(e,s):
  497.             s('200 OK',[])
  498.             return []
  499.  
  500.         def error_app(e,s):
  501.             raise AssertionError("This should be caught by handler")
  502.  
  503.         h = ErrorHandler()
  504.         h.run(non_error_app)
  505.         self.assertEqual(h.stdout.getvalue(),
  506.             "Status: 200 OK\r\n"
  507.             "Content-Length: 0\r\n"
  508.             "\r\n")
  509.         self.assertEqual(h.stderr.getvalue(),"")
  510.  
  511.         h = ErrorHandler()
  512.         h.run(error_app)
  513.         self.assertEqual(h.stdout.getvalue(),
  514.             "Status: %s\r\n"
  515.             "Content-Type: text/plain\r\n"
  516.             "Content-Length: %d\r\n"
  517.             "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
  518.  
  519.         self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
  520.  
  521.     def testErrorAfterOutput(self):
  522.         MSG = "Some output has been sent"
  523.         def error_app(e,s):
  524.             s("200 OK",[])(MSG)
  525.             raise AssertionError("This should be caught by handler")
  526.  
  527.         h = ErrorHandler()
  528.         h.run(error_app)
  529.         self.assertEqual(h.stdout.getvalue(),
  530.             "Status: 200 OK\r\n"
  531.             "\r\n"+MSG)
  532.         self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
  533.  
  534.  
  535.     def testHeaderFormats(self):
  536.  
  537.         def non_error_app(e,s):
  538.             s('200 OK',[])
  539.             return []
  540.  
  541.         stdpat = (
  542.             r"HTTP/%s 200 OK\r\n"
  543.             r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
  544.             r"%s" r"Content-Length: 0\r\n" r"\r\n"
  545.         )
  546.         shortpat = (
  547.             "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
  548.         )
  549.  
  550.         for ssw in "FooBar/1.0", None:
  551.             sw = ssw and "Server: %s\r\n" % ssw or ""
  552.  
  553.             for version in "1.0", "1.1":
  554.                 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
  555.  
  556.                     h = TestHandler(SERVER_PROTOCOL=proto)
  557.                     h.origin_server = False
  558.                     h.http_version = version
  559.                     h.server_software = ssw
  560.                     h.run(non_error_app)
  561.                     self.assertEqual(shortpat,h.stdout.getvalue())
  562.  
  563.                     h = TestHandler(SERVER_PROTOCOL=proto)
  564.                     h.origin_server = True
  565.                     h.http_version = version
  566.                     h.server_software = ssw
  567.                     h.run(non_error_app)
  568.                     if proto=="HTTP/0.9":
  569.                         self.assertEqual(h.stdout.getvalue(),"")
  570.                     else:
  571.                         self.failUnless(
  572.                             re.match(stdpat%(version,sw), h.stdout.getvalue()),
  573.                             (stdpat%(version,sw), h.stdout.getvalue())
  574.                         )
  575.  
  576. # This epilogue is needed for compatibility with the Python 2.5 regrtest module
  577.  
  578. def test_main():
  579.     test_support.run_unittest(__name__)
  580.  
  581. if __name__ == "__main__":
  582.     test_main()
  583.  
  584.  
  585.  
  586.  
  587.  
  588.  
  589.  
  590.  
  591.  
  592.  
  593.  
  594.  
  595.  
  596.  
  597.  
  598.  
  599.  
  600.  
  601.  
  602.  
  603.  
  604.  
  605.  
  606.  
  607.  
  608.  
  609.  
  610.  
  611.  
  612. # the above lines intentionally left blank
  613.