home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __license__ = 'GPL v3'
- __copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
- __docformat__ = 'restructuredtext en'
- import traceback
- import socket
- import re
- import sys
- from functools import partial
- from threading import Thread, Event
- from Queue import Queue, Empty
- import mechanize
- from calibre.customize import Plugin
- from calibre import browser, prints
- from calibre.ebooks.BeautifulSoup import BeautifulSoup
- from calibre.constants import preferred_encoding, DEBUG
-
- class CoverDownload(Plugin):
- supported_platforms = [
- 'windows',
- 'osx',
- 'linux']
- author = 'Kovid Goyal'
- type = _('Cover download')
-
- def has_cover(self, mi, ans, timeout = 5):
- raise NotImplementedError()
-
-
- def get_covers(self, mi, result_queue, abort, timeout = 5):
- raise NotImplementedError()
-
-
- def exception_to_string(self, ex):
-
- try:
- return unicode(ex)
- except:
-
- try:
- return str(ex).decode(preferred_encoding, 'replace')
- return repr(ex)
-
-
-
-
- def debug(self, *args, **kwargs):
- if DEBUG:
- prints('\t' + self.name + ':', *args, **kwargs)
-
-
-
-
- class HeadRequest(mechanize.Request):
-
- def get_method(self):
- return 'HEAD'
-
-
-
- class OpenLibraryCovers(CoverDownload):
- OPENLIBRARY = 'http://covers.openlibrary.org/b/isbn/%s-L.jpg?default=false'
- name = 'openlibrary.org covers'
- description = _('Download covers from openlibrary.org')
- author = 'Kovid Goyal'
-
- def has_cover(self, mi, ans, timeout = 5):
- if not mi.isbn:
- return False
- br = browser()
- br.set_handle_redirect(False)
-
- try:
- br.open_novisit(HeadRequest(self.OPENLIBRARY % mi.isbn), timeout = timeout)
- self.debug('cover for', mi.isbn, 'found')
- ans.set()
- except Exception:
- mi.isbn
- e = mi.isbn
- if callable(getattr(e, 'getcode', None)) and e.getcode() == 302:
- self.debug('cover for', mi.isbn, 'found')
- ans.set()
- else:
- self.debug(e)
- except:
- e.getcode() == 302
-
-
-
- def get_covers(self, mi, result_queue, abort, timeout = 5):
- if not mi.isbn:
- return None
- br = browser()
-
- try:
- ans = br.open(self.OPENLIBRARY % mi.isbn, timeout = timeout).read()
- result_queue.put((True, ans, 'jpg', self.name))
- except Exception:
- mi.isbn
- e = mi.isbn
- if callable(getattr(e, 'getcode', None)) and e.getcode() == 404:
- result_queue.put((False, _('ISBN: %s not found') % mi.isbn, '', self.name))
- else:
- result_queue.put((False, self.exception_to_string(e), traceback.format_exc(), self.name))
- except:
- e.getcode() == 404
-
-
-
-
- class LibraryThingCovers(CoverDownload):
- name = 'librarything.com covers'
- description = _('Download covers from librarything.com')
- author = 'Kovid Goyal'
- LIBRARYTHING = 'http://www.librarything.com/isbn/'
-
- def get_cover_url(self, isbn, br, timeout = 5):
-
- try:
- src = br.open_novisit('http://www.librarything.com/isbn/' + isbn, timeout = timeout).read().decode('utf-8', 'replace')
- except Exception:
- err = None
- if isinstance(getattr(err, 'args', [
- None])[0], socket.timeout):
- err = Exception(_('LibraryThing.com timed out. Try again later.'))
-
- raise err
-
- s = BeautifulSoup(src)
- url = s.find('td', attrs = {
- 'class': 'left' })
- if url is None:
- if s.find('div', attrs = {
- 'class': 'highloadwarning' }) is not None:
- raise Exception(_('Could not fetch cover as server is experiencing high load. Please try again later.'))
- s.find('div', attrs = {
- 'class': 'highloadwarning' }) is not None
- raise Exception(_('ISBN: %s not found') % isbn)
- url is None
- url = url.find('img')
- if url is None:
- raise Exception(_('LibraryThing.com server error. Try again later.'))
- url is None
- url = re.sub('_S[XY]\\d+', '', url['src'])
- return url
-
-
- def has_cover(self, mi, ans, timeout = 5):
- if not mi.isbn:
- return False
- br = browser()
-
- try:
- self.get_cover_url(mi.isbn, br, timeout = timeout)
- self.debug('cover for', mi.isbn, 'found')
- ans.set()
- except Exception:
- mi.isbn
- e = mi.isbn
- self.debug(e)
- except:
- mi.isbn
-
-
-
- def get_covers(self, mi, result_queue, abort, timeout = 5):
- if not mi.isbn:
- return None
- br = browser()
-
- try:
- url = self.get_cover_url(mi.isbn, br, timeout = timeout)
- cover_data = br.open_novisit(url).read()
- result_queue.put((True, cover_data, 'jpg', self.name))
- except Exception:
- mi.isbn
- e = mi.isbn
- result_queue.put((False, self.exception_to_string(e), traceback.format_exc(), self.name))
- except:
- mi.isbn
-
-
-
-
- def check_for_cover(mi, timeout = 5):
- cover_sources = cover_sources
- import calibre.customize.ui
- ans = Event()
- checkers = [ partial(p.has_cover, mi, ans, timeout = timeout) for p in cover_sources() ]
- workers = [ Thread(target = c) for c in checkers ]
- for w in workers:
- w.daemon = True
- w.start()
-
- for w in workers:
- if _[3](_[3][int(w.is_alive())]) == 0:
- break
- continue
- []
- []
- return ans.is_set()
-
-
- def download_covers(mi, result_queue, max_covers = 50, timeout = 5):
- cover_sources = cover_sources
- import calibre.customize.ui
- abort = Event()
- temp = Queue()
- getters = [ partial(p.get_covers, mi, temp, abort, timeout = timeout) for p in cover_sources() ]
- workers = [ Thread(target = c) for c in getters ]
- for w in workers:
- w.daemon = True
- w.start()
-
- count = 0
- for w in workers:
- if _[3](_[3][int(w.is_alive())]) == 0:
- break
- continue
- []
- []
- abort.set()
- while True:
-
- try:
- result = temp.get_nowait()
- count += 1
- result_queue.put(result)
- continue
- except Empty:
- sum
- sum
- []
- break
- continue
- []
-
-
- sum<EXCEPTION MATCH>Empty
-
-
- def download_cover(mi, timeout = 5):
- results = Queue()
- download_covers(mi, results, max_covers = 1, timeout = timeout)
- errors = []
- ans = None
- while True:
-
- try:
- x = results.get_nowait()
- if x[0]:
- ans = x[1]
- else:
- errors.append(x)
- continue
- except Empty:
- break
- continue
-
-
- None<EXCEPTION MATCH>Empty
- return (ans, errors)
-
-
- def test(isbns):
- MetaInformation = MetaInformation
- import calibre.ebooks.metadata
- mi = MetaInformation('test', [
- 'test'])
- for isbn in isbns:
- prints('Testing ISBN:', isbn)
- mi.isbn = isbn
- found = check_for_cover(mi)
- prints('Has cover:', found)
- (ans, errors) = download_cover(mi)
- if ans is not None:
- prints('Cover downloaded')
- else:
- prints('Download failed:')
- for err in errors:
- prints('\t', err[-1] + ':', err[1])
-
- print '\n'
-
-
- if __name__ == '__main__':
- isbns = sys.argv[1:] + [
- '9781591025412',
- '9780307272119']
- test(isbns)
-
-