home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.5)
-
- import socket
- from traceback import print_exc
- from subnetparse import IP_List
- from clock import clock
- from bittornadoinit import createPeerID
-
- try:
- True
- except:
- True = 1
- False = 0
-
- DEBUG = False
- EXPIRE_CACHE = 30
- ID = 'BT-' + createPeerID()[-4:]
-
- try:
- import pythoncom
- import win32com.client as win32com
- _supported = 1
- except ImportError:
- _supported = 0
-
-
- class _UPnP1:
-
- def __init__(self):
- self.map = None
- self.last_got_map = -1e+11
-
-
- def _get_map(self):
- if self.last_got_map + EXPIRE_CACHE < clock():
-
- try:
- dispatcher = win32com.client.Dispatch('HNetCfg.NATUPnP')
- self.map = dispatcher.StaticPortMappingCollection
- self.last_got_map = clock()
- self.map = None
-
-
- return self.map
-
-
- def test(self):
-
- try:
- if not self._get_map():
- raise AssertionError
- success = True
- except:
- success = False
-
- return success
-
-
- def open(self, ip, p):
- map = self._get_map()
-
- try:
- map.Add(p, 'TCP', p, ip, True, ID)
- if DEBUG:
- print 'port opened: ' + ip + ':' + str(p)
-
- success = True
- except:
- if DEBUG:
- print "COULDN'T OPEN " + str(p)
- print_exc()
-
- success = False
-
- return success
-
-
- def close(self, p):
- map = self._get_map()
-
- try:
- map.Remove(p, 'TCP')
- success = True
- if DEBUG:
- print 'port closed: ' + str(p)
- except:
- if DEBUG:
- print 'ERROR CLOSING ' + str(p)
- print_exc()
-
- success = False
-
- return success
-
-
- def clean(self, retry = False):
- if not _supported:
- return None
-
-
- try:
- map = self._get_map()
- ports_in_use = []
- for i in xrange(len(map)):
-
- try:
- mapping = map[i]
- port = mapping.ExternalPort
- prot = str(mapping.Protocol).lower()
- desc = str(mapping.Description).lower()
- except:
- port = None
-
- if port and prot == 'tcp' and desc[:3] == 'bt-':
- ports_in_use.append(port)
- continue
-
- success = True
- for port in ports_in_use:
-
- try:
- map.Remove(port, 'TCP')
- continue
- success = False
- continue
-
-
- if not success and not retry:
- self.clean(retry = True)
- except:
- pass
-
-
-
-
- class _UPnP2:
-
- def __init__(self):
- self.services = None
- self.last_got_services = -1e+11
-
-
- def _get_services(self):
- if not (self.services) or self.last_got_services + EXPIRE_CACHE < clock():
- self.services = []
-
- try:
- f = win32com.client.Dispatch('UPnP.UPnPDeviceFinder')
- for t in ('urn:schemas-upnp-org:service:WANIPConnection:1', 'urn:schemas-upnp-org:service:WANPPPConnection:1'):
-
- try:
- conns = f.FindByType(t, 0)
- for c in xrange(len(conns)):
-
- try:
- svcs = conns[c].Services
- for s in xrange(len(svcs)):
-
- try:
- self.services.append(svcs[s])
- continue
- continue
-
- continue
- continue
-
- continue
- continue
-
- except:
- pass
-
- self.last_got_services = clock()
-
- return self.services
-
-
- def test(self):
-
- try:
- if not self._get_services():
- raise AssertionError
- success = True
- except:
- success = False
-
- return success
-
-
- def open(self, ip, p):
- svcs = self._get_services()
- success = False
- for s in svcs:
-
- try:
- s.InvokeAction('AddPortMapping', [
- '',
- p,
- 'TCP',
- p,
- ip,
- True,
- ID,
- 0], '')
- success = True
- continue
- continue
-
-
- if DEBUG and not success:
- print "COULDN'T OPEN " + str(p)
- print_exc()
-
- return success
-
-
- def close(self, p):
- svcs = self._get_services()
- success = False
- for s in svcs:
-
- try:
- s.InvokeAction('DeletePortMapping', [
- '',
- p,
- 'TCP'], '')
- success = True
- continue
- continue
-
-
- if DEBUG and not success:
- print "COULDN'T OPEN " + str(p)
- print_exc()
-
- return success
-
-
-
- class _UPnP:
-
- def __init__(self):
- self.upnp1 = _UPnP1()
- self.upnp2 = _UPnP2()
- self.upnplist = (None, self.upnp1, self.upnp2)
- self.upnp = None
- self.local_ip = None
- self.last_got_ip = -1e+11
-
-
- def get_ip(self):
- if self.last_got_ip + EXPIRE_CACHE < clock():
- local_ips = IP_List()
- local_ips.set_intranet_addresses()
-
- try:
- for info in socket.getaddrinfo(socket.gethostname(), 0):
- self.local_ip = info[4][0]
- if local_ips.includes(self.local_ip):
- self.last_got_ip = clock()
- if DEBUG:
- print 'Local IP found: ' + self.local_ip
-
- break
- continue
- else:
- raise ValueError("couldn't find intranet IP")
- self.local_ip = None
- if DEBUG:
- print 'Error finding local IP'
- print_exc()
-
-
-
- return self.local_ip
-
-
- def test(self, upnp_type):
- if DEBUG:
- print 'testing UPnP type ' + str(upnp_type)
-
- if not upnp_type and not _supported or self.get_ip() is None:
- if DEBUG:
- print 'not supported'
-
- return 0
-
- pythoncom.CoInitialize()
- self.upnp = self.upnplist[upnp_type]
- if self.upnp.test():
- if DEBUG:
- print 'ok'
-
- return upnp_type
-
- if DEBUG:
- print 'tested bad'
-
- return 0
-
-
- def open(self, p):
- if not self.upnp:
- raise AssertionError, 'must run UPnP_test() with the desired UPnP access type first'
- return self.upnp.open(self.get_ip(), p)
-
-
- def close(self, p):
- if not self.upnp:
- raise AssertionError, 'must run UPnP_test() with the desired UPnP access type first'
- return self.upnp.close(p)
-
-
- def clean(self):
- return self.upnp1.clean()
-
-
- _upnp_ = _UPnP()
- UPnP_test = _upnp_.test
- UPnP_open_port = _upnp_.open
- UPnP_close_port = _upnp_.close
- UPnP_reset = _upnp_.clean
-