home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.6)
-
- from util import autoassign, strip_html_and_tags, replace_newlines, Storage
- from util.lrucache import lru_cache
- from util.auxencodings import fuzzydecode
- from email.utils import parseaddr, parsedate
- from email.header import decode_header
- from datetime import datetime
- import email.message as email
- from common import pref
- import sys
- import traceback
- import logging
- log = logging.getLogger('emailobj')
- replace_newlines = lru_cache(100)(replace_newlines)
- UnicodeErrors = (UnicodeEncodeError, UnicodeDecodeError)
-
- def unicode_hdr(hdr, fallback = None):
- more_encs = None if fallback else []
-
- try:
- return (u''.join,)((lambda .0: for hdr, encoding in .0:
- None if encoding else hdr)(decode_header(hdr)))
- except UnicodeErrors:
- return fuzzydecode(hdr, more_encs + [
- 'utf-8'])
- except Exception:
- log.warning('decoding an email header failed: %r', hdr)
- return fuzzydecode(hdr, more_encs + [
- 'utf-8'])
- else:
- return None
-
-
-
- def find_part(email, types):
- if not email.is_multipart():
- return email
- results = (dict,)((lambda .0: for part in .0:
- if part.get_content_type() in types:
- (part.get_content_type(), part)continue)(email))
- print results
- for ty in types:
- if ty in results:
- return results[ty]
-
-
-
- def find_attachments(email):
- attachments = { }
- for part in email:
- if 'Content-Disposition' in part.keys() and 'attachment' in part['Content-Disposition']:
- attachments[part.get_filename()] = Storage(data = part.get_payload(decode = True), content_type = part.get_content_type())
- continue
-
- return attachments
-
-
- def parse_content(part):
- charset = part.get_content_charset()
- content_type = part.get_content_type()
- payload = part.get_payload(decode = True)
- html = content_type == 'text/html'
- if payload is None:
- payload = ''
-
-
- try:
- if not charset:
- pass
- content = payload.decode('ascii')
- except (UnicodeDecodeError, LookupError):
- content = payload.decode('utf-8', 'replace')
-
- if html:
- content = strip_html_and_tags(content, [
- 'style'])
- else:
- content = content
- return content
-
-
- class Email(object):
-
- def __init__(self, id = None, fromname = None, fromemail = None, sendtime = None, subject = None, content = None, attachments = None, labels = None):
- autoassign(self, locals())
-
-
- def update(self, email):
- if isinstance(email, dict):
- attrs = email
- else:
- attrs = vars(email)
- autoassign(self, dict((lambda .0: for k, v in .0:
- if v is not None:
- (k, v)continue)(attrs.iteritems())))
-
-
- def fromEmailMessage(cls, id, email, sendtime_if_error = None):
- encoding = email.get_content_charset()
- (realname, email_address) = parseaddr(email['From'])
- realname = unicode_hdr(realname, encoding)
- _email = email
-
- try:
- datetuple = parsedate(email['Date'])
- sendtime = datetime(*datetuple[:7])
- except Exception:
- traceback.print_exc()
- print >>sys.stderr, 'using %s for "sendtime" instead' % sendtime_if_error
- sendtime = sendtime_if_error
-
-
- try:
- attachments = find_attachments(email)
- except:
- attachments = { }
-
- part = find_part(email, ('text/plain', 'text/html'))
- if part is None:
- content = u''
- else:
- content = parse_content(part)
- content = replace_newlines(content)
- prev_length = pref('email.preview_length', 200)
- if len(content) > prev_length:
- content = content[:prev_length] + '...'
- else:
- content
- email = cls(id, realname, email_address, sendtime, email['Subject'], content = content, attachments = attachments)
- return email
-
- fromEmailMessage = classmethod(fromEmailMessage)
-
- def __eq__(self, other):
- if not isinstance(other, Email):
- return False
- return self.id == other.id
-
-
- def __cmp__(self, other):
-
- try:
- return -cmp(self.sendtime, other.sendtime)
- except TypeError:
- return -1
-
-
-
- def domain(self):
- f = self.fromemail
- if f is not None:
- return f[f.find('@') + 1:]
-
- domain = property(domain)
-
- def __unicode__(self):
- if not self.subject:
- pass
- lines = [
- unicode('Subject: %s' % '<none>').encode('utf-8')]
- if self.fromname:
- _fromstr = self.fromname
- if self.fromemail:
- _fromstr += ' <%s>' % self.fromemail
-
- elif self.fromemail:
- _fromstr = self.fromemail
- else:
- _fromstr = '<unknown>'
- lines.append('From: %s' % _fromstr)
- if self.sendtime:
- lines.append('Sent at %s' % self.sendtime)
-
- if self.content:
- lines.append('')
- lines.append(unicode(self.content))
-
- self.lines = lines
- return u''.join(lines)
-
-
- __str__ = lambda self: self.__unicode__().decode('ascii', 'ignore')
-
- def __repr__(self):
-
- try:
- if not self.fromemail:
- pass
- return u'<Email (Subject: %r, From: %r)>' % (self.subject[:30], self.fromname)
- except Exception:
- return u'<Email from %r>' % self.fromemail
-
-
-
-
- class DecodedEmail(email.message.Message):
-
- def __init__(self, myemail):
- self.email = myemail
-
-
- def __getattr__(self, attr, val = sentinel):
- result = getattr(self.email, attr, val)
- if result is sentinel:
- raise AttributeError
- result is sentinel
- return result
-
-
- def __getitem__(self, header):
- s = email.message.Message.__getitem__(self, header)
- if header == 'Content':
- return s
- return unicode_hdr(s, self.get_content_charset())
-
- __iter__ = email.message.Message.walk
-
-