home *** CD-ROM | disk | FTP | other *** search
/ Mundo do CD-ROM 118 / cdrom118.iso / internet / webaroo / WebarooSetup.exe / Webaroo.msi / _A0DEB44B94924E89917E71AA90C5F226 / khashmir / khashmir.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2005-12-23  |  15.1 KB  |  431 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. import const
  5. from BitTorrent.platform import bttime as time
  6. from sha import sha
  7. from BitTorrent.defaultargs import common_options, rare_options
  8. from BitTorrent.RawServer import RawServer
  9. from ktable import KTable, K
  10. from knode import *
  11. from kstore import KStore
  12. from khash import newID, newIDInRange
  13. from util import packNodes
  14. from actions import FindNode, GetValue, KeyExpirer, StoreValue
  15. import krpc
  16. import sys
  17. import os
  18. import traceback
  19. from BitTorrent.bencode import bencode, bdecode
  20. from defer import Deferred
  21. from random import randrange
  22. from threading import Event
  23.  
  24. class KhashmirDBExcept(Exception):
  25.     pass
  26.  
  27.  
  28. class KhashmirBase:
  29.     _Node = KNodeBase
  30.     
  31.     def __init__(self, host, port, data_dir, rawserver = None, max_ul_rate = 1024, checkpoint = True):
  32.         self.max_ul_rate = max_ul_rate
  33.         self.socket = None
  34.         self.setup(host, port, data_dir, checkpoint)
  35.  
  36.     
  37.     def setup(self, host, port, data_dir, checkpoint = True):
  38.         self.host = host
  39.         self.port = port
  40.         self.ddir = data_dir
  41.         self.store = KStore()
  42.         self.pingcache = { }
  43.         self.socket = self.rawserver.create_udpsocket(self.port, self.host, True)
  44.         self.udp = krpc.hostbroker(self, (self.host, self.port), self.socket, self.rawserver.add_task, self.max_ul_rate)
  45.         self._load()
  46.         self.rawserver.start_listening_udp(self.socket, self.udp)
  47.         self.last = time()
  48.         KeyExpirer(self.store, self.rawserver.add_task)
  49.         self.refreshTable(force = 1)
  50.         if checkpoint:
  51.             self.rawserver.add_task(self.findCloseNodes, 30, ((lambda a: a), True))
  52.             self.rawserver.add_task(self.checkpoint, 60, (1,))
  53.         
  54.  
  55.     
  56.     def Node(self):
  57.         n = self._Node(self.udp.connectionForAddr)
  58.         n.table = self
  59.         return n
  60.  
  61.     
  62.     def __del__(self):
  63.         self.rawserver.stop_listen_udp(self.socket)
  64.         self.socket.close()
  65.  
  66.     
  67.     def _load(self):
  68.         do_load = False
  69.         
  70.         try:
  71.             s = open(os.path.join(self.ddir, 'routing_table'), 'r').read()
  72.             dict = bdecode(s)
  73.         except:
  74.             id = newID()
  75.  
  76.         id = dict['id']
  77.         do_load = True
  78.         self.node = self._Node(self.udp.connectionForAddr).init(id, self.host, self.port)
  79.         self.table = KTable(self.node)
  80.         if do_load:
  81.             self._loadRoutingTable(dict['rt'])
  82.         
  83.  
  84.     
  85.     def checkpoint(self, auto = 0):
  86.         d = { }
  87.         d['id'] = self.node.id
  88.         d['rt'] = self._dumpRoutingTable()
  89.         self.refreshTable()
  90.         
  91.         try:
  92.             f = open(os.path.join(self.ddir, 'routing_table'), 'wb')
  93.             f.write(bencode(d))
  94.             f.close()
  95.         except:
  96.             pass
  97.  
  98.         if auto:
  99.             self.rawserver.add_task(self.checkpoint, randrange(int(const.CHECKPOINT_INTERVAL * 0.90000000000000002), int(const.CHECKPOINT_INTERVAL * 1.1000000000000001)), (1,))
  100.         
  101.  
  102.     
  103.     def _loadRoutingTable(self, nodes):
  104.         """
  105.             load routing table nodes from database
  106.             it's usually a good idea to call refreshTable(force=1) after loading the table
  107.         """
  108.         for rec in nodes:
  109.             n = self.Node().initWithDict(rec)
  110.             self.table.insertNode(n, contacted = 0, nocheck = True)
  111.         
  112.  
  113.     
  114.     def _dumpRoutingTable(self):
  115.         '''
  116.             save routing table nodes to the database
  117.         '''
  118.         l = []
  119.         for bucket in self.table.buckets:
  120.             for node in bucket.l:
  121.                 l.append({
  122.                     'id': node.id,
  123.                     'host': node.host,
  124.                     'port': node.port })
  125.             
  126.         
  127.         return l
  128.  
  129.     
  130.     def addContact(self, host, port, callback = None):
  131.         '''
  132.             ping this node and add the contact info to the table on pong!
  133.         '''
  134.         n = self.Node().init(const.NULL_ID, host, port)
  135.         
  136.         try:
  137.             self.sendPing(n, callback = callback)
  138.         except krpc.KRPCSelfNodeError:
  139.             pass
  140.  
  141.  
  142.     
  143.     def findNode(self, id, callback, errback = None):
  144.         ''' returns the contact info for node, or the k closest nodes, from the global table '''
  145.         nodes = self.table.findNodes(id, invalid = False)
  146.         nodes += self.table.findNodes(id, invalid = True)
  147.         d = Deferred()
  148.         if errback:
  149.             d.addCallbacks(callback, errback)
  150.         else:
  151.             d.addCallback(callback)
  152.         if len(nodes) == 1 and nodes[0].id == id:
  153.             d.callback(nodes)
  154.         else:
  155.             state = FindNode(self, id, d.callback, self.rawserver.add_task)
  156.             self.rawserver.external_add_task(state.goWithNodes, 0, (nodes,))
  157.  
  158.     
  159.     def insertNode(self, n, contacted = 1):
  160.         """
  161.         insert a node in our local table, pinging oldest contact in bucket, if necessary
  162.         
  163.         If all you have is a host/port, then use addContact, which calls this method after
  164.         receiving the PONG from the remote node.  The reason for the seperation is we can't insert
  165.         a node into the table without it's peer-ID.  That means of course the node passed into this
  166.         method needs to be a properly formed Node object with a valid ID.
  167.         """
  168.         old = self.table.insertNode(n, contacted = contacted)
  169.         if old and old != n:
  170.             if not old.inPing():
  171.                 self.checkOldNode(old, n, contacted)
  172.             else:
  173.                 l = self.pingcache.get(old.id, [])
  174.                 l.append((n, contacted))
  175.                 self.pingcache[old.id] = l
  176.         
  177.  
  178.     
  179.     def checkOldNode(self, old, new, contacted = False):
  180.         
  181.         def _staleNodeHandler(dict):
  182.             ''' called if the pinged node never responds '''
  183.             if old.fails >= 2:
  184.                 o = self.table.replaceStaleNode(old, new)
  185.                 if o and o != new:
  186.                     self.checkOldNode(o, new)
  187.                     
  188.                     try:
  189.                         self.pingcache[o.id] = self.pingcache[old.id]
  190.                         del self.pingcache[old.id]
  191.                     except KeyError:
  192.                         pass
  193.                     except:
  194.                         None<EXCEPTION MATCH>KeyError
  195.                     
  196.  
  197.                 None<EXCEPTION MATCH>KeyError
  198.                 l = self.pingcache.get(old.id, [])
  199.                 if l:
  200.                     del self.pingcache[old.id]
  201.                     for node in l:
  202.                         self.insertNode(node[0], node[1])
  203.                     
  204.                 
  205.             else:
  206.                 l = self.pingcache.get(old.id, [])
  207.                 if l:
  208.                     del self.pingcache[old.id]
  209.                 
  210.                 self.insertNode(new)
  211.                 for node in l:
  212.                     self.insertNode(node[0], node[1])
  213.                 
  214.  
  215.         
  216.         def _notStaleNodeHandler(dict):
  217.             ''' called when we get a pong from the old node '''
  218.             self.table.insertNode(old, True)
  219.             self.insertNode(new, contacted)
  220.             for node in self.pingcache.get(old.id, []):
  221.                 self.insertNode(node[0], node[1])
  222.             
  223.             
  224.             try:
  225.                 del self.pingcache[old.id]
  226.             except KeyError:
  227.                 pass
  228.  
  229.  
  230.         
  231.         try:
  232.             df = old.ping(self.node.id)
  233.         except krpc.KRPCSelfNodeError:
  234.             pass
  235.  
  236.         df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
  237.  
  238.     
  239.     def sendPing(self, node, callback = None):
  240.         '''
  241.             ping a node
  242.         '''
  243.         
  244.         try:
  245.             df = node.ping(self.node.id)
  246.         except krpc.KRPCSelfNodeError:
  247.             pass
  248.  
  249.         
  250.         def _pongHandler(dict, node = node, table = self.table, callback = callback):
  251.             _krpc_sender = dict['_krpc_sender']
  252.             dict = dict['rsp']
  253.             sender = {
  254.                 'id': dict['id'] }
  255.             sender['host'] = _krpc_sender[0]
  256.             sender['port'] = _krpc_sender[1]
  257.             n = self.Node().initWithDict(sender)
  258.             table.insertNode(n)
  259.             if callback:
  260.                 callback()
  261.             
  262.  
  263.         
  264.         def _defaultPong(err, node = node, table = self.table, callback = callback):
  265.             if callback:
  266.                 callback()
  267.             
  268.  
  269.         df.addCallbacks(_pongHandler, _defaultPong)
  270.  
  271.     
  272.     def findCloseNodes(self, callback = (lambda a: a), auto = False):
  273.         '''
  274.             This does a findNode on the ID one away from our own.  
  275.             This will allow us to populate our table with nodes on our network closest to our own.
  276.             This is called as soon as we start up with an empty table
  277.         '''
  278.         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
  279.         self.findNode(id, callback)
  280.         if auto:
  281.             self.rawserver.add_task(self.findCloseNodes, randrange(int(const.FIND_CLOSE_INTERVAL * 0.90000000000000002), int(const.FIND_CLOSE_INTERVAL * 1.1000000000000001)), ((lambda a: True), True))
  282.         
  283.  
  284.     
  285.     def refreshTable(self, force = 0):
  286.         '''
  287.             force=1 will refresh table regardless of last bucket access time
  288.         '''
  289.         
  290.         def callback(nodes):
  291.             pass
  292.  
  293.         for bucket in self.table.buckets:
  294.             if force or time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
  295.                 id = newIDInRange(bucket.min, bucket.max)
  296.                 self.findNode(id, callback)
  297.                 continue
  298.         
  299.  
  300.     
  301.     def stats(self):
  302.         '''
  303.         Returns (num_contacts, num_nodes)
  304.         num_contacts: number contacts in our routing table
  305.         num_nodes: number of nodes estimated in the entire dht
  306.         '''
  307.         num_contacts = reduce((lambda a, b: a + len(b.l)), self.table.buckets, 0)
  308.         num_nodes = const.K * 2 ** (len(self.table.buckets) - 1)
  309.         return {
  310.             'num_contacts': num_contacts,
  311.             'num_nodes': num_nodes }
  312.  
  313.     
  314.     def krpc_ping(self, id, _krpc_sender):
  315.         sender = {
  316.             'id': id }
  317.         sender['host'] = _krpc_sender[0]
  318.         sender['port'] = _krpc_sender[1]
  319.         n = self.Node().initWithDict(sender)
  320.         self.insertNode(n, contacted = 0)
  321.         return {
  322.             'id': self.node.id }
  323.  
  324.     
  325.     def krpc_find_node(self, target, id, _krpc_sender):
  326.         nodes = self.table.findNodes(target, invalid = False)
  327.         nodes = map((lambda node: node.senderDict()), nodes)
  328.         sender = {
  329.             'id': id }
  330.         sender['host'] = _krpc_sender[0]
  331.         sender['port'] = _krpc_sender[1]
  332.         n = self.Node().initWithDict(sender)
  333.         self.insertNode(n, contacted = 0)
  334.         return {
  335.             'nodes': packNodes(nodes),
  336.             'id': self.node.id }
  337.  
  338.  
  339.  
  340. class KhashmirRead(KhashmirBase):
  341.     _Node = KNodeRead
  342.     
  343.     def retrieveValues(self, key):
  344.         
  345.         try:
  346.             l = self.store[key]
  347.         except KeyError:
  348.             l = []
  349.  
  350.         return l
  351.  
  352.     
  353.     def valueForKey(self, key, callback, searchlocal = 1):
  354.         """ returns the values found for key in global table
  355.             callback will be called with a list of values for each peer that returns unique values
  356.             final callback will be an empty list - probably should change to 'more coming' arg
  357.         """
  358.         nodes = self.table.findNodes(key)
  359.         if searchlocal:
  360.             l = self.retrieveValues(key)
  361.             if len(l) > 0:
  362.                 self.rawserver.add_task(callback, 0, (l,))
  363.             
  364.         else:
  365.             l = []
  366.         state = GetValue(self, key, callback, self.rawserver.add_task)
  367.         self.rawserver.add_task(state.goWithNodes, 0, (nodes, l))
  368.  
  369.     
  370.     def krpc_find_value(self, key, id, _krpc_sender):
  371.         sender = {
  372.             'id': id }
  373.         sender['host'] = _krpc_sender[0]
  374.         sender['port'] = _krpc_sender[1]
  375.         n = self.Node().initWithDict(sender)
  376.         self.insertNode(n, contacted = 0)
  377.         l = self.retrieveValues(key)
  378.         if len(l) > 0:
  379.             return {
  380.                 'values': l,
  381.                 'id': self.node.id }
  382.         else:
  383.             nodes = self.table.findNodes(key, invalid = False)
  384.             nodes = map((lambda node: node.senderDict()), nodes)
  385.             return {
  386.                 'nodes': packNodes(nodes),
  387.                 'id': self.node.id }
  388.  
  389.  
  390.  
  391. class KhashmirWrite(KhashmirRead):
  392.     _Node = KNodeWrite
  393.     
  394.     def storeValueForKey(self, key, value, callback = None):
  395.         """ stores the value for key in the global table, returns immediately, no status 
  396.             in this implementation, peers respond but don't indicate status to storing values
  397.             a key can have many values
  398.         """
  399.         
  400.         def _storeValueForKey(nodes, key = key, value = value, response = callback, table = self.table):
  401.             if not response:
  402.                 
  403.                 def _storedValueHandler(sender):
  404.                     pass
  405.  
  406.                 response = _storedValueHandler
  407.             
  408.             action = StoreValue(self, key, value, response, self.rawserver.add_task)
  409.             self.rawserver.external_add_task(action.goWithNodes, 0, (nodes,))
  410.  
  411.         self.findNode(key, _storeValueForKey)
  412.  
  413.     
  414.     def krpc_store_value(self, key, value, id, _krpc_sender):
  415.         t = '%0.6f' % time()
  416.         self.store[key] = value
  417.         sender = {
  418.             'id': id }
  419.         sender['host'] = _krpc_sender[0]
  420.         sender['port'] = _krpc_sender[1]
  421.         n = self.Node().initWithDict(sender)
  422.         self.insertNode(n, contacted = 0)
  423.         return {
  424.             'id': self.node.id }
  425.  
  426.  
  427.  
  428. class Khashmir(KhashmirWrite):
  429.     _Node = KNodeWrite
  430.  
  431.