home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.5) import unittest import email.Utils as email import socket import tempfile import traceback from copy import copy from StringIO import StringIO from clock import clock import os from download_utils import cleanFilename import download_utils import database import dialogs import httpclient import util from framework import EventLoopTest, DemocracyTestCase, HadToStopEventLoop class TestingConnectionHandler(httpclient.ConnectionHandler): def __init__(self, test): super(TestingConnectionHandler, self).__init__() self.states['foo'] = self.handleFoo self.states['bar'] = self.handleBar self.states['noread'] = None self.fooData = '' self.barData = '' self.gotHandleClose = False self.closeType = None self.test = test def handleFoo(self): data = self.buffer.read() self.fooData += data self.test.stopEventLoop(False) def handleBar(self): data = self.buffer.read() self.barData += data self.test.stopEventLoop(False) def handleClose(self, type): self.gotHandleClose = True self.closeType = type self.test.stopEventLoop(False) class FakeStream: def __init__(self, closeCallback = None): self.open = False self.readCallback = None self.closeCallback = closeCallback self.timedOut = False self.connectionErrback = None self.name = '' self.output = '' self.unprocessed = '' self.input = '' self.pendingOutput = '' self.timedOut = False self.pages = { 'participatoryculture.org': { '/democracytest/normalpage.txt': 'I AM A NORMAL PAGE\n', '/democracytest/normalpage2.txt': 'I AM A NORMAL PAGE\n', '/democracytest/normalpage3.txt': 'I AM A NORMAL PAGE\n', '/democracytest/nohead.php': 'DYNAMIC CONTENT' }, 'jigsaw.w3.org': { '/HTTP/Basic/': 'normal page', '/HTTP/Digest/': 'normal page' }, 'www.bar.com': { '/': 'Normal', '/2': 'Blah' }, 'www.baz.com': { '/': 'Normal', '/2': 'Blah' }, 'www.froz.com': { '/': 'Normal', '/2': 'Blah' }, 'www.qux.com': { '/': 'Normal', '/2': 'Blah' } } self.noheadPages = { 'participatoryculture.org': [ '/democracytest/nohead.php'] } self.basicAuthPages = { 'jigsaw.w3.org': { '/HTTP/Basic/': 'Basic Z3Vlc3Q6Z3Vlc3Q=' } } self.digestAuthPages = { 'jigsaw.w3.org': { '/HTTP/Digest/': 'STUFF GOES HERE' } } def _tryReadCallback(self): if len(self.pendingOutput) > 0 and self.readCallback: response = self.pendingOutput self.pendingOutput = '' self.readCallback(response) def _generateResponse(self, method, uri, version, headers): text = None now = email.Utils.formatdate(usegmt = True) if self.pages.has_key(headers['Host']): host_pages = self.pages[headers['Host']] if host_pages.has_key(uri): text = host_pages[uri] else: self.errback(httpclient.ConnectionError("Can't connect")) return None if text is not None: if method == 'GET': if self.basicAuthPages.has_key(headers['Host']) and uri in self.basicAuthPages[headers['Host']].keys(): pass None if not headers.has_key('Authorization') or self.basicAuthPages[headers['Host']][uri] != headers['Authorization'] else self.basicAuthPages[headers['Host']][uri] != headers['Authorization'] if self.digestAuthPages.has_key(headers['Host']) and uri in self.digestAuthPages[headers['Host']].keys(): pass None if not headers.has_key('Authorization') or self.digestAuthPages[headers['Host']][uri] != headers['Authorization'] else self.digestAuthPages[headers['Host']][uri] != headers['Authorization'] return 'HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=UTF-8\r\nLast-Modified: %s\r\nDate: %s\r\nContent-Length: %d\r\n\r\n%s' % (now, now, len(text), text) elif method == 'HEAD': if self.noheadPages.has_key(headers['Host']) and uri in self.noheadPages[headers['Host']]: return 'HTTP/1.1 405 NOT ALLOWED\r\nDate: %s\r\n\r\n' % now else: return 'HTTP/1.1 200 OK\r\nContent-Type: text/plain; charset=UTF-8\r\nLast-Modified: %s\r\nDate: %s\r\nContent-Length: %d\r\n\r\n' % (now, now, len(text)) text = '<h1>Not found</h1>' return 'HTTP/1.1 404 Not Found\r\nContent-Type: text/html; charset=UTF-8\r\nDate: %s\r\nContent-Length: %d\r\n\r\n%s' % (now, len(text), text) def _processRequest(self, method, uri, version, headers): response = self._generateResponse(method, uri, version, headers) if response is not None: self.pendingOutput += response self._tryReadCallback() def _processData(self, data): self.unprocessed += data for x in headers: headers = _[1](_[1][x.split(': ', 1)]) self._processRequest(request_method, request_uri, request_version, headers) [] def __str__(self): if self.name: return '%s: %s' % (type(self).__name__, self.name) else: return 'Unknown %s' % (type(self).__name__,) def startReadTimeout(self): pass def stopReadTimeout(self): pass def openConnection(self, host, port, callback, errback, disabledReadTimeout = None): self.name = 'Outgoing %s:%s' % (host, port) self.output = '' self.host = host self.port = port self.open = True self.errback = errback self.dsiabledReadTimeout = disabledReadTimeout callback(self) def acceptConnection(self, host, port, callback, errback): errback() def closeConnection(self): self.open = False def isOpen(self): return self.open def sendData(self, data, callback = None): if not self.isOpen(): raise ValueError('Socket not connected') self.output += data self._processData(data) def startReading(self, readCallback): if not self.isOpen(): raise ValueError('Socket not connected') self.readCallback = readCallback self._tryReadCallback() def stopReading(self): '''Stop reading from the socket.''' if not self.isOpen(): raise ValueError('Socket not connected') self.readCallback = None def onReadTimeout(self): raise IOError('Read Timeout') def handleSocketError(self, code, msg, operation): raise IOError('Socket Error') class DumbFakeStream(FakeStream): def _generateResponse(self, method, uri, version, headers): pass class TestingHTTPConnection(httpclient.HTTPConnection): """HTTPConnection that doesn't actually connect to the network.""" streamFactory = FakeStream class TestingHTTPSConnection(httpclient.HTTPSConnection): """HTTPSConnection that doesn't actually connect to the network.""" streamFactory = FakeStream class DumbTestingHTTPConnection(httpclient.HTTPConnection): """HTTPConnection that doesn't actually do much of anything.""" streamFactory = DumbFakeStream class TestingHTTPConnectionPool(httpclient.HTTPConnectionPool): MAX_CONNECTIONS = 4 HTTP_CONN = TestingHTTPConnection HTTPS_CONN = TestingHTTPSConnection def getConnection(self, scheme, host, port = None, type = 'active'): if port is None: if scheme == 'https': port = 443 else: port = 80 conns = self._getServerConnections(scheme, host, port) for conn in conns[type]: return conn def assertConnectionStarted(self, url): if not self.checkConnectionStarted(url): raise AssertionError def assertConnectionNotStarted(self, url): if not not self.checkConnectionStarted(url): raise AssertionError def checkConnectionStarted(self, url): (scheme, host, port, path) = httpclient.parseURL(url) conns = self._getServerConnections(scheme, host, port) for conn in conns['active']: try: if conn.host == host and conn.port == port: return True continue continue return False def closeConnection(self, scheme, host, port = None, type = 'active'): conn = self.getConnection(scheme, host, port, type) conn.handleClose(socket.SHUT_RDWR) class DumbTestingHTTPConnectionPool(TestingHTTPConnectionPool): HTTP_CONN = DumbTestingHTTPConnection HTTPS_CONN = DumbTestingHTTPConnection class DumbTestHTTPClient(httpclient.HTTPClient): connectionPool = DumbTestingHTTPConnectionPool() class TestHTTPClient(httpclient.HTTPClient): connectionPool = TestingHTTPConnectionPool() class TestingHeaderGrabber(httpclient.HTTPHeaderGrabber): connectionPool = TestingHTTPConnectionPool() class TestingAuthDelegate: def __init__(self): self.logins = [] def addLogin(self, user, password): self.logins.append((user, password)) def runDialog(self, dialog): if self.logins: (user, password) = self.logins.pop(0) dialog.runCallback(dialogs.BUTTON_OK, user, password) else: dialog.runCallback(None) def startResponse(version = '1.1', status = 200, headers = { }): rv = 'HTTP/%s %s OK\r\nContent-Type: text/plain; charset=ISO-8859-1\r\nLast-Modified: Wed, 10 May 2006 22:30:33 GMT\r\nDate: Wed, 10 May 2006 22:38:39 GMT\r\n' % (version, status) for key, value in headers.items(): rv += '%s: %s\r\n' % (key, value) rv += '\r\n' return rv class AsyncSocketTest(EventLoopTest): def setUp(self): self.data = None self.errbackCalled = False self.callbackCalled = False self.fakeCallbackError = False EventLoopTest.setUp(self) def callback(self, data): if self.fakeCallbackError: 1 / 0 self.data = data self.callbackCalled = True self.stopEventLoop(False) def errback(self, error): self.data = error self.errbackCalled = True self.stopEventLoop(False) class NetworkBufferTest(DemocracyTestCase): def setUp(self): self.buffer = httpclient.NetworkBuffer() def testReadLine(self): self.buffer.addData('HEL') self.assertEquals(self.buffer.readline(), None) self.buffer.addData('LO\r\n') self.assertEquals(self.buffer.readline(), 'HELLO') self.buffer.addData('HOWS\r\nIT\nGOING\r\nCRONLY\rDOESNTBREAK') self.assertEquals(self.buffer.readline(), 'HOWS') self.assertEquals(self.buffer.readline(), 'IT') self.assertEquals(self.buffer.readline(), 'GOING') self.assertEquals(self.buffer.readline(), None) self.assertEquals(self.buffer.read(), 'CRONLY\rDOESNTBREAK') def testRead(self): self.buffer.addData('12345678901234567890') self.assertEquals(self.buffer.read(4), '1234') self.assertEquals(self.buffer.read(6), '567890') self.buffer.addData('CARBOAT') self.assertEquals(self.buffer.read(), '1234567890CARBOAT') def testLength(self): self.buffer.addData('ONE\r\nTWO') self.assertEquals(self.buffer.length, 8) self.buffer.readline() self.assertEquals(self.buffer.length, 3) self.buffer.read(1) self.assertEquals(self.buffer.length, 2) self.buffer.unread('AAA') self.assertEquals(self.buffer.length, 5) self.buffer.addData('MORE') self.assertEquals(self.buffer.length, 9) def testGetValue(self): self.buffer.addData('ONE') self.buffer.addData('TWO') self.buffer.addData('THREE') self.assertEquals(self.buffer.getValue(), 'ONETWOTHREE') self.assertEquals(self.buffer.getValue(), 'ONETWOTHREE') class WeirdCloseConnectionTest(AsyncSocketTest): def testCloseDuringOpenConnection(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 0)) sock.listen(1) (host, port) = sock.getsockname() try: conn = httpclient.AsyncSocket() conn.openConnection(host, port, self.callback, self.errback) conn.closeConnection() self.runEventLoop(timeout = 1, timeoutNormal = True) self.assert_(not (self.callbackCalled)) self.assert_(self.errbackCalled) finally: sock.close() def testCloseDurringAcceptConnection(self): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: conn = httpclient.AsyncSocket() conn.acceptConnection('127.0.0.1', 0, self.callback, self.errback) sock.connect((conn.addr, conn.port)) conn.closeConnection() self.runEventLoop(timeout = 1, timeoutNormal = True) self.assert_(not (self.callbackCalled)) self.assert_(self.errbackCalled) finally: sock.close() class ConnectionHandlerTest(EventLoopTest): def setUp(self): EventLoopTest.setUp(self) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('127.0.0.1', 0)) server.listen(1) address = server.getsockname() self.connectionHandler = TestingConnectionHandler(self) def stopEventLoop(conn): self.stopEventLoop(False) self.connectionHandler.openConnection(address[0], address[1], stopEventLoop, stopEventLoop) self.runEventLoop() (self.remoteSocket, address) = server.accept() self.remoteSocket.setblocking(False) class DumbHTTPClientTest(AsyncSocketTest): def setUp(self): AsyncSocketTest.setUp(self) self.testRequest = DumbTestingHTTPConnection() self.testRequest.openConnection('foo.com', 80, (lambda x: None), (lambda x: None)) self.testRequest.sendRequest(self.callback, self.errback, '', 80, method = 'GET', path = '/bar/baz;123?a=b') self.authDelegate = TestingAuthDelegate() dialogs.setDelegate(self.authDelegate) def tearDown(self): AsyncSocketTest.tearDown(self) def testScheme(self): conn = httpclient.HTTPConnection() self.assertEquals(conn.scheme, 'http') def testRequestLine(self): self.assertEquals(self.testRequest.stream.output.split('\r\n')[0], 'GET /bar/baz;123?a=b HTTP/1.1') def testStatusLine(self): self.testRequest.handleData('HTTP/1.0 200 OK\r\n') self.assertEquals(self.testRequest.version, 'HTTP/1.0') self.assertEquals(self.testRequest.status, 200) self.assertEquals(self.testRequest.reason, 'OK') def testStatusLine2(self): self.testRequest.handleData('HTTP/1.1 404 Not Found\r\n') self.assertEquals(self.testRequest.version, 'HTTP/1.1') self.assertEquals(self.testRequest.status, 404) self.assertEquals(self.testRequest.reason, 'Not Found') def testBadStatusLine(self): self.testRequest.handleData('HTTP/0.9 200 OK\r\n') self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.BadStatusLine)) def testBadStatusLine2(self): self.testRequest.handleData('HTTP/1.0 641 OK\r\n') self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.BadStatusLine)) def testBadStatusLine3(self): self.testRequest.handleData('HTTP/1.0 TwoHundred OK\r\n') self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.BadStatusLine)) def testNoReason(self): self.testRequest.handleData('HTTP/1.0 200\r\n') self.assertEquals(self.testRequest.version, 'HTTP/1.0') self.assertEquals(self.testRequest.status, 200) self.assertEquals(self.testRequest.reason, '') def testTryToHandleHTTP0Point9(self): self.testRequest.handleData('StartOfThebody\r\n') self.assertEquals(self.testRequest.version, 'HTTP/0.9') self.assertEquals(self.testRequest.status, 200) self.assertEquals(self.testRequest.reason, '') self.assertEquals(self.testRequest.buffer.read(), 'StartOfThebody\r\n') fakeResponse = 'HTTP/1.0 200 OK\r\nContent-Type: text/plain; charset=ISO-8859-1\r\nLast-Modified: Wed, 10 May 2006 22:30:33 GMT\r\nDate: Wed, 10 May 2006 22:38:39 GMT\r\nX-Cache: HIT from pcf2.pcf.osuosl.org\r\nServer: Apache\r\nContent-Length: 14\r\n\r\nHELLO: WORLD\r\n' def testBasicHeaders(self): self.testRequest.handleData(self.fakeResponse) self.testRequest.handleClose(socket.SHUT_RD) headers = self.testRequest.headers self.assertEquals(headers['x-cache'], 'HIT from pcf2.pcf.osuosl.org') self.assertEquals(headers['server'], 'Apache') self.assertEquals(headers['last-modified'], 'Wed, 10 May 2006 22:30:33 GMT') self.assertEquals(headers['content-length'], '14') self.assertEquals(self.testRequest.contentLength, 14) self.assertEquals(headers['date'], 'Wed, 10 May 2006 22:38:39 GMT') self.assertEquals(headers['content-type'], 'text/plain; charset=ISO-8859-1') def testCallbackError(self): self.fakeCallbackError = True self.failedCalled = False def fakeFailed(*args, **kwargs): self.failedCalled = True oldFailed = util.failed util.failed = fakeFailed try: self.testRequest.handleData(self.fakeResponse) self.testRequest.handleClose(socket.SHUT_RD) finally: util.failed = oldFailed self.assert_(self.failedCalled) def testHeaderContinuation(self): self.testRequest.handleData('HTTP/1.0 200 OK\r\n') self.testRequest.handleData('Cont\r\n') self.testRequest.handleData(' ent-Type: text/plain\r\n') self.assertEquals(self.testRequest.headers['content-type'], 'text/plain') def testHeaderJoin(self): self.testRequest.handleData('HTTP/1.0 200 OK\r\n') self.testRequest.handleData('x-test-list: 1\r\n') self.testRequest.handleData('x-test-list: 2\r\n') self.testRequest.handleData('x-test-list: 3\r\n') self.testRequest.handleData('x-test-list: 4\r\n') self.testRequest.handleData('\r\n') self.assertEquals(self.testRequest.headers['x-test-list'], '1,2,3,4') def testBadHeaderContinuation(self): self.testRequest.handleData('HTTP/1.0 200 OK\r\n') self.testRequest.handleData('IShouldBeContinued\r\n') self.testRequest.handleData('\r\n') self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.BadHeaderLine)) def testWillClose(self): self.testRequest.handleData(startResponse(headers = { 'Content-Length': 128 })) self.assertEquals(self.testRequest.willClose, False) def testWillClose2(self): self.testRequest.handleData(startResponse(headers = { 'Transfer-Encoding': 'chunked' })) self.assertEquals(self.testRequest.willClose, False) def testWillClose3(self): self.testRequest.handleData(startResponse(version = '1.0', headers = { 'Content-Length': 128 })) self.assertEquals(self.testRequest.willClose, True) def testWillClose4(self): self.testRequest.handleData(startResponse()) self.assertEquals(self.testRequest.willClose, True) def testWillClose5(self): self.testRequest.handleData(startResponse(headers = { 'Connection': 'close', 'Content-Length': 128 })) self.assertEquals(self.testRequest.willClose, True) def testWillClose6(self): self.testRequest.handleData(startResponse(headers = { 'Connection': 'CLoSe', 'Content-Length': 128 })) self.assertEquals(self.testRequest.willClose, True) def testPipeline(self): self.assertEqual(self.testRequest.pipelinedRequest, None) self.testRequest.handleData(startResponse(headers = { 'Content-Length': 128 })) self.assertEquals(self.testRequest.canSendRequest(), True) self.testRequest.sendRequest(self.callback, self.errback, '', 80, path = '/pipelined/path') self.assertEquals(self.testRequest.pipelinedRequest[6], '/pipelined/path') self.testRequest.handleData('a' * 128) self.assert_(self.callbackCalled) self.assertEquals(self.data['body'], 'a' * 128) self.assertEquals(self.testRequest.state, 'response-status') self.assertEquals(self.testRequest.path, '/pipelined/path') self.assertEquals(self.testRequest.pipelinedRequest, None) self.assertEquals(self.testRequest.headers, { }) self.assertEquals(self.testRequest.body, '') self.assertEquals(self.testRequest.status, None) def testBadPipeline(self): self.testRequest.handleData(startResponse()) self.assertEquals(self.testRequest.canSendRequest(), False) self.assertRaises(httpclient.NetworkError, self.testRequest.sendRequest, self.callback, self.errback, '', 80) def testPipelineNeverStarted(self): self.pipelineError = None self.testRequest.handleData(startResponse(headers = { 'Content-Length': 128 })) def pipelineErrback(error): self.pipelineError = error self.testRequest.sendRequest(self.callback, pipelineErrback, '', 80, path = '/pipelined/path') self.testRequest.handleClose(socket.SHUT_RDWR) self.assert_(isinstance(self.pipelineError, httpclient.PipelinedRequestNeverStarted)) def testPipelineNeverStarted2(self): self.pipelineError = None self.testRequest.handleData(startResponse(headers = { 'Content-Length': 128 })) def pipelineErrback(error): self.pipelineError = error self.testRequest.sendRequest(self.callback, pipelineErrback, '', 80, path = '/pipelined/path') self.testRequest.closeConnection() self.assert_(isinstance(self.pipelineError, httpclient.PipelinedRequestNeverStarted)) def testContentLengthHandling(self): self.testRequest.handleData(startResponse(headers = { 'Content-Length': '5' })) self.testRequest.handleData('12345EXTRASTUFF') self.assertEquals(self.testRequest.body, '12345') def testTransferEncodingTrumpsContentLength(self): self.testRequest.handleData(startResponse(headers = { 'Content-Length': '5', 'Transfer-Encoding': 'chunked' })) self.assertEquals(self.testRequest.contentLength, None) def test416ContentLength(self): '''Test the content length after a 416 status code.''' self.testRequest.handleData(startResponse(status = 416, headers = { 'Content-Range': 'bytes */1234' })) self.assertEquals(self.testRequest.contentLength, 1234) def testNoBody(self): self.testRequest.handleData(startResponse(status = 204)) self.assertEquals(self.testRequest.state, 'closed') self.assertEquals(self.testRequest.body, '') def testNoBody2(self): self.testRequest.handleData(startResponse(status = 123)) self.assertEquals(self.testRequest.state, 'closed') self.assertEquals(self.testRequest.body, '') def testNoBody3(self): self.testRequest.method = 'HEAD' self.testRequest.handleData(startResponse()) self.assertEquals(self.testRequest.state, 'closed') self.assertEquals(self.testRequest.body, '') def testNoBody4(self): self.testRequest.bodyDataCallback = lambda data: 0 self.testRequest.handleData(startResponse(headers = { 'Content-Length': '0' })) self.assertEquals(self.testRequest.state, 'ready') self.assertEquals(self.testRequest.body, '') def testSplitUpMessage(self): data = self.fakeResponse for cutoff in [ 3, 6, 10, 4, 100, 52]: self.testRequest.handleData(data[:cutoff]) data = data[cutoff:] self.testRequest.handleData(data) self.testRequest.handleClose(socket.SHUT_RD) self.assertEquals(self.testRequest.version, 'HTTP/1.0') self.assertEquals(self.testRequest.status, 200) self.assertEquals(self.testRequest.reason, 'OK') self.assertEquals(self.testRequest.headers['server'], 'Apache') self.assertEquals(self.testRequest.body, 'HELLO: WORLD\r\n') def testOneChunk(self): self.testRequest.handleData(self.fakeResponse) self.testRequest.handleClose(socket.SHUT_RD) self.assertEquals(self.testRequest.version, 'HTTP/1.0') self.assertEquals(self.testRequest.status, 200) self.assertEquals(self.testRequest.reason, 'OK') self.assertEquals(self.testRequest.headers['server'], 'Apache') self.assertEquals(self.testRequest.body, 'HELLO: WORLD\r\n') def testBadChunkSize(self): self.testRequest.handleData(startResponse(headers = { 'Transfer-Encoding': 'chunked' })) self.testRequest.handleData('Fifty\r\n') self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.BadChunkSize)) def testIgnoreChunkExtensions(self): self.testRequest.handleData(startResponse(headers = { 'Transfer-Encoding': 'chunked' })) self.testRequest.handleData('ff ;ext1=2 ; ext3=4\r\n') self.assert_(not (self.errbackCalled)) self.assertEquals(self.testRequest.state, 'chunk-data') self.assertEquals(self.testRequest.chunkSize, 255) def testChunkWithoutCRLF(self): self.testRequest.handleData(startResponse(headers = { 'Transfer-Encoding': 'chunked' })) self.testRequest.handleData('5\r\n') self.testRequest.handleData('12345RN') self.assert_(self.errback) self.assert_(isinstance(self.data, httpclient.CRLFExpected)) def testPrematureClose(self): data = self.fakeResponse self.testRequest.handleData(data[:123]) self.testRequest.handleClose(socket.SHUT_RD) self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.ServerClosedConnection)) class HTTPClientTestBase(AsyncSocketTest): def setUp(self): AsyncSocketTest.setUp(self) self.testRequest = TestingHTTPConnection() self.testRequest.openConnection('foo.com', 80, (lambda x: None), (lambda x: None)) self.testRequest.sendRequest(self.callback, self.errback, '', 80, method = 'GET', path = '/bar/baz;123?a=b') self.authDelegate = TestingAuthDelegate() dialogs.setDelegate(self.authDelegate) TestHTTPClient.connectionPool = TestingHTTPConnectionPool() TestingHeaderGrabber.connectionPool = TestingHTTPConnectionPool() def tearDown(self): AsyncSocketTest.tearDown(self) class HTTPClientTest(HTTPClientTestBase): def testRealRequest(self): url = 'http://participatoryculture.org/democracytest/normalpage.txt' httpclient.grabURL(url, self.callback, self.errback, clientClass = TestHTTPClient) self.runEventLoop(timeout = 10) self.assert_(self.callbackCalled) self.assertEquals(self.data['body'], 'I AM A NORMAL PAGE\n') def testGrabHeaders(self): url = 'http://participatoryculture.org/democracytest/normalpage.txt' httpclient.grabHeaders(url, self.callback, self.errback, TestingHeaderGrabber) self.runEventLoop(timeout = 10) self.assert_(self.callbackCalled) self.assertEquals(self.data['body'], '') self.assertEquals(self.data['status'], 200) self.assertEquals(self.data['original-url'], self.data['updated-url']) self.assertEquals(self.data['original-url'], self.data['redirected-url']) def testGrabHeaders2(self): url = 'http://participatoryculture.org/democracytest/nohead.php' httpclient.grabHeaders(url, self.callback, self.errback, clientClass = TestingHeaderGrabber) self.runEventLoop(timeout = 10) self.assert_(self.callbackCalled) self.assertEquals(self.data['body'], '') self.assertEquals(self.data['status'], 200) self.assertEquals(self.data['original-url'], self.data['updated-url']) self.assertEquals(self.data['original-url'], self.data['redirected-url']) def testGrabHeadersCancel(self): url = 'http://participatoryculture.org/democracytest/normalpage.txt' client = httpclient.grabHeaders(url, self.callback, self.errback, clientClass = TestingHeaderGrabber) client.cancel() self.assertRaises((HadToStopEventLoop,), (lambda : self.runEventLoop(timeout = 1))) def testConnectionFailure(self): httpclient.grabURL('http://slashdot.org:123123', self.callback, self.errback, clientClass = TestHTTPClient) self.runEventLoop() self.assert_(self.errbackCalled) self.assertEquals(self.data.__class__, httpclient.ConnectionError) def testMultipleRequests(self): def middleCallback(data): self.firstData = data req.sendRequest(self.callback, self.errback, 'participatoryculture.org', 80, method = 'GET', path = '/democracytest/normalpage.txt') req = TestingHTTPConnection() def stopEventLoop(conn): self.stopEventLoop(False) (None, ((None, None),), self.addIdle)((lambda : req.openConnection('participatoryculture.org', 80, stopEventLoop, self.errback)), 'Open connection') self.runEventLoop() self.assert_(not (self.errbackCalled)) (None, None, self.addIdle)((lambda : req.sendRequest(middleCallback, self.errback, 'participatoryculture.org', 80, method = 'GET', path = '/democracytest/normalpage.txt')), 'Send Request') self.runEventLoop() self.assert_(not (self.errbackCalled)) self.assertEquals(self.firstData['body'], self.data['body']) def testUnexpectedStatusCode(self): """Test what happens when we get a bad status code. The header callback should be called, but the on body data callback shouldn't. Also, we should call the errback instead of the callback. """ self.onHeadersCalled = self.onBodyDataCalled = False def onHeaders(headers): self.onHeadersCalled = True def onBodyData(data): self.onBodyDataCalled = True url = 'http://participatoryculture.org/404' client = TestHTTPClient(url, self.callback, self.errback, onHeaders, onBodyData) client.startRequest() self.runEventLoop() self.assert_(self.onHeadersCalled) self.assert_(not (self.onBodyDataCalled)) self.assert_(not (self.callbackCalled)) self.assert_(self.errbackCalled) def testHeaderCallback(self): def headerCallback(response): self.headerResponse = copy(response) self.callbackCalledInHeaderCallback = self.callbackCalled self.errbackCalledInHeaderCallback = self.errbackCalled self.stopEventLoop(False) url = 'http://participatoryculture.org/democracytest/normalpage.txt' httpclient.grabURL(url, self.callback, self.errback, headerCallback = headerCallback, clientClass = TestHTTPClient) self.runEventLoop() self.assert_(not (self.callbackCalledInHeaderCallback)) self.assert_(not (self.errbackCalledInHeaderCallback)) self.assert_(self.headerResponse['content-type'].startswith('text/plain')) self.assertEquals(self.headerResponse['body'], None) def testHeaderCallbackCancel(self): def headerCallback(response): reqId.cancel() self.stopEventLoop(False) url = 'http://participatoryculture.org/democracytest/normalpage.txt' reqId = httpclient.grabURL(url, self.callback, self.errback, headerCallback = headerCallback, clientClass = TestHTTPClient) self.failedCalled = False def fakeFailed(*args, **kwargs): self.failedCalled = True oldFailed = util.failed util.failed = fakeFailed self.runEventLoop() util.failed = oldFailed self.assert_(not (self.callbackCalled)) self.assert_(not (self.errbackCalled)) self.assert_(not (self.failedCalled)) def testBodyDataCallbackCancel(self): def bodyDataCallback(response): reqId.cancel() self.stopEventLoop(False) url = 'http://participatoryculture.org/democracytest/normalpage.txt' reqId = httpclient.grabURL(url, self.callback, self.errback, bodyDataCallback = bodyDataCallback, clientClass = TestHTTPClient) self.failedCalled = False def fakeFailed(*args, **kwargs): self.failedCalled = True oldFailed = util.failed util.failed = fakeFailed self.runEventLoop() util.failed = oldFailed self.assert_(not (self.callbackCalled)) self.assert_(not (self.errbackCalled)) self.assert_(not (self.failedCalled)) def testBodyDataCallback(self): self.lastSeen = None def bodyDataCallback(data): self.lastSeen = data self.testRequest.bodyDataCallback = bodyDataCallback self.testRequest.handleData(startResponse(headers = { 'content-length': '20' })) self.assertEquals(self.lastSeen, None) self.testRequest.handleData('12345') self.assertEquals(self.lastSeen, '12345') self.testRequest.handleData('1234567890') self.assertEquals(self.lastSeen, '1234567890') self.testRequest.handleData('1234567890') self.assertEquals(self.lastSeen, '12345') def testBodyDataCallbackChunked(self): self.lastSeen = None def bodyDataCallback(data): self.lastSeen = data self.testRequest.bodyDataCallback = bodyDataCallback self.testRequest.handleData(startResponse(headers = { 'transfer-encoding': 'chunked' })) self.testRequest.handleData('5\r\nHI') self.assertEquals(self.lastSeen, 'HI') self.testRequest.handleData('BEN\r\n') self.assertEquals(self.lastSeen, 'BEN') self.testRequest.handleData('A\r\n') self.assertEquals(self.lastSeen, 'BEN') self.testRequest.handleData('1234567890\r\n0') self.assertEquals(self.lastSeen, '1234567890') self.assert_(not (self.callbackCalled)) self.testRequest.handleData('\r\n\r\n') self.assert_(self.callbackCalled) def testBodyDataCallbackRealRequest(self): url = 'http://participatoryculture.org/democracytest/normalpage.txt' self.gotData = '' def bodyDataCallback(data): self.gotData += data if self.gotData == 'I AM A NORMAL PAGE\n': self.stopEventLoop(False) httpclient.grabURL(url, self.callback, self.errback, bodyDataCallback = bodyDataCallback, clientClass = TestHTTPClient) self.runEventLoop() self.assertEquals(self.gotData, 'I AM A NORMAL PAGE\n') def testAuth(self): self.authDelegate.addLogin(u'ben', u'baddpassword') self.authDelegate.addLogin(u'guest', u'guest') url = 'http://jigsaw.w3.org/HTTP/Basic/' client = TestHTTPClient(url, self.callback, self.errback) client.startRequest() self.runEventLoop() self.assert_(self.callbackCalled) self.assertEquals(self.data['status'], 200) self.assertEquals(client.authAttempts, 2) def testBadAuth(self): self.authDelegate.addLogin(u'baduser', u'baddpass') self.authDelegate.addLogin(u'anotherbadtry', u'god') self.authDelegate.addLogin(u'billgates', u'password') url = 'http://jigsaw.w3.org/HTTP/Basic/' client = TestHTTPClient(url, self.callback, self.errback) client.startRequest() self.runEventLoop() self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.AuthorizationFailed)) self.assertEquals(client.authAttempts, 3) def testDigestAuth(self): url = 'http://jigsaw.w3.org/HTTP/Digest/' client = TestHTTPClient(url, self.callback, self.errback) client.startRequest() self.runEventLoop() self.assert_(self.errbackCalled) self.assert_(isinstance(self.data, httpclient.AuthorizationFailed)) self.assertEquals(client.authAttempts, 0) def testParseURL(self): (scheme, host, port, path) = httpclient.parseURL('https://www.foo.com/abc;123?a=b#4') self.assertEquals(scheme, 'https') self.assertEquals(host, 'www.foo.com') self.assertEquals(port, 443) self.assertEquals(path, '/abc;123?a=b') (scheme, host, port, path) = httpclient.parseURL('http://www.foo.com/abc;123?a=b#4') self.assertEquals(port, 80) (scheme, host, port, path) = httpclient.parseURL('http://www.foo.com:5000/abc;123?a=b#4') self.assertEquals(port, 5000) (scheme, host, port, path) = httpclient.parseURL('http://www.foo.com:123:123/abc;123?a=b#4') self.assertEquals(port, 123) def testCleanFilename(self): tempdir = tempfile.gettempdir() def testIt(filename): cleaned = cleanFilename(filename) self.assertEqual(cleaned.__class__, str) self.assertNotEqual(cleaned, '') path = os.path.join(tempdir, cleaned) f = open(path, 'w') f.write('AOEUOAEU') f.close() os.remove(path) testIt(u'iamnormal.txt') testIt(u'???') testIt(u'├╕benben.jpg') testIt(u'├╕???.├▓├│x') def testGetFilenameFromResponse(self): client = TestHTTPClient('http://www.foo.com', self.callback, self.errback) def getIt(path, cd = (None,)): response = { 'path': path } if cd: response['content-disposition'] = cd return client.getFilenameFromResponse(response) self.assertEquals('unknown', getIt('/')) self.assertEquals('index.html', getIt('/index.html')) self.assertEquals('index.html', getIt('/path/path2/index.html')) self.assertEquals('unknown', getIt('/path/path2/')) self.assertEquals('myfile.txt', getIt('/', 'filename="myfile.txt"')) self.assertEquals('myfile.txt', getIt('/', 'filename="myfile.txt"; size=45')) self.assertEquals('myfile.txt', getIt('/', ' filename = "myfile.txt"')) self.assertEquals('myfile.txt', getIt('/', 'filename=myfile.txt')) self.assertEquals('myfile.txt', getIt('/index.html', 'filename="myfile.txt"')) self.assertEquals('lots.of.extensions', getIt('/', 'filename="lots.of.extensions"')) self.assertEquals('uncleanfilename', getIt('/index', 'filename="\\un/cl:ean*fi?lena<m>|e"')) self.assertEquals('uncleanfil-ename2', getIt('/uncl*ean"fil?"ena|m""e2"')) def testGetCharsetFromResponse(self): client = TestHTTPClient('http://participatoryculture.org/democracytest/normal.txt', self.callback, self.errback) def getIt(contentType): if contentType: response = { 'content-type': contentType } else: response = { } return client.getCharsetFromResponse(response) self.assertEquals('iso-8859-1', getIt(None)) self.assertEquals('iso-8859-1', getIt('gabaldigook')) self.assertEquals('iso-8859-1', getIt('text/html')) self.assertEquals('utf-8', getIt('text/html; charset=utf-8')) self.assertEquals('utf-8', getIt('text/html; charset = utf-8')) self.assertEquals('utf-8', getIt('text/html; charset=utf-8; extraparam=2')) class HTTPConnectionPoolTest(EventLoopTest): def setUp(self): self.pool = TestingHTTPConnectionPool() super(HTTPConnectionPoolTest, self).setUp() def addRequest(self, url): return (self.pool.addRequest,)((lambda blah: (self.addIdle,)((lambda : self.stopEventLoop(False)), 'Closing connection when request is done') ), (lambda error: 0), None, None, None, url, 'GET', { }) def checkCounts(self, activeCount, freeCount, pendingCount): self.assertEquals(self.pool.activeConnectionCount, activeCount) self.assertEquals(self.pool.freeConnectionCount, freeCount) realFreeCount = realActiveCount = 0 for key, conns in self.pool.connections.items(): realFreeCount += len(conns['free']) realActiveCount += len(conns['active']) self.assertEquals(realActiveCount, activeCount) self.assertEquals(realFreeCount, freeCount) self.assertEquals(pendingCount, len(self.pool.pendingRequests)) def testNormalUsage(self): self.addRequest('http://www.foo.com/') self.addRequest('http://www.bar.com/') self.addRequest('http://www.foo.com/2') self.addRequest('http://www.google.com/') self.checkCounts(4, 0, 0) def testOpenConnectionFailed(self): self.pool = TestingHTTPConnectionPool() def stopEventLoop(error): self.stopEventLoop(False) self.pool.addRequest(stopEventLoop, stopEventLoop, None, None, None, 'http://uselessurl/', 'GET', { }) self.runEventLoop() self.checkCounts(0, 0, 0) def testCounts(self): self.addRequest('http://participatoryculture.org/') self.addRequest('http://participatoryculture.org/democracytest/normalpage.txt') self.addRequest('https://participatoryculture.org/') self.checkCounts(3, 0, 0) self.runEventLoop() self.checkCounts(2, 1, 0) self.pool.closeConnection('https', 'participatoryculture.org') self.checkCounts(1, 1, 0) self.pool.closeConnection('http', 'participatoryculture.org', type = 'free') self.checkCounts(1, 0, 0) def testServerLimit(self): self.addRequest('http://participatoryculture.org/democracytest/normalpage.txt') self.addRequest('http://participatoryculture.org/democracytest/normalpage2.txt') self.addRequest('https://participatoryculture.org/democracytest/normalpage.txt') self.checkCounts(3, 0, 0) self.addRequest('http://participatoryculture.org/democracytest/normalpage3.txt') self.checkCounts(3, 0, 1) self.pool.assertConnectionNotStarted('http://participatoryculture.org/democracytest/normalpage3.txt') self.runEventLoop() self.checkCounts(3, 0, 0) self.runEventLoop() self.pool.assertConnectionStarted('http://participatoryculture.org/democracytest/normalpage3.txt') self.checkCounts(2, 1, 0) def testTotalLimit(self): self.addRequest('http://participatoryculture.org/democracytest/normalpage.txt') self.addRequest('http://participatoryculture.org/democracytest/normalpage2.txt') self.addRequest('http://www.bar.com/') self.addRequest('http://www.bar.com/2') self.addRequest('http://www.baz.com/') self.addRequest('http://www.froz.com/') self.checkCounts(4, 0, 2) def testBothLimits(self): self.addRequest('http://participatoryculture.org/') self.addRequest('http://participatoryculture.org/2') self.addRequest('http://participatoryculture.org/3') self.checkCounts(2, 0, 1) self.pool.assertConnectionNotStarted('http://participatoryculture.org/3') self.addRequest('https://www.bar.com/') self.addRequest('http://www.bar.com/2') self.addRequest('http://www.baz.com/') self.checkCounts(4, 0, 2) self.pool.assertConnectionNotStarted('http://www.baz.com/') self.runEventLoop() self.checkCounts(4, 0, 1) self.pool.assertConnectionNotStarted('http://www.baz.com/') self.runEventLoop() self.checkCounts(4, 0, 0) self.runEventLoop() def testDropTheLRU(self): self.addRequest('http://www.baz.com/') self.addRequest('http://participatoryculture.org/') self.addRequest('http://www.bar.com/') self.addRequest('http://www.froz.com/') self.addRequest('http://www.qux.com/') if len(self.pool.connections['http:www.baz.com:80']['free']) == 0: pass self.assert_(len(self.pool.connections['http:www.baz.com:80']['active']) > 0) self.runEventLoop() if len(self.pool.connections['http:www.baz.com:80']['free']) == 0: pass self.assert_(len(self.pool.connections['http:www.baz.com:80']['active']) == 0) class GrabURLTest(AsyncSocketTest): def testStart(self): url = 'http://participatoryculture.org/democracytest/normalpage.txt' httpclient.grabURL(url, self.callback, self.errback, clientClass = TestHTTPClient) self.runEventLoop() self.origData = self.data httpclient.grabURL(url, self.callback, self.errback, start = 4, clientClass = TestHTTPClient) self.runEventLoop() self.assertEquals(self.data['body'], self.origData['body']) self.assertEquals(self.data['status'], 200) class BadURLTest(HTTPClientTestBase): def testScheme(self): url = 'participatoryculture.org/democracytest/normalpage.txt' httpclient.grabURL(url, self.callback, self.errback, clientClass = TestHTTPClient) self.runPendingIdles() self.assertEquals(self.errbackCalled, True) self.assertEquals(self.callbackCalled, False) def testSlashes(self): url = 'http:jigsaw.w3.org/HTTP/' httpclient.grabURL(url, self.callback, self.errback, clientClass = TestHTTPClient) self.runPendingIdles() self.assertEquals(self.errbackCalled, True) self.assertEquals(self.callbackCalled, False) def testHost(self): url = 'http:///HTTP/' httpclient.grabURL(url, self.callback, self.errback, clientClass = TestHTTPClient) self.runPendingIdles() self.assertEquals(self.errbackCalled, True) self.assertEquals(self.callbackCalled, False) def testOtherScheme(self): url = 'rtsp://jigsaw.w3.org/' httpclient.grabURL(url, self.callback, self.errback, clientClass = TestHTTPClient) self.runPendingIdles() self.assertEquals(self.errbackCalled, True) self.assertEquals(self.callbackCalled, False) class CookieExpirationDateTestCase(unittest.TestCase): def testCookieExpirationDate(self): """Tests get_cookie_expiration_date to make sure it's returning sane values and handles cookie expiration formats we've had problems with. """ mktime = mktime strptime = strptime localtime = localtime import time get_cookie_expiration_date = get_cookie_expiration_date import httpclient for cd in (('Thu, 03-May-07 22:48:52 GMT', '2007-05-03 22:48:52 GMT'), ('Fri, 03-Jun-11 13:41:15 GMT', '2011-06-03 13:41:15 GMT'), ('Sun, 17-Jan-2038 19:14:07 GMT', '2038-01-17 19:14:07 GMT'), ('Mon, 09-Apr-07 23:50:49 GMT', '2007-04-09 23:50:49 GMT'), ('Tue, 01-Jan-2030 10:00:00 GMT', '2030-01-01 10:00:00 GMT'), ('Tue, 17-Jul-2007 02:09:00 GMT', '2007-07-17 02:09:00 GMT')): self.assertEquals(localtime(get_cookie_expiration_date(cd[0])), localtime(mktime(strptime(cd[1], '%Y-%m-%d %H:%M:%S %Z')))) def testOverflowCookieExpirationDate(self): '''tests the case of get_cookie_expiration_date where the cookie expiration date causes an overflow error when parsing it. ''' localtime = localtime import time get_cookie_expiration_date = get_cookie_expiration_date DATEINFUTURE = DATEINFUTURE import httpclient self.assertEquals(localtime(get_cookie_expiration_date('Tue, 26-Jul-2050 10:00:00 GMT')), localtime(httpclient.DATEINFUTURE))