home *** CD-ROM | disk | FTP | other *** search
/ Chip 2006 June / CHIP 2006-06.2.iso / program / freeware / Democracy-0.8.2.exe / xulrunner / python / BitTorrent / Connecter.py < prev    next >
Encoding:
Python Source  |  2006-04-10  |  10.2 KB  |  315 lines

  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.0 (the License).  You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License.  You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10.  
  11. # Originally written by Bram Cohen, heavily modified by Uoti Urpala
  12.  
  13. # required for python 2.2
  14. from __future__ import generators
  15.  
  16. from binascii import b2a_hex
  17.  
  18. from BitTorrent.bitfield import Bitfield
  19. from BitTorrent.obsoletepythonsupport import *
  20.  
  21. def toint(s):
  22.     return int(b2a_hex(s), 16)
  23.  
  24. def tobinary(i):
  25.     return (chr(i >> 24) + chr((i >> 16) & 0xFF) +
  26.         chr((i >> 8) & 0xFF) + chr(i & 0xFF))
  27.  
  28. CHOKE = chr(0)
  29. UNCHOKE = chr(1)
  30. INTERESTED = chr(2)
  31. NOT_INTERESTED = chr(3)
  32. # index
  33. HAVE = chr(4)
  34. # index, bitfield
  35. BITFIELD = chr(5)
  36. # index, begin, length
  37. REQUEST = chr(6)
  38. # index, begin, piece
  39. PIECE = chr(7)
  40. # index, begin, piece
  41. CANCEL = chr(8)
  42.  
  43. protocol_name = 'BitTorrent protocol'
  44.  
  45.  
  46. class Connection(object):
  47.  
  48.     def __init__(self, encoder, connection, id, is_local):
  49.         self.encoder = encoder
  50.         self.connection = connection
  51.         self.id = id
  52.         self.ip = connection.ip
  53.         self.locally_initiated = is_local
  54.         self.complete = False
  55.         self.closed = False
  56.         self.got_anything = False
  57.         self.next_upload = None
  58.         self.upload = None
  59.         self.download = None
  60.         self._buffer = []
  61.         self._buffer_len = 0
  62.         self._reader = self._read_messages()
  63.         self._next_len = self._reader.next()
  64.         self._partial_message = None
  65.         self._outqueue = []
  66.         self.choke_sent = True
  67.         if self.locally_initiated:
  68.             connection.write(chr(len(protocol_name)) + protocol_name +
  69.                 (chr(0) * 8) + self.encoder.download_id)
  70.             if self.id is not None:
  71.                 connection.write(self.encoder.my_id)
  72.  
  73.     def close(self):
  74.         if not self.closed:
  75.             self.connection.close()
  76.             self._sever()
  77.  
  78.     def send_interested(self):
  79.         self._send_message(INTERESTED)
  80.  
  81.     def send_not_interested(self):
  82.         self._send_message(NOT_INTERESTED)
  83.  
  84.     def send_choke(self):
  85.         if self._partial_message is None:
  86.             self._send_message(CHOKE)
  87.             self.choke_sent = True
  88.             self.upload.sent_choke()
  89.  
  90.     def send_unchoke(self):
  91.         if self._partial_message is None:
  92.             self._send_message(UNCHOKE)
  93.             self.choke_sent = False
  94.  
  95.     def send_request(self, index, begin, length):
  96.         self._send_message(REQUEST + tobinary(index) +
  97.             tobinary(begin) + tobinary(length))
  98.  
  99.     def send_cancel(self, index, begin, length):
  100.         self._send_message(CANCEL + tobinary(index) +
  101.             tobinary(begin) + tobinary(length))
  102.  
  103.     def send_bitfield(self, bitfield):
  104.         self._send_message(BITFIELD + bitfield)
  105.  
  106.     def send_have(self, index):
  107.         self._send_message(HAVE + tobinary(index))
  108.  
  109.     def send_keepalive(self):
  110.         self._send_message('')
  111.  
  112.     def send_partial(self, bytes):
  113.         if self.closed:
  114.             return 0
  115.         if self._partial_message is None:
  116.             s = self.upload.get_upload_chunk()
  117.             if s is None:
  118.                 return 0
  119.             index, begin, piece = s
  120.             self._partial_message = ''.join((tobinary(len(piece) + 9), PIECE,
  121.                                     tobinary(index), tobinary(begin), piece))
  122.         if bytes < len(self._partial_message):
  123.             self.connection.write(buffer(self._partial_message, 0, bytes))
  124.             self._partial_message = buffer(self._partial_message, bytes)
  125.             return bytes
  126.  
  127.         queue = [str(self._partial_message)]
  128.         self._partial_message = None
  129.         if self.choke_sent != self.upload.choked:
  130.             if self.upload.choked:
  131.                 self._outqueue.append(tobinary(1) + CHOKE)
  132.                 self.upload.sent_choke()
  133.             else:
  134.                 self._outqueue.append(tobinary(1) + UNCHOKE)
  135.             self.choke_sent = self.upload.choked
  136.         queue.extend(self._outqueue)
  137.         self._outqueue = []
  138.         queue = ''.join(queue)
  139.         self.connection.write(queue)
  140.         return len(queue)
  141.  
  142.     # yields the number of bytes it wants next, gets those in self._message
  143.     def _read_messages(self):
  144.         yield 1   # header length
  145.         if ord(self._message) != len(protocol_name):
  146.             return
  147.  
  148.         yield len(protocol_name)
  149.         if self._message != protocol_name:
  150.             return
  151.  
  152.         yield 8  # reserved
  153.  
  154.         yield 20 # download id
  155.         if self.encoder.download_id is None:  # incoming connection
  156.             # modifies self.encoder if successful
  157.             self.encoder.select_torrent(self, self._message)
  158.             if self.encoder.download_id is None:
  159.                 return
  160.         elif self._message != self.encoder.download_id:
  161.             return
  162.         if not self.locally_initiated:
  163.             self.connection.write(chr(len(protocol_name)) + protocol_name +
  164.                 (chr(0) * 8) + self.encoder.download_id + self.encoder.my_id)
  165.  
  166.         yield 20  # peer id
  167.         if not self.id:
  168.             self.id = self._message
  169.             if self.id == self.encoder.my_id:
  170.                 return
  171.             for v in self.encoder.connections.itervalues():
  172.                 if v is not self:
  173.                     if v.id == self.id:
  174.                         return
  175.                     if self.encoder.config['one_connection_per_ip'] and \
  176.                            v.ip == self.ip:
  177.                         return
  178.             if self.locally_initiated:
  179.                 self.connection.write(self.encoder.my_id)
  180.             else:
  181.                 self.encoder.everinc = True
  182.         else:
  183.             if self._message != self.id:
  184.                 return
  185.         self.complete = True
  186.         self.encoder.connection_completed(self)
  187.  
  188.         while True:
  189.             yield 4   # message length
  190.             l = toint(self._message)
  191.             if l > self.encoder.config['max_message_length']:
  192.                 return
  193.             if l > 0:
  194.                 yield l
  195.                 self._got_message(self._message)
  196.  
  197.     def _got_message(self, message):
  198.         t = message[0]
  199.         if t == BITFIELD and self.got_anything:
  200.             self.close()
  201.             return
  202.         self.got_anything = True
  203.         if (t in [CHOKE, UNCHOKE, INTERESTED, NOT_INTERESTED] and
  204.                 len(message) != 1):
  205.             self.close()
  206.             return
  207.         if t == CHOKE:
  208.             self.download.got_choke()
  209.         elif t == UNCHOKE:
  210.             self.download.got_unchoke()
  211.         elif t == INTERESTED:
  212.             self.upload.got_interested()
  213.         elif t == NOT_INTERESTED:
  214.             self.upload.got_not_interested()
  215.         elif t == HAVE:
  216.             if len(message) != 5:
  217.                 self.close()
  218.                 return
  219.             i = toint(message[1:])
  220.             if i >= self.encoder.numpieces:
  221.                 self.close()
  222.                 return
  223.             self.download.got_have(i)
  224.         elif t == BITFIELD:
  225.             try:
  226.                 b = Bitfield(self.encoder.numpieces, message[1:])
  227.             except ValueError:
  228.                 self.close()
  229.                 return
  230.             self.download.got_have_bitfield(b)
  231.         elif t == REQUEST:
  232.             if len(message) != 13:
  233.                 self.close()
  234.                 return
  235.             i = toint(message[1:5])
  236.             if i >= self.encoder.numpieces:
  237.                 self.close()
  238.                 return
  239.             self.upload.got_request(i, toint(message[5:9]),
  240.                 toint(message[9:]))
  241.         elif t == CANCEL:
  242.             if len(message) != 13:
  243.                 self.close()
  244.                 return
  245.             i = toint(message[1:5])
  246.             if i >= self.encoder.numpieces:
  247.                 self.close()
  248.                 return
  249.             self.upload.got_cancel(i, toint(message[5:9]),
  250.                 toint(message[9:]))
  251.         elif t == PIECE:
  252.             if len(message) <= 9:
  253.                 self.close()
  254.                 return
  255.             i = toint(message[1:5])
  256.             if i >= self.encoder.numpieces:
  257.                 self.close()
  258.                 return
  259.             if self.download.got_piece(i, toint(message[5:9]), message[9:]):
  260.                 for co in self.encoder.complete_connections:
  261.                     co.send_have(i)
  262.         else:
  263.             self.close()
  264.  
  265.     def _sever(self):
  266.         self.closed = True
  267.         self._reader = None
  268.         del self.encoder.connections[self.connection]
  269.         self.encoder.replace_connection()
  270.         if self.complete:
  271.             del self.encoder.complete_connections[self]
  272.             self.download.disconnected()
  273.             self.encoder.choker.connection_lost(self)
  274.             self.upload = self.download = None
  275.  
  276.     def _send_message(self, message):
  277.         s = tobinary(len(message)) + message
  278.         if self._partial_message is not None:
  279.             self._outqueue.append(s)
  280.         else:
  281.             self.connection.write(s)
  282.  
  283.     def data_came_in(self, conn, s):
  284.         while True:
  285.             if self.closed:
  286.                 return
  287.             i = self._next_len - self._buffer_len
  288.             if i > len(s):
  289.                 self._buffer.append(s)
  290.                 self._buffer_len += len(s)
  291.                 return
  292.             m = s[:i]
  293.             if self._buffer_len > 0:
  294.                 self._buffer.append(m)
  295.                 m = ''.join(self._buffer)
  296.                 self._buffer = []
  297.                 self._buffer_len = 0
  298.             s = s[i:]
  299.             self._message = m
  300.             try:
  301.                 self._next_len = self._reader.next()
  302.             except StopIteration:
  303.                 self.close()
  304.                 return
  305.  
  306.     def connection_lost(self, conn):
  307.         assert conn is self.connection
  308.         self._sever()
  309.  
  310.     def connection_flushed(self, connection):
  311.         if self.complete:
  312.             if self.next_upload is None and (self._partial_message is not None
  313.                                              or self.upload.buffer):
  314.                 self.encoder.ratelimiter.queue(self)
  315.