home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.6)
-
- import logging
- import hashlib
- import rfc822
- import struct
- import string
- import re
- import util
- import util.primitives.funcs as funcs
- import util.xml_tag as util
- import msn
- from util.Events import event
- from msn.p10 import Notification as Super
- log = logging.getLogger('msn.p11.ns')
- defcb = dict(trid = True, callback = sentinel)
- MAX_SINT = 2147483647
- psmurl_re = re.compile('\\<PSMUrl\\>(.*?)\\</PSMUrl\\>', re.IGNORECASE)
- format_re = re.compile('%\\((.*?)\\)s')
-
- def psm_url_fix(datatag_str):
- match = psmurl_re.search(datatag_str)
- if match:
- url = match.group(1)
- (start, end) = match.span()
- newmsg = datatag_str[:start] + datatag_str[end:]
- else:
- url = ''
- newmsg = datatag_str
- return (newmsg, url)
-
-
- def transform_format_groups(fmtstring):
- fixed = []
- keys = []
- last_end = 0
- match = format_re.search(fmtstring, last_end)
- while match:
- key = match.group(1)
- if key not in keys:
- keys.append(key)
-
- fixed.append(fmtstring[last_end:match.start()])
- fixed.append('{%d}' % keys.index(key))
- last_end = match.end()
- match = format_re.search(fmtstring, last_end)
- fixed.append(fmtstring[last_end:])
- return (''.join(fixed), keys)
-
-
- def format_mediainfo(info):
- return repr(info)
-
-
- class MSNP11Notification(Super):
- versions = [
- 'MSNP11']
- client_chl_id = challenge_id = 'PROD0090YUAUV{2B'
- client_chl_code = 'YMM8C_H7KCQ2S_KL'
- events = Super.events | set(('contact_status_msg', 'received_oims'))
-
- def __init__(self, *a, **k):
- self.oims = []
- Super.__init__(self, *a, **k)
-
-
- def recv_sbs(self, msg):
- log.warning('Got SBS command')
-
-
- def recv_ubx(self, msmsg):
- (bname,) = msmsg.args
- msg = msmsg.payload
- log.info('Got UBX for %s: %r', bname, str(msg))
- if not msg:
- msg = '<data />'
-
- (msg, url) = psm_url_fix(msg)
-
- try:
- msg = util.xml_tag.tag(msg)
- except Exception:
- import traceback
- traceback.print_exc()
- msg = util.xml_tag.tag('<data />')
-
- status_message = ''
- now_playing = ''
- if msg.PSM:
- status_message = msg.PSM._cdata.decode('xml')
- log.info('%r has status message of: %r', bname, status_message)
-
- if status_message and now_playing:
- status_message = u'%s\n%s' % (status_message, now_playing)
- elif not status_message:
- pass
- status_message = now_playing
- self.event('contact_status_msg', bname, status_message)
- return msg
-
-
- def recv_chl(self, msg):
- log.debug('got chl')
- self.event('challenge', msg.args[0])
-
-
- def recv_uux(self, msg):
- unused_message = msg.payload
-
-
- def send_uux(self, message = None, mediainfo = None, url = None, callback = None):
- mtag = util.xml_tag.tag('Data')
- if message is not None:
- mtag.PSM._cdata = message
- else:
- mtag.PSM._cdata = ''
- if mediainfo is not None:
- mtag.CurrentMedia._cdata = mediainfo
- else:
- mtag.CurrentMedia._cdata = ''
- if url is not None:
- mtag.PSMUrl._cdata = url
- else:
- mtag.PSMUrl._cdata = ''
- message = mtag._to_xml(pretty = False).encode('utf-8')
- self.socket.send(msn.Message('UUX', payload = message), trid = True, callback = callback)
-
-
- def _set_status_message(self, *a, **k):
- return self.send_uux(*a, **k)
-
-
- def set_message_object(self, messageobj, callback):
- media = getattr(messageobj, 'media', None)
- log.debug('set_message_object got this for (messageobj, media): %r', (messageobj, media))
- if media is not None:
- fmt = funcs.get(media, 'format_string', '')
- args = funcs.get(media, 'format_args', { })
- if fmt and args:
- (fmtstring, keys) = transform_format_groups(fmt)
- values = [ args[key] for key in keys ]
- application = media.get('app', '')
- type = media.get('type', 'Music')
- enabled = '1'
- array = '\\0'.join([
- application,
- type,
- enabled,
- fmtstring] + values + [
- ''])
- self.send_uux(mediainfo = array, callback = callback)
- else:
- log.debug('msn not sending CurrentMedia because no fmt or args. (fmt=%r, args=%r)', fmt, args)
- self.send_uux(message = messageobj.message, callback = callback)
- else:
- log.debug('msn not sending CurrentMedia because media is None')
- self.send_uux(messageobj.message, callback = callback)
-
-
- def recv_msg_notification(self, msg):
- if msg.name == 'Hotmail':
- MD = self.extract_oim_info(msg)
- self.oims = msn.oim.OIMMessages(self, MD)
- else:
- log.warning('unknown msg/notification')
-
-
- def extract_oim_info(self, oim_info_msg):
- msg_obj = rfc822.Message(oim_info_msg.payload.body())
- maildata = msg_obj['Mail-Data']
- if 'too-large' in maildata:
- MD = None
- else:
- MD = util.xml_tag.tag(maildata)
- return MD
-
-
- def recv_msg_oims(self, msg):
- if msg.name == 'Hotmail':
- MD = self.extract_oim_info(msg)
- self.oims += msn.oim.OIMMessages(self, MD)
-
-
-
- def received_oims(self, oims):
- return oims
-
- received_oims = event(received_oims)
-
- def _challenge_response(self, chl_str, challenge_key, mystery_num = 242854337):
- hash = hashlib.md5(chl_str + challenge_key).digest()
- hash_ints = struct.unpack('<llll', hash)
- hash_ints = [ x & MAX_SINT for x in hash_ints ]
- chl_str += self.challenge_id
- chl_str += string.zfill('', 8 - len(chl_str) % 8)
- chl_nums = struct.unpack('<%di' % len(chl_str) / 4, chl_str)
- while i < len(chl_nums) - 1:
- j = chl_nums[i]
- j = mystery_num * j % MAX_SINT
- j += hi
- j = hash_ints[0] * j + hash_ints[1]
- j = j % MAX_SINT
- hi = (chl_nums[i + 1] + j) % MAX_SINT
- hi = hash_ints[2] * hi + hash_ints[3]
- hi = hi % MAX_SINT
- lo = lo + hi + j
- i += 2
- continue
- hi = lo = i = 0
-
- byteswap = lambda i, f: struct.unpack('>' + f, struct.pack('<' + f, i))[0]
- hi = byteswap((hi + hash_ints[1]) % MAX_SINT, 'L')
- lo = byteswap((lo + hash_ints[3]) % MAX_SINT, 'L')
- key = byteswap((hi << 0x20L) + lo, 'Q')
- ls = [ byteswap(abs(byteswap(x, 'Q') ^ key), 'Q') for x in struct.unpack('>QQ', hash) ]
- return ''.join((lambda .0: for x in .0:
- ('%x' % x).zfill(16).lower())(ls))
-
-
-
- def __test():
- import doctest
- doctest.testmod(verbose = True)
-
- if __name__ == '__main__':
- __test()
-
-