home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- __revision__ = '$Id: digest_md5.py 683 2008-12-05 18:25:45Z jajcus $'
- __docformat__ = 'restructuredtext en'
- from binascii import b2a_hex
- import re
- import logging
-
- try:
- import hashlib
- md5_factory = hashlib.md5
- except:
- import md5
- md5_factory = md5.new
-
- from pyxmpp.sasl.core import ClientAuthenticator, ServerAuthenticator
- from pyxmpp.sasl.core import Failure, Response, Challenge, Success, Failure
- from pyxmpp.utils import to_utf8, from_utf8
- quote_re = re.compile('(?<!\\\\)\\\\(.)')
-
- def _unquote(s):
- if not s.startswith('"') or not s.endswith('"'):
- return s
-
- return quote_re.sub('\\1', s[1:-1])
-
-
- def _quote(s):
- s = s.replace('\\', '\\\\')
- s = s.replace('"', '\\"')
- return '%s' % (s,)
-
-
- def _h_value(s):
- return md5_factory(s).digest()
-
-
- def _kd_value(k, s):
- return _h_value('%s:%s' % (k, s))
-
-
- def _make_urp_hash(username, realm, password):
- if realm is None:
- realm = ''
-
- if type(password) is unicode:
- password = password.encode('utf-8')
-
- return _h_value('%s:%s:%s' % (username, realm, password))
-
-
- def _compute_response(urp_hash, nonce, cnonce, nonce_count, authzid, digest_uri):
- if authzid:
- a1 = '%s:%s:%s:%s' % (urp_hash, nonce, cnonce, authzid)
- else:
- a1 = '%s:%s:%s' % (urp_hash, nonce, cnonce)
- a2 = 'AUTHENTICATE:' + digest_uri
- return b2a_hex(_kd_value(b2a_hex(_h_value(a1)), '%s:%s:%s:%s:%s' % (nonce, nonce_count, cnonce, 'auth', b2a_hex(_h_value(a2)))))
-
-
- def _compute_response_auth(urp_hash, nonce, cnonce, nonce_count, authzid, digest_uri):
- if authzid:
- a1 = '%s:%s:%s:%s' % (urp_hash, nonce, cnonce, authzid)
- else:
- a1 = '%s:%s:%s' % (urp_hash, nonce, cnonce)
- a2 = ':' + digest_uri
- return b2a_hex(_kd_value(b2a_hex(_h_value(a1)), '%s:%s:%s:%s:%s' % (nonce, nonce_count, cnonce, 'auth', b2a_hex(_h_value(a2)))))
-
- _param_re = re.compile('^(?P<var>[^=]+)\\=(?P<val>(\\"(([^"\\\\]+)|(\\\\\\")|(\\\\\\\\))+\\")|([^",]+))(\\s*\\,\\s*(?P<rest>.*))?$')
-
- class DigestMD5ClientAuthenticator(ClientAuthenticator):
-
- def __init__(self, password_manager):
- ClientAuthenticator.__init__(self, password_manager)
- self.username = None
- self.rspauth_checked = None
- self.response_auth = None
- self.authzid = None
- self.pformat = None
- self.realm = None
- self.password = None
- self.nonce_count = None
- self._DigestMD5ClientAuthenticator__logger = logging.getLogger('pyxmpp.sasl.DigestMD5ClientAuthenticator')
-
-
- def start(self, username, authzid):
- self.username = from_utf8(username)
- if authzid:
- self.authzid = from_utf8(authzid)
- else:
- self.authzid = ''
- self.password = None
- self.pformat = None
- self.nonce_count = 0
- self.response_auth = None
- self.rspauth_checked = 0
- self.realm = None
- return Response()
-
-
- def challenge(self, challenge):
- if not challenge:
- self._DigestMD5ClientAuthenticator__logger.debug('Empty challenge')
- return Failure('bad-challenge')
-
- challenge = challenge.split('\x00')[0]
- if self.response_auth:
- return self._final_challenge(challenge)
-
- realms = []
- nonce = None
- charset = 'iso-8859-1'
- while challenge:
- m = _param_re.match(challenge)
- if not m:
- self._DigestMD5ClientAuthenticator__logger.debug('Challenge syntax error: %r' % (challenge,))
- return Failure('bad-challenge')
-
- challenge = m.group('rest')
- var = m.group('var')
- val = m.group('val')
- self._DigestMD5ClientAuthenticator__logger.debug('%r: %r' % (var, val))
- if var == 'realm':
- realms.append(_unquote(val))
- continue
- if var == 'nonce':
- if nonce:
- self._DigestMD5ClientAuthenticator__logger.debug('Duplicate nonce')
- return Failure('bad-challenge')
-
- nonce = _unquote(val)
- continue
- if var == 'qop':
- qopl = _unquote(val).split(',')
- if 'auth' not in qopl:
- self._DigestMD5ClientAuthenticator__logger.debug('auth not supported')
- return Failure('not-implemented')
-
- 'auth' not in qopl
- if var == 'charset':
- val = _unquote(val)
- if val != 'utf-8':
- self._DigestMD5ClientAuthenticator__logger.debug('charset given and not utf-8')
- return Failure('bad-challenge')
-
- charset = 'utf-8'
- continue
- if var == 'algorithm':
- val = _unquote(val)
- if val != 'md5-sess':
- self._DigestMD5ClientAuthenticator__logger.debug('algorithm given and not md5-sess')
- return Failure('bad-challenge')
-
- val != 'md5-sess'
- if not nonce:
- self._DigestMD5ClientAuthenticator__logger.debug('nonce not given')
- return Failure('bad-challenge')
-
- self._get_password()
- return self._make_response(charset, realms, nonce)
-
-
- def _get_password(self):
- if self.password is None:
- (self.password, self.pformat) = self.password_manager.get_password(self.username, [
- 'plain',
- 'md5:user:realm:pass'])
-
- if not (self.password) or self.pformat not in ('plain', 'md5:user:realm:pass'):
- self._DigestMD5ClientAuthenticator__logger.debug("Couldn't get plain password. Password: %r Format: %r" % (self.password, self.pformat))
- return Failure('password-unavailable')
-
-
-
- def _make_response(self, charset, realms, nonce):
- params = []
- realm = self._get_realm(realms, charset)
- if isinstance(realm, Failure):
- return realm
- elif realm:
- realm = _quote(realm)
- params.append('realm="%s"' % (realm,))
-
-
- try:
- username = self.username.encode(charset)
- except UnicodeError:
- self._DigestMD5ClientAuthenticator__logger.debug("Couldn't encode username to %r" % (charset,))
- return Failure('incompatible-charset')
-
- username = _quote(username)
- params.append('username="%s"' % (username,))
- cnonce = self.password_manager.generate_nonce()
- cnonce = _quote(cnonce)
- params.append('cnonce="%s"' % (cnonce,))
- params.append('nonce="%s"' % (_quote(nonce),))
- self.nonce_count += 1
- nonce_count = '%08x' % (self.nonce_count,)
- params.append('nc=%s' % (nonce_count,))
- params.append('qop=auth')
- serv_type = self.password_manager.get_serv_type().encode('us-ascii')
- host = self.password_manager.get_serv_host().encode('us-ascii')
- serv_name = self.password_manager.get_serv_name().encode('us-ascii')
- if serv_name and serv_name != host:
- digest_uri = '%s/%s/%s' % (serv_type, host, serv_name)
- else:
- digest_uri = '%s/%s' % (serv_type, host)
- digest_uri = _quote(digest_uri)
- params.append('digest-uri="%s"' % (digest_uri,))
- if self.authzid:
-
- try:
- authzid = self.authzid.encode(charset)
- except UnicodeError:
- self._DigestMD5ClientAuthenticator__logger.debug("Couldn't encode authzid to %r" % (charset,))
- return Failure('incompatible-charset')
-
- authzid = _quote(authzid)
- else:
- authzid = ''
- if self.pformat == 'md5:user:realm:pass':
- urp_hash = self.password
- else:
- urp_hash = _make_urp_hash(username, realm, self.password)
- response = _compute_response(urp_hash, nonce, cnonce, nonce_count, authzid, digest_uri)
- self.response_auth = _compute_response_auth(urp_hash, nonce, cnonce, nonce_count, authzid, digest_uri)
- params.append('response=%s' % (response,))
- if authzid:
- params.append('authzid="%s"' % (authzid,))
-
- return Response(','.join(params))
-
-
- def _get_realm(self, realms, charset):
- if realm:
- if type(realm) is unicode:
-
- try:
- realm = realm.encode(charset)
- except UnicodeError:
- None if realms else []
- None if realms else []
- self._DigestMD5ClientAuthenticator__logger.debug("Couldn't encode realm to %r" % (charset,))
- return Failure('incompatible-charset')
- except:
- None if realms else []<EXCEPTION MATCH>UnicodeError
-
-
- None if realms else []
- if charset != 'utf-8':
-
- try:
- realm = unicode(realm, 'utf-8').encode(charset)
- except UnicodeError:
- None if realms else []
- None if realms else []
- self._DigestMD5ClientAuthenticator__logger.debug("Couldn't encode realm from utf-8 to %r" % (charset,))
- return Failure('incompatible-charset')
- except:
- None if realms else []<EXCEPTION MATCH>UnicodeError
-
-
- None if realms else []
- self.realm = realm
-
- return realm
-
-
- def _final_challenge(self, challenge):
- if self.rspauth_checked:
- return Failure('extra-challenge')
-
- challenge = challenge.split('\x00')[0]
- rspauth = None
- while challenge:
- m = _param_re.match(challenge)
- if not m:
- self._DigestMD5ClientAuthenticator__logger.debug('Challenge syntax error: %r' % (challenge,))
- return Failure('bad-challenge')
-
- challenge = m.group('rest')
- var = m.group('var')
- val = m.group('val')
- self._DigestMD5ClientAuthenticator__logger.debug('%r: %r' % (var, val))
- if var == 'rspauth':
- rspauth = val
- continue
- if not rspauth:
- self._DigestMD5ClientAuthenticator__logger.debug('Final challenge without rspauth')
- return Failure('bad-success')
-
- if rspauth == self.response_auth:
- self.rspauth_checked = 1
- return Response('')
- else:
- self._DigestMD5ClientAuthenticator__logger.debug('Wrong rspauth value - peer is cheating?')
- self._DigestMD5ClientAuthenticator__logger.debug('my rspauth: %r' % (self.response_auth,))
- return Failure('bad-success')
-
-
- def finish(self, data):
- if not self.response_auth:
- self._DigestMD5ClientAuthenticator__logger.debug('Got success too early')
- return Failure('bad-success')
-
- if self.rspauth_checked:
- return Success(self.username, self.realm, self.authzid)
- else:
- r = self._final_challenge(data)
- if isinstance(r, Failure):
- return r
-
- if self.rspauth_checked:
- return Success(self.username, self.realm, self.authzid)
- else:
- self._DigestMD5ClientAuthenticator__logger.debug('Something went wrong when processing additional data with success?')
- return Failure('bad-success')
-
-
-
- class DigestMD5ServerAuthenticator(ServerAuthenticator):
-
- def __init__(self, password_manager):
- ServerAuthenticator.__init__(self, password_manager)
- self.nonce = None
- self.username = None
- self.realm = None
- self.authzid = None
- self.done = None
- self.last_nonce_count = None
- self._DigestMD5ServerAuthenticator__logger = logging.getLogger('pyxmpp.sasl.DigestMD5ServerAuthenticator')
-
-
- def start(self, response):
- _unused = response
- self.last_nonce_count = 0
- params = []
- realms = self.password_manager.get_realms()
- if realms:
- self.realm = _quote(realms[0])
- for r in realms:
- r = _quote(r)
- params.append('realm="%s"' % (r,))
-
- else:
- self.realm = None
- nonce = _quote(self.password_manager.generate_nonce())
- self.nonce = nonce
- params.append('nonce="%s"' % (nonce,))
- params.append('qop="auth"')
- params.append('charset=utf-8')
- params.append('algorithm=md5-sess')
- self.authzid = None
- self.done = 0
- return Challenge(','.join(params))
-
-
- def response(self, response):
- if self.done:
- return Success(self.username, self.realm, self.authzid)
-
- if not response:
- return Failure('not-authorized')
-
- return self._parse_response(response)
-
-
- def _parse_response(self, response):
- response = response.split('\x00')[0]
- if self.realm:
- realm = to_utf8(self.realm)
- realm = _quote(realm)
- else:
- realm = None
- username = None
- cnonce = None
- digest_uri = None
- response_val = None
- authzid = None
- nonce_count = None
- while response:
- m = _param_re.match(response)
- if not m:
- self._DigestMD5ServerAuthenticator__logger.debug('Response syntax error: %r' % (response,))
- return Failure('not-authorized')
-
- response = m.group('rest')
- var = m.group('var')
- val = m.group('val')
- self._DigestMD5ServerAuthenticator__logger.debug('%r: %r' % (var, val))
- if var == 'realm':
- realm = val[1:-1]
- continue
- if var == 'cnonce':
- if cnonce:
- self._DigestMD5ServerAuthenticator__logger.debug('Duplicate cnonce')
- return Failure('not-authorized')
-
- cnonce = val[1:-1]
- continue
- if var == 'qop':
- if val != 'auth':
- self._DigestMD5ServerAuthenticator__logger.debug("qop other then 'auth'")
- return Failure('not-authorized')
-
- val != 'auth'
- if var == 'digest-uri':
- digest_uri = val[1:-1]
- continue
- if var == 'authzid':
- authzid = val[1:-1]
- continue
- if var == 'username':
- username = val[1:-1]
- continue
- if var == 'response':
- response_val = val
- continue
- if var == 'nc':
- nonce_count = val
- self.last_nonce_count += 1
- if int(nonce_count) != self.last_nonce_count:
- self._DigestMD5ServerAuthenticator__logger.debug('bad nonce: %r != %r' % (nonce_count, self.last_nonce_count))
- return Failure('not-authorized')
-
- int(nonce_count) != self.last_nonce_count
- continue
- self
- return self._check_params(username, realm, cnonce, digest_uri, response_val, authzid, nonce_count)
-
-
- def _check_params(self, username, realm, cnonce, digest_uri, response_val, authzid, nonce_count):
- if not cnonce:
- self._DigestMD5ServerAuthenticator__logger.debug("Required 'cnonce' parameter not given")
- return Failure('not-authorized')
-
- if not response_val:
- self._DigestMD5ServerAuthenticator__logger.debug("Required 'response' parameter not given")
- return Failure('not-authorized')
-
- if not username:
- self._DigestMD5ServerAuthenticator__logger.debug("Required 'username' parameter not given")
- return Failure('not-authorized')
-
- if not digest_uri:
- self._DigestMD5ServerAuthenticator__logger.debug("Required 'digest_uri' parameter not given")
- return Failure('not-authorized')
-
- if not nonce_count:
- self._DigestMD5ServerAuthenticator__logger.debug("Required 'nc' parameter not given")
- return Failure('not-authorized')
-
- return self._make_final_challenge(username, realm, cnonce, digest_uri, response_val, authzid, nonce_count)
-
-
- def _make_final_challenge(self, username, realm, cnonce, digest_uri, response_val, authzid, nonce_count):
- username_uq = from_utf8(username.replace('\\', ''))
- if authzid:
- authzid_uq = from_utf8(authzid.replace('\\', ''))
- else:
- authzid_uq = None
- if realm:
- realm_uq = from_utf8(realm.replace('\\', ''))
- else:
- realm_uq = None
- digest_uri_uq = digest_uri.replace('\\', '')
- self.username = username_uq
- self.realm = realm_uq
- (password, pformat) = self.password_manager.get_password(username_uq, realm_uq, ('plain', 'md5:user:realm:pass'))
- if pformat == 'md5:user:realm:pass':
- urp_hash = password
- elif pformat == 'plain':
- urp_hash = _make_urp_hash(username, realm, password)
- else:
- self._DigestMD5ServerAuthenticator__logger.debug("Couldn't get password.")
- return Failure('not-authorized')
- valid_response = _compute_response(urp_hash, self.nonce, cnonce, nonce_count, authzid, digest_uri)
- if response_val != valid_response:
- self._DigestMD5ServerAuthenticator__logger.debug('Response mismatch: %r != %r' % (response_val, valid_response))
- return Failure('not-authorized')
-
- s = digest_uri_uq.split('/')
- if len(s) == 3:
- (serv_type, host, serv_name) = s
- elif len(s) == 2:
- (serv_type, host) = s
- serv_name = None
- else:
- self._DigestMD5ServerAuthenticator__logger.debug('Bad digest_uri: %r' % (digest_uri_uq,))
- return Failure('not-authorized')
- info = { }
- info['mechanism'] = 'DIGEST-MD5'
- info['username'] = username_uq
- info['serv-type'] = serv_type
- info['host'] = host
- info['serv-name'] = serv_name
- if self.password_manager.check_authzid(authzid_uq, info):
- rspauth = _compute_response_auth(urp_hash, self.nonce, cnonce, nonce_count, authzid, digest_uri)
- self.authzid = authzid
- self.done = 1
- return Challenge('rspauth=' + rspauth)
- else:
- self._DigestMD5ServerAuthenticator__logger.debug('Authzid check failed')
- return Failure('invalid_authzid')
-
-
-