home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from util.net import linkify
- from util.cacheable import urlcacheopen
- from social.twitter.interfaces import IDisplayableTweet
- from util.primitives import iproperty
- from social.twitter.interfaces import ITwitterFetcher
- from util.primitives import Storage
- from protocols.adapters import AdaptationFailure
- from util.primitives import preserve_newlines
- import traceback
- import calendar
- import urlparse
- import protocols
- import time
- import simplejson
- from interfaces import IDisplayableStatus, IDisplayableDirect, ICachingURLObj, ILinkifiedText
- import rfc822
- from logging import getLogger
- log = getLogger('twitter.objects')
-
- class TwitterCacheable(protocols.AbstractBase):
- protocols.advise(instancesProvide = [
- ICachingURLObj])
- get_cache_names = dict
-
- def _get_cache_names(self):
-
- try:
- return self._cache_names
- except AttributeError:
- KeyError = None
- self._cache_names = self.get_cache_names()
-
- return self._cache_names
-
- cache_names = property(fget = _get_cache_names, doc = 'dict of attributes to map cache URLs as')
-
- def _get_cache_values(self):
-
- try:
- return self._cache_values
- except AttributeError:
- KeyError = None
- self._cache_values = { }
-
- return self._cache_values
-
- cache_values = property(fget = _get_cache_values, doc = 'completed values')
-
- def do_cache(self, cache = None):
- for name, location in self.cache_names.iteritems():
- if cache is not None:
-
- try:
- self.cache_values[name] = cache[location]
- continue
- except KeyError:
- pass
- except:
- None<EXCEPTION MATCH>KeyError
-
-
-
- if self.cache_values.get(name, sentinel) is sentinel:
-
- try:
- (resp, _data) = urlcacheopen(location)
- except Exception:
- self.cache_values[name] = None
-
- if resp.status <= resp.status:
- pass
- elif resp.status < 300:
- self.cache_values[name] = resp.cache_path
- if cache is not None:
- cache[location] = resp.cache_path
-
- else:
- self.cache_values[name] = None
- resp.status < 300
-
-
-
-
- class StatusMixin(object):
-
- def _tweet(self):
- return self
-
- _tweet = property(_tweet)
-
- def get_cache_names(self):
- return {
- 'cache_url': self._tweet.user.profile_image_url }
-
-
- def user_screen_name(self):
- return self._tweet.user.screen_name
-
- user_screen_name = property(user_screen_name)
-
- def linkified_text(self):
-
- try:
- return self._linkified_text
- except AttributeError:
- import social.twitter.functions as f
- pieces = f.at_someone.split(preserve_newlines(self.text))
- pieces = filter(None, pieces)
- pieces = [ f.namelink(linkify(piece)) for piece in pieces ]
- self._linkified_text = u''.join(pieces)
- return self._linkified_text
- except:
- []
-
-
- linkified_text = property(linkified_text)
-
-
- class DirectMixin(object):
-
- def _tweet(self):
- return self
-
- _tweet = property(_tweet)
-
- def get_cache_names(self):
- return {
- 'sender_cache_url': self._tweet.sender.profile_image_url,
- 'recipient_cache_url': self._tweet.recipient.profile_image_url }
-
-
- def linkified_text(self):
-
- try:
- return self._linkified_text
- except AttributeError:
- self._linkified_text = linkify(preserve_newlines(self.text))
- return self._linkified_text
-
-
- linkified_text = property(linkified_text)
-
-
- class UpdateFetcherMixin(object):
-
- def get_updatefreq(self):
- return self._updatefreq
-
-
- def set_updatefreq(self, value):
- self._updatefreq = value
-
-
- def del_updatefreq(self):
- del self._updatefreq
-
- updatefreq = iproperty('get_updatefreq', 'set_updatefreq', 'del_updatefreq')
-
-
- class StringTwitterFetcher(object):
- protocols.advise(instancesProvide = [
- ITwitterFetcher])
-
- def __init__(self, url):
- self.url = url
-
-
- def id(self):
- return self.url
-
- id = property(id)
-
- def Fetch(self, api):
- json = api._FetchUrl(self.url)
- data = simplejson.loads(json)
- ret = []
- for d in filter(None, data):
-
- try:
- ret.append(IDisplayableTweet(d))
- continue
- except AdaptationFailure:
- log.error('there was an error adapting %r', d)
- continue
-
-
-
- return ret
-
-
-
- def fetcherFromString(s):
- if s.endswith('.json') or s.endswith('.xml'):
- return StringTwitterFetcher(s)
- else:
- return None
-
-
- class TwitterStatusMsg(StatusMixin, TwitterCacheable, Storage):
- protocols.advise(instancesProvide = [
- IDisplayableStatus])
- direct = False
-
- def __init__(self, *a, **k):
- Storage.__init__(self, *a, **k)
- if 'user' in self:
- self.user = Storage(self.user)
-
-
-
- def AsDict(self):
- d = dict(self)
- if 'user' in d:
- d['user'] = dict(d['user'])
-
- d.pop('twitter', None)
- d.pop('_cache_names', None)
- d.pop('_cache_values', None)
- d.pop('_linkified_text', None)
- d.pop('_kind', None)
- return d
-
-
- def shouldShow(self, icon_type):
- return self.twitter.shouldShow(self, icon_type)
-
-
- def kind(self):
-
- try:
- return self._kind
- except AttributeError:
- if self.user.get('screen_name', '').lower() == self.twitter.name.lower() and self.in_reply_to_user_id:
- self._kind = 'self_reply'
- elif self.user.get('screen_name', '').lower() == self.twitter.name.lower():
- self._kind = 'self_tweet'
- elif self.in_reply_to_user_id and '@' + self.twitter.name.lower() in self.text.lower():
- self._kind = 'friend_reply'
- else:
- self._kind = 'friend_tweet'
- return self._kind
-
-
- kind = property(kind)
-
- def created_at_in_seconds(self):
- if self.created_at is None:
- return 0
-
- return calendar.timegm(rfc822.parsedate(self.created_at))
-
- created_at_in_seconds = property(created_at_in_seconds)
-
- def relative_created_at(self):
- fudge = 1.5
- delta = int(time.time()) - int(self.created_at_in_seconds)
- if delta < 60 * fudge:
- return 'about a minute ago'
- elif delta < 3600 * (1 / fudge):
- return 'about %d minutes ago' % (delta / 60 + 1)
- elif delta < 3600 * fudge:
- return 'about an hour ago'
- elif delta < 86400 * (1 / fudge):
- return 'about %d hours ago' % (delta / 3600 + 1)
- elif delta < 86400 * fudge:
- return 'about a day ago'
- else:
- return 'about %d days ago' % (delta / 86400 + 1)
-
- relative_created_at = property(relative_created_at)
-
- def __repr__(self):
- return '<StatusMessage id: %s>' % self.id
-
-
-
- class TwitterDirectMsg(DirectMixin, TwitterCacheable, Storage):
- protocols.advise(instancesProvide = [
- IDisplayableDirect])
- direct = True
-
- def __init__(self, *a, **k):
- Storage.__init__(self, *a, **k)
- if 'sender' in self:
- self.sender = Storage(self.sender)
-
- if 'recipient' in self:
- self.recipient = Storage(self.recipient)
-
-
-
- def AsDict(self):
- d = dict(self)
- if 'sender' in d:
- d['sender'] = dict(d['sender'])
-
- if 'recipient' in d:
- d['recipient'] = dict(d['recipient'])
-
- d.pop('twitter', None)
- d.pop('_cache_names', None)
- d.pop('_cache_values', None)
- d.pop('_linkified_text', None)
- d.pop('_kind', None)
- return d
-
-
- def shouldShow(self, icon_type):
- return self.twitter.shouldShow(self, icon_type)
-
-
- def kind(self):
-
- try:
- return self._kind
- except AttributeError:
- if self.sender.get('screen_name', '').lower() == self.twitter.name.lower():
- self._kind = 'direct_sent'
- else:
- self._kind = 'direct_recieved'
- return self._kind
-
-
- kind = property(kind)
-
- def created_at_in_seconds(self):
- return calendar.timegm(rfc822.parsedate(self.created_at))
-
- created_at_in_seconds = property(created_at_in_seconds)
-
- def relative_created_at(self):
- fudge = 1.5
- delta = int(time.time()) - int(self.created_at_in_seconds)
- if delta < 60 * fudge:
- return 'about a minute ago'
- elif delta < 3600 * (1 / fudge):
- return 'about %d minutes ago' % (delta / 60 + 1)
- elif delta < 3600 * fudge:
- return 'about an hour ago'
- elif delta < 86400 * (1 / fudge):
- return 'about %d hours ago' % (delta / 3600 + 1)
- elif delta < 86400 * fudge:
- return 'about a day ago'
- else:
- return 'about %d days ago' % (delta / 86400 + 1)
-
- relative_created_at = property(relative_created_at)
-
- def __repr__(self):
- return '<DirectMessage id: %s>' % self.id
-
-
-
- def validate_user(user):
- int(user.id)
- unicode(user.name)
- unicode(user.screen_name)
- if isinstance(user.profile_image_url, unicode):
- user.profile_image_url = fix_unicode_url(user.profile_image_url)
-
- str(user.profile_image_url)
-
-
- def fix_unicode_url(url):
- parsed = urlparse.urlsplit(url)
- newpath = [] + []([ x.encode('utf-8').encode('url') for x in parsed.path.split('/')[1:] ])
- url = urlparse.urlunsplit((parsed.scheme, parsed.netloc, newpath, parsed.query, parsed.fragment))
- return url
-
-
- def validate_direct(tweet):
-
- try:
- validate_user(tweet.sender)
- validate_user(tweet.recipient)
- unicode(tweet.sender_screen_name)
- unicode(tweet.recipient_screen_name)
- int(tweet.id)
- unicode(tweet.text)
- tweet.relative_created_at
- except Exception:
- traceback.print_exc()
- return False
-
- return True
-
-
- def validate_status(tweet):
-
- try:
- int(tweet.id)
- unicode(tweet.text)
- tweet.relative_created_at
-
- try:
- bool(tweet.favorited)
- except (KeyError, AttributeError):
- tweet.favorited = False
-
- str(tweet.source)
- validate_user(tweet.user)
- except Exception:
- traceback.print_exc()
- return False
-
- return True
-
-
- def tweetFromDict(d):
- if 'user' in d:
- t = TwitterStatusMsg(d)
- if validate_status(t):
- return t
-
- elif 'sender' in d:
- t = TwitterDirectMsg(d)
- if validate_direct(t):
- return t
-
- else:
- return None
-
-