home *** CD-ROM | disk | FTP | other *** search
/ Chip 2006 June / CHIP 2006-06.2.iso / program / freeware / Democracy-0.8.2.exe / xulrunner / python / BitTorrent / Downloader.py < prev    next >
Encoding:
Python Source  |  2006-04-10  |  14.0 KB  |  364 lines

  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.0 (the License).  You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License.  You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10.  
  11. # Written by Bram Cohen
  12.  
  13. from random import shuffle
  14. from time import time
  15.  
  16. from BitTorrent.CurrentRateMeasure import Measure
  17. from BitTorrent.bitfield import Bitfield
  18.  
  19.  
  20. class PerIPStats(object):
  21.  
  22.     def __init__(self):
  23.         self.numgood = 0
  24.         self.bad = {}
  25.         self.numconnections = 0
  26.         self.lastdownload = None
  27.         self.peerid = None
  28.  
  29.  
  30. class BadDataGuard(object):
  31.  
  32.     def __init__(self, download):
  33.         self.download = download
  34.         self.ip = download.connection.ip
  35.         self.downloader = download.downloader
  36.         self.stats = self.downloader.perip[self.ip]
  37.         self.lastindex = None
  38.  
  39.     def bad(self, index, bump = False):
  40.         self.stats.bad.setdefault(index, 0)
  41.         self.stats.bad[index] += 1
  42.         if self.ip not in self.downloader.bad_peers:
  43.             self.downloader.bad_peers[self.ip] = (False, self.stats)
  44.         if self.download is not None:
  45.             self.downloader.kick(self.download)
  46.             self.download = None
  47.         elif len(self.stats.bad) > 1 and self.stats.numconnections == 1 and \
  48.              self.stats.lastdownload is not None:
  49.             # kick new connection from same IP if previous one sent bad data,
  50.             # mainly to give the algorithm time to find other bad pieces
  51.             # in case the peer is sending a lot of bad data
  52.             self.downloader.kick(self.stats.lastdownload)
  53.         if len(self.stats.bad) >= 3 and len(self.stats.bad) > \
  54.            self.stats.numgood // 30:
  55.             self.downloader.ban(self.ip)
  56.         elif bump:
  57.             self.downloader.picker.bump(index)
  58.  
  59.     def good(self, index):
  60.         # lastindex is a hack to only increase numgood for by one for each good
  61.         # piece, however many chunks came from the connection(s) from this IP
  62.         if index != self.lastindex:
  63.             self.stats.numgood += 1
  64.             self.lastindex = index
  65.  
  66.  
  67. class SingleDownload(object):
  68.  
  69.     def __init__(self, downloader, connection):
  70.         self.downloader = downloader
  71.         self.connection = connection
  72.         self.choked = True
  73.         self.interested = False
  74.         self.active_requests = []
  75.         self.measure = Measure(downloader.config['max_rate_period'])
  76.         self.peermeasure = Measure(max(downloader.storage.piece_size / 10000,
  77.                                        20))
  78.         self.have = Bitfield(downloader.numpieces)
  79.         self.last = 0
  80.         self.example_interest = None
  81.         self.backlog = 2
  82.         self.guard = BadDataGuard(self)
  83.  
  84.     def _backlog(self):
  85.         backlog = 2 + int(4 * self.measure.get_rate() /
  86.                           self.downloader.chunksize)
  87.         if backlog > 50:
  88.             backlog = max(50, int(.075 * backlog))
  89.         self.backlog = backlog
  90.         return backlog
  91.  
  92.     def disconnected(self):
  93.         self.downloader.lost_peer(self)
  94.         for i in xrange(len(self.have)):
  95.             if self.have[i]:
  96.                 self.downloader.picker.lost_have(i)
  97.         self._letgo()
  98.         self.guard.download = None
  99.  
  100.     def _letgo(self):
  101.         if not self.active_requests:
  102.             return
  103.         if self.downloader.storage.endgame:
  104.             self.active_requests = []
  105.             return
  106.         lost = []
  107.         for index, begin, length in self.active_requests:
  108.             self.downloader.storage.request_lost(index, begin, length)
  109.             if index not in lost:
  110.                 lost.append(index)
  111.         self.active_requests = []
  112.         ds = [d for d in self.downloader.downloads if not d.choked]
  113.         shuffle(ds)
  114.         for d in ds:
  115.             d._request_more(lost)
  116.         for d in self.downloader.downloads:
  117.             if d.choked and not d.interested:
  118.                 for l in lost:
  119.                     if d.have[l] and self.downloader.storage.do_I_have_requests(l):
  120.                         d.interested = True
  121.                         d.connection.send_interested()
  122.                         break
  123.  
  124.     def got_choke(self):
  125.         if not self.choked:
  126.             self.choked = True
  127.             self._letgo()
  128.  
  129.     def got_unchoke(self):
  130.         if self.choked:
  131.             self.choked = False
  132.             if self.interested:
  133.                 self._request_more()
  134.  
  135.     def got_piece(self, index, begin, piece):
  136.         try:
  137.             self.active_requests.remove((index, begin, len(piece)))
  138.         except ValueError:
  139.             self.downloader.discarded_bytes += len(piece)
  140.             return False
  141.         if self.downloader.storage.endgame:
  142.             self.downloader.all_requests.remove((index, begin, len(piece)))
  143.         self.last = time()
  144.         self.measure.update_rate(len(piece))
  145.         self.downloader.measurefunc(len(piece))
  146.         self.downloader.downmeasure.update_rate(len(piece))
  147.         if not self.downloader.storage.piece_came_in(index, begin, piece,
  148.                                                      self.guard):
  149.             if self.downloader.storage.endgame:
  150.                 while self.downloader.storage.do_I_have_requests(index):
  151.                     nb, nl = self.downloader.storage.new_request(index)
  152.                     self.downloader.all_requests.append((index, nb, nl))
  153.                 for d in self.downloader.downloads:
  154.                     d.fix_download_endgame()
  155.                 return False
  156.             ds = [d for d in self.downloader.downloads if not d.choked]
  157.             shuffle(ds)
  158.             for d in ds:
  159.                 d._request_more([index])
  160.             return False
  161.         if self.downloader.storage.do_I_have(index):
  162.             self.downloader.picker.complete(index)
  163.         if self.downloader.storage.endgame:
  164.             for d in self.downloader.downloads:
  165.                 if d is not self and d.interested:
  166.                     if d.choked:
  167.                         d.fix_download_endgame()
  168.                     else:
  169.                         try:
  170.                             d.active_requests.remove((index, begin, len(piece)))
  171.                         except ValueError:
  172.                             continue
  173.                         d.connection.send_cancel(index, begin, len(piece))
  174.                         d.fix_download_endgame()
  175.         self._request_more()
  176.         if self.downloader.picker.am_I_complete():
  177.             for d in [i for i in self.downloader.downloads if i.have.numfalse == 0]:
  178.                 d.connection.close()
  179.         return self.downloader.storage.do_I_have(index)
  180.  
  181.     def _want(self, index):
  182.         return self.have[index] and self.downloader.storage.do_I_have_requests(index)
  183.  
  184.     def _request_more(self, indices = None):
  185.         assert not self.choked
  186.         if len(self.active_requests) >= self._backlog():
  187.             return
  188.         if self.downloader.storage.endgame:
  189.             self.fix_download_endgame()
  190.             return
  191.         lost_interests = []
  192.         while len(self.active_requests) < self.backlog:
  193.             if indices is None:
  194.                 interest = self.downloader.picker.next(self._want, self.have.numfalse == 0)
  195.             else:
  196.                 interest = None
  197.                 for i in indices:
  198.                     if self.have[i] and self.downloader.storage.do_I_have_requests(i):
  199.                         interest = i
  200.                         break
  201.             if interest is None:
  202.                 break
  203.             if not self.interested:
  204.                 self.interested = True
  205.                 self.connection.send_interested()
  206.             self.example_interest = interest
  207.             self.downloader.picker.requested(interest, self.have.numfalse == 0)
  208.             while len(self.active_requests) < (self.backlog-2) * 5 + 2:
  209.                 begin, length = self.downloader.storage.new_request(interest)
  210.                 self.active_requests.append((interest, begin, length))
  211.                 self.connection.send_request(interest, begin, length)
  212.                 if not self.downloader.storage.do_I_have_requests(interest):
  213.                     lost_interests.append(interest)
  214.                     break
  215.         if not self.active_requests and self.interested:
  216.             self.interested = False
  217.             self.connection.send_not_interested()
  218.         if lost_interests:
  219.             for d in self.downloader.downloads:
  220.                 if d.active_requests or not d.interested:
  221.                     continue
  222.                 if d.example_interest is not None and self.downloader.storage.do_I_have_requests(d.example_interest):
  223.                     continue
  224.                 for lost in lost_interests:
  225.                     if d.have[lost]:
  226.                         break
  227.                 else:
  228.                     continue
  229.                 interest = self.downloader.picker.next(d._want, d.have.numfalse == 0)
  230.                 if interest is None:
  231.                     d.interested = False
  232.                     d.connection.send_not_interested()
  233.                 else:
  234.                     d.example_interest = interest
  235.         if self.downloader.storage.endgame:
  236.             self.downloader.all_requests = []
  237.             for d in self.downloader.downloads:
  238.                 self.downloader.all_requests.extend(d.active_requests)
  239.             for d in self.downloader.downloads:
  240.                 d.fix_download_endgame()
  241.  
  242.     def fix_download_endgame(self):
  243.         want = [a for a in self.downloader.all_requests if self.have[a[0]] and a not in self.active_requests]
  244.         if self.interested and not self.active_requests and not want:
  245.             self.interested = False
  246.             self.connection.send_not_interested()
  247.             return
  248.         if not self.interested and want:
  249.             self.interested = True
  250.             self.connection.send_interested()
  251.         if self.choked or len(self.active_requests) >= self._backlog():
  252.             return
  253.         shuffle(want)
  254.         del want[self.backlog - len(self.active_requests):]
  255.         self.active_requests.extend(want)
  256.         for piece, begin, length in want:
  257.             self.connection.send_request(piece, begin, length)
  258.  
  259.     def got_have(self, index):
  260.         if self.have[index]:
  261.             return
  262.         if index == self.downloader.numpieces-1:
  263.             self.peermeasure.update_rate(self.downloader.storage.total_length-
  264.               (self.downloader.numpieces-1)*self.downloader.storage.piece_size)
  265.         else:
  266.             self.peermeasure.update_rate(self.downloader.storage.piece_size)
  267.         self.have[index] = True
  268.         self.downloader.picker.got_have(index)
  269.         if self.downloader.picker.am_I_complete() and self.have.numfalse == 0:
  270.             self.connection.close()
  271.             return
  272.         if self.downloader.storage.endgame:
  273.             self.fix_download_endgame()
  274.         elif self.downloader.storage.do_I_have_requests(index):
  275.             if not self.choked:
  276.                 self._request_more([index])
  277.             else:
  278.                 if not self.interested:
  279.                     self.interested = True
  280.                     self.connection.send_interested()
  281.  
  282.     def got_have_bitfield(self, have):
  283.         if self.downloader.picker.am_I_complete() and have.numfalse == 0:
  284.             self.connection.close()
  285.             return
  286.         self.have = have
  287.         for i in xrange(len(self.have)):
  288.             if self.have[i]:
  289.                 self.downloader.picker.got_have(i)
  290.         if self.downloader.storage.endgame:
  291.             for piece, begin, length in self.downloader.all_requests:
  292.                 if self.have[piece]:
  293.                     self.interested = True
  294.                     self.connection.send_interested()
  295.                     return
  296.         for i in xrange(len(self.have)):
  297.             if self.have[i] and self.downloader.storage.do_I_have_requests(i):
  298.                 self.interested = True
  299.                 self.connection.send_interested()
  300.                 return
  301.  
  302.     def get_rate(self):
  303.         return self.measure.get_rate()
  304.  
  305.     def is_snubbed(self):
  306.         return time() - self.last > self.downloader.snub_time
  307.  
  308.  
  309. class Downloader(object):
  310.  
  311.     def __init__(self, config, storage, picker, numpieces, downmeasure,
  312.                  measurefunc, kickfunc, banfunc):
  313.         self.config = config
  314.         self.storage = storage
  315.         self.picker = picker
  316.         self.chunksize = config['download_slice_size']
  317.         self.downmeasure = downmeasure
  318.         self.numpieces = numpieces
  319.         self.snub_time = config['snub_time']
  320.         self.measurefunc = measurefunc
  321.         self.kickfunc = kickfunc
  322.         self.banfunc = banfunc
  323.         self.downloads = []
  324.         self.perip = {}
  325.         self.bad_peers = {}
  326.         self.discarded_bytes = 0
  327.  
  328.     def make_download(self, connection):
  329.         ip = connection.ip
  330.         perip = self.perip.get(ip)
  331.         if perip is None:
  332.             perip = PerIPStats()
  333.             self.perip[ip] = perip
  334.         perip.numconnections += 1
  335.         d = SingleDownload(self, connection)
  336.         perip.lastdownload = d
  337.         perip.peerid = connection.id
  338.         self.downloads.append(d)
  339.         return d
  340.  
  341.     def lost_peer(self, download):
  342.         self.downloads.remove(download)
  343.         ip = download.connection.ip
  344.         self.perip[ip].numconnections -= 1
  345.         if self.perip[ip].lastdownload == download:
  346.             self.perip[ip].lastdownload = None
  347.  
  348.     def kick(self, download):
  349.         if not self.config['retaliate_to_garbled_data']:
  350.             return
  351.         ip = download.connection.ip
  352.         peerid = download.connection.id
  353.         # kickfunc will schedule connection.close() to be executed later; we
  354.         # might now be inside RawServer event loop with events from that
  355.         # connection already queued, and trying to handle them after doing
  356.         # close() now could cause problems.
  357.         self.kickfunc(download.connection)
  358.  
  359.     def ban(self, ip):
  360.         if not self.config['retaliate_to_garbled_data']:
  361.             return
  362.         self.banfunc(ip)
  363.         self.bad_peers[ip] = (True, self.perip[ip])
  364.