home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) from __future__ import with_statement __license__ = 'GPL v3' __copyright__ = '2008, Marshall T. Vandegrift <llasram@gmail.com>' __docformat__ = 'restructuredtext en' import os import re import uuid import logging from mimetypes import types_map from collections import defaultdict from itertools import count from urlparse import urldefrag, urlparse, urlunparse from urllib import unquote as urlunquote from urlparse import urljoin from lxml import etree, html from cssutils import CSSParser from cssutils.css import CSSRule import calibre from calibre.constants import filesystem_encoding from calibre.translations.dynamic import translate from calibre.ebooks.chardet import xml_to_unicode from calibre.ebooks.oeb.entitydefs import ENTITYDEFS from calibre.ebooks.conversion.preprocess import CSSPreProcessor RECOVER_PARSER = etree.XMLParser(recover = True, no_network = True) XML_NS = 'http://www.w3.org/XML/1998/namespace' XHTML_NS = 'http://www.w3.org/1999/xhtml' OEB_DOC_NS = 'http://openebook.org/namespaces/oeb-document/1.0/' OPF1_NS = 'http://openebook.org/namespaces/oeb-package/1.0/' OPF2_NS = 'http://www.idpf.org/2007/opf' OPF_NSES = set([ OPF1_NS, OPF2_NS]) DC09_NS = 'http://purl.org/metadata/dublin_core' DC10_NS = 'http://purl.org/dc/elements/1.0/' DC11_NS = 'http://purl.org/dc/elements/1.1/' DC_NSES = set([ DC09_NS, DC10_NS, DC11_NS]) XSI_NS = 'http://www.w3.org/2001/XMLSchema-instance' DCTERMS_NS = 'http://purl.org/dc/terms/' NCX_NS = 'http://www.daisy.org/z3986/2005/ncx/' SVG_NS = 'http://www.w3.org/2000/svg' XLINK_NS = 'http://www.w3.org/1999/xlink' CALIBRE_NS = 'http://calibre.kovidgoyal.net/2009/metadata' RE_NS = 'http://exslt.org/regular-expressions' MBP_NS = 'http://www.mobipocket.com' XPNSMAP = { 'h': XHTML_NS, 'o1': OPF1_NS, 'o2': OPF2_NS, 'd09': DC09_NS, 'd10': DC10_NS, 'd11': DC11_NS, 'xsi': XSI_NS, 'dt': DCTERMS_NS, 'ncx': NCX_NS, 'svg': SVG_NS, 'xl': XLINK_NS, 're': RE_NS, 'mbp': MBP_NS, 'calibre': CALIBRE_NS } OPF1_NSMAP = { 'dc': DC11_NS, 'oebpackage': OPF1_NS } OPF2_NSMAP = { 'opf': OPF2_NS, 'dc': DC11_NS, 'dcterms': DCTERMS_NS, 'xsi': XSI_NS, 'calibre': CALIBRE_NS } def XML(name): return '{%s}%s' % (XML_NS, name) def XHTML(name): return '{%s}%s' % (XHTML_NS, name) def OPF(name): return '{%s}%s' % (OPF2_NS, name) def DC(name): return '{%s}%s' % (DC11_NS, name) def XSI(name): return '{%s}%s' % (XSI_NS, name) def DCTERMS(name): return '{%s}%s' % (DCTERMS_NS, name) def NCX(name): return '{%s}%s' % (NCX_NS, name) def SVG(name): return '{%s}%s' % (SVG_NS, name) def XLINK(name): return '{%s}%s' % (XLINK_NS, name) def CALIBRE(name): return '{%s}%s' % (CALIBRE_NS, name) _css_url_re = re.compile('url\\((.*?)\\)', re.I) _css_import_re = re.compile('@import "(.*?)"') _archive_re = re.compile('[^ ]+') def iterlinks(root): link_attrs = set(html.defs.link_attrs) link_attrs.add(XLINK('href')) for el in root.iter(): attribs = el.attrib try: tag = el.tag except UnicodeDecodeError: continue if tag == XHTML('object'): codebase = None if 'codebase' in attribs: codebase = el.get('codebase') yield (el, 'codebase', codebase, 0) for attrib in ('classid', 'data'): if attrib in attribs: value = el.get(attrib) if codebase is not None: value = urljoin(codebase, value) yield (el, attrib, value, 0) continue if 'archive' in attribs: for match in _archive_re.finditer(el.get('archive')): value = match.group(0) if codebase is not None: value = urljoin(codebase, value) yield (el, 'archive', value, match.start()) else: for attr in attribs: if attr in link_attrs: yield (el, attr, attribs[attr], 0) continue if tag == XHTML('style') and el.text: for match in _css_url_re.finditer(el.text): yield (el, None, match.group(1), match.start(1)) for match in _css_import_re.finditer(el.text): yield (el, None, match.group(1), match.start(1)) if 'style' in attribs: for match in _css_url_re.finditer(attribs['style']): yield (el, 'style', match.group(1), match.start(1)) def make_links_absolute(root, base_url): def link_repl(href): return urljoin(base_url, href) rewrite_links(root, link_repl) def resolve_base_href(root): base_href = None basetags = root.xpath('//base[@href]|//h:base[@href]', namespaces = XPNSMAP) for b in basetags: base_href = b.get('href') b.drop_tree() if not base_href: return None make_links_absolute(root, base_href, resolve_base_href = False) def rewrite_links(root, link_repl_func, resolve_base_href = False): if resolve_base_href: resolve_base_href(root) for el, attrib, link, pos in iterlinks(root): new_link = link_repl_func(link.strip()) if new_link == link: continue if new_link is None: if attrib is None: el.text = '' continue del el.attrib[attrib] continue if attrib is None: new = el.text[:pos] + new_link + el.text[pos + len(link):] el.text = new continue cur = el.attrib[attrib] if not pos and len(cur) == len(link): el.attrib[attrib] = new_link continue new = cur[:pos] + new_link + cur[pos + len(link):] el.attrib[attrib] = new EPUB_MIME = types_map['.epub'] XHTML_MIME = types_map['.xhtml'] CSS_MIME = types_map['.css'] NCX_MIME = types_map['.ncx'] OPF_MIME = types_map['.opf'] PAGE_MAP_MIME = 'application/oebps-page-map+xml' OEB_DOC_MIME = 'text/x-oeb1-document' OEB_CSS_MIME = 'text/x-oeb1-css' OPENTYPE_MIME = 'application/x-font-opentype' GIF_MIME = types_map['.gif'] JPEG_MIME = types_map['.jpeg'] PNG_MIME = types_map['.png'] SVG_MIME = types_map['.svg'] BINARY_MIME = 'application/octet-stream' XHTML_CSS_NAMESPACE = u'@namespace "%s";\n' % XHTML_NS OEB_STYLES = set([ CSS_MIME, OEB_CSS_MIME, 'text/x-oeb-css']) OEB_DOCS = set([ XHTML_MIME, 'text/html', OEB_DOC_MIME, 'text/x-oeb-document']) OEB_RASTER_IMAGES = set([ GIF_MIME, JPEG_MIME, PNG_MIME]) OEB_IMAGES = set([ GIF_MIME, JPEG_MIME, PNG_MIME, SVG_MIME]) MS_COVER_TYPE = 'other.ms-coverimage-standard' ENTITY_RE = re.compile('&([a-zA-Z_:][a-zA-Z0-9.-_:]+);') COLLAPSE_RE = re.compile('[ \\t\\r\\n\\v]+') QNAME_RE = re.compile('^[{][^{}]+[}][^{}]+$') PREFIXNAME_RE = re.compile('^[^:]+[:][^:]+') XMLDECL_RE = re.compile('^\\s*<[?]xml.*?[?]>') CSSURL_RE = re.compile('url[(](?P<q>["\']?)(?P<url>[^)]+)(?P=q)[)]') def element(parent, *args, **kwargs): if parent is not None: return etree.SubElement(parent, *args, **kwargs) return etree.Element(*args, **kwargs) def namespace(name): if '}' in name: return name.split('}', 1)[0][1:] return '' def barename(name): if '}' in name: return name.split('}', 1)[1] return name def prefixname(name, nsrmap): if not isqname(name): return name ns = namespace(name) if ns not in nsrmap: return name prefix = nsrmap[ns] if not prefix: return barename(name) return ':'.join((prefix, barename(name))) def isprefixname(name): if name: pass return PREFIXNAME_RE.match(name) is not None def qname(name, nsmap): if not isprefixname(name): return name (prefix, local) = name.split(':', 1) if prefix not in nsmap: return name return '{%s}%s' % (nsmap[prefix], local) def isqname(name): if name: pass return QNAME_RE.match(name) is not None def XPath(expr): return etree.XPath(expr, namespaces = XPNSMAP) def xpath(elem, expr): return elem.xpath(expr, namespaces = XPNSMAP) def xml2str(root, pretty_print = False, strip_comments = False): ans = etree.tostring(root, encoding = 'utf-8', xml_declaration = True, pretty_print = pretty_print) if strip_comments: ans = re.compile('<!--.*?-->', re.DOTALL).sub('', ans) return ans def xml2unicode(root, pretty_print = False): return etree.tostring(root, pretty_print = pretty_print) def xml2text(elem): return etree.tostring(elem, method = 'text', encoding = unicode, with_tail = False) ASCII_CHARS = set((lambda .0: for x in .0: chr(x))(xrange(128))) UNIBYTE_CHARS = set((lambda .0: for x in .0: chr(x))(xrange(256))) URL_SAFE = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_.-/~') URL_UNSAFE = [ ASCII_CHARS - URL_SAFE, UNIBYTE_CHARS - URL_SAFE] def urlquote(href): result = [] unsafe = None if isinstance(href, unicode) else 1 unsafe = URL_UNSAFE[unsafe] for char in href: if char in unsafe: char = '%%%02x' % ord(char) result.append(char) return ''.join(result) def urlnormalize(href): parts = urlparse(href) if not (parts.scheme) or parts.scheme == 'file': (path, frag) = urldefrag(href) parts = ('', '', path, '', '', frag) parts = (lambda .0: for part in .0: part.replace('\\', '/'))(parts) parts = (lambda .0: for part in .0: urlunquote(part))(parts) parts = (lambda .0: for part in .0: urlquote(part))(parts) return urlunparse(parts) def merge_multiple_html_heads_and_bodies(root, log = None): heads = xpath(root, '//h:head') bodies = xpath(root, '//h:body') if not len(heads) > 1 or len(bodies) > 1: return root for child in root: root.remove(child) head = root.makeelement(XHTML('head')) body = root.makeelement(XHTML('body')) for h in heads: for x in h: head.append(x) for b in bodies: for x in b: body.append(x) map(root.append, (head, body)) if log is not None: log.warn('Merging multiple <head> and <body> sections') return root class DummyHandler(logging.Handler): def __init__(self): logging.Handler.__init__(self, logging.WARNING) self.setFormatter(logging.Formatter('%(message)s')) self.log = None def emit(self, record): if self.log is not None: msg = self.format(record) f = None if record.levelno >= logging.ERROR else self.log.warn f(msg) _css_logger = logging.getLogger('calibre.css') _css_logger.setLevel(logging.WARNING) _css_log_handler = DummyHandler() _css_logger.addHandler(_css_log_handler) class OEBError(Exception): pass class NotHTML(OEBError): pass class NullContainer(object): def __init__(self, log): self.log = log def read(self, path): raise OEBError('Attempt to read from NullContainer') def write(self, path): raise OEBError('Attempt to write to NullContainer') def exists(self, path): return False def namelist(self): return [] class DirContainer(object): def __init__(self, path, log): self.log = log path = unicode(path) ext = os.path.splitext(path)[1].lower() if ext == '.opf': self.opfname = os.path.basename(path) self.rootdir = os.path.dirname(path) return None self.rootdir = path for path in self.namelist(): ext = os.path.splitext(path)[1].lower() if ext == '.opf': self.opfname = path return None self.opfname = None def read(self, path): if path is None: path = self.opfname path = os.path.join(self.rootdir, path) try: f = _[1] return f.read() finally: pass def write(self, path, data): path = os.path.join(self.rootdir, urlunquote(path)) dir = os.path.dirname(path) if not os.path.isdir(dir): os.makedirs(dir) try: f = _[1] return f.write(data) finally: pass def exists(self, path): try: path = os.path.join(self.rootdir, urlunquote(path)) except ValueError: return False return os.path.isfile(path) def namelist(self): names = [] base = self.rootdir if isinstance(base, unicode): base = base.encode(filesystem_encoding) for root, dirs, files in os.walk(base): for fname in files: fname = os.path.join(root, fname) fname = fname.replace('\\', '/') if not isinstance(fname, unicode): try: fname = fname.decode(filesystem_encoding) continue names.append(fname) return names class Metadata(object): DC_TERMS = set([ 'contributor', 'coverage', 'creator', 'date', 'description', 'format', 'identifier', 'language', 'publisher', 'relation', 'rights', 'source', 'subject', 'title', 'type']) CALIBRE_TERMS = set([ 'series', 'series_index', 'rating', 'timestamp', 'publication_type']) OPF_ATTRS = { 'role': OPF('role'), 'file-as': OPF('file-as'), 'scheme': OPF('scheme'), 'event': OPF('event'), 'type': XSI('type'), 'lang': XML('lang'), 'id': 'id' } OPF1_NSMAP = { 'dc': DC11_NS, 'oebpackage': OPF1_NS } OPF2_NSMAP = { 'opf': OPF2_NS, 'dc': DC11_NS, 'dcterms': DCTERMS_NS, 'xsi': XSI_NS, 'calibre': CALIBRE_NS } class Item(object): class Attribute(object): def __init__(self, attr, allowed = None): if not callable(attr): attr_ = (attr,) attr = lambda term: attr_ self.attr = attr self.allowed = allowed def term_attr(self, obj): term = obj.term if namespace(term) != DC11_NS: term = OPF('meta') allowed = self.allowed if allowed is not None and term not in allowed: raise AttributeError('attribute %r not valid for metadata term %r' % (self.attr(term), barename(obj.term))) term not in allowed return self.attr(term) def __get__(self, obj, cls): if obj is None: return None return obj.attrib.get(self.term_attr(obj), '') def __set__(self, obj, value): obj.attrib[self.term_attr(obj)] = value def __init__(self, term, value, attrib = { }, nsmap = { }, **kwargs): self.attrib = attrib = dict(attrib) self.nsmap = nsmap = dict(nsmap) attrib.update(kwargs) if namespace(term) == OPF2_NS: term = barename(term) ns = namespace(term) local = barename(term).lower() if local in Metadata.DC_TERMS: if not ns or ns in DC_NSES: term = DC(local) elif local in Metadata.CALIBRE_TERMS and ns in (CALIBRE_NS, ''): term = CALIBRE(local) self.term = term self.value = value for attr, value in attrib.items(): if isprefixname(value): attrib[attr] = qname(value, nsmap) nsattr = Metadata.OPF_ATTRS.get(attr, attr) if nsattr == OPF('scheme') and namespace(term) != DC11_NS: nsattr = 'scheme' if attr != nsattr: attrib[nsattr] = attrib.pop(attr) continue def name(self): def fget(self): return self.term return property(fget = fget) name = dynamic_property(name) def content(self): def fget(self): return self.value def fset(self, value): self.value = value return property(fget = fget, fset = fset) content = dynamic_property(content) scheme = Attribute((lambda term: if term == OPF('meta'): 'scheme'OPF('scheme')), [ DC('identifier'), OPF('meta')]) file_as = Attribute(OPF('file-as'), [ DC('creator'), DC('contributor'), DC('title')]) role = Attribute(OPF('role'), [ DC('creator'), DC('contributor')]) event = Attribute(OPF('event'), [ DC('date')]) id = Attribute('id') type = Attribute(XSI('type'), [ DC('date'), DC('format'), DC('type')]) lang = Attribute(XML('lang'), [ DC('contributor'), DC('coverage'), DC('creator'), DC('publisher'), DC('relation'), DC('rights'), DC('source'), DC('subject'), OPF('meta')]) def __getitem__(self, key): return self.attrib[key] def __setitem__(self, key, value): self.attrib[key] = value def __contains__(self, key): return key in self.attrib def get(self, key, default = None): return self.attrib.get(key, default) def __repr__(self): return 'Item(term=%r, value=%r, attrib=%r)' % (barename(self.term), self.value, self.attrib) def __str__(self): return unicode(self.value).encode('ascii', 'xmlcharrefreplace') def __unicode__(self): return unicode(self.value) def to_opf1(self, dcmeta = None, xmeta = None, nsrmap = { }): attrib = { } for key, value in self.attrib.items(): if namespace(key) == OPF2_NS: key = barename(key) attrib[key] = prefixname(value, nsrmap) if namespace(self.term) == DC11_NS: name = DC(barename(self.term).title()) elem = element(dcmeta, name, attrib = attrib) elem.text = self.value else: elem = element(xmeta, 'meta', attrib = attrib) elem.attrib['name'] = prefixname(self.term, nsrmap) elem.attrib['content'] = prefixname(self.value, nsrmap) return elem def to_opf2(self, parent = None, nsrmap = { }): attrib = { } for key, value in self.attrib.items(): attrib[key] = prefixname(value, nsrmap) if namespace(self.term) == DC11_NS: elem = element(parent, self.term, attrib = attrib) elem.text = self.value else: elem = element(parent, OPF('meta'), attrib = attrib) elem.attrib['name'] = prefixname(self.term, nsrmap) elem.attrib['content'] = prefixname(self.value, nsrmap) return elem def __init__(self, oeb): self.oeb = oeb self.items = defaultdict(list) def add(self, term, value, attrib = { }, nsmap = { }, **kwargs): item = self.Item(term, value, attrib, nsmap, **kwargs) items = self.items[barename(item.term)] items.append(item) return item def iterkeys(self): for key in self.items: yield key __iter__ = iterkeys def clear(self, key): l = self.items[key] for x in list(l): l.remove(x) def filter(self, key, predicate): l = self.items[key] for x in list(l): if predicate(x): l.remove(x) continue def __getitem__(self, key): return self.items[key] def __contains__(self, key): return key in self.items def __getattr__(self, term): return self.items[term] def _nsmap(self): def fget(self): nsmap = { } for term in self.items: for item in self.items[term]: nsmap.update(item.nsmap) return nsmap return property(fget = fget) _nsmap = dynamic_property(_nsmap) def _opf1_nsmap(self): def fget(self): nsmap = self._nsmap for key, value in nsmap.items(): if value in OPF_NSES or value in DC_NSES: del nsmap[key] continue return nsmap return property(fget = fget) _opf1_nsmap = dynamic_property(_opf1_nsmap) def _opf2_nsmap(self): def fget(self): nsmap = self._nsmap nsmap.update(OPF2_NSMAP) return nsmap return property(fget = fget) _opf2_nsmap = dynamic_property(_opf2_nsmap) def to_opf1(self, parent = None): nsmap = self._opf1_nsmap nsrmap = dict((lambda .0: for key, value in .0: (value, key))(nsmap.items())) elem = element(parent, 'metadata', nsmap = nsmap) dcmeta = element(elem, 'dc-metadata', nsmap = OPF1_NSMAP) xmeta = element(elem, 'x-metadata') for term in self.items: for item in self.items[term]: item.to_opf1(dcmeta, xmeta, nsrmap = nsrmap) if 'ms-chaptertour' not in self.items: chaptertour = self.Item('ms-chaptertour', 'chaptertour') chaptertour.to_opf1(dcmeta, xmeta, nsrmap = nsrmap) return elem def to_opf2(self, parent = None): nsmap = self._opf2_nsmap nsrmap = dict((lambda .0: for key, value in .0: (value, key))(nsmap.items())) elem = element(parent, OPF('metadata'), nsmap = nsmap) for term in self.items: for item in self.items[term]: item.to_opf2(elem, nsrmap = nsrmap) return elem class Manifest(object): class Item(object): NUM_RE = re.compile('^(.*)([0-9][0-9.]*)(?=[.]|$)') META_XP = XPath('/h:html/h:head/h:meta[@http-equiv="Content-Type"]') def __init__(self, oeb, id, href, media_type, fallback = None, loader = str, data = None): self.oeb = oeb self.id = id self.href = self.path = urlnormalize(href) self.media_type = media_type self.fallback = fallback self.override_css_fetch = None self.spine_position = None self.linear = True if loader is None and data is None: loader = oeb.container.read self._loader = loader self._data = data def __repr__(self): return u'Item(id=%r, href=%r, media_type=%r)' % (self.id, self.href, self.media_type) def _parse_xml(self, data): data = xml_to_unicode(data, strip_encoding_pats = True, assume_utf8 = True, resolve_entities = True)[0] if not data: return None return etree.fromstring(data, parser = RECOVER_PARSER) def _parse_xhtml(self, data): self.oeb.log.debug('Parsing', self.href, '...') data = self.oeb.decode(data) data = self.oeb.html_preprocessor(data) idx = data.find('<html') if idx == -1: idx = data.find('<HTML') if idx > -1: pre = data[:idx] data = data[idx:] if '<!DOCTYPE' in pre: user_entities = { } for match in re.finditer('<!ENTITY\\s+(\\S+)\\s+([^>]+)', pre): val = match.group(2) if val.startswith('"') and val.endswith('"'): val = val[1:-1] user_entities[match.group(1)] = val if user_entities: pat = re.compile('&(%s);' % '|'.join(user_entities.keys())) data = (pat.sub,)((lambda m: user_entities[m.group(1)]), data) parser = etree.XMLParser(no_network = True) def first_pass(data): try: data = etree.fromstring(data, parser = parser) except etree.XMLSyntaxError: err = None self.oeb.log.exception('Initial parse failed:') repl = lambda m: ENTITYDEFS.get(m.group(1), m.group(0)) data = ENTITY_RE.sub(repl, data) try: data = etree.fromstring(data, parser = parser) except etree.XMLSyntaxError: err = None self.oeb.logger.warn('Parsing file %r as HTML' % self.href) if err.args and err.args[0].startswith('Excessive depth'): soupparser = soupparser import lxml.html data = soupparser.fromstring(data) else: data = html.fromstring(data) data.attrib.pop('xmlns', None) for elem in data.iter(tag = etree.Comment): if elem.text: elem.text = elem.text.strip('-') continue data = etree.tostring(data, encoding = unicode) try: data = etree.fromstring(data, parser = parser) except etree.XMLSyntaxError: data = etree.fromstring(data, parser = RECOVER_PARSER) except: None<EXCEPTION MATCH>etree.XMLSyntaxError None<EXCEPTION MATCH>etree.XMLSyntaxError None<EXCEPTION MATCH>etree.XMLSyntaxError return data data = first_pass(data) if not namespace(data.tag): self.oeb.log.warn('Forcing', self.href, 'into XHTML namespace') data.attrib['xmlns'] = XHTML_NS data = etree.tostring(data, encoding = unicode) try: data = etree.fromstring(data, parser = parser) data = data.replace(':=', '=').replace(':>', '>') data = data.replace('<http:/>', '') try: data = etree.fromstring(data, parser = parser) except etree.XMLSyntaxError: self.oeb.logger.warn('Stripping comments and meta tags from %s' % self.href) data = re.compile('<!--.*?-->', re.DOTALL).sub('', data) data = re.sub('<meta\\s+[^>]+?>', '', data) data = data.replace("<?xml version='1.0' encoding='utf-8'?><o:p></o:p>", '') data = data.replace("<?xml version='1.0' encoding='utf-8'??>", '') data = etree.fromstring(data, parser = RECOVER_PARSER) except: None<EXCEPTION MATCH>etree.XMLSyntaxError elif namespace(data.tag) != XHTML_NS: ns = namespace(data.tag) attrib = dict(data.attrib) nroot = etree.Element(XHTML('html'), nsmap = { None: XHTML_NS }, attrib = attrib) for elem in data.iterdescendants(): if isinstance(elem.tag, basestring) and namespace(elem.tag) == ns: elem.tag = XHTML(barename(elem.tag)) continue for elem in data: nroot.append(elem) data = nroot data = merge_multiple_html_heads_and_bodies(data, self.oeb.logger) head = xpath(data, '/h:html/h:head') head = None if head else None if head is None: self.oeb.logger.warn('File %r missing <head/> element' % self.href) head = etree.Element(XHTML('head')) data.insert(0, head) title = etree.SubElement(head, XHTML('title')) title.text = self.oeb.translate(__('Unknown')) elif not xpath(data, '/h:html/h:head/h:title'): self.oeb.logger.warn('File %r missing <title/> element' % self.href) title = etree.SubElement(head, XHTML('title')) title.text = self.oeb.translate(__('Unknown')) for meta in self.META_XP(data): meta.getparent().remove(meta) etree.SubElement(head, XHTML('meta'), attrib = { 'http-equiv': 'Content-Type', 'content': '%s; charset=utf-8' % XHTML_NS }) if not xpath(data, '/h:html/h:body'): body = xpath(data, '//h:body') if body: body = body[0] body.getparent().remove(body) data.append(body) else: self.oeb.logger.warn('File %r missing <body/> element' % self.href) etree.SubElement(data, XHTML('body')) r = _[1] for x in r: x.tag = XHTML('span') body = xpath(data, '/h:html/h:body')[0] for key in list(body.attrib.keys()): if key == 'lang' or key.endswith('}lang'): body.attrib.pop(key) continue [] def remove_elem(a): p = a.getparent() idx = p.index(a) - 1 p.remove(a) if a.tail: if idx <= 0: if p.text is None: p.text = '' p.text += a.tail elif p[idx].tail is None: p[idx].tail = '' p[idx].tail += a.tail for a in xpath(data, '//h:a[@href]|//h:i|//h:b'): if a.get('id', None) is None and a.get('name', None) is None and len(a) == 0 and not (a.text): remove_elem(a) continue [] return data def _parse_txt(self, data): if '<html>' in data: return self._parse_xhtml(data) self.oeb.log.debug('Converting', self.href, '...') convert_markdown = convert_markdown import calibre.ebooks.txt.processor title = self.oeb.metadata.title if title: title = unicode(title[0]) else: title = _('Unknown') return self._parse_xhtml(convert_markdown(data, title = title)) def _parse_css(self, data): def get_style_rules_from_import(import_rule): ans = [] if not import_rule.styleSheet: return ans rules = import_rule.styleSheet.cssRules for rule in rules: if rule.type == CSSRule.IMPORT_RULE: ans.extend(get_style_rules_from_import(rule)) continue import_rule.styleSheet if rule.type in (CSSRule.FONT_FACE_RULE, CSSRule.STYLE_RULE): ans.append(rule) continue return ans self.oeb.log.debug('Parsing', self.href, '...') data = self.oeb.decode(data) data = self.oeb.css_preprocessor(data, add_namespace = True) if not self.override_css_fetch: pass parser = CSSParser(loglevel = logging.WARNING, fetcher = self._fetch_css, log = _css_logger) data = parser.parseString(data, href = self.href) data.namespaces['h'] = XHTML_NS import_rules = list(data.cssRules.rulesOfType(CSSRule.IMPORT_RULE)) rules_to_append = [] insert_index = None for r in data.cssRules.rulesOfType(CSSRule.STYLE_RULE): insert_index = data.cssRules.index(r) for rule in import_rules: rules_to_append.extend(get_style_rules_from_import(rule)) for r in reversed(rules_to_append): data.insertRule(r, index = insert_index) for rule in import_rules: data.deleteRule(rule) return data def _fetch_css(self, path): hrefs = self.oeb.manifest.hrefs if path not in hrefs: self.oeb.logger.warn('CSS import of missing file %r' % path) return (None, None) item = hrefs[path] if item.media_type not in OEB_STYLES: self.oeb.logger.warn('CSS import of non-CSS file %r' % path) return (None, None) data = item.data.cssText return ('utf-8', data) def data(self): doc = "Provides MIME type sensitive access to the manifest\n entry's associated content.\n\n - XHTML, HTML, and variant content is parsed as necessary to\n convert and and return as an lxml.etree element in the XHTML\n namespace.\n - XML content is parsed and returned as an lxml.etree element.\n - CSS and CSS-variant content is parsed and returned as a cssutils\n CSS DOM stylesheet.\n - All other content is returned as a :class:`str` object with no\n special parsing.\n " def fget(self): data = self._data if data is None: if self._loader is None: return None data = self._loader(getattr(self, 'html_input_href', self.href)) if not isinstance(data, basestring): pass elif self.media_type.lower() in OEB_DOCS: data = self._parse_xhtml(data) elif self.media_type.lower()[-4:] in ('+xml', '/xml'): data = self._parse_xml(data) elif self.media_type.lower() in OEB_STYLES: data = self._parse_css(data) elif 'text' in self.media_type.lower(): self.oeb.log.warn('%s contains data in TXT format' % self.href, 'converting to HTML') data = self._parse_txt(data) self.media_type = XHTML_MIME self._data = data return data def fset(self, value): self._data = value def fdel(self): self._data = None return property(fget, fset, fdel, doc = doc) data = dynamic_property(data) def unload_data_from_memory(self, memory = None): if isinstance(self._data, (str, bytes)): self._data = None def __str__(self): data = self.data if isinstance(data, etree._Element): ans = xml2str(data, pretty_print = self.oeb.pretty_print) if self.media_type in OEB_DOCS: ans = re.sub('<(div|a|span)([^>]*)/>', '<\\1\\2></\\1>', ans) return ans if isinstance(data, unicode): return data.encode('utf-8') if hasattr(data, 'cssText'): data = data.cssText return data return str(data) def __unicode__(self): data = self.data if isinstance(data, etree._Element): return xml2unicode(data, pretty_print = self.oeb.pretty_print) if isinstance(data, unicode): return data if hasattr(data, 'cssText'): return data.cssText return unicode(data) def __eq__(self, other): return id(self) == id(other) def __ne__(self, other): return not self.__eq__(other) def __cmp__(self, other): result = cmp(self.spine_position, other.spine_position) if result != 0: return result smatch = self.NUM_RE.search(self.href) sref = result != 0 if smatch else self.href snum = None if smatch else 0 skey = (sref, snum, self.id) omatch = self.NUM_RE.search(other.href) oref = None if omatch else other.href onum = None if omatch else 0 okey = (oref, onum, other.id) return cmp(skey, okey) def relhref(self, href): if urlparse(href).scheme: return href if '/' not in self.href: return href base = os.path.dirname(self.href).split('/') (target, frag) = urldefrag(href) target = target.split('/') for index in xrange(min(len(base), len(target))): if base[index] != target[index]: break continue '/' not in self.href else: index += 1 relhref = [ '..'] * (len(base) - index) + target[index:] relhref = '/'.join(relhref) if frag: relhref = '#'.join((relhref, frag)) return relhref def abshref(self, href): purl = urlparse(href) scheme = purl.scheme if scheme and scheme != 'file': return href purl = list(purl) purl[0] = '' href = urlunparse(purl) (path, frag) = urldefrag(href) if not path: if frag: return '#'.join((self.href, frag)) return self.href path if '/' not in self.href: return href dirname = os.path.dirname(self.href) href = os.path.join(dirname, href) href = os.path.normpath(href).replace('\\', '/') return href def __init__(self, oeb): self.oeb = oeb self.items = set() self.ids = { } self.hrefs = { } def add(self, id, href, media_type, fallback = None, loader = None, data = None): item = self.Item(self.oeb, id, href, media_type, fallback, loader, data) self.items.add(item) self.ids[item.id] = item self.hrefs[item.href] = item return item def remove(self, item): if item in self.ids: item = self.ids[item] del self.ids[item.id] if item.href in self.hrefs: del self.hrefs[item.href] self.items.remove(item) if item in self.oeb.spine: self.oeb.spine.remove(item) def generate(self, id = None, href = None): if id is not None: base = id index = 1 while id in self.ids: id = base + str(index) index += 1 if href is not None: href = urlnormalize(href) (base, ext) = os.path.splitext(href) index = 1 lhrefs = []([ x.lower() for x in self.hrefs ]) while href.lower() in lhrefs: href = base + str(index) + ext index += 1 continue [] return (id, href) def __iter__(self): for item in self.items: yield item def __len__(self): return len(self.items) def values(self): return list(self.items) def __contains__(self, item): return item in self.items def to_opf1(self, parent = None): elem = element(parent, 'manifest') for item in self.items: media_type = item.media_type if media_type in OEB_DOCS: media_type = OEB_DOC_MIME elif media_type in OEB_STYLES: media_type = OEB_CSS_MIME attrib = { 'id': item.id, 'href': urlunquote(item.href), 'media-type': media_type } if item.fallback: attrib['fallback'] = item.fallback element(elem, 'item', attrib = attrib) return elem def to_opf2(self, parent = None): def sort(x, y): return cmp(x.href, y.href) elem = element(parent, OPF('manifest')) for item in sorted(self.items, cmp = sort): media_type = item.media_type if media_type in OEB_DOCS: media_type = XHTML_MIME elif media_type in OEB_STYLES: media_type = CSS_MIME attrib = { 'id': item.id, 'href': urlunquote(item.href), 'media-type': media_type } if item.fallback: attrib['fallback'] = item.fallback element(elem, OPF('item'), attrib = attrib) return elem class Spine(object): def __init__(self, oeb): self.oeb = oeb self.items = [] def _linear(self, linear): if isinstance(linear, basestring): linear = linear.lower() if linear is None or linear in ('yes', 'true'): linear = True elif linear in ('no', 'false'): linear = False return linear def add(self, item, linear = None): item.linear = self._linear(linear) item.spine_position = len(self.items) self.items.append(item) return item def insert(self, index, item, linear): item.linear = self._linear(linear) item.spine_position = index self.items.insert(index, item) for i in xrange(index, len(self.items)): self.items[i].spine_position = i return item def remove(self, item): index = item.spine_position self.items.pop(index) for i in xrange(index, len(self.items)): self.items[i].spine_position = i item.spine_position = None def index(self, item): for i, x in enumerate(self): if item == x: return i return -1 def __iter__(self): for item in self.items: yield item def __getitem__(self, index): return self.items[index] def __len__(self): return len(self.items) def __contains__(self, item): return item in self.items def to_opf1(self, parent = None): elem = element(parent, 'spine') for item in self.items: if item.linear: element(elem, 'itemref', attrib = { 'idref': item.id }) continue return elem def to_opf2(self, parent = None): elem = element(parent, OPF('spine')) for item in self.items: attrib = { 'idref': item.id } if not item.linear: attrib['linear'] = 'no' element(elem, OPF('itemref'), attrib = attrib) return elem class Guide(object): class Reference(object): _TYPES_TITLES = [ ('cover', __('Cover')), ('title-page', __('Title Page')), ('toc', __('Table of Contents')), ('index', __('Index')), ('glossary', __('Glossary')), ('acknowledgements', __('Acknowledgements')), ('bibliography', __('Bibliography')), ('colophon', __('Colophon')), ('copyright-page', __('Copyright')), ('dedication', __('Dedication')), ('epigraph', __('Epigraph')), ('foreword', __('Foreword')), ('loi', __('List of Illustrations')), ('lot', __('List of Tables')), ('notes', __('Notes')), ('preface', __('Preface')), ('text', __('Main Text'))] TYPES = set((lambda .0: for t, _ in .0: t)(_TYPES_TITLES)) TITLES = dict(_TYPES_TITLES) ORDER = dict((lambda .0: for t, _ in .0: (t, i))(enumerate(_TYPES_TITLES))) def __init__(self, oeb, type, title, href): self.oeb = oeb if type.lower() in self.TYPES: type = type.lower() elif type not in self.TYPES and not type.startswith('other.'): type = 'other.' + type if not title and type in self.TITLES: title = oeb.translate(self.TITLES[type]) self.type = type self.title = title self.href = urlnormalize(href) def __repr__(self): return 'Reference(type=%r, title=%r, href=%r)' % (self.type, self.title, self.href) def _order(self): def fget(self): return self.ORDER.get(self.type, self.type) return property(fget = fget) _order = dynamic_property(_order) def __cmp__(self, other): if not isinstance(other, Guide.Reference): return NotImplemented return cmp(self._order, other._order) def item(self): doc = 'The manifest item associated with this reference.' def fget(self): path = urldefrag(self.href)[0] hrefs = self.oeb.manifest.hrefs return hrefs.get(path, None) return property(fget = fget, doc = doc) item = dynamic_property(item) def __init__(self, oeb): self.oeb = oeb self.refs = { } def add(self, type, title, href): ref = self.Reference(self.oeb, type, title, href) self.refs[type] = ref return ref def remove(self, type): return self.refs.pop(type, None) def iterkeys(self): for type in self.refs: yield type __iter__ = iterkeys def values(self): return sorted(self.refs.values()) def items(self): for type, ref in self.refs.items(): yield (type, ref) def __getitem__(self, key): return self.refs[key] def __delitem__(self, key): del self.refs[key] def __contains__(self, key): return key in self.refs def __len__(self): return len(self.refs) def to_opf1(self, parent = None): elem = element(parent, 'guide') for ref in self.refs.values(): attrib = { 'type': ref.type, 'href': urlunquote(ref.href) } if ref.title: attrib['title'] = ref.title element(elem, 'reference', attrib = attrib) return elem def to_opf2(self, parent = None): elem = element(parent, OPF('guide')) for ref in self.refs.values(): attrib = { 'type': ref.type, 'href': urlunquote(ref.href) } if ref.title: attrib['title'] = ref.title element(elem, OPF('reference'), attrib = attrib) return elem class TOC(object): def __init__(self, title = None, href = None, klass = None, id = None, play_order = None, author = None, description = None): self.title = title self.href = None if href else href self.klass = klass self.id = id self.nodes = [] self.play_order = 0 if play_order is None: play_order = self.next_play_order() self.play_order = play_order self.author = author self.description = description def add(self, title, href, klass = None, id = None, play_order = 0, author = None, description = None): node = TOC(title, href, klass, id, play_order, author, description) self.nodes.append(node) return node def remove(self, node): for child in self.nodes: if child is node: self.nodes.remove(child) return True if child.remove(node): return True return False def iter(self): yield self for child in self.nodes: for node in child.iter(): yield node def count(self): return len(list(self.iter())) - 1 def next_play_order(self): entries = [ x.play_order for x in self.iter() ] base = [] if entries else 0 return base + 1 def has_href(self, href): for x in self.iter(): if x.href == href: return True return False def has_text(self, text): for x in self.iter(): if x.title and x.title.lower() == text.lower(): return True return False def iterdescendants(self): for child in self.nodes: for node in child.iter(): yield node def __iter__(self): for node in self.nodes: yield node def __getitem__(self, index): return self.nodes[index] def autolayer(self): prev = None for node in list(self.nodes): if prev and urldefrag(prev.href)[0] == urldefrag(node.href)[0]: self.nodes.remove(node) prev.nodes.append(node) continue prev = node def depth(self): try: return max((lambda .0: for node in .0: node.depth())(self.nodes)) + 1 except ValueError: return 1 def __str__(self): return 'TOC: %s --> %s' % (self.title, self.href) def to_opf1(self, tour): for node in self.nodes: element(tour, 'site', attrib = { 'title': node.title, 'href': urlunquote(node.href) }) node.to_opf1(tour) return tour def to_ncx(self, parent = None): if parent is None: parent = etree.Element(NCX('navMap')) for node in self.nodes: if not node.id: pass id = unicode(uuid.uuid4()) po = node.play_order if po == 0: po = 1 attrib = { 'id': id, 'playOrder': str(po) } if node.klass: attrib['class'] = node.klass point = element(parent, NCX('navPoint'), attrib = attrib) label = etree.SubElement(point, NCX('navLabel')) title = node.title if title: title = re.sub('\\s+', ' ', title) element(label, NCX('text')).text = title element(point, NCX('content'), src = urlunquote(node.href)) node.to_ncx(point) return parent def rationalize_play_orders(self): def po_node(n): for x in self.iter(): if x is n: return None if x.play_order == n.play_order: return x def href_node(n): for x in self.iter(): if x is n: return None if x.href == n.href: return x for x in self.iter(): y = po_node(x) if y is not None: if x.href != y.href: x.play_order = getattr(href_node(x), 'play_order', self.next_play_order()) y = href_node(x) if y is not None: x.play_order = y.play_order continue class PageList(object): class Page(object): TYPES = set([ 'front', 'normal', 'special']) def __init__(self, name, href, type = 'normal', klass = None, id = None): self.name = unicode(name) self.href = urlnormalize(href) self.type = None if type in self.TYPES else 'normal' self.id = id self.klass = klass def __init__(self): self.pages = [] def add(self, name, href, type = 'normal', klass = None, id = None): page = self.Page(name, href, type, klass, id) self.pages.append(page) return page def __len__(self): return len(self.pages) def __iter__(self): for page in self.pages: yield page def __getitem__(self, index): return self.pages[index] def pop(self, index = -1): return self.pages.pop(index) def remove(self, page): return self.pages.remove(page) def to_ncx(self, parent = None): plist = element(parent, NCX('pageList'), id = str(uuid.uuid4())) values = dict((lambda .0: for t in .0: (t, count(1)))(('front', 'normal', 'special'))) for page in self.pages: if not page.id: pass id = unicode(uuid.uuid4()) type = page.type value = str(values[type].next()) attrib = { 'id': id, 'value': value, 'type': type, 'playOrder': '0' } if page.klass: attrib['class'] = page.klass ptarget = element(plist, NCX('pageTarget'), attrib = attrib) label = element(ptarget, NCX('navLabel')) element(label, NCX('text')).text = page.name element(ptarget, NCX('content'), src = page.href) return plist def to_page_map(self): pmap = etree.Element(OPF('page-map'), nsmap = { None: OPF2_NS }) for page in self.pages: element(pmap, OPF('page'), name = page.name, href = page.href) return pmap class OEBBook(object): COVER_SVG_XP = XPath('h:body//svg:svg[position() = 1]') COVER_OBJECT_XP = XPath('h:body//h:object[@data][position() = 1]') def __init__(self, logger, html_preprocessor, css_preprocessor = CSSPreProcessor(), encoding = 'utf-8', pretty_print = False, input_encoding = 'utf-8'): _css_log_handler.log = logger self.encoding = encoding self.input_encoding = input_encoding self.html_preprocessor = html_preprocessor self.css_preprocessor = css_preprocessor self.pretty_print = pretty_print self.logger = self.log = logger self.version = '2.0' self.container = NullContainer(self.log) self.metadata = Metadata(self) self.uid = None self.manifest = Manifest(self) self.spine = Spine(self) self.guide = Guide(self) self.toc = TOC() self.pages = PageList() self.auto_generated_toc = True def generate(cls, opts): encoding = opts.encoding pretty_print = opts.pretty_print return cls(encoding = encoding, pretty_print = pretty_print) generate = classmethod(generate) def translate(self, text): lang = str(self.metadata.language[0]) lang = lang.split('-', 1)[0].lower() return translate(lang, text) def decode(self, data): def fix_data(d): return d.replace('\r\n', '\n').replace('\r', '\n') if isinstance(data, unicode): return fix_data(data) bom_enc = None if data[:4] in ('\x00\x00\xfe\xff', '\xff\xfe\x00\x00'): bom_enc = { '\x00\x00\xfe\xff': 'utf-32-be', '\xff\xfe\x00\x00': 'utf-32-le' }[data[:4]] data = data[4:] elif data[:2] in ('\xff\xfe', '\xfe\xff'): bom_enc = { '\xff\xfe': 'utf-16-le', '\xfe\xff': 'utf-16-be' }[data[:2]] data = data[2:] elif data[:3] == '\xef\xbb\xbf': bom_enc = 'utf-8' data = data[3:] if bom_enc is not None: try: return fix_data(data.decode(bom_enc)) except UnicodeDecodeError: pass except: None<EXCEPTION MATCH>UnicodeDecodeError None<EXCEPTION MATCH>UnicodeDecodeError if self.input_encoding is not None: try: return fix_data(data.decode(self.input_encoding, 'replace')) except UnicodeDecodeError: pass except: None<EXCEPTION MATCH>UnicodeDecodeError None<EXCEPTION MATCH>UnicodeDecodeError try: return fix_data(data.decode('utf-8')) except UnicodeDecodeError: pass (data, _) = xml_to_unicode(data) return fix_data(data) def to_opf1(self): package = etree.Element('package', attrib = { 'unique-identifier': self.uid.id }) self.metadata.to_opf1(package) self.manifest.to_opf1(package) self.spine.to_opf1(package) tours = element(package, 'tours') tour = element(tours, 'tour', attrib = { 'id': 'chaptertour', 'title': 'Chapter Tour' }) self.toc.to_opf1(tour) self.guide.to_opf1(package) return { OPF_MIME: ('content.opf', package) } def _update_playorder(self, ncx): hrefs = set(map(urlnormalize, xpath(ncx, '//ncx:content/@src'))) playorder = { } next = 1 selector = XPath('h:body//*[@id or @name]') for item in self.spine: base = item.href if base in hrefs: playorder[base] = next next += 1 for elem in selector(item.data): added = False for attr in ('id', 'name'): id = elem.get(attr) if not id: continue href = '#'.join([ base, id]) if href in hrefs: playorder[href] = next added = True continue if added: next += 1 continue selector = XPath('ncx:content/@src') for i, elem in enumerate(xpath(ncx, '//*[@playOrder and ./ncx:content[@src]]')): href = urlnormalize(selector(elem)[0]) order = playorder.get(href, i) elem.attrib['playOrder'] = str(order) def _to_ncx(self): lang = unicode(self.metadata.language[0]) ncx = etree.Element(NCX('ncx'), attrib = { 'version': '2005-1', XML('lang'): lang }, nsmap = { None: NCX_NS }) head = etree.SubElement(ncx, NCX('head')) etree.SubElement(head, NCX('meta'), name = 'dtb:uid', content = unicode(self.uid)) etree.SubElement(head, NCX('meta'), name = 'dtb:depth', content = str(self.toc.depth())) generator = ''.join([ 'calibre (', calibre.__version__, ')']) etree.SubElement(head, NCX('meta'), name = 'dtb:generator', content = generator) etree.SubElement(head, NCX('meta'), name = 'dtb:totalPageCount', content = str(len(self.pages))) maxpnum = etree.SubElement(head, NCX('meta'), name = 'dtb:maxPageNumber', content = '0') title = etree.SubElement(ncx, NCX('docTitle')) text = etree.SubElement(title, NCX('text')) text.text = unicode(self.metadata.title[0]) navmap = etree.SubElement(ncx, NCX('navMap')) self.toc.to_ncx(navmap) if len(self.pages) > 0: plist = self.pages.to_ncx(ncx) value = max((lambda .0: for x in .0: int(x))(xpath(plist, '//@value'))) maxpnum.attrib['content'] = str(value) self._update_playorder(ncx) return ncx def to_opf2(self, page_map = False): results = { } package = etree.Element(OPF('package'), attrib = { 'version': '2.0', 'unique-identifier': self.uid.id }, nsmap = { None: OPF2_NS }) self.metadata.to_opf2(package) manifest = self.manifest.to_opf2(package) spine = self.spine.to_opf2(package) self.guide.to_opf2(package) results[OPF_MIME] = ('content.opf', package) (id, href) = self.manifest.generate('ncx', 'toc.ncx') etree.SubElement(manifest, OPF('item'), id = id, href = href, attrib = { 'media-type': NCX_MIME }) spine.attrib['toc'] = id results[NCX_MIME] = (href, self._to_ncx()) if page_map and len(self.pages) > 0: (id, href) = self.manifest.generate('page-map', 'page-map.xml') etree.SubElement(manifest, OPF('item'), id = id, href = href, attrib = { 'media-type': PAGE_MAP_MIME }) spine.attrib['page-map'] = id results[PAGE_MAP_MIME] = (href, self.pages.to_page_map()) return results