home *** CD-ROM | disk | FTP | other *** search
/ PC Welt 2006 November (DVD) / PCWELT_11_2006.ISO / casper / filesystem.squashfs / usr / lib / python2.4 / site-packages / BitTorrent / RawServer.py < prev    next >
Encoding:
Python Source  |  2006-07-20  |  18.0 KB  |  628 lines

  1. # Written by Bram Cohen
  2. # see LICENSE.txt for license information
  3.  
  4. from bisect import insort
  5. import socket
  6. from cStringIO import StringIO
  7. from traceback import print_exc
  8. from errno import EWOULDBLOCK, ENOBUFS
  9. try:
  10.     from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
  11.     timemult = 1000
  12. except ImportError:
  13.     from selectpoll import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
  14.     timemult = 1
  15. from threading import Thread, Event
  16. from time import time, sleep
  17. import sys
  18. from random import randrange
  19.  
  20. all = POLLIN | POLLOUT
  21.  
  22. class SingleSocket:
  23.     def __init__(self, raw_server, sock, handler):
  24.         self.raw_server = raw_server
  25.         self.socket = sock
  26.         self.handler = handler
  27.         self.buffer = []
  28.         self.last_hit = time()
  29.         self.fileno = sock.fileno()
  30.         self.connected = False
  31.         
  32.     def get_ip(self):
  33.         try:
  34.             return self.socket.getpeername()[0]
  35.         except socket.error:
  36.             return 'no connection'
  37.         
  38.     def close(self):
  39.         sock = self.socket
  40.         self.socket = None
  41.         self.buffer = []
  42.         del self.raw_server.single_sockets[self.fileno]
  43.         self.raw_server.poll.unregister(sock)
  44.         sock.close()
  45.  
  46.     def shutdown(self, val):
  47.         self.socket.shutdown(val)
  48.  
  49.     def is_flushed(self):
  50.         return len(self.buffer) == 0
  51.  
  52.     def write(self, s):
  53.         assert self.socket is not None
  54.         self.buffer.append(s)
  55.         if len(self.buffer) == 1:
  56.             self.try_write()
  57.  
  58.     def try_write(self):
  59.         if self.connected:
  60.             try:
  61.                 while self.buffer != []:
  62.                     amount = self.socket.send(self.buffer[0])
  63.                     if amount != len(self.buffer[0]):
  64.                         if amount != 0:
  65.                             self.buffer[0] = self.buffer[0][amount:]
  66.                         break
  67.                     del self.buffer[0]
  68.             except socket.error, e:
  69.                 code, msg = e
  70.                 if code != EWOULDBLOCK:
  71.                     self.raw_server.dead_from_write.append(self)
  72.                     return
  73.         if self.buffer == []:
  74.             self.raw_server.poll.register(self.socket, POLLIN)
  75.         else:
  76.             self.raw_server.poll.register(self.socket, all)
  77.  
  78. def default_error_handler(x):
  79.     print x
  80.  
  81. class RawServer:
  82.     def __init__(self, doneflag, timeout_check_interval, timeout, noisy = True,
  83.             errorfunc = default_error_handler, maxconnects = 55):
  84.         self.timeout_check_interval = timeout_check_interval
  85.         self.timeout = timeout
  86.         self.poll = poll()
  87.         # {socket: SingleSocket}
  88.         self.single_sockets = {}
  89.         self.dead_from_write = []
  90.         self.doneflag = doneflag
  91.         self.noisy = noisy
  92.         self.errorfunc = errorfunc
  93.         self.maxconnects = maxconnects
  94.         self.funcs = []
  95.         self.unscheduled_tasks = []
  96.         self.add_task(self.scan_for_timeouts, timeout_check_interval)
  97.  
  98.     def add_task(self, func, delay):
  99.         self.unscheduled_tasks.append((func, delay))
  100.  
  101.     def scan_for_timeouts(self):
  102.         self.add_task(self.scan_for_timeouts, self.timeout_check_interval)
  103.         t = time() - self.timeout
  104.         tokill = []
  105.         for s in self.single_sockets.values():
  106.             if s.last_hit < t:
  107.                 tokill.append(s)
  108.         for k in tokill:
  109.             if k.socket is not None:
  110.                 self._close_socket(k)
  111.  
  112.     def bind(self, port, bind = '', reuse = False):
  113.         self.bindaddr = bind
  114.         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  115.         if reuse:
  116.             server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  117.         server.setblocking(0)
  118.         try:
  119.             server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 32)
  120.         except:
  121.             pass
  122.         server.bind((bind, port))
  123.         server.listen(5)
  124.         self.poll.register(server, POLLIN)
  125.         self.server = server
  126.  
  127.     def start_connection(self, dns, handler = None):
  128.         if handler is None:
  129.             handler = self.handler
  130.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  131.         sock.setblocking(0)
  132.         try:
  133.             sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, 32)
  134.         except:
  135.             pass
  136.         sock.bind((self.bindaddr, 0))
  137.         try:
  138.             sock.connect_ex(dns)
  139.         except socket.error:
  140.             raise
  141.         except Exception, e:
  142.             raise socket.error(str(e))
  143.         self.poll.register(sock, POLLIN)
  144.         s = SingleSocket(self, sock, handler)
  145.         self.single_sockets[sock.fileno()] = s
  146.         return s
  147.         
  148.     def handle_events(self, events):
  149.         for sock, event in events:
  150.             if sock == self.server.fileno():
  151.                 if event & (POLLHUP | POLLERR) != 0:
  152.                     self.poll.unregister(self.server)
  153.                     self.server.close()
  154.                     self.errorfunc('lost server socket')
  155.                 else:
  156.                     try:
  157.                         newsock, addr = self.server.accept()
  158.                         newsock.setblocking(0)
  159.                         if len(self.single_sockets) >= self.maxconnects:
  160.                             newsock.close()
  161.                             continue
  162.                         nss = SingleSocket(self, newsock, self.handler)
  163.                         self.single_sockets[newsock.fileno()] = nss
  164.                         self.poll.register(newsock, POLLIN)
  165.                         self.handler.external_connection_made(nss)
  166.                     except socket.error:
  167.                         sleep(1)
  168.             else:
  169.                 s = self.single_sockets.get(sock)
  170.                 if s is None:
  171.                     continue
  172.                 s.connected = True
  173.                 if (event & (POLLHUP | POLLERR)) != 0:
  174.                     self._close_socket(s)
  175.                     continue
  176.                 if (event & POLLIN) != 0:
  177.                     try:
  178.                         s.last_hit = time()
  179.                         data = s.socket.recv(100000)
  180.                         if data == '':
  181.                             self._close_socket(s)
  182.                         else:
  183.                             s.handler.data_came_in(s, data)
  184.                     except socket.error, e:
  185.                         code, msg = e
  186.                         if code != EWOULDBLOCK:
  187.                             self._close_socket(s)
  188.                             continue
  189.                 if (event & POLLOUT) != 0 and s.socket is not None and not s.is_flushed():
  190.                     s.try_write()
  191.                     if s.is_flushed():
  192.                         s.handler.connection_flushed(s)
  193.  
  194.     def pop_unscheduled(self):
  195.         try:
  196.             while True:
  197.                 (func, delay) = self.unscheduled_tasks.pop()
  198.                 insort(self.funcs, (time() + delay, func))
  199.         except IndexError:
  200.             pass
  201.  
  202.     def listen_forever(self, handler):
  203.         self.handler = handler
  204.         try:
  205.             while not self.doneflag.isSet():
  206.                 try:
  207.                     self.pop_unscheduled()
  208.                     if len(self.funcs) == 0:
  209.                         period = 2 ** 30
  210.                     else:
  211.                         period = self.funcs[0][0] - time()
  212.                     if period < 0:
  213.                         period = 0
  214.                     events = self.poll.poll(period * timemult)
  215.                     if self.doneflag.isSet():
  216.                         return
  217.                     while len(self.funcs) > 0 and self.funcs[0][0] <= time():
  218.                         garbage, func = self.funcs[0]
  219.                         del self.funcs[0]
  220.                         try:
  221.                             func()
  222.                         except KeyboardInterrupt:
  223.                             print_exc()
  224.                             return
  225.                         except:
  226.                             if self.noisy:
  227.                                 data = StringIO()
  228.                                 print_exc(file = data)
  229.                                 self.errorfunc(data.getvalue())
  230.                     self._close_dead()
  231.                     self.handle_events(events)
  232.                     if self.doneflag.isSet():
  233.                         return
  234.                     self._close_dead()
  235.                 except error, e:
  236.                     if self.doneflag.isSet():
  237.                         return
  238.                     # I can't find a coherent explanation for what the behavior should be here,
  239.                     # and people report conflicting behavior, so I'll just try all the possibilities
  240.                     try:
  241.                         code, msg, desc = e
  242.                     except:
  243.                         try:
  244.                             code, msg = e
  245.                         except:
  246.                             code = ENOBUFS
  247.                     if code == ENOBUFS:
  248.                         self.errorfunc("Have to exit due to the TCP stack flaking out")
  249.                         return
  250.                 except KeyboardInterrupt:
  251.                     print_exc()
  252.                     return
  253.                 except:
  254.                     data = StringIO()
  255.                     print_exc(file = data)
  256.                     self.errorfunc(data.getvalue())
  257.         finally:
  258.             for ss in self.single_sockets.values():
  259.                 ss.close()
  260.             self.server.close()
  261.  
  262.     def _close_dead(self):
  263.         while len(self.dead_from_write) > 0:
  264.             old = self.dead_from_write
  265.             self.dead_from_write = []
  266.             for s in old:
  267.                 if s.socket is not None:
  268.                     self._close_socket(s)
  269.  
  270.     def _close_socket(self, s):
  271.         sock = s.socket.fileno()
  272.         s.socket.close()
  273.         self.poll.unregister(sock)
  274.         del self.single_sockets[sock]
  275.         s.socket = None
  276.         s.handler.connection_lost(s)
  277.  
  278. # everything below is for testing
  279.  
  280. class DummyHandler:
  281.     def __init__(self):
  282.         self.external_made = []
  283.         self.data_in = []
  284.         self.lost = []
  285.  
  286.     def external_connection_made(self, s):
  287.         self.external_made.append(s)
  288.     
  289.     def data_came_in(self, s, data):
  290.         self.data_in.append((s, data))
  291.     
  292.     def connection_lost(self, s):
  293.         self.lost.append(s)
  294.  
  295.     def connection_flushed(self, s):
  296.         pass
  297.  
  298. def sl(rs, handler, port):
  299.     rs.bind(port)
  300.     Thread(target = rs.listen_forever, args = [handler]).start()
  301.  
  302. def loop(rs):
  303.     x = []
  304.     def r(rs = rs, x = x):
  305.         rs.add_task(x[0], .1)
  306.     x.append(r)
  307.     rs.add_task(r, .1)
  308.  
  309. beginport = 5000 + randrange(10000)
  310.  
  311. def test_starting_side_close():
  312.     try:
  313.         fa = Event()
  314.         fb = Event()
  315.         da = DummyHandler()
  316.         sa = RawServer(fa, 100, 100)
  317.         loop(sa)
  318.         sl(sa, da, beginport)
  319.         db = DummyHandler()
  320.         sb = RawServer(fb, 100, 100)
  321.         loop(sb)
  322.         sl(sb, db, beginport + 1)
  323.  
  324.         sleep(.5)
  325.         ca = sa.start_connection(('127.0.0.1', beginport + 1))
  326.         sleep(1)
  327.         
  328.         assert da.external_made == []
  329.         assert da.data_in == []
  330.         assert da.lost == []
  331.         assert len(db.external_made) == 1
  332.         cb = db.external_made[0]
  333.         del db.external_made[:]
  334.         assert db.data_in == []
  335.         assert db.lost == []
  336.  
  337.         ca.write('aaa')
  338.         cb.write('bbb')
  339.         sleep(1)
  340.         
  341.         assert da.external_made == []
  342.         assert da.data_in == [(ca, 'bbb')]
  343.         del da.data_in[:]
  344.         assert da.lost == []
  345.         assert db.external_made == []
  346.         assert db.data_in == [(cb, 'aaa')]
  347.         del db.data_in[:]
  348.         assert db.lost == []
  349.  
  350.         ca.write('ccc')
  351.         cb.write('ddd')
  352.         sleep(1)
  353.         
  354.         assert da.external_made == []
  355.         assert da.data_in == [(ca, 'ddd')]
  356.         del da.data_in[:]
  357.         assert da.lost == []
  358.         assert db.external_made == []
  359.         assert db.data_in == [(cb, 'ccc')]
  360.         del db.data_in[:]
  361.         assert db.lost == []
  362.  
  363.         ca.close()
  364.         sleep(1)
  365.  
  366.         assert da.external_made == []
  367.         assert da.data_in == []
  368.         assert da.lost == []
  369.         assert db.external_made == []
  370.         assert db.data_in == []
  371.         assert db.lost == [cb]
  372.         del db.lost[:]
  373.     finally:
  374.         fa.set()
  375.         fb.set()
  376.  
  377. def test_receiving_side_close():
  378.     try:
  379.         da = DummyHandler()
  380.         fa = Event()
  381.         sa = RawServer(fa, 100, 100)
  382.         loop(sa)
  383.         sl(sa, da, beginport + 2)
  384.         db = DummyHandler()
  385.         fb = Event()
  386.         sb = RawServer(fb, 100, 100)
  387.         loop(sb)
  388.         sl(sb, db, beginport + 3)
  389.         
  390.         sleep(.5)
  391.         ca = sa.start_connection(('127.0.0.1', beginport + 3))
  392.         sleep(1)
  393.         
  394.         assert da.external_made == []
  395.         assert da.data_in == []
  396.         assert da.lost == []
  397.         assert len(db.external_made) == 1
  398.         cb = db.external_made[0]
  399.         del db.external_made[:]
  400.         assert db.data_in == []
  401.         assert db.lost == []
  402.  
  403.         ca.write('aaa')
  404.         cb.write('bbb')
  405.         sleep(1)
  406.         
  407.         assert da.external_made == []
  408.         assert da.data_in == [(ca, 'bbb')]
  409.         del da.data_in[:]
  410.         assert da.lost == []
  411.         assert db.external_made == []
  412.         assert db.data_in == [(cb, 'aaa')]
  413.         del db.data_in[:]
  414.         assert db.lost == []
  415.  
  416.         ca.write('ccc')
  417.         cb.write('ddd')
  418.         sleep(1)
  419.         
  420.         assert da.external_made == []
  421.         assert da.data_in == [(ca, 'ddd')]
  422.         del da.data_in[:]
  423.         assert da.lost == []
  424.         assert db.external_made == []
  425.         assert db.data_in == [(cb, 'ccc')]
  426.         del db.data_in[:]
  427.         assert db.lost == []
  428.  
  429.         cb.close()
  430.         sleep(1)
  431.  
  432.         assert da.external_made == []
  433.         assert da.data_in == []
  434.         assert da.lost == [ca]
  435.         del da.lost[:]
  436.         assert db.external_made == []
  437.         assert db.data_in == []
  438.         assert db.lost == []
  439.     finally:
  440.         fa.set()
  441.         fb.set()
  442.  
  443. def test_connection_refused():
  444.     try:
  445.         da = DummyHandler()
  446.         fa = Event()
  447.         sa = RawServer(fa, 100, 100)
  448.         loop(sa)
  449.         sl(sa, da, beginport + 6)
  450.  
  451.         sleep(.5)
  452.         ca = sa.start_connection(('127.0.0.1', beginport + 15))
  453.         sleep(1)
  454.         
  455.         assert da.external_made == []
  456.         assert da.data_in == []
  457.         assert da.lost == [ca]
  458.         del da.lost[:]
  459.     finally:
  460.         fa.set()
  461.  
  462. def test_both_close():
  463.     try:
  464.         da = DummyHandler()
  465.         fa = Event()
  466.         sa = RawServer(fa, 100, 100)
  467.         loop(sa)
  468.         sl(sa, da, beginport + 4)
  469.  
  470.         sleep(1)
  471.         db = DummyHandler()
  472.         fb = Event()
  473.         sb = RawServer(fb, 100, 100)
  474.         loop(sb)
  475.         sl(sb, db, beginport + 5)
  476.  
  477.         sleep(.5)
  478.         ca = sa.start_connection(('127.0.0.1', beginport + 5))
  479.         sleep(1)
  480.         
  481.         assert da.external_made == []
  482.         assert da.data_in == []
  483.         assert da.lost == []
  484.         assert len(db.external_made) == 1
  485.         cb = db.external_made[0]
  486.         del db.external_made[:]
  487.         assert db.data_in == []
  488.         assert db.lost == []
  489.  
  490.         ca.write('aaa')
  491.         cb.write('bbb')
  492.         sleep(1)
  493.         
  494.         assert da.external_made == []
  495.         assert da.data_in == [(ca, 'bbb')]
  496.         del da.data_in[:]
  497.         assert da.lost == []
  498.         assert db.external_made == []
  499.         assert db.data_in == [(cb, 'aaa')]
  500.         del db.data_in[:]
  501.         assert db.lost == []
  502.  
  503.         ca.write('ccc')
  504.         cb.write('ddd')
  505.         sleep(1)
  506.         
  507.         assert da.external_made == []
  508.         assert da.data_in == [(ca, 'ddd')]
  509.         del da.data_in[:]
  510.         assert da.lost == []
  511.         assert db.external_made == []
  512.         assert db.data_in == [(cb, 'ccc')]
  513.         del db.data_in[:]
  514.         assert db.lost == []
  515.  
  516.         ca.close()
  517.         cb.close()
  518.         sleep(1)
  519.  
  520.         assert da.external_made == []
  521.         assert da.data_in == []
  522.         assert da.lost == []
  523.         assert db.external_made == []
  524.         assert db.data_in == []
  525.         assert db.lost == []
  526.     finally:
  527.         fa.set()
  528.         fb.set()
  529.  
  530. def test_normal():
  531.     l = []
  532.     f = Event()
  533.     s = RawServer(f, 100, 100)
  534.     loop(s)
  535.     sl(s, DummyHandler(), beginport + 7)
  536.     s.add_task(lambda l = l: l.append('b'), 2)
  537.     s.add_task(lambda l = l: l.append('a'), 1)
  538.     s.add_task(lambda l = l: l.append('d'), 4)
  539.     sleep(1.5)
  540.     s.add_task(lambda l = l: l.append('c'), 1.5)
  541.     sleep(3)
  542.     assert l == ['a', 'b', 'c', 'd']
  543.     f.set()
  544.  
  545. def test_catch_exception():
  546.     l = []
  547.     f = Event()
  548.     s = RawServer(f, 100, 100, False)
  549.     loop(s)
  550.     sl(s, DummyHandler(), beginport + 9)
  551.     s.add_task(lambda l = l: l.append('b'), 2)
  552.     s.add_task(lambda: 4/0, 1)
  553.     sleep(3)
  554.     assert l == ['b']
  555.     f.set()
  556.  
  557. def test_closes_if_not_hit():
  558.     try:
  559.         da = DummyHandler()
  560.         fa = Event()
  561.         sa = RawServer(fa, 2, 2)
  562.         loop(sa)
  563.         sl(sa, da, beginport + 14)
  564.  
  565.         sleep(1)
  566.         db = DummyHandler()
  567.         fb = Event()
  568.         sb = RawServer(fb, 100, 100)
  569.         loop(sb)
  570.         sl(sb, db, beginport + 13)
  571.         
  572.         sleep(.5)
  573.         sa.start_connection(('127.0.0.1', beginport + 13))
  574.         sleep(1)
  575.         
  576.         assert da.external_made == []
  577.         assert da.data_in == []
  578.         assert da.lost == []
  579.         assert len(db.external_made) == 1
  580.         del db.external_made[:]
  581.         assert db.data_in == []
  582.         assert db.lost == []
  583.  
  584.         sleep(3.1)
  585.         
  586.         assert len(da.lost) == 1
  587.         assert len(db.lost) == 1
  588.     finally:
  589.         fa.set()
  590.         fb.set()
  591.  
  592. def test_does_not_close_if_hit():
  593.     try:
  594.         fa = Event()
  595.         fb = Event()
  596.         da = DummyHandler()
  597.         sa = RawServer(fa, 2, 2)
  598.         loop(sa)
  599.         sl(sa, da, beginport + 12)
  600.  
  601.         sleep(1)
  602.         db = DummyHandler()
  603.         sb = RawServer(fb, 100, 100)
  604.         loop(sb)
  605.         sl(sb, db, beginport + 13)
  606.         
  607.         sleep(.5)
  608.         sa.start_connection(('127.0.0.1', beginport + 13))
  609.         sleep(1)
  610.         
  611.         assert da.external_made == []
  612.         assert da.data_in == []
  613.         assert da.lost == []
  614.         assert len(db.external_made) == 1
  615.         cb = db.external_made[0]
  616.         del db.external_made[:]
  617.         assert db.data_in == []
  618.         assert db.lost == []
  619.  
  620.         cb.write('bbb')
  621.         sleep(.5)
  622.         
  623.         assert da.lost == []
  624.         assert db.lost == []
  625.     finally:
  626.         fa.set()
  627.         fb.set()
  628.