home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) import logging import time import sqlite3 from _clientcookie import CookieJar, Cookie, MappingIterator from _util import isstringlike, experimental debug = logging.getLogger('mechanize.cookies').debug class Firefox3CookieJar(CookieJar): def __init__(self, filename, autoconnect = True, policy = None): experimental('Firefox3CookieJar is experimental code') CookieJar.__init__(self, policy) if filename is not None and not isstringlike(filename): raise ValueError('filename must be string-like') not isstringlike(filename) self.filename = filename self._conn = None if autoconnect: self.connect() def connect(self): self._conn = sqlite3.connect(self.filename) self._conn.isolation_level = 'DEFERRED' self._create_table_if_necessary() def close(self): self._conn.close() def _transaction(self, func): try: cur = self._conn.cursor() try: result = func(cur) finally: cur.close() except: self._conn.rollback() raise self._conn.commit() return result def _execute(self, query, params = ()): return (None, self._transaction)((lambda cur: cur.execute(query, params))) def _query(self, query, params = ()): cur = self._conn.cursor() try: cur.execute(query, params) for row in cur.fetchall(): yield row finally: cur.close() def _create_table_if_necessary(self): self._execute('CREATE TABLE IF NOT EXISTS moz_cookies (id INTEGER PRIMARY KEY, name TEXT,\n value TEXT, host TEXT, path TEXT,expiry INTEGER,\n lastAccessed INTEGER, isSecure INTEGER, isHttpOnly INTEGER)') def _cookie_from_row(self, row): (pk, name, value, domain, path, expires, last_accessed, secure, http_only) = row version = 0 domain = domain.encode('ascii', 'ignore') path = path.encode('ascii', 'ignore') name = name.encode('ascii', 'ignore') value = value.encode('ascii', 'ignore') secure = bool(secure) rest = { } if http_only: rest['HttpOnly'] = None if name == '': name = value value = None initial_dot = domain.startswith('.') domain_specified = initial_dot discard = False if expires == '': expires = None discard = True return Cookie(version, name, value, None, False, domain, domain_specified, initial_dot, path, False, secure, expires, discard, None, None, rest) def clear(self, domain = None, path = None, name = None): CookieJar.clear(self, domain, path, name) where_parts = [] sql_params = [] if domain is not None: where_parts.append('host = ?') sql_params.append(domain) if path is not None: where_parts.append('path = ?') sql_params.append(path) if name is not None: where_parts.append('name = ?') sql_params.append(name) where = ' AND '.join(where_parts) if where: where = ' WHERE ' + where def clear(cur): cur.execute('DELETE FROM moz_cookies%s' % where, tuple(sql_params)) self._transaction(clear) def _row_from_cookie(self, cookie, cur): expires = cookie.expires if cookie.discard: expires = '' domain = unicode(cookie.domain) path = unicode(cookie.path) name = unicode(cookie.name) value = unicode(cookie.value) secure = bool(int(cookie.secure)) if value is None: value = name name = '' last_accessed = int(time.time()) http_only = cookie.has_nonstandard_attr('HttpOnly') query = cur.execute('SELECT MAX(id) + 1 from moz_cookies') pk = query.fetchone()[0] if pk is None: pk = 1 return (pk, name, value, domain, path, expires, last_accessed, secure, http_only) def set_cookie(self, cookie): if cookie.discard: CookieJar.set_cookie(self, cookie) return None def set_cookie(cur): row = self._row_from_cookie(cookie, cur) (name, unused, domain, path) = row[1:5] cur.execute('DELETE FROM moz_cookies WHERE host = ? AND path = ? AND name = ?', (domain, path, name)) cur.execute('INSERT INTO moz_cookies VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)\n', row) self._transaction(set_cookie) def __iter__(self): for cookie in MappingIterator(self._cookies): yield cookie for row in self._query('SELECT * FROM moz_cookies ORDER BY name, path, host'): yield self._cookie_from_row(row) def _cookies_for_request(self, request): session_cookies = CookieJar._cookies_for_request(self, request) def get_cookies(cur): query = cur.execute('SELECT host from moz_cookies') domains = [ row[0] for row in query.fetchmany() ] cookies = [] for domain in domains: cookies += self._persistent_cookies_for_domain(domain, request, cur) return cookies persistent_coookies = self._transaction(get_cookies) return session_cookies + persistent_coookies def _persistent_cookies_for_domain(self, domain, request, cur): cookies = [] if not self._policy.domain_return_ok(domain, request): return [] debug('Checking %s for cookies to return', domain) query = cur.execute('SELECT * from moz_cookies WHERE host = ? ORDER BY path', (domain,)) cookies = [ self._cookie_from_row(row) for row in query.fetchmany() ] last_path = None r = [] for cookie in cookies: if not self._policy.return_ok(cookie, request): debug(' not returning cookie') continue debug(" it's a match") r.append(cookie) return r