home *** CD-ROM | disk | FTP | other *** search
/ Chip 2006 June / CHIP 2006-06.2.iso / program / freeware / Democracy-0.8.2.exe / xulrunner / python / BitTorrent / download.py < prev    next >
Encoding:
Python Source  |  2006-04-10  |  21.4 KB  |  553 lines

  1. # The contents of this file are subject to the BitTorrent Open Source License
  2. # Version 1.0 (the License).  You may not copy or use this file, in either
  3. # source code or executable form, except in compliance with the License.  You
  4. # may obtain a copy of the License at http://www.bittorrent.com/license/.
  5. #
  6. # Software distributed under the License is distributed on an AS IS basis,
  7. # WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License
  8. # for the specific language governing rights and limitations under the
  9. # License.
  10.  
  11. # Written by Bram Cohen and Uoti Urpala
  12.  
  13. from __future__ import division
  14. # required for python 2.2
  15. from __future__ import generators
  16.  
  17. import os
  18. import sys
  19. import threading
  20. import gc
  21. from sha import sha
  22. from socket import error as socketerror
  23. from random import seed
  24. from time import time
  25. from cStringIO import StringIO
  26. from traceback import print_exc
  27. from math import sqrt
  28. try:
  29.     getpid = os.getpid
  30. except AttributeError:
  31.     def getpid():
  32.         return 1
  33.  
  34. from BitTorrent.btformats import check_message
  35. from BitTorrent.Choker import Choker
  36. from BitTorrent.Storage import Storage, FilePool
  37. from BitTorrent.StorageWrapper import StorageWrapper
  38. from BitTorrent.Uploader import Upload
  39. from BitTorrent.Downloader import Downloader
  40. from BitTorrent.Encoder import Encoder, SingleportListener
  41. from BitTorrent.RateLimiter import RateLimiter
  42. from BitTorrent.RawServer import RawServer
  43. from BitTorrent.Rerequester import Rerequester
  44. from BitTorrent.DownloaderFeedback import DownloaderFeedback
  45. from BitTorrent.RateMeasure import RateMeasure
  46. from BitTorrent.CurrentRateMeasure import Measure
  47. from BitTorrent.PiecePicker import PiecePicker
  48. from BitTorrent.ConvertedMetainfo import set_filesystem_encoding
  49. from BitTorrent import version
  50. from BitTorrent import BTFailure, BTShutdown, INFO, WARNING, ERROR, CRITICAL
  51.  
  52.  
  53. class Feedback(object):
  54.  
  55.     def finished(self, torrent):
  56.         pass
  57.  
  58.     def failed(self, torrent, is_external):
  59.         pass
  60.  
  61.     def error(self, torrent, level, text):
  62.         pass
  63.  
  64.     def exception(self, torrent, text):
  65.         self.error(torrent, CRITICAL, text)
  66.  
  67.     def started(self, torrent):
  68.         pass
  69.  
  70.  
  71. class Multitorrent(object):
  72.  
  73.     def __init__(self, config, doneflag, errorfunc, listen_fail_ok=False):
  74.         self.config = dict(config)
  75.         self.errorfunc = errorfunc
  76.         self.rawserver = RawServer(doneflag, config['timeout_check_interval'],
  77.                                    config['timeout'], errorfunc=errorfunc,
  78.                                    bindaddr=config['bind'])
  79.         self.singleport_listener = SingleportListener(self.rawserver)
  80.         self._find_port(listen_fail_ok)
  81.         self.filepool = FilePool(config['max_files_open'])
  82.         self.ratelimiter = RateLimiter(self.rawserver.add_task)
  83.         self.ratelimiter.set_parameters(config['max_upload_rate'],
  84.                                         config['upload_unit_size'])
  85.         set_filesystem_encoding(config['filesystem_encoding'],
  86.                                                  errorfunc)
  87.  
  88.     def _find_port(self, listen_fail_ok=True):
  89.         e = 'maxport less than minport - no ports to check'
  90.         if self.config['minport'] <= 0:
  91.             self.config['minport'] = 1
  92.         for port in xrange(self.config['minport'], self.config['maxport'] + 1):
  93.             try:
  94.                 self.singleport_listener.open_port(port, self.config)
  95.                 break
  96.             except socketerror, e:
  97.                 pass
  98.         else:
  99.             if not listen_fail_ok:
  100.                 raise BTFailure, "Couldn't open a listening port: " + str(e)
  101.             self.errorfunc(CRITICAL, "Could not open a listening port: " +
  102.                            str(e) + ". Check your port range settings.")
  103.  
  104.     def close_listening_socket(self):
  105.         self.singleport_listener.close_sockets()
  106.  
  107.     def start_torrent(self, metainfo, config, feedback, filename):
  108.         torrent = _SingleTorrent(self.rawserver, self.singleport_listener,
  109.                                  self.ratelimiter, self.filepool, config)
  110.         self.rawserver.add_context(torrent)
  111.         def start():
  112.             torrent.start_download(metainfo, feedback, filename)
  113.         self.rawserver.add_task(start, 0, torrent)
  114.         return torrent
  115.  
  116.     def set_option(self, option, value):
  117.         if option not in self.config or self.config[option] == value:
  118.             return
  119.         if option not in 'max_upload_rate upload_unit_size '\
  120.                'max_files_open minport maxport'.split():
  121.             return
  122.         self.config[option] = value
  123.         if option == 'max_files_open':
  124.             self.filepool.set_max_files_open(value)
  125.         elif option == 'max_upload_rate':
  126.             self.ratelimiter.set_parameters(value,
  127.                                             self.config['upload_unit_size'])
  128.         elif option == 'upload_unit_size':
  129.             self.ratelimiter.set_parameters(self.config['max_upload_rate'],
  130.                                             value)
  131.         elif option == 'maxport':
  132.             if not self.config['minport'] <= self.singleport_listener.port <= \
  133.                    self.config['maxport']:
  134.                 self._find_port()
  135.  
  136.     def get_completion(self, config, metainfo, save_path, filelist=False):
  137.         if not config['data_dir']:
  138.             return None
  139.         infohash = metainfo.infohash
  140.         if metainfo.is_batch:
  141.             myfiles = [os.path.join(save_path, f) for f in metainfo.files_fs]
  142.         else:
  143.             myfiles = [save_path]
  144.  
  145.         if metainfo.total_bytes == 0:
  146.             if filelist:
  147.                 return None
  148.             return 1
  149.         try:
  150.             s = Storage(None, None, zip(myfiles, metainfo.sizes),
  151.                         check_only=True)
  152.         except:
  153.             return None
  154.         filename = os.path.join(config['data_dir'], 'resume',
  155.                                 infohash.encode('hex'))
  156.         try:
  157.             f = file(filename, 'rb')
  158.         except:
  159.             f = None
  160.         try:
  161.             r = s.check_fastresume(f, filelist, metainfo.piece_length,
  162.                                    len(metainfo.hashes), myfiles)
  163.         except:
  164.             r = None
  165.         if f is not None:
  166.             f.close()
  167.         if r is None:
  168.             return None
  169.         if filelist:
  170.             return r[0] / metainfo.total_bytes, r[1], r[2]
  171.         return r / metainfo.total_bytes
  172.  
  173.  
  174. class _SingleTorrent(object):
  175.  
  176.     def __init__(self, rawserver, singleport_listener, ratelimiter, filepool,
  177.                  config):
  178.         self._rawserver = rawserver
  179.         self._singleport_listener = singleport_listener
  180.         self._ratelimiter = ratelimiter
  181.         self._filepool = filepool
  182.         self.config = dict(config)
  183.         self._storage = None
  184.         self._storagewrapper = None
  185.         self._ratemeasure = None
  186.         self._upmeasure = None
  187.         self._downmeasure = None
  188.         self._encoder = None
  189.         self._rerequest = None
  190.         self._statuscollecter = None
  191.         self._announced = False
  192.         self._listening = False
  193.         self.reserved_ports = []
  194.         self.reported_port = None
  195.         self._myfiles = None
  196.         self.started = False
  197.         self.is_seed = False
  198.         self.closed = False
  199.         self.infohash = None
  200.         self.total_bytes = None
  201.         self._doneflag = threading.Event()
  202.         self.finflag = threading.Event()
  203.         self._hashcheck_thread = None
  204.         self._contfunc = None
  205.         self._activity = ('Initial startup', 0)
  206.         self.feedback = None
  207.         self.errors = []
  208.  
  209.     def start_download(self, *args, **kwargs):
  210.         it = self._start_download(*args, **kwargs)
  211.         def cont():
  212.             try:
  213.                 it.next()
  214.             except StopIteration:
  215.                 self._contfunc = None
  216.         def contfunc():
  217.             self._rawserver.external_add_task(cont, 0, self)
  218.         self._contfunc = contfunc
  219.         contfunc()
  220.  
  221.     def _start_download(self, metainfo, feedback, save_path):
  222.         self.feedback = feedback
  223.         config = self.config
  224.         self._set_auto_uploads()
  225.  
  226.         self.infohash = metainfo.infohash
  227.         self.total_bytes = metainfo.total_bytes
  228.         if not metainfo.reported_errors:
  229.             metainfo.show_encoding_errors(self._error)
  230.  
  231.         myid = self._make_id()
  232.         seed(myid)
  233.         def schedfunc(func, delay):
  234.             self._rawserver.add_task(func, delay, self)
  235.         def externalsched(func, delay):
  236.             self._rawserver.external_add_task(func, delay, self)
  237.         if metainfo.is_batch:
  238.             myfiles = [os.path.join(save_path, f) for f in metainfo.files_fs]
  239.         else:
  240.             myfiles = [save_path]
  241.         self._filepool.add_files(myfiles, self)
  242.         self._myfiles = myfiles
  243.         self._storage = Storage(config, self._filepool, zip(myfiles,
  244.                                                             metainfo.sizes))
  245.         resumefile = None
  246.         if config['data_dir']:
  247.             filename = os.path.join(config['data_dir'], 'resume',
  248.                                     self.infohash.encode('hex'))
  249.             if os.path.exists(filename):
  250.                 try:
  251.                     resumefile = file(filename, 'rb')
  252.                     if self._storage.check_fastresume(resumefile) == 0:
  253.                         resumefile.close()
  254.                         resumefile = None
  255.                 except Exception, e:
  256.                     self._error(WARNING, 'Could not load fastresume data: '+
  257.                                 str(e) + '. Will perform full hash check.')
  258.                     if resumefile is not None:
  259.                         resumefile.close()
  260.                     resumefile = None
  261.         def data_flunked(amount, index):
  262.             self._ratemeasure.data_rejected(amount)
  263.             self._error(INFO, 'piece %d failed hash check, '
  264.                         're-downloading it' % index)
  265.         backthread_exception = []
  266.         def errorfunc(level, text):
  267.             def e():
  268.                 self._error(level, text)
  269.             externalsched(e, 0)
  270.         def hashcheck():
  271.             def statusfunc(activity = None, fractionDone = 0):
  272.                 if activity is None:
  273.                     activity = self._activity[0]
  274.                 self._activity = (activity, fractionDone)
  275.             try:
  276.                 self._storagewrapper = StorageWrapper(self._storage,
  277.                      config, metainfo.hashes, metainfo.piece_length,
  278.                      self._finished, statusfunc, self._doneflag, data_flunked,
  279.                      self.infohash, errorfunc, resumefile)
  280.             except:
  281.                 backthread_exception.append(sys.exc_info())
  282.             self._contfunc()
  283.         thread = threading.Thread(target = hashcheck, \
  284.                                   name = "Bittorrent hashcheck")
  285.         thread.setDaemon(False)
  286.         self._hashcheck_thread = thread
  287.         thread.start()
  288.         yield None
  289.         self._hashcheck_thread = None
  290.         if resumefile is not None:
  291.             resumefile.close()
  292.         if backthread_exception:
  293.             a, b, c = backthread_exception[0]
  294.             raise a, b, c
  295.  
  296.         if self._storagewrapper.amount_left == 0:
  297.             self._finished()
  298.         choker = Choker(config, schedfunc, self.finflag.isSet)
  299.         upmeasure = Measure(config['max_rate_period'])
  300.         upmeasure_seedtime = Measure(config['max_rate_period_seedtime'])
  301.         downmeasure = Measure(config['max_rate_period'])
  302.         self._upmeasure = upmeasure
  303.         self._downmeasure = downmeasure
  304.         self._ratemeasure = RateMeasure(self._storagewrapper.
  305.                                         amount_left_with_partials)
  306.         picker = PiecePicker(len(metainfo.hashes), config)
  307.         for i in xrange(len(metainfo.hashes)):
  308.             if self._storagewrapper.do_I_have(i):
  309.                 picker.complete(i)
  310.         for i in self._storagewrapper.stat_dirty:
  311.             picker.requested(i)
  312.         def kickpeer(connection):
  313.             def kick():
  314.                 connection.close()
  315.             schedfunc(kick, 0)
  316.         def banpeer(ip):
  317.             self._encoder.ban(ip)
  318.         downloader = Downloader(config, self._storagewrapper, picker,
  319.             len(metainfo.hashes), downmeasure, self._ratemeasure.data_came_in,
  320.                                 kickpeer, banpeer)
  321.         def make_upload(connection):
  322.             return Upload(connection, self._ratelimiter, upmeasure,
  323.                         upmeasure_seedtime, choker, self._storagewrapper,
  324.                         config['max_slice_length'], config['max_rate_period'])
  325.         self._encoder = Encoder(make_upload, downloader, choker,
  326.                      len(metainfo.hashes), self._ratelimiter, self._rawserver,
  327.                      config, myid, schedfunc, self.infohash, self)
  328.         self.reported_port = self.config['forwarded_port']
  329.         if not self.reported_port:
  330.             self.reported_port = self._singleport_listener.get_port()
  331.             self.reserved_ports.append(self.reported_port)
  332.         self._singleport_listener.add_torrent(self.infohash, self._encoder)
  333.         self._listening = True
  334.         self._rerequest = Rerequester(metainfo.announce, config,
  335.             schedfunc, self._encoder.how_many_connections,
  336.             self._encoder.start_connection, externalsched,
  337.             self._storagewrapper.get_amount_left, upmeasure.get_total,
  338.             downmeasure.get_total, self.reported_port, myid,
  339.             self.infohash, self._error, self.finflag, upmeasure.get_rate,
  340.             downmeasure.get_rate, self._encoder.ever_got_incoming,
  341.             self.internal_shutdown, self._announce_done)
  342.         self._statuscollecter = DownloaderFeedback(choker, upmeasure.get_rate,
  343.             upmeasure_seedtime.get_rate, downmeasure.get_rate,
  344.             upmeasure.get_total, downmeasure.get_total,
  345.             self._ratemeasure.get_time_left, self._ratemeasure.get_size_left,
  346.             self.total_bytes, self.finflag, downloader, self._myfiles)
  347.  
  348.         self._announced = True
  349.         self._rerequest.begin()
  350.         self.started = True
  351.         if not self.finflag.isSet():
  352.             self._activity = ('downloading', 0)
  353.         self.feedback.started(self)
  354.  
  355.     def got_exception(self, e):
  356.         is_external = False
  357.         if isinstance(e, BTShutdown):
  358.             self._error(ERROR, str(e))
  359.             is_external = True
  360.         elif isinstance(e, BTFailure):
  361.             self._error(CRITICAL, str(e))
  362.             self._activity = ('download failed: ' + str(e), 0)
  363.         elif isinstance(e, IOError):
  364.             self._error(CRITICAL, 'IO Error ' + str(e))
  365.             self._activity = ('killed by IO error: ' + str(e), 0)
  366.         elif isinstance(e, OSError):
  367.             self._error(CRITICAL, 'OS Error ' + str(e))
  368.             self._activity = ('killed by OS error: ' + str(e), 0)
  369.         else:
  370.             data = StringIO()
  371.             print_exc(file=data)
  372.             self._error(CRITICAL, data.getvalue(), True)
  373.             self._activity = ('killed by internal exception: ' + str(e), 0)
  374.         try:
  375.             self._close()
  376.         except Exception, e:
  377.             self._error(ERROR, 'Additional error when closing down due to '
  378.                         'error: ' + str(e))
  379.         if is_external:
  380.             self.feedback.failed(self, True)
  381.             return
  382.         if self.config['data_dir'] and self._storage is not None:
  383.             filename = os.path.join(self.config['data_dir'], 'resume',
  384.                                     self.infohash.encode('hex'))
  385.             if os.path.exists(filename):
  386.                 try:
  387.                     os.remove(filename)
  388.                 except Exception, e:
  389.                     self._error(WARNING, 'Could not remove fastresume file '
  390.                                 'after failure:' + str(e))
  391.         self.feedback.failed(self, False)
  392.  
  393.     def _finished(self):
  394.         self.finflag.set()
  395.         # Call self._storage.close() to flush buffers and change files to
  396.         # read-only mode (when they're possibly reopened). Let exceptions
  397.         # from self._storage.close() kill the torrent since files might not
  398.         # be correct on disk if file.close() failed.
  399.         self._storage.close()
  400.         # If we haven't announced yet, normal first announce done later will
  401.         # tell the tracker about seed status.
  402.         self.is_seed = True
  403.         if self._announced:
  404.             self._rerequest.announce_finish()
  405.         self._activity = ('seeding', 1)
  406.         if self.config['check_hashes']:
  407.             self._save_fastresume(True)
  408.         self.feedback.finished(self)
  409.  
  410.     def _save_fastresume(self, on_finish=False):
  411.         if not on_finish and (self.finflag.isSet() or not self.started):
  412.             return
  413.         if not self.config['data_dir']:
  414.             return
  415.         if on_finish:    # self._ratemeasure might not exist yet
  416.             amount_done = self.total_bytes
  417.         else:
  418.             amount_done = self.total_bytes - self._ratemeasure.get_size_left()
  419.         filename = os.path.join(self.config['data_dir'], 'resume',
  420.                                 self.infohash.encode('hex'))
  421.         resumefile = None
  422.         try:
  423.             resumefile = file(filename, 'wb')
  424.             self._storage.write_fastresume(resumefile, amount_done)
  425.             self._storagewrapper.write_fastresume(resumefile)
  426.             resumefile.close()
  427.         except Exception, e:
  428.             self._error(WARNING, 'Could not write fastresume data: ' + str(e))
  429.             if resumefile is not None:
  430.                 resumefile.close()
  431.  
  432.     def shutdown(self):
  433.         if self.closed:
  434.             return
  435.         try:
  436.             self._close()
  437.             self._save_fastresume()
  438.             self._activity = ('shut down', 0)
  439.         except Exception, e:
  440.             self.got_exception(e)
  441.  
  442.     def internal_shutdown(self, level, text):
  443.         # This is only called when announce fails with no peers,
  444.         # don't try to announce again telling we're leaving the torrent
  445.         self._announced = False
  446.         self._error(level, text)
  447.         self.shutdown()
  448.         self.feedback.failed(self, True)
  449.  
  450.     def _close(self):
  451.         if self.closed:
  452.             return
  453.         self.closed = True
  454.         self._rawserver.remove_context(self)
  455.         self._doneflag.set()
  456.         if self._announced:
  457.             self._rerequest.announce_stop()
  458.             self._rerequest.cleanup()
  459.         if self._hashcheck_thread is not None:
  460.             self._hashcheck_thread.join() # should die soon after doneflag set
  461.         if self._myfiles is not None:
  462.             self._filepool.remove_files(self._myfiles)
  463.         if self._listening:
  464.             self._singleport_listener.remove_torrent(self.infohash)
  465.         for port in self.reserved_ports:
  466.             self._singleport_listener.release_port(port)
  467.         if self._encoder is not None:
  468.             self._encoder.close_connections()
  469.         if self._storage is not None:
  470.             self._storage.close()
  471.         self._ratelimiter.clean_closed()
  472.         self._rawserver.add_task(gc.collect, 0, None)
  473.  
  474.     def get_status(self, spew = False, fileinfo=False):
  475.         if self.started and not self.closed:
  476.             r = self._statuscollecter.get_statistics(spew, fileinfo)
  477.             r['activity'] = self._activity[0]
  478.         else:
  479.             r = dict(zip(('activity', 'fractionDone'), self._activity))
  480.         return r
  481.  
  482.     def get_total_transfer(self):
  483.         if self._upmeasure is None:
  484.             return (0, 0)
  485.         return (self._upmeasure.get_total(), self._downmeasure.get_total())
  486.  
  487.     def set_option(self, option, value):
  488.         if self.closed:
  489.             return
  490.         if option not in self.config or self.config[option] == value:
  491.             return
  492.         if option not in 'min_uploads max_uploads max_initiate max_allow_in '\
  493.            'data_dir ip max_upload_rate retaliate_to_garbled_data'.split():
  494.             return
  495.         # max_upload_rate doesn't affect upload rate here, just auto uploads
  496.         self.config[option] = value
  497.         self._set_auto_uploads()
  498.  
  499.     def change_port(self):
  500.         if not self._listening:
  501.             return
  502.         r = self.config['forwarded_port']
  503.         if r:
  504.             for port in self.reserved_ports:
  505.                 self._singleport_listener.release_port(port)
  506.             del self.reserved_ports[:]
  507.             if self.reported_port == r:
  508.                 return
  509.         elif self._singleport_listener.port != self.reported_port:
  510.             r = self._singleport_listener.get_port()
  511.             self.reserved_ports.append(r)
  512.         else:
  513.             return
  514.         self.reported_port = r
  515.         myid = self._make_id()
  516.         self._encoder.my_id = myid
  517.         self._rerequest.change_port(myid, r)
  518.  
  519.     def _announce_done(self):
  520.         for port in self.reserved_ports[:-1]:
  521.             self._singleport_listener.release_port(port)
  522.         del self.reserved_ports[:-1]
  523.  
  524.     def _make_id(self):
  525.         myid = 'M' + version.split()[0].replace('.', '-')
  526.         myid = myid + ('-' * (8-len(myid)))+sha(repr(time())+ ' ' +
  527.                                      str(getpid())).digest()[-6:].encode('hex')
  528.         return myid
  529.  
  530.     def _set_auto_uploads(self):
  531.         uploads = self.config['max_uploads']
  532.         rate = self.config['max_upload_rate']
  533.         if uploads > 0:
  534.             pass
  535.         elif rate <= 0:
  536.             uploads = 7 # unlimited, just guess something here...
  537.         elif rate < 9:
  538.             uploads = 2
  539.         elif rate < 15:
  540.             uploads = 3
  541.         elif rate < 42:
  542.             uploads = 4
  543.         else:
  544.             uploads = int(sqrt(rate * .6))
  545.         self.config['max_uploads_internal'] = uploads
  546.  
  547.     def _error(self, level, text, exception=False):
  548.         self.errors.append((time(), level, text))
  549.         if exception:
  550.             self.feedback.exception(self, text)
  551.         else:
  552.             self.feedback.error(self, level, text)
  553.