home *** CD-ROM | disk | FTP | other *** search
/ Chip 2006 June / CHIP 2006-06.2.iso / program / freeware / Democracy-0.8.2.exe / xulrunner / python / BitTorrent / RawServer.py < prev    next >
Encoding:
Python Source  |  2006-04-10  |  13.1 KB  |  358 lines

  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.0 (the License).  You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License.  You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10.  
  11. # Written by Bram Cohen
  12.  
  13. import os
  14. import sys
  15. from bisect import insort
  16. import socket
  17. from cStringIO import StringIO
  18. from traceback import print_exc
  19. from errno import EWOULDBLOCK, ENOBUFS
  20. from time import time, sleep
  21. from BitTorrent import CRITICAL, FAQ_URL
  22.  
  23. try:
  24.     from select import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
  25.     timemult = 1000
  26. except ImportError:
  27.     from BitTorrent.selectpoll import poll, error, POLLIN, POLLOUT, POLLERR, POLLHUP
  28.     timemult = 1
  29.  
  30.  
  31. all = POLLIN | POLLOUT
  32.  
  33.  
  34. class SingleSocket(object):
  35.  
  36.     def __init__(self, raw_server, sock, handler, context, ip=None):
  37.         self.raw_server = raw_server
  38.         self.socket = sock
  39.         self.handler = handler
  40.         self.buffer = []
  41.         self.last_hit = time()
  42.         self.fileno = sock.fileno()
  43.         self.connected = False
  44.         self.context = context
  45.         if ip is not None:
  46.             self.ip = ip
  47.         else:
  48.             try:
  49.                 peername = self.socket.getpeername()
  50.             except socket.error:
  51.                 self.ip = 'unknown'
  52.             else:
  53.                 try:
  54.                     self.ip = peername[0]
  55.                 except:
  56.                     assert isinstance(peername, basestring)
  57.                     self.ip = peername # UNIX socket, not really ip
  58.  
  59.     def close(self):
  60.         sock = self.socket
  61.         self.socket = None
  62.         self.buffer = []
  63.         del self.raw_server.single_sockets[self.fileno]
  64.         self.raw_server.poll.unregister(sock)
  65.         self.handler = None
  66.         sock.close()
  67.  
  68.     def shutdown(self, val):
  69.         self.socket.shutdown(val)
  70.  
  71.     def is_flushed(self):
  72.         return len(self.buffer) == 0
  73.  
  74.     def write(self, s):
  75.         assert self.socket is not None
  76.         self.buffer.append(s)
  77.         if len(self.buffer) == 1:
  78.             self.try_write()
  79.  
  80.     def try_write(self):
  81.         if self.connected:
  82.             try:
  83.                 while self.buffer != []:
  84.                     amount = self.socket.send(self.buffer[0])
  85.                     if amount != len(self.buffer[0]):
  86.                         if amount != 0:
  87.                             self.buffer[0] = self.buffer[0][amount:]
  88.                         break
  89.                     del self.buffer[0]
  90.             except socket.error, e:
  91.                 code, msg = e
  92.                 if code != EWOULDBLOCK:
  93.                     self.raw_server.dead_from_write.append(self)
  94.                     return
  95.         if self.buffer == []:
  96.             self.raw_server.poll.register(self.socket, POLLIN)
  97.         else:
  98.             self.raw_server.poll.register(self.socket, all)
  99.  
  100. def default_error_handler(x, y):
  101.     print x
  102.  
  103.  
  104. class RawServer(object):
  105.  
  106.     def __init__(self, doneflag, timeout_check_interval, timeout, noisy=True,
  107.             errorfunc=default_error_handler, bindaddr='', tos=0):
  108.         self.timeout_check_interval = timeout_check_interval
  109.         self.timeout = timeout
  110.         self.bindaddr = bindaddr
  111.         self.tos = tos
  112.         self.poll = poll()
  113.         # {socket: SingleSocket}
  114.         self.single_sockets = {}
  115.         self.dead_from_write = []
  116.         self.doneflag = doneflag
  117.         self.noisy = noisy
  118.         self.errorfunc = errorfunc
  119.         self.funcs = []
  120.         self.externally_added_tasks = []
  121.         self.listening_handlers = {}
  122.         self.serversockets = {}
  123.         self.live_contexts = {None : True}
  124.         self.add_task(self.scan_for_timeouts, timeout_check_interval)
  125.         if sys.platform != 'win32':
  126.             self.wakeupfds = os.pipe()
  127.             self.poll.register(self.wakeupfds[0], POLLIN)
  128.         else:
  129.             # Windows doesn't support pipes with select(). Just prevent sleeps
  130.             # longer than a second instead of proper wakeup for now.
  131.             self.wakeupfds = (None, None)
  132.             def wakeup():
  133.                 self.add_task(wakeup, 1)
  134.             wakeup()
  135.  
  136.     def add_context(self, context):
  137.         self.live_contexts[context] = True
  138.  
  139.     def remove_context(self, context):
  140.         del self.live_contexts[context]
  141.         self.funcs = [x for x in self.funcs if x[2] != context]
  142.  
  143.     def add_task(self, func, delay, context=None):
  144.         if context in self.live_contexts:
  145.             insort(self.funcs, (time() + delay, func, context))
  146.  
  147.     def external_add_task(self, func, delay, context=None):
  148.         self.externally_added_tasks.append((func, delay, context))
  149.         # Wake up the RawServer thread in case it's sleeping in poll()
  150.         if self.wakeupfds[1] is not None:
  151.             os.write(self.wakeupfds[1], 'X')
  152.  
  153.     def scan_for_timeouts(self):
  154.         self.add_task(self.scan_for_timeouts, self.timeout_check_interval)
  155.         t = time() - self.timeout
  156.         tokill = []
  157.         for s in self.single_sockets.values():
  158.             if s.last_hit < t:
  159.                 tokill.append(s)
  160.         for k in tokill:
  161.             if k.socket is not None:
  162.                 self._close_socket(k)
  163.  
  164.     def create_serversocket(port, bind='', reuse=False, tos=0):
  165.         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  166.         if reuse and os.name != 'nt':
  167.             server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  168.         server.setblocking(0)
  169.         if tos != 0:
  170.             try:
  171.                 server.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, tos)
  172.             except:
  173.                 pass
  174.         server.bind((bind, port))
  175.         server.listen(5)
  176.         return server
  177.     create_serversocket = staticmethod(create_serversocket)
  178.  
  179.     def start_listening(self, serversocket, handler, context=None):
  180.         self.listening_handlers[serversocket.fileno()] = (handler, context)
  181.         self.serversockets[serversocket.fileno()] = serversocket
  182.         self.poll.register(serversocket, POLLIN)
  183.  
  184.     def stop_listening(self, serversocket):
  185.         del self.listening_handlers[serversocket.fileno()]
  186.         del self.serversockets[serversocket.fileno()]
  187.         self.poll.unregister(serversocket)
  188.  
  189.     def start_connection(self, dns, handler=None, context=None, do_bind=True):
  190.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  191.         sock.setblocking(0)
  192.         if do_bind and self.bindaddr:
  193.             sock.bind((self.bindaddr, 0))
  194.         if self.tos != 0:
  195.             try:
  196.                 sock.setsockopt(socket.IPPROTO_IP, socket.IP_TOS, self.tos)
  197.             except:
  198.                 pass
  199.         try:
  200.             sock.connect_ex(dns)
  201.         except socket.error:
  202.             sock.close()
  203.             raise
  204.         except Exception, e:
  205.             sock.close()
  206.             raise socket.error(str(e))
  207.         self.poll.register(sock, POLLIN)
  208.         s = SingleSocket(self, sock, handler, context, dns[0])
  209.         self.single_sockets[sock.fileno()] = s
  210.         return s
  211.  
  212.     def wrap_socket(self, sock, handler, context=None, ip=None):
  213.         sock.setblocking(0)
  214.         self.poll.register(sock, POLLIN)
  215.         s = SingleSocket(self, sock, handler, context, ip)
  216.         self.single_sockets[sock.fileno()] = s
  217.         return s
  218.  
  219.     def _handle_events(self, events):
  220.         for sock, event in events:
  221.             if sock in self.serversockets:
  222.                 s = self.serversockets[sock]
  223.                 if event & (POLLHUP | POLLERR) != 0:
  224.                     self.poll.unregister(s)
  225.                     s.close()
  226.                     self.errorfunc(CRITICAL, 'lost server socket')
  227.                 else:
  228.                     try:
  229.                         handler, context = self.listening_handlers[sock]
  230.                         newsock, addr = s.accept()
  231.                         newsock.setblocking(0)
  232.                         nss = SingleSocket(self, newsock, handler, context)
  233.                         self.single_sockets[newsock.fileno()] = nss
  234.                         self.poll.register(newsock, POLLIN)
  235.                         self._make_wrapped_call(handler. \
  236.                            external_connection_made, (nss,), context=context)
  237.                     except socket.error:
  238.                         sleep(1)
  239.             else:
  240.                 s = self.single_sockets.get(sock)
  241.                 if s is None:
  242.                     if sock == self.wakeupfds[0]:
  243.                         # Another thread wrote this just to wake us up.
  244.                         os.read(sock, 1)
  245.                     continue
  246.                 s.connected = True
  247.                 if event & POLLERR:
  248.                     self._close_socket(s)
  249.                     continue
  250.                 if event & (POLLIN | POLLHUP):
  251.                     s.last_hit = time()
  252.                     try:
  253.                         data = s.socket.recv(100000)
  254.                     except socket.error, e:
  255.                         code, msg = e
  256.                         if code != EWOULDBLOCK:
  257.                             self._close_socket(s)
  258.                         continue
  259.                     if data == '':
  260.                         self._close_socket(s)
  261.                     else:
  262.                         self._make_wrapped_call(s.handler.data_came_in,
  263.                                                 (s, data), s)
  264.                 # data_came_in could have closed the socket (s.socket = None)
  265.                 if event & POLLOUT and s.socket is not None:
  266.                     s.try_write()
  267.                     if s.is_flushed():
  268.                         self._make_wrapped_call(s.handler.connection_flushed,
  269.                                                 (s,), s)
  270.  
  271.     def _pop_externally_added(self):
  272.         while self.externally_added_tasks:
  273.             task = self.externally_added_tasks.pop(0)
  274.             self.add_task(*task)
  275.  
  276.     def listen_forever(self):
  277.         while not self.doneflag.isSet():
  278.             try:
  279.                 self._pop_externally_added()
  280.                 if len(self.funcs) == 0:
  281.                     period = 1e9
  282.                 else:
  283.                     period = self.funcs[0][0] - time()
  284.                 if period < 0:
  285.                     period = 0
  286.                 events = self.poll.poll(period * timemult)
  287.                 if self.doneflag.isSet():
  288.                     return
  289.                 while len(self.funcs) > 0 and self.funcs[0][0] <= time():
  290.                     garbage, func, context = self.funcs.pop(0)
  291.                     self._make_wrapped_call(func, (), context=context)
  292.                 self._close_dead()
  293.                 self._handle_events(events)
  294.                 if self.doneflag.isSet():
  295.                     return
  296.                 self._close_dead()
  297.             except error, e:
  298.                 if self.doneflag.isSet():
  299.                     return
  300.                 # I can't find a coherent explanation for what the behavior
  301.                 # should be here, and people report conflicting behavior,
  302.                 # so I'll just try all the possibilities
  303.                 try:
  304.                     code, msg, desc = e
  305.                 except:
  306.                     try:
  307.                         code, msg = e
  308.                     except:
  309.                         code = ENOBUFS
  310.                 if code == ENOBUFS:
  311.                     self.errorfunc(CRITICAL, "Have to exit due to the TCP "
  312.                                    "stack flaking out. "
  313.                                    "Please see the FAQ at %s"%FAQ_URL)
  314.                     return
  315.             except KeyboardInterrupt:
  316.                 print_exc()
  317.                 return
  318.             except:
  319.                 data = StringIO()
  320.                 print_exc(file=data)
  321.                 self.errorfunc(CRITICAL, data.getvalue())
  322.  
  323.     def _make_wrapped_call(self, function, args, socket=None, context=None):
  324.         try:
  325.             function(*args)
  326.         except KeyboardInterrupt:
  327.             raise
  328.         except Exception, e:         # hopefully nothing raises strings
  329.             # Incoming sockets can be assigned to a particular torrent during
  330.             # a data_came_in call, and it's possible (though not likely) that
  331.             # there could be a torrent-specific exception during the same call.
  332.             # Therefore read the context after the call.
  333.             if socket is not None:
  334.                 context = socket.context
  335.             if self.noisy and context is None:
  336.                 data = StringIO()
  337.                 print_exc(file=data)
  338.                 self.errorfunc(CRITICAL, data.getvalue())
  339.             if context is not None:
  340.                 context.got_exception(e)
  341.  
  342.     def _close_dead(self):
  343.         while len(self.dead_from_write) > 0:
  344.             old = self.dead_from_write
  345.             self.dead_from_write = []
  346.             for s in old:
  347.                 if s.socket is not None:
  348.                     self._close_socket(s)
  349.  
  350.     def _close_socket(self, s):
  351.         sock = s.socket.fileno()
  352.         s.socket.close()
  353.         self.poll.unregister(sock)
  354.         del self.single_sockets[sock]
  355.         s.socket = None
  356.         self._make_wrapped_call(s.handler.connection_lost, (s,), s)
  357.         s.handler = None
  358.