home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.6)
-
- from urllib2 import Request, urlopen
- import oauth.oauth as oauth
- from util import threaded, callsback
- from datetime import datetime
- import time
- CONSUMER_KEY = 'cBr7RsK3ljQT3nPNzuxQfA'
- CONSUMER_SECRET = '73wB2AKjAFNqknP8tBVKzCY4TMnOal7dQYAD8cICo'
-
- def get_oauth_token(username, password, callback = None):
- auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
-
- def on_thread():
- return auth.get_xauth_access_token(username, password)
-
- on_thread = (None, None, threaded)(on_thread)
- on_thread(callback = callback)
-
- get_oauth_token = callsback(get_oauth_token)
-
- class AuthHandler(object):
-
- def apply_auth(self, url, method, headers, parameters):
- raise NotImplementedError
-
-
- def get_username(self):
- raise NotImplementedError
-
-
-
- class OAuthHandler(AuthHandler):
- OAUTH_HOST = 'twitter.com'
- OAUTH_ROOT = '/oauth/'
-
- def __init__(self, consumer_key, consumer_secret, callback = None, secure = False):
- self._consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)
- self._sigmethod = oauth.OAuthSignatureMethod_HMAC_SHA1()
- self.request_token = None
- self.access_token = None
- self.callback = callback
- self.username = None
- self.secure = secure
-
-
- def _get_oauth_url(self, endpoint, secure = False):
- if self.secure or secure:
- prefix = 'https://'
- else:
- prefix = 'http://'
- return prefix + self.OAUTH_HOST + self.OAUTH_ROOT + endpoint
-
-
- def apply_auth(self, url, method, headers, parameters):
- request = oauth.OAuthRequest.from_consumer_and_token(self._consumer, http_url = url, http_method = method, token = self.access_token, parameters = parameters)
- request.sign_request(self._sigmethod, self._consumer, self.access_token)
- headers.update(request.to_header())
-
-
- def _get_request_token(self):
- url = self._get_oauth_url('request_token')
- request = oauth.OAuthRequest.from_consumer_and_token(self._consumer, http_url = url, callback = self.callback)
- request.sign_request(self._sigmethod, self._consumer, None)
- resp = urlopen(Request(url, headers = request.to_header()))
- return oauth.OAuthToken.from_string(resp.read())
-
-
- def set_request_token(self, key, secret):
- self.request_token = oauth.OAuthToken(key, secret)
-
-
- def set_access_token(self, key, secret):
- self.access_token = oauth.OAuthToken(key, secret)
-
-
- def get_authorization_url(self, signin_with_twitter = False):
- self.request_token = self._get_request_token()
- if signin_with_twitter:
- url = self._get_oauth_url('authenticate')
- else:
- url = self._get_oauth_url('authorize')
- request = oauth.OAuthRequest.from_token_and_callback(token = self.request_token, http_url = url)
- return request.to_url()
-
-
- def get_access_token(self, verifier = None):
- url = self._get_oauth_url('access_token')
- request = oauth.OAuthRequest.from_consumer_and_token(self._consumer, token = self.request_token, http_url = url, verifier = str(verifier))
- request.sign_request(self._sigmethod, self._consumer, self.request_token)
- resp = urlopen(Request(url, headers = request.to_header()))
- self.access_token = oauth.OAuthToken.from_string(resp.read())
- return self.access_token
-
-
- def get_xauth_access_token(self, username, password):
- url = self._get_oauth_url('access_token', secure = True)
- request = oauth.OAuthRequest.from_consumer_and_token(oauth_consumer = self._consumer, http_method = 'POST', http_url = url, parameters = {
- 'x_auth_mode': 'client_auth',
- 'x_auth_username': to_utf8(username),
- 'x_auth_password': to_utf8(password),
- 'oauth_timestamp': generate_corrected_timestamp() })
- request.sign_request(self._sigmethod, self._consumer, None)
- resp = urlopen(Request(url, data = request.to_postdata()))
- self.access_token = oauth.OAuthToken.from_string(resp.read())
- return self.access_token
-
-
- def get_username(self):
- return self.username
-
-
-
- def to_utf8(s):
- if isinstance(s, unicode):
- return s.encode('utf-8')
- return s
-
- _time_correction = None
-
- def get_time_correction():
- return _time_correction
-
-
- def set_server_timestamp(server_time_now):
- global _time_correction
- _time_correction = server_time_now - time.time()
-
-
- def generate_corrected_timestamp():
- import logging
- now = time.time()
- if _time_correction is not None:
- t = now + _time_correction
- logging.getLogger('twitter').warn('using corrected timestamp: %r', datetime.fromtimestamp(t).isoformat())
- else:
- t = now
- logging.getLogger('twitter').warn('using UNcorrected timestamp: %r', datetime.fromtimestamp(t).isoformat())
- return t
-
-