home *** CD-ROM | disk | FTP | other *** search
/ Maximum CD 2011 June / maximum-cd-2011-06.iso / DiscContents / LibO_3.3.1_Win_x86_install_multi.exe / libreoffice1.cab / test_urllib2net.py < prev    next >
Encoding:
Python Source  |  2011-02-15  |  9.1 KB  |  266 lines

  1. #!/usr/bin/env python
  2.  
  3. import unittest
  4. from test import test_support
  5. from test.test_urllib2 import sanepathname2url
  6.  
  7. import socket
  8. import urllib2
  9. import sys
  10. import os
  11. import mimetools
  12.  
  13.  
  14. def _retry_thrice(func, exc, *args, **kwargs):
  15.     for i in range(3):
  16.         try:
  17.             return func(*args, **kwargs)
  18.         except exc, last_exc:
  19.             continue
  20.         except:
  21.             raise
  22.     raise last_exc
  23.  
  24. def _wrap_with_retry_thrice(func, exc):
  25.     def wrapped(*args, **kwargs):
  26.         return _retry_thrice(func, exc, *args, **kwargs)
  27.     return wrapped
  28.  
  29. # Connecting to remote hosts is flaky.  Make it more robust by retrying
  30. # the connection several times.
  31. _urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
  32.  
  33.  
  34. class AuthTests(unittest.TestCase):
  35.     """Tests urllib2 authentication features."""
  36.  
  37. ## Disabled at the moment since there is no page under python.org which
  38. ## could be used to HTTP authentication.
  39. #
  40. #    def test_basic_auth(self):
  41. #        import httplib
  42. #
  43. #        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
  44. #        test_hostport = "www.python.org"
  45. #        test_realm = 'Test Realm'
  46. #        test_user = 'test.test_urllib2net'
  47. #        test_password = 'blah'
  48. #
  49. #        # failure
  50. #        try:
  51. #            _urlopen_with_retry(test_url)
  52. #        except urllib2.HTTPError, exc:
  53. #            self.assertEqual(exc.code, 401)
  54. #        else:
  55. #            self.fail("urlopen() should have failed with 401")
  56. #
  57. #        # success
  58. #        auth_handler = urllib2.HTTPBasicAuthHandler()
  59. #        auth_handler.add_password(test_realm, test_hostport,
  60. #                                  test_user, test_password)
  61. #        opener = urllib2.build_opener(auth_handler)
  62. #        f = opener.open('http://localhost/')
  63. #        response = _urlopen_with_retry("http://www.python.org/")
  64. #
  65. #        # The 'userinfo' URL component is deprecated by RFC 3986 for security
  66. #        # reasons, let's not implement it!  (it's already implemented for proxy
  67. #        # specification strings (that is, URLs or authorities specifying a
  68. #        # proxy), so we must keep that)
  69. #        self.assertRaises(httplib.InvalidURL,
  70. #                          urllib2.urlopen, "http://evil:thing@example.com")
  71.  
  72.  
  73. class CloseSocketTest(unittest.TestCase):
  74.  
  75.     def test_close(self):
  76.         import socket, httplib, gc
  77.  
  78.         # calling .close() on urllib2's response objects should close the
  79.         # underlying socket
  80.  
  81.         # delve deep into response to fetch socket._socketobject
  82.         response = _urlopen_with_retry("http://www.python.org/")
  83.         abused_fileobject = response.fp
  84.         self.assert_(abused_fileobject.__class__ is socket._fileobject)
  85.         httpresponse = abused_fileobject._sock
  86.         self.assert_(httpresponse.__class__ is httplib.HTTPResponse)
  87.         fileobject = httpresponse.fp
  88.         self.assert_(fileobject.__class__ is socket._fileobject)
  89.  
  90.         self.assert_(not fileobject.closed)
  91.         response.close()
  92.         self.assert_(fileobject.closed)
  93.  
  94. class OtherNetworkTests(unittest.TestCase):
  95.     def setUp(self):
  96.         if 0:  # for debugging
  97.             import logging
  98.             logger = logging.getLogger("test_urllib2net")
  99.             logger.addHandler(logging.StreamHandler())
  100.  
  101.     # XXX The rest of these tests aren't very good -- they don't check much.
  102.     # They do sometimes catch some major disasters, though.
  103.  
  104.     def test_ftp(self):
  105.         urls = [
  106.             'ftp://ftp.kernel.org/pub/linux/kernel/README',
  107.             'ftp://ftp.kernel.org/pub/linux/kernel/non-existant-file',
  108.             #'ftp://ftp.kernel.org/pub/leenox/kernel/test',
  109.             'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
  110.                 '/research-reports/00README-Legal-Rules-Regs',
  111.             ]
  112.         self._test_urls(urls, self._extra_handlers())
  113.  
  114.     def test_file(self):
  115.         TESTFN = test_support.TESTFN
  116.         f = open(TESTFN, 'w')
  117.         try:
  118.             f.write('hi there\n')
  119.             f.close()
  120.             urls = [
  121.                 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
  122.                 ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
  123.                 ]
  124.             self._test_urls(urls, self._extra_handlers(), retry=True)
  125.         finally:
  126.             os.remove(TESTFN)
  127.  
  128.     # XXX Following test depends on machine configurations that are internal
  129.     # to CNRI.  Need to set up a public server with the right authentication
  130.     # configuration for test purposes.
  131.  
  132. ##     def test_cnri(self):
  133. ##         if socket.gethostname() == 'bitdiddle':
  134. ##             localhost = 'bitdiddle.cnri.reston.va.us'
  135. ##         elif socket.gethostname() == 'bitdiddle.concentric.net':
  136. ##             localhost = 'localhost'
  137. ##         else:
  138. ##             localhost = None
  139. ##         if localhost is not None:
  140. ##             urls = [
  141. ##                 'file://%s/etc/passwd' % localhost,
  142. ##                 'http://%s/simple/' % localhost,
  143. ##                 'http://%s/digest/' % localhost,
  144. ##                 'http://%s/not/found.h' % localhost,
  145. ##                 ]
  146.  
  147. ##             bauth = HTTPBasicAuthHandler()
  148. ##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
  149. ##                                'password')
  150. ##             dauth = HTTPDigestAuthHandler()
  151. ##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
  152. ##                                'password')
  153.  
  154. ##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
  155.  
  156.     def _test_urls(self, urls, handlers, retry=True):
  157.         import socket
  158.         import time
  159.         import logging
  160.         debug = logging.getLogger("test_urllib2").debug
  161.  
  162.         urlopen = urllib2.build_opener(*handlers).open
  163.         if retry:
  164.             urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
  165.  
  166.         for url in urls:
  167.             if isinstance(url, tuple):
  168.                 url, req, expected_err = url
  169.             else:
  170.                 req = expected_err = None
  171.             debug(url)
  172.             try:
  173.                 f = urlopen(url, req)
  174.             except EnvironmentError, err:
  175.                 debug(err)
  176.                 if expected_err:
  177.                     msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
  178.                            (expected_err, url, req, type(err), err))
  179.                     self.assert_(isinstance(err, expected_err), msg)
  180.             else:
  181.                 with test_support.transient_internet():
  182.                     buf = f.read()
  183.                 f.close()
  184.                 debug("read %d bytes" % len(buf))
  185.             debug("******** next url coming up...")
  186.             time.sleep(0.1)
  187.  
  188.     def _extra_handlers(self):
  189.         handlers = []
  190.  
  191.         cfh = urllib2.CacheFTPHandler()
  192.         cfh.setTimeout(1)
  193.         handlers.append(cfh)
  194.  
  195.         return handlers
  196.  
  197.  
  198. class TimeoutTest(unittest.TestCase):
  199.     def test_http_basic(self):
  200.         self.assertTrue(socket.getdefaulttimeout() is None)
  201.         u = _urlopen_with_retry("http://www.python.org")
  202.         self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
  203.  
  204.     def test_http_default_timeout(self):
  205.         self.assertTrue(socket.getdefaulttimeout() is None)
  206.         socket.setdefaulttimeout(60)
  207.         try:
  208.             u = _urlopen_with_retry("http://www.python.org")
  209.         finally:
  210.             socket.setdefaulttimeout(None)
  211.         self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
  212.  
  213.     def test_http_no_timeout(self):
  214.         self.assertTrue(socket.getdefaulttimeout() is None)
  215.         socket.setdefaulttimeout(60)
  216.         try:
  217.             u = _urlopen_with_retry("http://www.python.org", timeout=None)
  218.         finally:
  219.             socket.setdefaulttimeout(None)
  220.         self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
  221.  
  222.     def test_http_timeout(self):
  223.         u = _urlopen_with_retry("http://www.python.org", timeout=120)
  224.         self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
  225.  
  226.     FTP_HOST = "ftp://ftp.mirror.nl/pub/mirror/gnu/"
  227.  
  228.     def test_ftp_basic(self):
  229.         self.assertTrue(socket.getdefaulttimeout() is None)
  230.         u = _urlopen_with_retry(self.FTP_HOST)
  231.         self.assertTrue(u.fp.fp._sock.gettimeout() is None)
  232.  
  233.     def test_ftp_default_timeout(self):
  234.         self.assertTrue(socket.getdefaulttimeout() is None)
  235.         socket.setdefaulttimeout(60)
  236.         try:
  237.             u = _urlopen_with_retry(self.FTP_HOST)
  238.         finally:
  239.             socket.setdefaulttimeout(None)
  240.         self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
  241.  
  242.     def test_ftp_no_timeout(self):
  243.         self.assertTrue(socket.getdefaulttimeout() is None)
  244.         socket.setdefaulttimeout(60)
  245.         try:
  246.             u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
  247.         finally:
  248.             socket.setdefaulttimeout(None)
  249.         self.assertTrue(u.fp.fp._sock.gettimeout() is None)
  250.  
  251.     def test_ftp_timeout(self):
  252.         u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
  253.         self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
  254.  
  255.  
  256. def test_main():
  257.     test_support.requires("network")
  258.     test_support.run_unittest(AuthTests,
  259.                               OtherNetworkTests,
  260.                               CloseSocketTest,
  261.                               TimeoutTest,
  262.                               )
  263.  
  264. if __name__ == "__main__":
  265.     test_main()
  266.