home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- import const
- from BitTorrent.platform import bttime as time
- from sha import sha
- from BitTorrent.defaultargs import common_options, rare_options
- from BitTorrent.RawServer import RawServer
- from ktable import KTable, K
- from knode import *
- from kstore import KStore
- from khash import newID, newIDInRange
- from util import packNodes
- from actions import FindNode, GetValue, KeyExpirer, StoreValue
- import krpc
- import sys
- import os
- import traceback
- from BitTorrent.bencode import bencode, bdecode
- from defer import Deferred
- from random import randrange
- from threading import Event
-
- class KhashmirDBExcept(Exception):
- pass
-
-
- class KhashmirBase:
- _Node = KNodeBase
-
- def __init__(self, host, port, data_dir, rawserver = None, max_ul_rate = 1024, checkpoint = True):
- self.max_ul_rate = max_ul_rate
- self.socket = None
- self.setup(host, port, data_dir, checkpoint)
-
-
- def setup(self, host, port, data_dir, checkpoint = True):
- self.host = host
- self.port = port
- self.ddir = data_dir
- self.store = KStore()
- self.pingcache = { }
- self.socket = self.rawserver.create_udpsocket(self.port, self.host, True)
- self.udp = krpc.hostbroker(self, (self.host, self.port), self.socket, self.rawserver.add_task, self.max_ul_rate)
- self._load()
- self.rawserver.start_listening_udp(self.socket, self.udp)
- self.last = time()
- KeyExpirer(self.store, self.rawserver.add_task)
- self.refreshTable(force = 1)
- if checkpoint:
- self.rawserver.add_task(self.findCloseNodes, 30, ((lambda a: a), True))
- self.rawserver.add_task(self.checkpoint, 60, (1,))
-
-
-
- def Node(self):
- n = self._Node(self.udp.connectionForAddr)
- n.table = self
- return n
-
-
- def __del__(self):
- self.rawserver.stop_listen_udp(self.socket)
- self.socket.close()
-
-
- def _load(self):
- do_load = False
-
- try:
- s = open(os.path.join(self.ddir, 'routing_table'), 'r').read()
- dict = bdecode(s)
- except:
- id = newID()
-
- id = dict['id']
- do_load = True
- self.node = self._Node(self.udp.connectionForAddr).init(id, self.host, self.port)
- self.table = KTable(self.node)
- if do_load:
- self._loadRoutingTable(dict['rt'])
-
-
-
- def checkpoint(self, auto = 0):
- d = { }
- d['id'] = self.node.id
- d['rt'] = self._dumpRoutingTable()
- self.refreshTable()
-
- try:
- f = open(os.path.join(self.ddir, 'routing_table'), 'wb')
- f.write(bencode(d))
- f.close()
- except:
- pass
-
- if auto:
- self.rawserver.add_task(self.checkpoint, randrange(int(const.CHECKPOINT_INTERVAL * 0.90000000000000002), int(const.CHECKPOINT_INTERVAL * 1.1000000000000001)), (1,))
-
-
-
- def _loadRoutingTable(self, nodes):
- """
- load routing table nodes from database
- it's usually a good idea to call refreshTable(force=1) after loading the table
- """
- for rec in nodes:
- n = self.Node().initWithDict(rec)
- self.table.insertNode(n, contacted = 0, nocheck = True)
-
-
-
- def _dumpRoutingTable(self):
- '''
- save routing table nodes to the database
- '''
- l = []
- for bucket in self.table.buckets:
- for node in bucket.l:
- l.append({
- 'id': node.id,
- 'host': node.host,
- 'port': node.port })
-
-
- return l
-
-
- def addContact(self, host, port, callback = None):
- '''
- ping this node and add the contact info to the table on pong!
- '''
- n = self.Node().init(const.NULL_ID, host, port)
-
- try:
- self.sendPing(n, callback = callback)
- except krpc.KRPCSelfNodeError:
- pass
-
-
-
- def findNode(self, id, callback, errback = None):
- ''' returns the contact info for node, or the k closest nodes, from the global table '''
- nodes = self.table.findNodes(id, invalid = False)
- nodes += self.table.findNodes(id, invalid = True)
- d = Deferred()
- if errback:
- d.addCallbacks(callback, errback)
- else:
- d.addCallback(callback)
- if len(nodes) == 1 and nodes[0].id == id:
- d.callback(nodes)
- else:
- state = FindNode(self, id, d.callback, self.rawserver.add_task)
- self.rawserver.external_add_task(state.goWithNodes, 0, (nodes,))
-
-
- def insertNode(self, n, contacted = 1):
- """
- insert a node in our local table, pinging oldest contact in bucket, if necessary
-
- If all you have is a host/port, then use addContact, which calls this method after
- receiving the PONG from the remote node. The reason for the seperation is we can't insert
- a node into the table without it's peer-ID. That means of course the node passed into this
- method needs to be a properly formed Node object with a valid ID.
- """
- old = self.table.insertNode(n, contacted = contacted)
- if old and old != n:
- if not old.inPing():
- self.checkOldNode(old, n, contacted)
- else:
- l = self.pingcache.get(old.id, [])
- l.append((n, contacted))
- self.pingcache[old.id] = l
-
-
-
- def checkOldNode(self, old, new, contacted = False):
-
- def _staleNodeHandler(dict):
- ''' called if the pinged node never responds '''
- if old.fails >= 2:
- o = self.table.replaceStaleNode(old, new)
- if o and o != new:
- self.checkOldNode(o, new)
-
- try:
- self.pingcache[o.id] = self.pingcache[old.id]
- del self.pingcache[old.id]
- except KeyError:
- pass
- except:
- None<EXCEPTION MATCH>KeyError
-
-
- None<EXCEPTION MATCH>KeyError
- l = self.pingcache.get(old.id, [])
- if l:
- del self.pingcache[old.id]
- for node in l:
- self.insertNode(node[0], node[1])
-
-
- else:
- l = self.pingcache.get(old.id, [])
- if l:
- del self.pingcache[old.id]
-
- self.insertNode(new)
- for node in l:
- self.insertNode(node[0], node[1])
-
-
-
- def _notStaleNodeHandler(dict):
- ''' called when we get a pong from the old node '''
- self.table.insertNode(old, True)
- self.insertNode(new, contacted)
- for node in self.pingcache.get(old.id, []):
- self.insertNode(node[0], node[1])
-
-
- try:
- del self.pingcache[old.id]
- except KeyError:
- pass
-
-
-
- try:
- df = old.ping(self.node.id)
- except krpc.KRPCSelfNodeError:
- pass
-
- df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
-
-
- def sendPing(self, node, callback = None):
- '''
- ping a node
- '''
-
- try:
- df = node.ping(self.node.id)
- except krpc.KRPCSelfNodeError:
- pass
-
-
- def _pongHandler(dict, node = node, table = self.table, callback = callback):
- _krpc_sender = dict['_krpc_sender']
- dict = dict['rsp']
- sender = {
- 'id': dict['id'] }
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- table.insertNode(n)
- if callback:
- callback()
-
-
-
- def _defaultPong(err, node = node, table = self.table, callback = callback):
- if callback:
- callback()
-
-
- df.addCallbacks(_pongHandler, _defaultPong)
-
-
- def findCloseNodes(self, callback = (lambda a: a), auto = False):
- '''
- This does a findNode on the ID one away from our own.
- This will allow us to populate our table with nodes on our network closest to our own.
- This is called as soon as we start up with an empty table
- '''
- id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- self.findNode(id, callback)
- if auto:
- self.rawserver.add_task(self.findCloseNodes, randrange(int(const.FIND_CLOSE_INTERVAL * 0.90000000000000002), int(const.FIND_CLOSE_INTERVAL * 1.1000000000000001)), ((lambda a: True), True))
-
-
-
- def refreshTable(self, force = 0):
- '''
- force=1 will refresh table regardless of last bucket access time
- '''
-
- def callback(nodes):
- pass
-
- for bucket in self.table.buckets:
- if force or time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
- id = newIDInRange(bucket.min, bucket.max)
- self.findNode(id, callback)
- continue
-
-
-
- def stats(self):
- '''
- Returns (num_contacts, num_nodes)
- num_contacts: number contacts in our routing table
- num_nodes: number of nodes estimated in the entire dht
- '''
- num_contacts = reduce((lambda a, b: a + len(b.l)), self.table.buckets, 0)
- num_nodes = const.K * 2 ** (len(self.table.buckets) - 1)
- return {
- 'num_contacts': num_contacts,
- 'num_nodes': num_nodes }
-
-
- def krpc_ping(self, id, _krpc_sender):
- sender = {
- 'id': id }
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- self.insertNode(n, contacted = 0)
- return {
- 'id': self.node.id }
-
-
- def krpc_find_node(self, target, id, _krpc_sender):
- nodes = self.table.findNodes(target, invalid = False)
- nodes = map((lambda node: node.senderDict()), nodes)
- sender = {
- 'id': id }
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- self.insertNode(n, contacted = 0)
- return {
- 'nodes': packNodes(nodes),
- 'id': self.node.id }
-
-
-
- class KhashmirRead(KhashmirBase):
- _Node = KNodeRead
-
- def retrieveValues(self, key):
-
- try:
- l = self.store[key]
- except KeyError:
- l = []
-
- return l
-
-
- def valueForKey(self, key, callback, searchlocal = 1):
- """ returns the values found for key in global table
- callback will be called with a list of values for each peer that returns unique values
- final callback will be an empty list - probably should change to 'more coming' arg
- """
- nodes = self.table.findNodes(key)
- if searchlocal:
- l = self.retrieveValues(key)
- if len(l) > 0:
- self.rawserver.add_task(callback, 0, (l,))
-
- else:
- l = []
- state = GetValue(self, key, callback, self.rawserver.add_task)
- self.rawserver.add_task(state.goWithNodes, 0, (nodes, l))
-
-
- def krpc_find_value(self, key, id, _krpc_sender):
- sender = {
- 'id': id }
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- self.insertNode(n, contacted = 0)
- l = self.retrieveValues(key)
- if len(l) > 0:
- return {
- 'values': l,
- 'id': self.node.id }
- else:
- nodes = self.table.findNodes(key, invalid = False)
- nodes = map((lambda node: node.senderDict()), nodes)
- return {
- 'nodes': packNodes(nodes),
- 'id': self.node.id }
-
-
-
- class KhashmirWrite(KhashmirRead):
- _Node = KNodeWrite
-
- def storeValueForKey(self, key, value, callback = None):
- """ stores the value for key in the global table, returns immediately, no status
- in this implementation, peers respond but don't indicate status to storing values
- a key can have many values
- """
-
- def _storeValueForKey(nodes, key = key, value = value, response = callback, table = self.table):
- if not response:
-
- def _storedValueHandler(sender):
- pass
-
- response = _storedValueHandler
-
- action = StoreValue(self, key, value, response, self.rawserver.add_task)
- self.rawserver.external_add_task(action.goWithNodes, 0, (nodes,))
-
- self.findNode(key, _storeValueForKey)
-
-
- def krpc_store_value(self, key, value, id, _krpc_sender):
- t = '%0.6f' % time()
- self.store[key] = value
- sender = {
- 'id': id }
- sender['host'] = _krpc_sender[0]
- sender['port'] = _krpc_sender[1]
- n = self.Node().initWithDict(sender)
- self.insertNode(n, contacted = 0)
- return {
- 'id': self.node.id }
-
-
-
- class Khashmir(KhashmirWrite):
- _Node = KNodeWrite
-
-