home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_urllib2.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  42.8 KB  |  1,120 lines

  1. import unittest
  2. from test import test_support
  3.  
  4. import os
  5. import socket
  6. import StringIO
  7.  
  8. import urllib2
  9. from urllib2 import Request, OpenerDirector
  10.  
  11. # XXX
  12. # Request
  13. # CacheFTPHandler (hard to write)
  14. # parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
  15.  
  16. class TrivialTests(unittest.TestCase):
  17.     def test_trivial(self):
  18.         # A couple trivial tests
  19.  
  20.         self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
  21.  
  22.         # XXX Name hacking to get this to work on Windows.
  23.         fname = os.path.abspath(urllib2.__file__).replace('\\', '/')
  24.         if fname[1:2] == ":":
  25.             fname = fname[2:]
  26.         # And more hacking to get it to work on MacOS. This assumes
  27.         # urllib.pathname2url works, unfortunately...
  28.         if os.name == 'mac':
  29.             fname = '/' + fname.replace(':', '/')
  30.         elif os.name == 'riscos':
  31.             import string
  32.             fname = os.expand(fname)
  33.             fname = fname.translate(string.maketrans("/.", "./"))
  34.  
  35.         file_url = "file://%s" % fname
  36.         f = urllib2.urlopen(file_url)
  37.  
  38.         buf = f.read()
  39.         f.close()
  40.  
  41.     def test_parse_http_list(self):
  42.         tests = [('a,b,c', ['a', 'b', 'c']),
  43.                  ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
  44.                  ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
  45.                  ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
  46.         for string, list in tests:
  47.             self.assertEquals(urllib2.parse_http_list(string), list)
  48.  
  49.  
  50. def test_request_headers_dict():
  51.     """
  52.     The Request.headers dictionary is not a documented interface.  It should
  53.     stay that way, because the complete set of headers are only accessible
  54.     through the .get_header(), .has_header(), .header_items() interface.
  55.     However, .headers pre-dates those methods, and so real code will be using
  56.     the dictionary.
  57.  
  58.     The introduction in 2.4 of those methods was a mistake for the same reason:
  59.     code that previously saw all (urllib2 user)-provided headers in .headers
  60.     now sees only a subset (and the function interface is ugly and incomplete).
  61.     A better change would have been to replace .headers dict with a dict
  62.     subclass (or UserDict.DictMixin instance?)  that preserved the .headers
  63.     interface and also provided access to the "unredirected" headers.  It's
  64.     probably too late to fix that, though.
  65.  
  66.  
  67.     Check .capitalize() case normalization:
  68.  
  69.     >>> url = "http://example.com"
  70.     >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
  71.     'blah'
  72.     >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
  73.     'blah'
  74.  
  75.     Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
  76.     but that could be changed in future.
  77.  
  78.     """
  79.  
  80. def test_request_headers_methods():
  81.     """
  82.     Note the case normalization of header names here, to .capitalize()-case.
  83.     This should be preserved for backwards-compatibility.  (In the HTTP case,
  84.     normalization to .title()-case is done by urllib2 before sending headers to
  85.     httplib).
  86.  
  87.     >>> url = "http://example.com"
  88.     >>> r = Request(url, headers={"Spam-eggs": "blah"})
  89.     >>> r.has_header("Spam-eggs")
  90.     True
  91.     >>> r.header_items()
  92.     [('Spam-eggs', 'blah')]
  93.     >>> r.add_header("Foo-Bar", "baz")
  94.     >>> items = r.header_items()
  95.     >>> items.sort()
  96.     >>> items
  97.     [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
  98.  
  99.     Note that e.g. r.has_header("spam-EggS") is currently False, and
  100.     r.get_header("spam-EggS") returns None, but that could be changed in
  101.     future.
  102.  
  103.     >>> r.has_header("Not-there")
  104.     False
  105.     >>> print r.get_header("Not-there")
  106.     None
  107.     >>> r.get_header("Not-there", "default")
  108.     'default'
  109.  
  110.     """
  111.  
  112.  
  113. def test_password_manager(self):
  114.     """
  115.     >>> mgr = urllib2.HTTPPasswordMgr()
  116.     >>> add = mgr.add_password
  117.     >>> add("Some Realm", "http://example.com/", "joe", "password")
  118.     >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
  119.     >>> add("c", "http://example.com/foo", "foo", "ni")
  120.     >>> add("c", "http://example.com/bar", "bar", "nini")
  121.     >>> add("b", "http://example.com/", "first", "blah")
  122.     >>> add("b", "http://example.com/", "second", "spam")
  123.     >>> add("a", "http://example.com", "1", "a")
  124.     >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
  125.     >>> add("Some Realm", "d.example.com", "4", "d")
  126.     >>> add("Some Realm", "e.example.com:3128", "5", "e")
  127.  
  128.     >>> mgr.find_user_password("Some Realm", "example.com")
  129.     ('joe', 'password')
  130.     >>> mgr.find_user_password("Some Realm", "http://example.com")
  131.     ('joe', 'password')
  132.     >>> mgr.find_user_password("Some Realm", "http://example.com/")
  133.     ('joe', 'password')
  134.     >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
  135.     ('joe', 'password')
  136.     >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
  137.     ('joe', 'password')
  138.     >>> mgr.find_user_password("c", "http://example.com/foo")
  139.     ('foo', 'ni')
  140.     >>> mgr.find_user_password("c", "http://example.com/bar")
  141.     ('bar', 'nini')
  142.  
  143.     Actually, this is really undefined ATM
  144. ##     Currently, we use the highest-level path where more than one match:
  145.  
  146. ##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
  147. ##     ('joe', 'password')
  148.  
  149.     Use latest add_password() in case of conflict:
  150.  
  151.     >>> mgr.find_user_password("b", "http://example.com/")
  152.     ('second', 'spam')
  153.  
  154.     No special relationship between a.example.com and example.com:
  155.  
  156.     >>> mgr.find_user_password("a", "http://example.com/")
  157.     ('1', 'a')
  158.     >>> mgr.find_user_password("a", "http://a.example.com/")
  159.     (None, None)
  160.  
  161.     Ports:
  162.  
  163.     >>> mgr.find_user_password("Some Realm", "c.example.com")
  164.     (None, None)
  165.     >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
  166.     ('3', 'c')
  167.     >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
  168.     ('3', 'c')
  169.     >>> mgr.find_user_password("Some Realm", "d.example.com")
  170.     ('4', 'd')
  171.     >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
  172.     ('5', 'e')
  173.  
  174.     """
  175.     pass
  176.  
  177.  
  178. def test_password_manager_default_port(self):
  179.     """
  180.     >>> mgr = urllib2.HTTPPasswordMgr()
  181.     >>> add = mgr.add_password
  182.  
  183.     The point to note here is that we can't guess the default port if there's
  184.     no scheme.  This applies to both add_password and find_user_password.
  185.  
  186.     >>> add("f", "http://g.example.com:80", "10", "j")
  187.     >>> add("g", "http://h.example.com", "11", "k")
  188.     >>> add("h", "i.example.com:80", "12", "l")
  189.     >>> add("i", "j.example.com", "13", "m")
  190.     >>> mgr.find_user_password("f", "g.example.com:100")
  191.     (None, None)
  192.     >>> mgr.find_user_password("f", "g.example.com:80")
  193.     ('10', 'j')
  194.     >>> mgr.find_user_password("f", "g.example.com")
  195.     (None, None)
  196.     >>> mgr.find_user_password("f", "http://g.example.com:100")
  197.     (None, None)
  198.     >>> mgr.find_user_password("f", "http://g.example.com:80")
  199.     ('10', 'j')
  200.     >>> mgr.find_user_password("f", "http://g.example.com")
  201.     ('10', 'j')
  202.     >>> mgr.find_user_password("g", "h.example.com")
  203.     ('11', 'k')
  204.     >>> mgr.find_user_password("g", "h.example.com:80")
  205.     ('11', 'k')
  206.     >>> mgr.find_user_password("g", "http://h.example.com:80")
  207.     ('11', 'k')
  208.     >>> mgr.find_user_password("h", "i.example.com")
  209.     (None, None)
  210.     >>> mgr.find_user_password("h", "i.example.com:80")
  211.     ('12', 'l')
  212.     >>> mgr.find_user_password("h", "http://i.example.com:80")
  213.     ('12', 'l')
  214.     >>> mgr.find_user_password("i", "j.example.com")
  215.     ('13', 'm')
  216.     >>> mgr.find_user_password("i", "j.example.com:80")
  217.     (None, None)
  218.     >>> mgr.find_user_password("i", "http://j.example.com")
  219.     ('13', 'm')
  220.     >>> mgr.find_user_password("i", "http://j.example.com:80")
  221.     (None, None)
  222.  
  223.     """
  224.  
  225. class MockOpener:
  226.     addheaders = []
  227.     def open(self, req, data=None):
  228.         self.req, self.data = req, data
  229.     def error(self, proto, *args):
  230.         self.proto, self.args = proto, args
  231.  
  232. class MockFile:
  233.     def read(self, count=None): pass
  234.     def readline(self, count=None): pass
  235.     def close(self): pass
  236.  
  237. class MockHeaders(dict):
  238.     def getheaders(self, name):
  239.         return self.values()
  240.  
  241. class MockResponse(StringIO.StringIO):
  242.     def __init__(self, code, msg, headers, data, url=None):
  243.         StringIO.StringIO.__init__(self, data)
  244.         self.code, self.msg, self.headers, self.url = code, msg, headers, url
  245.     def info(self):
  246.         return self.headers
  247.     def geturl(self):
  248.         return self.url
  249.  
  250. class MockCookieJar:
  251.     def add_cookie_header(self, request):
  252.         self.ach_req = request
  253.     def extract_cookies(self, response, request):
  254.         self.ec_req, self.ec_r = request, response
  255.  
  256. class FakeMethod:
  257.     def __init__(self, meth_name, action, handle):
  258.         self.meth_name = meth_name
  259.         self.handle = handle
  260.         self.action = action
  261.     def __call__(self, *args):
  262.         return self.handle(self.meth_name, self.action, *args)
  263.  
  264. class MockHandler:
  265.     # useful for testing handler machinery
  266.     # see add_ordered_mock_handlers() docstring
  267.     handler_order = 500
  268.     def __init__(self, methods):
  269.         self._define_methods(methods)
  270.     def _define_methods(self, methods):
  271.         for spec in methods:
  272.             if len(spec) == 2: name, action = spec
  273.             else: name, action = spec, None
  274.             meth = FakeMethod(name, action, self.handle)
  275.             setattr(self.__class__, name, meth)
  276.     def handle(self, fn_name, action, *args, **kwds):
  277.         self.parent.calls.append((self, fn_name, args, kwds))
  278.         if action is None:
  279.             return None
  280.         elif action == "return self":
  281.             return self
  282.         elif action == "return response":
  283.             res = MockResponse(200, "OK", {}, "")
  284.             return res
  285.         elif action == "return request":
  286.             return Request("http://blah/")
  287.         elif action.startswith("error"):
  288.             code = action[action.rfind(" ")+1:]
  289.             try:
  290.                 code = int(code)
  291.             except ValueError:
  292.                 pass
  293.             res = MockResponse(200, "OK", {}, "")
  294.             return self.parent.error("http", args[0], res, code, "", {})
  295.         elif action == "raise":
  296.             raise urllib2.URLError("blah")
  297.         assert False
  298.     def close(self): pass
  299.     def add_parent(self, parent):
  300.         self.parent = parent
  301.         self.parent.calls = []
  302.     def __lt__(self, other):
  303.         if not hasattr(other, "handler_order"):
  304.             # No handler_order, leave in original order.  Yuck.
  305.             return True
  306.         return self.handler_order < other.handler_order
  307.  
  308. def add_ordered_mock_handlers(opener, meth_spec):
  309.     """Create MockHandlers and add them to an OpenerDirector.
  310.  
  311.     meth_spec: list of lists of tuples and strings defining methods to define
  312.     on handlers.  eg:
  313.  
  314.     [["http_error", "ftp_open"], ["http_open"]]
  315.  
  316.     defines methods .http_error() and .ftp_open() on one handler, and
  317.     .http_open() on another.  These methods just record their arguments and
  318.     return None.  Using a tuple instead of a string causes the method to
  319.     perform some action (see MockHandler.handle()), eg:
  320.  
  321.     [["http_error"], [("http_open", "return request")]]
  322.  
  323.     defines .http_error() on one handler (which simply returns None), and
  324.     .http_open() on another handler, which returns a Request object.
  325.  
  326.     """
  327.     handlers = []
  328.     count = 0
  329.     for meths in meth_spec:
  330.         class MockHandlerSubclass(MockHandler): pass
  331.         h = MockHandlerSubclass(meths)
  332.         h.handler_order += count
  333.         h.add_parent(opener)
  334.         count = count + 1
  335.         handlers.append(h)
  336.         opener.add_handler(h)
  337.     return handlers
  338.  
  339. def build_test_opener(*handler_instances):
  340.     opener = OpenerDirector()
  341.     for h in handler_instances:
  342.         opener.add_handler(h)
  343.     return opener
  344.  
  345. class MockHTTPHandler(urllib2.BaseHandler):
  346.     # useful for testing redirections and auth
  347.     # sends supplied headers and code as first response
  348.     # sends 200 OK as second response
  349.     def __init__(self, code, headers):
  350.         self.code = code
  351.         self.headers = headers
  352.         self.reset()
  353.     def reset(self):
  354.         self._count = 0
  355.         self.requests = []
  356.     def http_open(self, req):
  357.         import mimetools, httplib, copy
  358.         from StringIO import StringIO
  359.         self.requests.append(copy.deepcopy(req))
  360.         if self._count == 0:
  361.             self._count = self._count + 1
  362.             name = httplib.responses[self.code]
  363.             msg = mimetools.Message(StringIO(self.headers))
  364.             return self.parent.error(
  365.                 "http", req, MockFile(), self.code, name, msg)
  366.         else:
  367.             self.req = req
  368.             msg = mimetools.Message(StringIO("\r\n\r\n"))
  369.             return MockResponse(200, "OK", msg, "", req.get_full_url())
  370.  
  371. class MockPasswordManager:
  372.     def add_password(self, realm, uri, user, password):
  373.         self.realm = realm
  374.         self.url = uri
  375.         self.user = user
  376.         self.password = password
  377.     def find_user_password(self, realm, authuri):
  378.         self.target_realm = realm
  379.         self.target_url = authuri
  380.         return self.user, self.password
  381.  
  382.  
  383. class OpenerDirectorTests(unittest.TestCase):
  384.  
  385.     def test_add_non_handler(self):
  386.         class NonHandler(object):
  387.             pass
  388.         self.assertRaises(TypeError,
  389.                           OpenerDirector().add_handler, NonHandler())
  390.  
  391.     def test_badly_named_methods(self):
  392.         # test work-around for three methods that accidentally follow the
  393.         # naming conventions for handler methods
  394.         # (*_open() / *_request() / *_response())
  395.  
  396.         # These used to call the accidentally-named methods, causing a
  397.         # TypeError in real code; here, returning self from these mock
  398.         # methods would either cause no exception, or AttributeError.
  399.  
  400.         from urllib2 import URLError
  401.  
  402.         o = OpenerDirector()
  403.         meth_spec = [
  404.             [("do_open", "return self"), ("proxy_open", "return self")],
  405.             [("redirect_request", "return self")],
  406.             ]
  407.         handlers = add_ordered_mock_handlers(o, meth_spec)
  408.         o.add_handler(urllib2.UnknownHandler())
  409.         for scheme in "do", "proxy", "redirect":
  410.             self.assertRaises(URLError, o.open, scheme+"://example.com/")
  411.  
  412.     def test_handled(self):
  413.         # handler returning non-None means no more handlers will be called
  414.         o = OpenerDirector()
  415.         meth_spec = [
  416.             ["http_open", "ftp_open", "http_error_302"],
  417.             ["ftp_open"],
  418.             [("http_open", "return self")],
  419.             [("http_open", "return self")],
  420.             ]
  421.         handlers = add_ordered_mock_handlers(o, meth_spec)
  422.  
  423.         req = Request("http://example.com/")
  424.         r = o.open(req)
  425.         # Second .http_open() gets called, third doesn't, since second returned
  426.         # non-None.  Handlers without .http_open() never get any methods called
  427.         # on them.
  428.         # In fact, second mock handler defining .http_open() returns self
  429.         # (instead of response), which becomes the OpenerDirector's return
  430.         # value.
  431.         self.assertEqual(r, handlers[2])
  432.         calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
  433.         for expected, got in zip(calls, o.calls):
  434.             handler, name, args, kwds = got
  435.             self.assertEqual((handler, name), expected)
  436.             self.assertEqual(args, (req,))
  437.  
  438.     def test_handler_order(self):
  439.         o = OpenerDirector()
  440.         handlers = []
  441.         for meths, handler_order in [
  442.             ([("http_open", "return self")], 500),
  443.             (["http_open"], 0),
  444.             ]:
  445.             class MockHandlerSubclass(MockHandler): pass
  446.             h = MockHandlerSubclass(meths)
  447.             h.handler_order = handler_order
  448.             handlers.append(h)
  449.             o.add_handler(h)
  450.  
  451.         r = o.open("http://example.com/")
  452.         # handlers called in reverse order, thanks to their sort order
  453.         self.assertEqual(o.calls[0][0], handlers[1])
  454.         self.assertEqual(o.calls[1][0], handlers[0])
  455.  
  456.     def test_raise(self):
  457.         # raising URLError stops processing of request
  458.         o = OpenerDirector()
  459.         meth_spec = [
  460.             [("http_open", "raise")],
  461.             [("http_open", "return self")],
  462.             ]
  463.         handlers = add_ordered_mock_handlers(o, meth_spec)
  464.  
  465.         req = Request("http://example.com/")
  466.         self.assertRaises(urllib2.URLError, o.open, req)
  467.         self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
  468.  
  469. ##     def test_error(self):
  470. ##         # XXX this doesn't actually seem to be used in standard library,
  471. ##         #  but should really be tested anyway...
  472.  
  473.     def test_http_error(self):
  474.         # XXX http_error_default
  475.         # http errors are a special case
  476.         o = OpenerDirector()
  477.         meth_spec = [
  478.             [("http_open", "error 302")],
  479.             [("http_error_400", "raise"), "http_open"],
  480.             [("http_error_302", "return response"), "http_error_303",
  481.              "http_error"],
  482.             [("http_error_302")],
  483.             ]
  484.         handlers = add_ordered_mock_handlers(o, meth_spec)
  485.  
  486.         class Unknown:
  487.             def __eq__(self, other): return True
  488.  
  489.         req = Request("http://example.com/")
  490.         r = o.open(req)
  491.         assert len(o.calls) == 2
  492.         calls = [(handlers[0], "http_open", (req,)),
  493.                  (handlers[2], "http_error_302",
  494.                   (req, Unknown(), 302, "", {}))]
  495.         for expected, got in zip(calls, o.calls):
  496.             handler, method_name, args = expected
  497.             self.assertEqual((handler, method_name), got[:2])
  498.             self.assertEqual(args, got[2])
  499.  
  500.     def test_processors(self):
  501.         # *_request / *_response methods get called appropriately
  502.         o = OpenerDirector()
  503.         meth_spec = [
  504.             [("http_request", "return request"),
  505.              ("http_response", "return response")],
  506.             [("http_request", "return request"),
  507.              ("http_response", "return response")],
  508.             ]
  509.         handlers = add_ordered_mock_handlers(o, meth_spec)
  510.  
  511.         req = Request("http://example.com/")
  512.         r = o.open(req)
  513.         # processor methods are called on *all* handlers that define them,
  514.         # not just the first handler that handles the request
  515.         calls = [
  516.             (handlers[0], "http_request"), (handlers[1], "http_request"),
  517.             (handlers[0], "http_response"), (handlers[1], "http_response")]
  518.  
  519.         for i, (handler, name, args, kwds) in enumerate(o.calls):
  520.             if i < 2:
  521.                 # *_request
  522.                 self.assertEqual((handler, name), calls[i])
  523.                 self.assertEqual(len(args), 1)
  524.                 self.assert_(isinstance(args[0], Request))
  525.             else:
  526.                 # *_response
  527.                 self.assertEqual((handler, name), calls[i])
  528.                 self.assertEqual(len(args), 2)
  529.                 self.assert_(isinstance(args[0], Request))
  530.                 # response from opener.open is None, because there's no
  531.                 # handler that defines http_open to handle it
  532.                 self.assert_(args[1] is None or
  533.                              isinstance(args[1], MockResponse))
  534.  
  535.  
  536. def sanepathname2url(path):
  537.     import urllib
  538.     urlpath = urllib.pathname2url(path)
  539.     if os.name == "nt" and urlpath.startswith("///"):
  540.         urlpath = urlpath[2:]
  541.     # XXX don't ask me about the mac...
  542.     return urlpath
  543.  
  544. class HandlerTests(unittest.TestCase):
  545.  
  546.     def test_ftp(self):
  547.         class MockFTPWrapper:
  548.             def __init__(self, data): self.data = data
  549.             def retrfile(self, filename, filetype):
  550.                 self.filename, self.filetype = filename, filetype
  551.                 return StringIO.StringIO(self.data), len(self.data)
  552.  
  553.         class NullFTPHandler(urllib2.FTPHandler):
  554.             def __init__(self, data): self.data = data
  555.             def connect_ftp(self, user, passwd, host, port, dirs,
  556.                             timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
  557.                 self.user, self.passwd = user, passwd
  558.                 self.host, self.port = host, port
  559.                 self.dirs = dirs
  560.                 self.ftpwrapper = MockFTPWrapper(self.data)
  561.                 return self.ftpwrapper
  562.  
  563.         import ftplib
  564.         data = "rheum rhaponicum"
  565.         h = NullFTPHandler(data)
  566.         o = h.parent = MockOpener()
  567.  
  568.         for url, host, port, type_, dirs, filename, mimetype in [
  569.             ("ftp://localhost/foo/bar/baz.html",
  570.              "localhost", ftplib.FTP_PORT, "I",
  571.              ["foo", "bar"], "baz.html", "text/html"),
  572.             ("ftp://localhost:80/foo/bar/",
  573.              "localhost", 80, "D",
  574.              ["foo", "bar"], "", None),
  575.             ("ftp://localhost/baz.gif;type=a",
  576.              "localhost", ftplib.FTP_PORT, "A",
  577.              [], "baz.gif", None),  # XXX really this should guess image/gif
  578.             ]:
  579.             req = Request(url)
  580.             req.timeout = None
  581.             r = h.ftp_open(req)
  582.             # ftp authentication not yet implemented by FTPHandler
  583.             self.assert_(h.user == h.passwd == "")
  584.             self.assertEqual(h.host, socket.gethostbyname(host))
  585.             self.assertEqual(h.port, port)
  586.             self.assertEqual(h.dirs, dirs)
  587.             self.assertEqual(h.ftpwrapper.filename, filename)
  588.             self.assertEqual(h.ftpwrapper.filetype, type_)
  589.             headers = r.info()
  590.             self.assertEqual(headers.get("Content-type"), mimetype)
  591.             self.assertEqual(int(headers["Content-length"]), len(data))
  592.  
  593.     def test_file(self):
  594.         import rfc822, socket
  595.         h = urllib2.FileHandler()
  596.         o = h.parent = MockOpener()
  597.  
  598.         TESTFN = test_support.TESTFN
  599.         urlpath = sanepathname2url(os.path.abspath(TESTFN))
  600.         towrite = "hello, world\n"
  601.         urls = [
  602.             "file://localhost%s" % urlpath,
  603.             "file://%s" % urlpath,
  604.             "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
  605.             ]
  606.         try:
  607.             localaddr = socket.gethostbyname(socket.gethostname())
  608.         except socket.gaierror:
  609.             localaddr = ''
  610.         if localaddr:
  611.             urls.append("file://%s%s" % (localaddr, urlpath))
  612.  
  613.         for url in urls:
  614.             f = open(TESTFN, "wb")
  615.             try:
  616.                 try:
  617.                     f.write(towrite)
  618.                 finally:
  619.                     f.close()
  620.  
  621.                 r = h.file_open(Request(url))
  622.                 try:
  623.                     data = r.read()
  624.                     headers = r.info()
  625.                     newurl = r.geturl()
  626.                 finally:
  627.                     r.close()
  628.                 stats = os.stat(TESTFN)
  629.                 modified = rfc822.formatdate(stats.st_mtime)
  630.             finally:
  631.                 os.remove(TESTFN)
  632.             self.assertEqual(data, towrite)
  633.             self.assertEqual(headers["Content-type"], "text/plain")
  634.             self.assertEqual(headers["Content-length"], "13")
  635.             self.assertEqual(headers["Last-modified"], modified)
  636.  
  637.         for url in [
  638.             "file://localhost:80%s" % urlpath,
  639.             "file:///file_does_not_exist.txt",
  640.             "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
  641.                                    os.getcwd(), TESTFN),
  642.             "file://somerandomhost.ontheinternet.com%s/%s" %
  643.             (os.getcwd(), TESTFN),
  644.             ]:
  645.             try:
  646.                 f = open(TESTFN, "wb")
  647.                 try:
  648.                     f.write(towrite)
  649.                 finally:
  650.                     f.close()
  651.  
  652.                 self.assertRaises(urllib2.URLError,
  653.                                   h.file_open, Request(url))
  654.             finally:
  655.                 os.remove(TESTFN)
  656.  
  657.         h = urllib2.FileHandler()
  658.         o = h.parent = MockOpener()
  659.         # XXXX why does // mean ftp (and /// mean not ftp!), and where
  660.         #  is file: scheme specified?  I think this is really a bug, and
  661.         #  what was intended was to distinguish between URLs like:
  662.         # file:/blah.txt (a file)
  663.         # file://localhost/blah.txt (a file)
  664.         # file:///blah.txt (a file)
  665.         # file://ftp.example.com/blah.txt (an ftp URL)
  666.         for url, ftp in [
  667.             ("file://ftp.example.com//foo.txt", True),
  668.             ("file://ftp.example.com///foo.txt", False),
  669. # XXXX bug: fails with OSError, should be URLError
  670.             ("file://ftp.example.com/foo.txt", False),
  671.             ]:
  672.             req = Request(url)
  673.             try:
  674.                 h.file_open(req)
  675.             # XXXX remove OSError when bug fixed
  676.             except (urllib2.URLError, OSError):
  677.                 self.assert_(not ftp)
  678.             else:
  679.                 self.assert_(o.req is req)
  680.                 self.assertEqual(req.type, "ftp")
  681.  
  682.     def test_http(self):
  683.         class MockHTTPResponse:
  684.             def __init__(self, fp, msg, status, reason):
  685.                 self.fp = fp
  686.                 self.msg = msg
  687.                 self.status = status
  688.                 self.reason = reason
  689.             def read(self):
  690.                 return ''
  691.         class MockHTTPClass:
  692.             def __init__(self):
  693.                 self.req_headers = []
  694.                 self.data = None
  695.                 self.raise_on_endheaders = False
  696.             def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
  697.                 self.host = host
  698.                 self.timeout = timeout
  699.                 return self
  700.             def set_debuglevel(self, level):
  701.                 self.level = level
  702.             def request(self, method, url, body=None, headers={}):
  703.                 self.method = method
  704.                 self.selector = url
  705.                 self.req_headers += headers.items()
  706.                 self.req_headers.sort()
  707.                 if body:
  708.                     self.data = body
  709.                 if self.raise_on_endheaders:
  710.                     import socket
  711.                     raise socket.error()
  712.             def getresponse(self):
  713.                 return MockHTTPResponse(MockFile(), {}, 200, "OK")
  714.  
  715.         h = urllib2.AbstractHTTPHandler()
  716.         o = h.parent = MockOpener()
  717.  
  718.         url = "http://example.com/"
  719.         for method, data in [("GET", None), ("POST", "blah")]:
  720.             req = Request(url, data, {"Foo": "bar"})
  721.             req.timeout = None
  722.             req.add_unredirected_header("Spam", "eggs")
  723.             http = MockHTTPClass()
  724.             r = h.do_open(http, req)
  725.  
  726.             # result attributes
  727.             r.read; r.readline  # wrapped MockFile methods
  728.             r.info; r.geturl  # addinfourl methods
  729.             r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
  730.             hdrs = r.info()
  731.             hdrs.get; hdrs.has_key  # r.info() gives dict from .getreply()
  732.             self.assertEqual(r.geturl(), url)
  733.  
  734.             self.assertEqual(http.host, "example.com")
  735.             self.assertEqual(http.level, 0)
  736.             self.assertEqual(http.method, method)
  737.             self.assertEqual(http.selector, "/")
  738.             self.assertEqual(http.req_headers,
  739.                              [("Connection", "close"),
  740.                               ("Foo", "bar"), ("Spam", "eggs")])
  741.             self.assertEqual(http.data, data)
  742.  
  743.         # check socket.error converted to URLError
  744.         http.raise_on_endheaders = True
  745.         self.assertRaises(urllib2.URLError, h.do_open, http, req)
  746.  
  747.         # check adding of standard headers
  748.         o.addheaders = [("Spam", "eggs")]
  749.         for data in "", None:  # POST, GET
  750.             req = Request("http://example.com/", data)
  751.             r = MockResponse(200, "OK", {}, "")
  752.             newreq = h.do_request_(req)
  753.             if data is None:  # GET
  754.                 self.assert_("Content-length" not in req.unredirected_hdrs)
  755.                 self.assert_("Content-type" not in req.unredirected_hdrs)
  756.             else:  # POST
  757.                 self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
  758.                 self.assertEqual(req.unredirected_hdrs["Content-type"],
  759.                              "application/x-www-form-urlencoded")
  760.             # XXX the details of Host could be better tested
  761.             self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
  762.             self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
  763.  
  764.             # don't clobber existing headers
  765.             req.add_unredirected_header("Content-length", "foo")
  766.             req.add_unredirected_header("Content-type", "bar")
  767.             req.add_unredirected_header("Host", "baz")
  768.             req.add_unredirected_header("Spam", "foo")
  769.             newreq = h.do_request_(req)
  770.             self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
  771.             self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
  772.             self.assertEqual(req.unredirected_hdrs["Host"], "baz")
  773.             self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
  774.  
  775.     def test_http_doubleslash(self):
  776.         # Checks that the presence of an unnecessary double slash in a url doesn't break anything
  777.         # Previously, a double slash directly after the host could cause incorrect parsing of the url
  778.         h = urllib2.AbstractHTTPHandler()
  779.         o = h.parent = MockOpener()
  780.  
  781.         data = ""
  782.         ds_urls = [
  783.             "http://example.com/foo/bar/baz.html",
  784.             "http://example.com//foo/bar/baz.html",
  785.             "http://example.com/foo//bar/baz.html",
  786.             "http://example.com/foo/bar//baz.html",
  787.         ]
  788.  
  789.         for ds_url in ds_urls:
  790.             ds_req = Request(ds_url, data)
  791.  
  792.             # Check whether host is determined correctly if there is no proxy
  793.             np_ds_req = h.do_request_(ds_req)
  794.             self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
  795.  
  796.             # Check whether host is determined correctly if there is a proxy
  797.             ds_req.set_proxy("someproxy:3128",None)
  798.             p_ds_req = h.do_request_(ds_req)
  799.             self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
  800.  
  801.     def test_errors(self):
  802.         h = urllib2.HTTPErrorProcessor()
  803.         o = h.parent = MockOpener()
  804.  
  805.         url = "http://example.com/"
  806.         req = Request(url)
  807.         # all 2xx are passed through
  808.         r = MockResponse(200, "OK", {}, "", url)
  809.         newr = h.http_response(req, r)
  810.         self.assert_(r is newr)
  811.         self.assert_(not hasattr(o, "proto"))  # o.error not called
  812.         r = MockResponse(202, "Accepted", {}, "", url)
  813.         newr = h.http_response(req, r)
  814.         self.assert_(r is newr)
  815.         self.assert_(not hasattr(o, "proto"))  # o.error not called
  816.         r = MockResponse(206, "Partial content", {}, "", url)
  817.         newr = h.http_response(req, r)
  818.         self.assert_(r is newr)
  819.         self.assert_(not hasattr(o, "proto"))  # o.error not called
  820.         # anything else calls o.error (and MockOpener returns None, here)
  821.         r = MockResponse(502, "Bad gateway", {}, "", url)
  822.         self.assert_(h.http_response(req, r) is None)
  823.         self.assertEqual(o.proto, "http")  # o.error called
  824.         self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
  825.  
  826.     def test_cookies(self):
  827.         cj = MockCookieJar()
  828.         h = urllib2.HTTPCookieProcessor(cj)
  829.         o = h.parent = MockOpener()
  830.  
  831.         req = Request("http://example.com/")
  832.         r = MockResponse(200, "OK", {}, "")
  833.         newreq = h.http_request(req)
  834.         self.assert_(cj.ach_req is req is newreq)
  835.         self.assertEquals(req.get_origin_req_host(), "example.com")
  836.         self.assert_(not req.is_unverifiable())
  837.         newr = h.http_response(req, r)
  838.         self.assert_(cj.ec_req is req)
  839.         self.assert_(cj.ec_r is r is newr)
  840.  
  841.     def test_redirect(self):
  842.         from_url = "http://example.com/a.html"
  843.         to_url = "http://example.com/b.html"
  844.         h = urllib2.HTTPRedirectHandler()
  845.         o = h.parent = MockOpener()
  846.  
  847.         # ordinary redirect behaviour
  848.         for code in 301, 302, 303, 307:
  849.             for data in None, "blah\nblah\n":
  850.                 method = getattr(h, "http_error_%s" % code)
  851.                 req = Request(from_url, data)
  852.                 req.add_header("Nonsense", "viking=withhold")
  853.                 if data is not None:
  854.                     req.add_header("Content-Length", str(len(data)))
  855.                 req.add_unredirected_header("Spam", "spam")
  856.                 try:
  857.                     method(req, MockFile(), code, "Blah",
  858.                            MockHeaders({"location": to_url}))
  859.                 except urllib2.HTTPError:
  860.                     # 307 in response to POST requires user OK
  861.                     self.assert_(code == 307 and data is not None)
  862.                 self.assertEqual(o.req.get_full_url(), to_url)
  863.                 try:
  864.                     self.assertEqual(o.req.get_method(), "GET")
  865.                 except AttributeError:
  866.                     self.assert_(not o.req.has_data())
  867.  
  868.                 # now it's a GET, there should not be headers regarding content
  869.                 # (possibly dragged from before being a POST)
  870.                 headers = [x.lower() for x in o.req.headers]
  871.                 self.assertTrue("content-length" not in headers)
  872.                 self.assertTrue("content-type" not in headers)
  873.  
  874.                 self.assertEqual(o.req.headers["Nonsense"],
  875.                                  "viking=withhold")
  876.                 self.assert_("Spam" not in o.req.headers)
  877.                 self.assert_("Spam" not in o.req.unredirected_hdrs)
  878.  
  879.         # loop detection
  880.         req = Request(from_url)
  881.         def redirect(h, req, url=to_url):
  882.             h.http_error_302(req, MockFile(), 302, "Blah",
  883.                              MockHeaders({"location": url}))
  884.         # Note that the *original* request shares the same record of
  885.         # redirections with the sub-requests caused by the redirections.
  886.  
  887.         # detect infinite loop redirect of a URL to itself
  888.         req = Request(from_url, origin_req_host="example.com")
  889.         count = 0
  890.         try:
  891.             while 1:
  892.                 redirect(h, req, "http://example.com/")
  893.                 count = count + 1
  894.         except urllib2.HTTPError:
  895.             # don't stop until max_repeats, because cookies may introduce state
  896.             self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
  897.  
  898.         # detect endless non-repeating chain of redirects
  899.         req = Request(from_url, origin_req_host="example.com")
  900.         count = 0
  901.         try:
  902.             while 1:
  903.                 redirect(h, req, "http://example.com/%d" % count)
  904.                 count = count + 1
  905.         except urllib2.HTTPError:
  906.             self.assertEqual(count,
  907.                              urllib2.HTTPRedirectHandler.max_redirections)
  908.  
  909.     def test_cookie_redirect(self):
  910.         # cookies shouldn't leak into redirected requests
  911.         from cookielib import CookieJar
  912.  
  913.         from test.test_cookielib import interact_netscape
  914.  
  915.         cj = CookieJar()
  916.         interact_netscape(cj, "http://www.example.com/", "spam=eggs")
  917.         hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
  918.         hdeh = urllib2.HTTPDefaultErrorHandler()
  919.         hrh = urllib2.HTTPRedirectHandler()
  920.         cp = urllib2.HTTPCookieProcessor(cj)
  921.         o = build_test_opener(hh, hdeh, hrh, cp)
  922.         o.open("http://www.example.com/")
  923.         self.assert_(not hh.req.has_header("Cookie"))
  924.  
  925.     def test_proxy(self):
  926.         o = OpenerDirector()
  927.         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
  928.         o.add_handler(ph)
  929.         meth_spec = [
  930.             [("http_open", "return response")]
  931.             ]
  932.         handlers = add_ordered_mock_handlers(o, meth_spec)
  933.  
  934.         req = Request("http://acme.example.com/")
  935.         self.assertEqual(req.get_host(), "acme.example.com")
  936.         r = o.open(req)
  937.         self.assertEqual(req.get_host(), "proxy.example.com:3128")
  938.  
  939.         self.assertEqual([(handlers[0], "http_open")],
  940.                          [tup[0:2] for tup in o.calls])
  941.  
  942.     def test_basic_auth(self, quote_char='"'):
  943.         opener = OpenerDirector()
  944.         password_manager = MockPasswordManager()
  945.         auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
  946.         realm = "ACME Widget Store"
  947.         http_handler = MockHTTPHandler(
  948.             401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
  949.             (quote_char, realm, quote_char) )
  950.         opener.add_handler(auth_handler)
  951.         opener.add_handler(http_handler)
  952.         self._test_basic_auth(opener, auth_handler, "Authorization",
  953.                               realm, http_handler, password_manager,
  954.                               "http://acme.example.com/protected",
  955.                               "http://acme.example.com/protected",
  956.                               )
  957.  
  958.     def test_basic_auth_with_single_quoted_realm(self):
  959.         self.test_basic_auth(quote_char="'")
  960.  
  961.     def test_proxy_basic_auth(self):
  962.         opener = OpenerDirector()
  963.         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
  964.         opener.add_handler(ph)
  965.         password_manager = MockPasswordManager()
  966.         auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
  967.         realm = "ACME Networks"
  968.         http_handler = MockHTTPHandler(
  969.             407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
  970.         opener.add_handler(auth_handler)
  971.         opener.add_handler(http_handler)
  972.         self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
  973.                               realm, http_handler, password_manager,
  974.                               "http://acme.example.com:3128/protected",
  975.                               "proxy.example.com:3128",
  976.                               )
  977.  
  978.     def test_basic_and_digest_auth_handlers(self):
  979.         # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
  980.         # response (http://python.org/sf/1479302), where it should instead
  981.         # return None to allow another handler (especially
  982.         # HTTPBasicAuthHandler) to handle the response.
  983.  
  984.         # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
  985.         # try digest first (since it's the strongest auth scheme), so we record
  986.         # order of calls here to check digest comes first:
  987.         class RecordingOpenerDirector(OpenerDirector):
  988.             def __init__(self):
  989.                 OpenerDirector.__init__(self)
  990.                 self.recorded = []
  991.             def record(self, info):
  992.                 self.recorded.append(info)
  993.         class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
  994.             def http_error_401(self, *args, **kwds):
  995.                 self.parent.record("digest")
  996.                 urllib2.HTTPDigestAuthHandler.http_error_401(self,
  997.                                                              *args, **kwds)
  998.         class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
  999.             def http_error_401(self, *args, **kwds):
  1000.                 self.parent.record("basic")
  1001.                 urllib2.HTTPBasicAuthHandler.http_error_401(self,
  1002.                                                             *args, **kwds)
  1003.  
  1004.         opener = RecordingOpenerDirector()
  1005.         password_manager = MockPasswordManager()
  1006.         digest_handler = TestDigestAuthHandler(password_manager)
  1007.         basic_handler = TestBasicAuthHandler(password_manager)
  1008.         realm = "ACME Networks"
  1009.         http_handler = MockHTTPHandler(
  1010.             401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
  1011.         opener.add_handler(basic_handler)
  1012.         opener.add_handler(digest_handler)
  1013.         opener.add_handler(http_handler)
  1014.  
  1015.         # check basic auth isn't blocked by digest handler failing
  1016.         self._test_basic_auth(opener, basic_handler, "Authorization",
  1017.                               realm, http_handler, password_manager,
  1018.                               "http://acme.example.com/protected",
  1019.                               "http://acme.example.com/protected",
  1020.                               )
  1021.         # check digest was tried before basic (twice, because
  1022.         # _test_basic_auth called .open() twice)
  1023.         self.assertEqual(opener.recorded, ["digest", "basic"]*2)
  1024.  
  1025.     def _test_basic_auth(self, opener, auth_handler, auth_header,
  1026.                          realm, http_handler, password_manager,
  1027.                          request_url, protected_url):
  1028.         import base64
  1029.         user, password = "wile", "coyote"
  1030.  
  1031.         # .add_password() fed through to password manager
  1032.         auth_handler.add_password(realm, request_url, user, password)
  1033.         self.assertEqual(realm, password_manager.realm)
  1034.         self.assertEqual(request_url, password_manager.url)
  1035.         self.assertEqual(user, password_manager.user)
  1036.         self.assertEqual(password, password_manager.password)
  1037.  
  1038.         r = opener.open(request_url)
  1039.  
  1040.         # should have asked the password manager for the username/password
  1041.         self.assertEqual(password_manager.target_realm, realm)
  1042.         self.assertEqual(password_manager.target_url, protected_url)
  1043.  
  1044.         # expect one request without authorization, then one with
  1045.         self.assertEqual(len(http_handler.requests), 2)
  1046.         self.assertFalse(http_handler.requests[0].has_header(auth_header))
  1047.         userpass = '%s:%s' % (user, password)
  1048.         auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
  1049.         self.assertEqual(http_handler.requests[1].get_header(auth_header),
  1050.                          auth_hdr_value)
  1051.  
  1052.         # if the password manager can't find a password, the handler won't
  1053.         # handle the HTTP auth error
  1054.         password_manager.user = password_manager.password = None
  1055.         http_handler.reset()
  1056.         r = opener.open(request_url)
  1057.         self.assertEqual(len(http_handler.requests), 1)
  1058.         self.assertFalse(http_handler.requests[0].has_header(auth_header))
  1059.  
  1060.  
  1061. class MiscTests(unittest.TestCase):
  1062.  
  1063.     def test_build_opener(self):
  1064.         class MyHTTPHandler(urllib2.HTTPHandler): pass
  1065.         class FooHandler(urllib2.BaseHandler):
  1066.             def foo_open(self): pass
  1067.         class BarHandler(urllib2.BaseHandler):
  1068.             def bar_open(self): pass
  1069.  
  1070.         build_opener = urllib2.build_opener
  1071.  
  1072.         o = build_opener(FooHandler, BarHandler)
  1073.         self.opener_has_handler(o, FooHandler)
  1074.         self.opener_has_handler(o, BarHandler)
  1075.  
  1076.         # can take a mix of classes and instances
  1077.         o = build_opener(FooHandler, BarHandler())
  1078.         self.opener_has_handler(o, FooHandler)
  1079.         self.opener_has_handler(o, BarHandler)
  1080.  
  1081.         # subclasses of default handlers override default handlers
  1082.         o = build_opener(MyHTTPHandler)
  1083.         self.opener_has_handler(o, MyHTTPHandler)
  1084.  
  1085.         # a particular case of overriding: default handlers can be passed
  1086.         # in explicitly
  1087.         o = build_opener()
  1088.         self.opener_has_handler(o, urllib2.HTTPHandler)
  1089.         o = build_opener(urllib2.HTTPHandler)
  1090.         self.opener_has_handler(o, urllib2.HTTPHandler)
  1091.         o = build_opener(urllib2.HTTPHandler())
  1092.         self.opener_has_handler(o, urllib2.HTTPHandler)
  1093.  
  1094.         # Issue2670: multiple handlers sharing the same base class
  1095.         class MyOtherHTTPHandler(urllib2.HTTPHandler): pass
  1096.         o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
  1097.         self.opener_has_handler(o, MyHTTPHandler)
  1098.         self.opener_has_handler(o, MyOtherHTTPHandler)
  1099.  
  1100.     def opener_has_handler(self, opener, handler_class):
  1101.         for h in opener.handlers:
  1102.             if h.__class__ == handler_class:
  1103.                 break
  1104.         else:
  1105.             self.assert_(False)
  1106.  
  1107.  
  1108. def test_main(verbose=None):
  1109.     from test import test_urllib2
  1110.     test_support.run_doctest(test_urllib2, verbose)
  1111.     test_support.run_doctest(urllib2, verbose)
  1112.     tests = (TrivialTests,
  1113.              OpenerDirectorTests,
  1114.              HandlerTests,
  1115.              MiscTests)
  1116.     test_support.run_unittest(*tests)
  1117.  
  1118. if __name__ == "__main__":
  1119.     test_main(verbose=True)
  1120.