home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 July / maximum-cd-2011-07.iso / DiscContents / LibO_3.3.2_Win_x86_install_multi.exe / libreoffice1.cab / test_urllib2_localnet.py < prev    next >
Encoding:
Python Source  |  2011-03-15  |  16.5 KB  |  492 lines

  1. #!/usr/bin/env python
  2.  
  3. import mimetools
  4. import threading
  5. import urlparse
  6. import urllib2
  7. import BaseHTTPServer
  8. import unittest
  9. import hashlib
  10. from test import test_support
  11.  
  12. # Loopback http server infrastructure
  13.  
  14. class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
  15.     """HTTP server w/ a few modifications that make it useful for
  16.     loopback testing purposes.
  17.     """
  18.  
  19.     def __init__(self, server_address, RequestHandlerClass):
  20.         BaseHTTPServer.HTTPServer.__init__(self,
  21.                                            server_address,
  22.                                            RequestHandlerClass)
  23.  
  24.         # Set the timeout of our listening socket really low so
  25.         # that we can stop the server easily.
  26.         self.socket.settimeout(1.0)
  27.  
  28.     def get_request(self):
  29.         """BaseHTTPServer method, overridden."""
  30.  
  31.         request, client_address = self.socket.accept()
  32.  
  33.         # It's a loopback connection, so setting the timeout
  34.         # really low shouldn't affect anything, but should make
  35.         # deadlocks less likely to occur.
  36.         request.settimeout(10.0)
  37.  
  38.         return (request, client_address)
  39.  
  40. class LoopbackHttpServerThread(threading.Thread):
  41.     """Stoppable thread that runs a loopback http server."""
  42.  
  43.     def __init__(self, request_handler):
  44.         threading.Thread.__init__(self)
  45.         self._stop = False
  46.         self.ready = threading.Event()
  47.         request_handler.protocol_version = "HTTP/1.0"
  48.         self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
  49.                                         request_handler)
  50.         #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
  51.         #                                      self.httpd.server_port)
  52.         self.port = self.httpd.server_port
  53.  
  54.     def stop(self):
  55.         """Stops the webserver if it's currently running."""
  56.  
  57.         # Set the stop flag.
  58.         self._stop = True
  59.  
  60.         self.join()
  61.  
  62.     def run(self):
  63.         self.ready.set()
  64.         while not self._stop:
  65.             self.httpd.handle_request()
  66.  
  67. # Authentication infrastructure
  68.  
  69. class DigestAuthHandler:
  70.     """Handler for performing digest authentication."""
  71.  
  72.     def __init__(self):
  73.         self._request_num = 0
  74.         self._nonces = []
  75.         self._users = {}
  76.         self._realm_name = "Test Realm"
  77.         self._qop = "auth"
  78.  
  79.     def set_qop(self, qop):
  80.         self._qop = qop
  81.  
  82.     def set_users(self, users):
  83.         assert isinstance(users, dict)
  84.         self._users = users
  85.  
  86.     def set_realm(self, realm):
  87.         self._realm_name = realm
  88.  
  89.     def _generate_nonce(self):
  90.         self._request_num += 1
  91.         nonce = hashlib.md5(str(self._request_num)).hexdigest()
  92.         self._nonces.append(nonce)
  93.         return nonce
  94.  
  95.     def _create_auth_dict(self, auth_str):
  96.         first_space_index = auth_str.find(" ")
  97.         auth_str = auth_str[first_space_index+1:]
  98.  
  99.         parts = auth_str.split(",")
  100.  
  101.         auth_dict = {}
  102.         for part in parts:
  103.             name, value = part.split("=")
  104.             name = name.strip()
  105.             if value[0] == '"' and value[-1] == '"':
  106.                 value = value[1:-1]
  107.             else:
  108.                 value = value.strip()
  109.             auth_dict[name] = value
  110.         return auth_dict
  111.  
  112.     def _validate_auth(self, auth_dict, password, method, uri):
  113.         final_dict = {}
  114.         final_dict.update(auth_dict)
  115.         final_dict["password"] = password
  116.         final_dict["method"] = method
  117.         final_dict["uri"] = uri
  118.         HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
  119.         HA1 = hashlib.md5(HA1_str).hexdigest()
  120.         HA2_str = "%(method)s:%(uri)s" % final_dict
  121.         HA2 = hashlib.md5(HA2_str).hexdigest()
  122.         final_dict["HA1"] = HA1
  123.         final_dict["HA2"] = HA2
  124.         response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
  125.                        "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
  126.         response = hashlib.md5(response_str).hexdigest()
  127.  
  128.         return response == auth_dict["response"]
  129.  
  130.     def _return_auth_challenge(self, request_handler):
  131.         request_handler.send_response(407, "Proxy Authentication Required")
  132.         request_handler.send_header("Content-Type", "text/html")
  133.         request_handler.send_header(
  134.             'Proxy-Authenticate', 'Digest realm="%s", '
  135.             'qop="%s",'
  136.             'nonce="%s", ' % \
  137.             (self._realm_name, self._qop, self._generate_nonce()))
  138.         # XXX: Not sure if we're supposed to add this next header or
  139.         # not.
  140.         #request_handler.send_header('Connection', 'close')
  141.         request_handler.end_headers()
  142.         request_handler.wfile.write("Proxy Authentication Required.")
  143.         return False
  144.  
  145.     def handle_request(self, request_handler):
  146.         """Performs digest authentication on the given HTTP request
  147.         handler.  Returns True if authentication was successful, False
  148.         otherwise.
  149.  
  150.         If no users have been set, then digest auth is effectively
  151.         disabled and this method will always return True.
  152.         """
  153.  
  154.         if len(self._users) == 0:
  155.             return True
  156.  
  157.         if not request_handler.headers.has_key('Proxy-Authorization'):
  158.             return self._return_auth_challenge(request_handler)
  159.         else:
  160.             auth_dict = self._create_auth_dict(
  161.                 request_handler.headers['Proxy-Authorization']
  162.                 )
  163.             if self._users.has_key(auth_dict["username"]):
  164.                 password = self._users[ auth_dict["username"] ]
  165.             else:
  166.                 return self._return_auth_challenge(request_handler)
  167.             if not auth_dict.get("nonce") in self._nonces:
  168.                 return self._return_auth_challenge(request_handler)
  169.             else:
  170.                 self._nonces.remove(auth_dict["nonce"])
  171.  
  172.             auth_validated = False
  173.  
  174.             # MSIE uses short_path in its validation, but Python's
  175.             # urllib2 uses the full path, so we're going to see if
  176.             # either of them works here.
  177.  
  178.             for path in [request_handler.path, request_handler.short_path]:
  179.                 if self._validate_auth(auth_dict,
  180.                                        password,
  181.                                        request_handler.command,
  182.                                        path):
  183.                     auth_validated = True
  184.  
  185.             if not auth_validated:
  186.                 return self._return_auth_challenge(request_handler)
  187.             return True
  188.  
  189. # Proxy test infrastructure
  190.  
  191. class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
  192.     """This is a 'fake proxy' that makes it look like the entire
  193.     internet has gone down due to a sudden zombie invasion.  It main
  194.     utility is in providing us with authentication support for
  195.     testing.
  196.     """
  197.  
  198.     digest_auth_handler = DigestAuthHandler()
  199.  
  200.     def log_message(self, format, *args):
  201.         # Uncomment the next line for debugging.
  202.         #sys.stderr.write(format % args)
  203.         pass
  204.  
  205.     def do_GET(self):
  206.         (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
  207.             self.path, 'http')
  208.         self.short_path = path
  209.         if self.digest_auth_handler.handle_request(self):
  210.             self.send_response(200, "OK")
  211.             self.send_header("Content-Type", "text/html")
  212.             self.end_headers()
  213.             self.wfile.write("You've reached %s!<BR>" % self.path)
  214.             self.wfile.write("Our apologies, but our server is down due to "
  215.                               "a sudden zombie invasion.")
  216.  
  217. # Test cases
  218.  
  219. class ProxyAuthTests(unittest.TestCase):
  220.     URL = "http://localhost"
  221.  
  222.     USER = "tester"
  223.     PASSWD = "test123"
  224.     REALM = "TestRealm"
  225.  
  226.     def setUp(self):
  227.         FakeProxyHandler.digest_auth_handler.set_users({
  228.             self.USER : self.PASSWD
  229.             })
  230.         FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
  231.  
  232.         self.server = LoopbackHttpServerThread(FakeProxyHandler)
  233.         self.server.start()
  234.         self.server.ready.wait()
  235.         proxy_url = "http://127.0.0.1:%d" % self.server.port
  236.         handler = urllib2.ProxyHandler({"http" : proxy_url})
  237.         self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
  238.         self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
  239.  
  240.     def tearDown(self):
  241.         self.server.stop()
  242.  
  243.     def test_proxy_with_bad_password_raises_httperror(self):
  244.         self._digest_auth_handler.add_password(self.REALM, self.URL,
  245.                                                self.USER, self.PASSWD+"bad")
  246.         FakeProxyHandler.digest_auth_handler.set_qop("auth")
  247.         self.assertRaises(urllib2.HTTPError,
  248.                           self.opener.open,
  249.                           self.URL)
  250.  
  251.     def test_proxy_with_no_password_raises_httperror(self):
  252.         FakeProxyHandler.digest_auth_handler.set_qop("auth")
  253.         self.assertRaises(urllib2.HTTPError,
  254.                           self.opener.open,
  255.                           self.URL)
  256.  
  257.     def test_proxy_qop_auth_works(self):
  258.         self._digest_auth_handler.add_password(self.REALM, self.URL,
  259.                                                self.USER, self.PASSWD)
  260.         FakeProxyHandler.digest_auth_handler.set_qop("auth")
  261.         result = self.opener.open(self.URL)
  262.         while result.read():
  263.             pass
  264.         result.close()
  265.  
  266.     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
  267.         self._digest_auth_handler.add_password(self.REALM, self.URL,
  268.                                                self.USER, self.PASSWD)
  269.         FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
  270.         try:
  271.             result = self.opener.open(self.URL)
  272.         except urllib2.URLError:
  273.             # It's okay if we don't support auth-int, but we certainly
  274.             # shouldn't receive any kind of exception here other than
  275.             # a URLError.
  276.             result = None
  277.         if result:
  278.             while result.read():
  279.                 pass
  280.             result.close()
  281.  
  282.  
  283. def GetRequestHandler(responses):
  284.  
  285.     class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
  286.  
  287.         server_version = "TestHTTP/"
  288.         requests = []
  289.         headers_received = []
  290.         port = 80
  291.  
  292.         def do_GET(self):
  293.             body = self.send_head()
  294.             if body:
  295.                 self.wfile.write(body)
  296.  
  297.         def do_POST(self):
  298.             content_length = self.headers['Content-Length']
  299.             post_data = self.rfile.read(int(content_length))
  300.             self.do_GET()
  301.             self.requests.append(post_data)
  302.  
  303.         def send_head(self):
  304.             FakeHTTPRequestHandler.headers_received = self.headers
  305.             self.requests.append(self.path)
  306.             response_code, headers, body = responses.pop(0)
  307.  
  308.             self.send_response(response_code)
  309.  
  310.             for (header, value) in headers:
  311.                 self.send_header(header, value % self.port)
  312.             if body:
  313.                 self.send_header('Content-type', 'text/plain')
  314.                 self.end_headers()
  315.                 return body
  316.             self.end_headers()
  317.  
  318.         def log_message(self, *args):
  319.             pass
  320.  
  321.  
  322.     return FakeHTTPRequestHandler
  323.  
  324.  
  325. class TestUrlopen(unittest.TestCase):
  326.     """Tests urllib2.urlopen using the network.
  327.  
  328.     These tests are not exhaustive.  Assuming that testing using files does a
  329.     good job overall of some of the basic interface features.  There are no
  330.     tests exercising the optional 'data' and 'proxies' arguments.  No tests
  331.     for transparent redirection have been written.
  332.     """
  333.  
  334.     def start_server(self, responses):
  335.         handler = GetRequestHandler(responses)
  336.  
  337.         self.server = LoopbackHttpServerThread(handler)
  338.         self.server.start()
  339.         self.server.ready.wait()
  340.         port = self.server.port
  341.         handler.port = port
  342.         return handler
  343.  
  344.  
  345.     def test_redirection(self):
  346.         expected_response = 'We got here...'
  347.         responses = [
  348.             (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
  349.             (200, [], expected_response)
  350.         ]
  351.  
  352.         handler = self.start_server(responses)
  353.  
  354.         try:
  355.             f = urllib2.urlopen('http://localhost:%s/' % handler.port)
  356.             data = f.read()
  357.             f.close()
  358.  
  359.             self.assertEquals(data, expected_response)
  360.             self.assertEquals(handler.requests, ['/', '/somewhere_else'])
  361.         finally:
  362.             self.server.stop()
  363.  
  364.  
  365.     def test_404(self):
  366.         expected_response = 'Bad bad bad...'
  367.         handler = self.start_server([(404, [], expected_response)])
  368.  
  369.         try:
  370.             try:
  371.                 urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
  372.             except urllib2.URLError, f:
  373.                 pass
  374.             else:
  375.                 self.fail('404 should raise URLError')
  376.  
  377.             data = f.read()
  378.             f.close()
  379.  
  380.             self.assertEquals(data, expected_response)
  381.             self.assertEquals(handler.requests, ['/weeble'])
  382.         finally:
  383.             self.server.stop()
  384.  
  385.  
  386.     def test_200(self):
  387.         expected_response = 'pycon 2008...'
  388.         handler = self.start_server([(200, [], expected_response)])
  389.  
  390.         try:
  391.             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
  392.             data = f.read()
  393.             f.close()
  394.  
  395.             self.assertEquals(data, expected_response)
  396.             self.assertEquals(handler.requests, ['/bizarre'])
  397.         finally:
  398.             self.server.stop()
  399.  
  400.     def test_200_with_parameters(self):
  401.         expected_response = 'pycon 2008...'
  402.         handler = self.start_server([(200, [], expected_response)])
  403.  
  404.         try:
  405.             f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
  406.             data = f.read()
  407.             f.close()
  408.  
  409.             self.assertEquals(data, expected_response)
  410.             self.assertEquals(handler.requests, ['/bizarre', 'get=with_feeling'])
  411.         finally:
  412.             self.server.stop()
  413.  
  414.  
  415.     def test_sending_headers(self):
  416.         handler = self.start_server([(200, [], "we don't care")])
  417.  
  418.         try:
  419.             req = urllib2.Request("http://localhost:%s/" % handler.port,
  420.                                   headers={'Range': 'bytes=20-39'})
  421.             urllib2.urlopen(req)
  422.             self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
  423.         finally:
  424.             self.server.stop()
  425.  
  426.     def test_basic(self):
  427.         handler = self.start_server([(200, [], "we don't care")])
  428.  
  429.         try:
  430.             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
  431.             for attr in ("read", "close", "info", "geturl"):
  432.                 self.assert_(hasattr(open_url, attr), "object returned from "
  433.                              "urlopen lacks the %s attribute" % attr)
  434.             try:
  435.                 self.assert_(open_url.read(), "calling 'read' failed")
  436.             finally:
  437.                 open_url.close()
  438.         finally:
  439.             self.server.stop()
  440.  
  441.     def test_info(self):
  442.         handler = self.start_server([(200, [], "we don't care")])
  443.  
  444.         try:
  445.             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
  446.             info_obj = open_url.info()
  447.             self.assert_(isinstance(info_obj, mimetools.Message),
  448.                          "object returned by 'info' is not an instance of "
  449.                          "mimetools.Message")
  450.             self.assertEqual(info_obj.getsubtype(), "plain")
  451.         finally:
  452.             self.server.stop()
  453.  
  454.     def test_geturl(self):
  455.         # Make sure same URL as opened is returned by geturl.
  456.         handler = self.start_server([(200, [], "we don't care")])
  457.  
  458.         try:
  459.             open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
  460.             url = open_url.geturl()
  461.             self.assertEqual(url, "http://localhost:%s" % handler.port)
  462.         finally:
  463.             self.server.stop()
  464.  
  465.  
  466.     def test_bad_address(self):
  467.         # Make sure proper exception is raised when connecting to a bogus
  468.         # address.
  469.         self.assertRaises(IOError,
  470.                           # SF patch 809915:  In Sep 2003, VeriSign started
  471.                           # highjacking invalid .com and .net addresses to
  472.                           # boost traffic to their own site.  This test
  473.                           # started failing then.  One hopes the .invalid
  474.                           # domain will be spared to serve its defined
  475.                           # purpose.
  476.                           # urllib2.urlopen, "http://www.sadflkjsasadf.com/")
  477.                           urllib2.urlopen, "http://www.python.invalid./")
  478.  
  479.  
  480. def test_main():
  481.     # We will NOT depend on the network resource flag
  482.     # (Lib/test/regrtest.py -u network) since all tests here are only
  483.     # localhost.  However, if this is a bad rationale, then uncomment
  484.     # the next line.
  485.     #test_support.requires("network")
  486.  
  487.     test_support.run_unittest(ProxyAuthTests)
  488.     test_support.run_unittest(TestUrlopen)
  489.  
  490. if __name__ == "__main__":
  491.     test_main()
  492.