home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.6)
-
- import traceback
- import logging
- from pyxmpp.utils import to_utf8
- from pyxmpp.sasl.core import ClientAuthenticator
- from pyxmpp.sasl.core import Success, Failure, Challenge, Response
- import struct
- import hashlib
- log = logging.getLogger('digsby.sasl')
- import M2Crypto
- import util.cryptography as util
- import util.cacheable as util
- import digsbyrsa
- ROOT_CERT_RAW = '-----BEGIN CERTIFICATE-----\nMIIEqjCCA5KgAwIBAgIJAIm6tjo3F7DQMA0GCSqGSIb3DQEBBQUAMIGUMQswCQYD\nVQQGEwJVUzERMA8GA1UECBMITmV3IFlvcmsxEjAQBgNVBAcTCVJvY2hlc3RlcjEX\nMBUGA1UEChMOZG90U3ludGF4LCBMTEMxDzANBgNVBAsTBkRpZ3NieTETMBEGA1UE\nAxMKZGlnc2J5Lm9yZzEfMB0GCSqGSIb3DQEJARYQYWRtaW5AZGlnc2J5LmNvbTAe\nFw0xMDEyMTUyMDUwNTZaFw0xMTEyMTUyMDUwNTZaMIGUMQswCQYDVQQGEwJVUzER\nMA8GA1UECBMITmV3IFlvcmsxEjAQBgNVBAcTCVJvY2hlc3RlcjEXMBUGA1UEChMO\nZG90U3ludGF4LCBMTEMxDzANBgNVBAsTBkRpZ3NieTETMBEGA1UEAxMKZGlnc2J5\nLm9yZzEfMB0GCSqGSIb3DQEJARYQYWRtaW5AZGlnc2J5LmNvbTCCASIwDQYJKoZI\nhvcNAQEBBQADggEPADCCAQoCggEBALvRZBUWh9IhxNfajLc6gXA8OVArXp2XvxCf\nhHR/CznE4bqNSs8kGlkXZzGZlhRXDpMM4ytlktYN6Bu+RwR1BzjQSt/GpnuxCL5X\n+l9yukATrfBs/i7iHwmWREkvJXzgi3ToyZ/NsmjmA1CtzYn44D8Sc/lsui4EeGyb\nslno/ZYpCIniaHPnA1+A8u6Fbq/DgZkpLY8ZA/lgKRwtQMa216eBEhROjJVLjdN3\nGDenu/tGBIdKQLAJ1bLCswSamtmmTAIT4nqd5GH/p1PlZKpObn38+PV1Cth1ZA3R\nSnIxFSRUO5s2OxWTR694hEufrLV3ccK5NSW1tuMYeDaWUfo3MIECAwEAAaOB/DCB\n+TAdBgNVHQ4EFgQUq3dfcfI0E/07C6iZiyp1lwmdA24wgckGA1UdIwSBwTCBvoAU\nq3dfcfI0E/07C6iZiyp1lwmdA26hgZqkgZcwgZQxCzAJBgNVBAYTAlVTMREwDwYD\nVQQIEwhOZXcgWW9yazESMBAGA1UEBxMJUm9jaGVzdGVyMRcwFQYDVQQKEw5kb3RT\neW50YXgsIExMQzEPMA0GA1UECxMGRGlnc2J5MRMwEQYDVQQDEwpkaWdzYnkub3Jn\nMR8wHQYJKoZIhvcNAQkBFhBhZG1pbkBkaWdzYnkuY29tggkAibq2OjcXsNAwDAYD\nVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOCAQEAJCsHm8osylNqfmNMTEL6Nczr\nhD95jl1D3a3hKlKHYPkZ5/pmGHV4C/ZYteSm9yWtWNQp/ZGTS+XG4I9NFQ6s6Cr1\nLLOoK52BVzal5LemAPyzXyIKuG2fwTMdBiL9fIoYDLWvjzp5SGHHc4K0mofetgxZ\nTdqQr7qWXY62zdkKSgwo9HPqrhtUzyfvDBJPjzeRbGguV3jCvodgV5D7aK18K1gz\nC9lIMQaWRzS80+a1dUtibwG4fTKSRaIrOmdhvI+YdTj4aNKcmq985CXD068hG09P\nArAEDrQEul9GOIqcx6RmtDSx1r+f1Iv+ef5boBu/04TZCCClDF7AYUwIErJCoQ==\n-----END CERTIFICATE-----\n'
- ROOT_CERT = M2Crypto.X509.load_cert_string(ROOT_CERT_RAW)
-
- def _save_client_key_pem(key, uname, password):
-
- passfunc = lambda _dec_enc: hashlib.sha256(password + 'digsby' + uname.encode('utf-8')).digest()
- key = key.as_pem(callback = passfunc)
- if key is not None:
- dec_key = key.decode('utf8')
- else:
- dec_key = key
- return util.cacheable.save_cache_object('client_priv_key.key', dec_key, json = True, user = True)
-
-
- def _get_client_key_pem(uname, password):
-
- passfunc = lambda _dec_enc: hashlib.sha256(password + 'digsby' + uname.encode('utf-8')).digest()
- key = util.cacheable.load_cache_object('client_priv_key.key', json = True, user = True)
- if key is not None:
- enc_key = key.encode('utf8')
-
- try:
- ret_key = M2Crypto.RSA.load_key_string(enc_key, callback = passfunc)
- if not ret_key.check_key():
- ret_key = None
- except Exception:
- (None, None)
- (None, None)
- ret_key = None
- except:
- (None, None)<EXCEPTION MATCH>Exception
-
-
- (None, None)
- ret_key = key
- return ret_key
-
-
- def pstrI(s):
- return struct.pack('!I', len(s)) + s
-
-
- def pstrIlist(l):
- return [ pstrI(s) for s in l ]
-
-
- def pstrAES(key, s, iv = None):
- p = pstrI(s)
- return util.cryptography.encrypt(key, p, iv = iv, padchar = chr(256 + ~ord(p[-1])), mode = util.cryptography.Mode.CBC)
-
-
- def unpackpstrlist(s):
- ret = []
- while s:
- l = struct.unpack('!I', s[:4])[0]
- s = s[4:]
- ret.append(s[:l])
- s = s[l:]
- return ret
-
-
- def unpackpstr(s):
- l = struct.unpack('!I', s[:4])[0]
- return (s[4:4 + l], s[4 + l:])
-
-
- class DigsbyAESClientAuthenticator(ClientAuthenticator):
-
- def __init__(self, password_manager):
- ClientAuthenticator.__init__(self, password_manager)
- self.username = None
- self.password = None
- self.key = None
- self.step = 0
- self.authzid = None
- self._DigsbyAESClientAuthenticator__logger = logging.getLogger('digsby.sasl.AES')
-
-
- def start(self, username, authzid):
- self.username = username
- if authzid:
- self.authzid = authzid
- else:
- self.authzid = ''
- self.step = 0
- return self.challenge('')
-
-
- def challenge(self, challenge):
- if self.password is None:
- (self.password, pformat) = self.password_manager.get_password(self.username)
- if not (self.password) or pformat != 'plain':
- self._DigsbyAESClientAuthenticator__logger.debug("Couldn't retrieve plain password")
- return Failure('password-unavailable')
-
- if self.step == 0:
- self.step = 1
- return Response(''.join(pstrIlist([
- to_utf8(self.authzid),
- to_utf8(self.username)])))
- if self.step == 1:
- self.step = 2
- srv_rsa_key = None
- self._DigsbyAESClientAuthenticator__logger.critical('loading server certificate')
-
- try:
- srv_cert = M2Crypto.X509.load_cert_string(challenge)
- if srv_cert is not None:
- self._DigsbyAESClientAuthenticator__logger.critical('retrieving server pubkey')
- srv_key = srv_cert.get_pubkey()
- if srv_key is not None:
- self._DigsbyAESClientAuthenticator__logger.critical('retrieving server RSA pubkey')
- srv_rsa_key = srv_key.get_rsa()
-
- except Exception:
- self.step == 0
- self.step == 0
- traceback.print_exc()
- except:
- self.step == 0
-
- if srv_rsa_key is None:
- return Failure('bad-server-cert')
- if not srv_cert.verify(ROOT_CERT.get_pubkey()):
- return Failure('bad-server-cert')
- self.srv_rsa_key = srv_rsa_key
- self._DigsbyAESClientAuthenticator__logger.critical('generating Nonce')
- m2 = m2
- import M2Crypto
- nonce = m2.rand_bytes(16)
- self._DigsbyAESClientAuthenticator__logger.critical('encrypting Nonce')
- enonce = digsbyrsa.DIGSBY_RSA_public_encrypt(nonce, srv_rsa_key, M2Crypto.RSA.pkcs1_oaep_padding)
- self._DigsbyAESClientAuthenticator__logger.critical('loading key')
-
- try:
- self.key = _get_client_key_pem(self.username, self.password)
- except Exception:
- srv_cert.verify(ROOT_CERT.get_pubkey())
- srv_cert.verify(ROOT_CERT.get_pubkey())
- srv_rsa_key is None
- self.key = None
- except:
- self.step == 0
-
- if self.key is None:
- self._DigsbyAESClientAuthenticator__logger.critical('generating new key')
- self.key = M2Crypto.RSA.gen_key(2048, 65537)
- self._DigsbyAESClientAuthenticator__logger.critical('saving new key')
- if not self.key.check_key():
- raise ValueError('failed to generate key')
- self.key.check_key()
-
- try:
- _save_client_key_pem(self.key, self.username, self.password)
- except Exception:
- self.step == 0
- self.step == 0
- traceback.print_exc()
- except:
- self.step == 0<EXCEPTION MATCH>Exception
-
-
- self.step == 0
- self._DigsbyAESClientAuthenticator__logger.critical('creating buffer')
- buff = M2Crypto.BIO.MemoryBuffer()
- self._DigsbyAESClientAuthenticator__logger.critical('serializing client public key to buffer')
- self.key.save_pub_key_bio(buff)
- self._DigsbyAESClientAuthenticator__logger.critical_s('Nonce: %r', nonce)
- genkey = buff.getvalue()
- self._DigsbyAESClientAuthenticator__logger.critical_s('Key: %r', genkey)
- eKey = pstrAES(nonce, genkey)
- self._DigsbyAESClientAuthenticator__logger.critical('returning response')
- return Response(''.join(pstrIlist([
- enonce,
- eKey])))
- if self.step == 2:
- self.step = 3
- (package_nonce_C_userpub, package_C_AES_C_serverpriv) = unpackpstrlist(challenge)
- package_nonce = digsbyrsa.DIGSBY_RSA_private_decrypt(package_nonce_C_userpub, self.key, M2Crypto.RSA.pkcs1_oaep_padding)
- package_C_serverpriv = util.cryptography.decrypt(package_nonce, package_C_AES_C_serverpriv, padchar = None, mode = util.cryptography.Mode.CBC)
- nonce = digsbyrsa.DIGSBY_RSA_public_decrypt(package_C_serverpriv, self.srv_rsa_key, M2Crypto.RSA.pkcs1_padding)
- return Response(pstrAES(nonce, self.password))
- return Failure('extra-challenge')
-
-
- def finish(self, data):
- _unused = data
- return Success(self.username, None, self.authzid)
-
-
-