home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) from __future__ import with_statement import struct import os import time import sys import shutil import binascii import cStringIO from contextlib import closing from tempfile import SpooledTemporaryFile from calibre import sanitize_file_name from calibre.constants import filesystem_encoding from calibre.ebooks.chardet import detect try: import zlib crc32 = zlib.crc32 except ImportError: zlib = None crc32 = binascii.crc32 __all__ = [ 'BadZipfile', 'error', 'ZIP_STORED', 'ZIP_DEFLATED', 'is_zipfile', 'ZipInfo', 'ZipFile', 'PyZipFile', 'LargeZipFile'] class BadZipfile(Exception): pass class LargeZipFile(Exception): pass error = BadZipfile ZIP64_LIMIT = (1 << 31) - 1 ZIP_FILECOUNT_LIMIT = 1 << 16 ZIP_MAX_COMMENT = (1 << 16) - 1 ZIP_STORED = 0 ZIP_DEFLATED = 8 structEndArchive = '<4s4H2LH' stringEndArchive = 'PK\x05\x06' sizeEndCentDir = struct.calcsize(structEndArchive) _ECD_SIGNATURE = 0 _ECD_DISK_NUMBER = 1 _ECD_DISK_START = 2 _ECD_ENTRIES_THIS_DISK = 3 _ECD_ENTRIES_TOTAL = 4 _ECD_SIZE = 5 _ECD_OFFSET = 6 _ECD_COMMENT_SIZE = 7 _ECD_COMMENT = 8 _ECD_LOCATION = 9 structCentralDir = '<4s4B4HL2L5H2L' stringCentralDir = 'PK\x01\x02' sizeCentralDir = struct.calcsize(structCentralDir) _CD_SIGNATURE = 0 _CD_CREATE_VERSION = 1 _CD_CREATE_SYSTEM = 2 _CD_EXTRACT_VERSION = 3 _CD_EXTRACT_SYSTEM = 4 _CD_FLAG_BITS = 5 _CD_COMPRESS_TYPE = 6 _CD_TIME = 7 _CD_DATE = 8 _CD_CRC = 9 _CD_COMPRESSED_SIZE = 10 _CD_UNCOMPRESSED_SIZE = 11 _CD_FILENAME_LENGTH = 12 _CD_EXTRA_FIELD_LENGTH = 13 _CD_COMMENT_LENGTH = 14 _CD_DISK_NUMBER_START = 15 _CD_INTERNAL_FILE_ATTRIBUTES = 16 _CD_EXTERNAL_FILE_ATTRIBUTES = 17 _CD_LOCAL_HEADER_OFFSET = 18 structFileHeader = '<4s2B4HL2L2H' stringFileHeader = 'PK\x03\x04' sizeFileHeader = struct.calcsize(structFileHeader) _FH_SIGNATURE = 0 _FH_EXTRACT_VERSION = 1 _FH_EXTRACT_SYSTEM = 2 _FH_GENERAL_PURPOSE_FLAG_BITS = 3 _FH_COMPRESSION_METHOD = 4 _FH_LAST_MOD_TIME = 5 _FH_LAST_MOD_DATE = 6 _FH_CRC = 7 _FH_COMPRESSED_SIZE = 8 _FH_UNCOMPRESSED_SIZE = 9 _FH_FILENAME_LENGTH = 10 _FH_EXTRA_FIELD_LENGTH = 11 structEndArchive64Locator = '<4sLQL' stringEndArchive64Locator = 'PK\x06\x07' sizeEndCentDir64Locator = struct.calcsize(structEndArchive64Locator) structEndArchive64 = '<4sQ2H2L4Q' stringEndArchive64 = 'PK\x06\x06' sizeEndCentDir64 = struct.calcsize(structEndArchive64) _CD64_SIGNATURE = 0 _CD64_DIRECTORY_RECSIZE = 1 _CD64_CREATE_VERSION = 2 _CD64_EXTRACT_VERSION = 3 _CD64_DISK_NUMBER = 4 _CD64_DISK_NUMBER_START = 5 _CD64_NUMBER_ENTRIES_THIS_DISK = 6 _CD64_NUMBER_ENTRIES_TOTAL = 7 _CD64_DIRECTORY_SIZE = 8 _CD64_OFFSET_START_CENTDIR = 9 def decode_arcname(name): if not isinstance(name, unicode): try: name = name.decode('utf-8') res = detect(name) encoding = res['encoding'] try: name = name.decode(encoding) name = name.decode('utf-8', 'replace') isinstance(name, unicode) return name def is_zipfile(filename): try: fpin = open(filename, 'rb') endrec = _EndRecData(fpin) fpin.close() if endrec: return True except IOError: pass return False def _EndRecData64(fpin, offset, endrec): fpin.seek(offset - sizeEndCentDir64Locator, 2) data = fpin.read(sizeEndCentDir64Locator) (sig, diskno, reloff, disks) = struct.unpack(structEndArchive64Locator, data) if sig != stringEndArchive64Locator: return endrec if diskno != 0 or disks != 1: raise BadZipfile('zipfiles that span multiple disks are not supported') disks != 1 fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2) data = fpin.read(sizeEndCentDir64) (sig, sz, create_version, read_version, disk_num, disk_dir, dircount, dircount2, dirsize, diroffset) = struct.unpack(structEndArchive64, data) if sig != stringEndArchive64: return endrec endrec[_ECD_DISK_NUMBER] = disk_num endrec[_ECD_DISK_START] = disk_dir endrec[_ECD_ENTRIES_THIS_DISK] = dircount endrec[_ECD_ENTRIES_TOTAL] = dircount2 endrec[_ECD_SIZE] = dirsize endrec[_ECD_OFFSET] = diroffset return endrec def _EndRecData(fpin): fpin.seek(0, 2) filesize = fpin.tell() fpin.seek(-sizeEndCentDir, 2) data = fpin.read() if data[0:4] == stringEndArchive and data[-2:] == '\x00\x00': endrec = struct.unpack(structEndArchive, data) endrec = list(endrec) endrec.append('') endrec.append(filesize - sizeEndCentDir) if endrec[_ECD_OFFSET] == 0xFFFFFFFFL: return _EndRecData64(fpin, -sizeEndCentDir, endrec) return endrec maxCommentStart = max(filesize - 65536 - sizeEndCentDir, 0) fpin.seek(maxCommentStart, 0) data = fpin.read() start = data.rfind(stringEndArchive) class ZipInfo(object): __slots__ = ('orig_filename', 'filename', 'date_time', 'compress_type', 'comment', 'extra', 'create_system', 'create_version', 'extract_version', 'reserved', 'flag_bits', 'volume', 'internal_attr', 'external_attr', 'header_offset', 'CRC', 'compress_size', 'file_size', '_raw_time', 'file_offset') def __init__(self, filename = 'NoName', date_time = (1980, 1, 1, 0, 0, 0)): self.orig_filename = filename null_byte = filename.find(chr(0)) if null_byte >= 0: filename = filename[0:null_byte] if os.sep != '/' and os.sep in filename: filename = filename.replace(os.sep, '/') self.filename = filename self.date_time = date_time self.compress_type = ZIP_STORED self.comment = '' self.extra = '' if sys.platform == 'win32': self.create_system = 0 else: self.create_system = 3 self.create_version = 20 self.extract_version = 20 self.reserved = 0 self.flag_bits = 0 self.volume = 0 self.internal_attr = 0 self.external_attr = 0 self.file_offset = 0 def FileHeader(self): dt = self.date_time dosdate = dt[0] - 1980 << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | dt[5] // 2 if self.flag_bits & 8: CRC = compress_size = file_size = 0 else: CRC = self.CRC compress_size = self.compress_size file_size = self.file_size extra = self.extra if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT: fmt = '<HHQQ' extra = extra + struct.pack(fmt, 1, struct.calcsize(fmt) - 4, file_size, compress_size) file_size = 0xFFFFFFFFL compress_size = 0xFFFFFFFFL self.extract_version = max(45, self.extract_version) self.create_version = max(45, self.extract_version) (filename, flag_bits) = self._encodeFilenameFlags() header = struct.pack(structFileHeader, stringFileHeader, self.extract_version, self.reserved, flag_bits, self.compress_type, dostime, dosdate, CRC, compress_size, file_size, len(filename), len(extra)) return header + filename + extra def _encodeFilenameFlags(self): if isinstance(self.filename, unicode): return (self.filename.encode('utf-8'), self.flag_bits | 2048) return (self.filename, self.flag_bits) def _decodeFilename(self): if self.flag_bits & 2048: return self.filename.decode('utf-8') return decode_arcname(self.filename) def _decodeExtra(self): extra = self.extra unpack = struct.unpack while extra: (tp, ln) = unpack('<HH', extra[:4]) if tp == 1: if ln >= 24: counts = unpack('<QQQ', extra[4:28]) elif ln == 16: counts = unpack('<QQ', extra[4:20]) elif ln == 8: counts = unpack('<Q', extra[4:12]) elif ln == 0: counts = () else: raise RuntimeError, 'Corrupt extra field %s' % (ln,) idx = ln >= 24 if self.file_size in (0xFFFFFFFFFFFFFFFFL, 0xFFFFFFFFL): self.file_size = counts[idx] idx += 1 if self.compress_size == 0xFFFFFFFFL: self.compress_size = counts[idx] idx += 1 if self.header_offset == 0xFFFFFFFFL: self.header_offset = counts[idx] idx += 1 extra = extra[ln + 4:] class _ZipDecrypter: def _GenerateCRCTable(): poly = 0xEDB88320L table = [ 0] * 256 for i in range(256): crc = i for j in range(8): if crc & 1: crc = crc >> 1 & 2147483647 ^ poly continue crc = crc >> 1 & 2147483647 table[i] = crc return table crctable = _GenerateCRCTable() def _crc32(self, ch, crc): return crc >> 8 & 16777215 ^ self.crctable[(crc ^ ord(ch)) & 255] def __init__(self, pwd): self.key0 = 305419896 self.key1 = 591751049 self.key2 = 878082192 for p in pwd: self._UpdateKeys(p) def _UpdateKeys(self, c): self.key0 = self._crc32(c, self.key0) self.key1 = self.key1 + (self.key0 & 255) & 0xFFFFFFFFL self.key1 = self.key1 * 134775813 + 1 & 0xFFFFFFFFL self.key2 = self._crc32(chr(self.key1 >> 24 & 255), self.key2) def __call__(self, c): c = ord(c) k = self.key2 | 2 c = c ^ k * (k ^ 1) >> 8 & 255 c = chr(c) self._UpdateKeys(c) return c class ZipExtFile: def __init__(self, fileobj, zipinfo, decrypt = None): self.fileobj = fileobj self.orig_pos = fileobj.tell() self.decrypter = decrypt self.bytes_read = 0x0L self.rawbuffer = '' self.readbuffer = '' self.linebuffer = '' self.eof = False self.univ_newlines = False self.nlSeps = ('\n',) self.lastdiscard = '' self.compress_type = zipinfo.compress_type self.compress_size = zipinfo.compress_size self.closed = False self.mode = 'r' self.name = zipinfo.filename self.compreadsize = 65536 if self.compress_type == ZIP_DEFLATED: self.dc = zlib.decompressobj(-15) def set_univ_newlines(self, univ_newlines): self.univ_newlines = univ_newlines self.nlSeps = ('\n',) if self.univ_newlines: self.nlSeps = ('\r\n', '\r', '\n') def __iter__(self): return self def next(self): nextline = self.readline() if not nextline: raise StopIteration() nextline return nextline def close(self): self.closed = True def _checkfornewline(self): (nl, nllen) = (-1, -1) if self.linebuffer: if (self.lastdiscard, self.linebuffer[0]) == ('\r', '\n'): self.linebuffer = self.linebuffer[1:] for sep in self.nlSeps: nl = self.linebuffer.find(sep) if nl >= 0: nllen = len(sep) return (nl, nllen) return (nl, nllen) def readline(self, size = -1): if size < 0: size = sys.maxint elif size == 0: return '' (nl, nllen) = self._checkfornewline() buf = self.linebuffer[:nl] self.lastdiscard = self.linebuffer[nl:nl + nllen] self.linebuffer = self.linebuffer[nl + nllen:] return buf + '\n' def readlines(self, sizehint = -1): result = [] while True: line = self.readline() if not line: break result.append(line) return result def read_raw(self): pos = self.fileobj.tell() self.fileobj.seek(self.orig_pos) bytes_to_read = self.compress_size if self.decrypter is not None: bytes_to_read -= 12 raw = '' if bytes_to_read > 0: raw = self.fileobj.read(bytes_to_read) self.fileobj.seek(pos) return raw def read(self, size = None): if size == 0: return '' bytesToRead = self.compress_size - self.bytes_read if self.decrypter is not None: bytesToRead -= 12 if size is not None and size >= 0: if self.compress_type == ZIP_STORED: lr = len(self.readbuffer) bytesToRead = min(bytesToRead, size - lr) elif self.compress_type == ZIP_DEFLATED: if len(self.readbuffer) > size: bytesToRead = 0 else: lr = len(self.rawbuffer) bytesToRead = min(bytesToRead, self.compreadsize - lr) if bytesToRead + self.bytes_read > self.compress_size: bytesToRead = self.compress_size - self.bytes_read if size is None or len(self.readbuffer) <= size: bytes = self.readbuffer self.readbuffer = '' else: bytes = self.readbuffer[:size] self.readbuffer = self.readbuffer[size:] return bytes class ZipFile: fp = None def __init__(self, file, mode = 'r', compression = ZIP_DEFLATED, allowZip64 = False): if mode not in ('r', 'w', 'a'): raise RuntimeError('ZipFile() requires mode "r", "w", or "a" not %s' % mode) mode not in ('r', 'w', 'a') if compression == ZIP_STORED: pass elif compression == ZIP_DEFLATED: if not zlib: raise RuntimeError, 'Compression requires the (missing) zlib module' zlib else: raise RuntimeError, 'That compression method is not supported' self._allowZip64 = compression == ZIP_STORED self._didModify = False self.debug = 0 self.NameToInfo = { } self.filelist = [] self.extract_mapping = { } self.compression = compression self.mode = key = mode.replace('b', '')[0] self.pwd = None self.comment = '' if isinstance(file, basestring): self._filePassed = 0 self.filename = file modeDict = { 'r': 'rb', 'w': 'wb', 'a': 'r+b' } try: self.fp = open(file, modeDict[mode]) except IOError: if mode == 'a': mode = key = 'w' self.fp = open(file, modeDict[mode]) else: raise mode == 'a' None<EXCEPTION MATCH>IOError self._filePassed = 1 self.fp = file self.filename = getattr(file, 'name', None) if key == 'r': self._GetContents() elif key == 'w': pass elif key == 'a': try: self._RealGetContents() self._calculate_file_offsets() self.fp.seek(self.start_dir, 0) except BadZipfile: self.fp.seek(0, 2) except: None<EXCEPTION MATCH>BadZipfile None<EXCEPTION MATCH>BadZipfile if not self._filePassed: self.fp.close() self.fp = None raise RuntimeError, 'Mode must be "r", "w" or "a"' def _GetContents(self): try: self._RealGetContents() except BadZipfile: if not self._filePassed: self.fp.close() self.fp = None raise def _RealGetContents(self): fp = self.fp endrec = _EndRecData(fp) if not endrec: raise BadZipfile, 'File is not a zip file' endrec if self.debug > 1: print endrec size_cd = endrec[_ECD_SIZE] offset_cd = endrec[_ECD_OFFSET] self.comment = endrec[_ECD_COMMENT] concat = endrec[_ECD_LOCATION] - size_cd - offset_cd if endrec[_ECD_LOCATION] > ZIP64_LIMIT: concat -= sizeEndCentDir64 + sizeEndCentDir64Locator if self.debug > 2: inferred = concat + offset_cd print 'given, inferred, offset', offset_cd, inferred, concat self.start_dir = offset_cd + concat fp.seek(self.start_dir, 0) data = fp.read(size_cd) fp = cStringIO.StringIO(data) total = 0 while total < size_cd: centdir = fp.read(sizeCentralDir) if centdir[0:4] != stringCentralDir: raise BadZipfile, 'Bad magic number for central directory' centdir[0:4] != stringCentralDir centdir = struct.unpack(structCentralDir, centdir) if self.debug > 2: print centdir filename = fp.read(centdir[_CD_FILENAME_LENGTH]) x = ZipInfo(filename) x.extra = fp.read(centdir[_CD_EXTRA_FIELD_LENGTH]) x.comment = fp.read(centdir[_CD_COMMENT_LENGTH]) x.header_offset = centdir[_CD_LOCAL_HEADER_OFFSET] (x.create_version, x.create_system, x.extract_version, x.reserved, x.flag_bits, x.compress_type, t, d, x.CRC, x.compress_size, x.file_size) = centdir[1:12] (x.volume, x.internal_attr, x.external_attr) = centdir[15:18] x._raw_time = t x.date_time = ((d >> 9) + 1980, d >> 5 & 15, d & 31, t >> 11, t >> 5 & 63, (t & 31) * 2) x._decodeExtra() x.header_offset = x.header_offset + concat x.filename = x._decodeFilename() self.filelist.append(x) self.NameToInfo[x.filename] = x total = total + sizeCentralDir + centdir[_CD_FILENAME_LENGTH] + centdir[_CD_EXTRA_FIELD_LENGTH] + centdir[_CD_COMMENT_LENGTH] if self.debug > 2: print 'total', total continue def _calculate_file_offsets(self): for zip_info in self.filelist: self.fp.seek(zip_info.header_offset, 0) fheader = self.fp.read(30) if fheader[0:4] != stringFileHeader: raise BadZipfile, 'Bad magic number for file header' fheader[0:4] != stringFileHeader fheader = struct.unpack(structFileHeader, fheader) file_offset = zip_info.header_offset + 30 + fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] fname = self.fp.read(fheader[_FH_FILENAME_LENGTH]) if fname != zip_info.orig_filename: raise RuntimeError('File name in directory "%s" and header "%s" differ.' % (zip_info.orig_filename, fname)) fname != zip_info.orig_filename zip_info.file_offset = file_offset def replace(self, filename, arcname = None, compress_type = None): deleteName = arcname if deleteName is None: deleteName = filename self.delete(deleteName) self.write(filename, arcname, compress_type) def replacestr(self, zinfo, bytes): self.delete(zinfo.filename) self.writestr(zinfo, bytes) def delete(self, name): for i in range(0, len(self.filelist)): if self.filelist[i].filename == name: if self.debug: print 'Removing', name deleted_offset = self.filelist[i].header_offset deleted_size = (self.filelist[i].file_offset - self.filelist[i].header_offset) + self.filelist[i].compress_size zinfo_size = struct.calcsize(structCentralDir) + len(self.filelist[i].filename) + len(self.filelist[i].extra) current_offset = self.fp.tell() self.fp.seek(0, 2) archive_size = self.fp.tell() self.fp.seek(deleted_offset + deleted_size) buf = self.fp.read() self.fp.seek(deleted_offset) self.fp.write(buf) self.fp.truncate(archive_size - deleted_size - zinfo_size) if current_offset > deleted_offset + deleted_size: current_offset -= deleted_size elif current_offset > deleted_offset: current_offset = deleted_offset self.fp.seek(current_offset, 0) del self.filelist[i] for j in range(i, len(self.filelist)): if self.filelist[j].header_offset > deleted_offset: self.filelist[j].header_offset -= deleted_size if self.filelist[j].file_offset > deleted_offset: self.filelist[j].file_offset -= deleted_size continue self.filelist[j] self._didModify = True return None def namelist(self): l = [] for data in self.filelist: l.append(data.filename) return l def infolist(self): return self.filelist def printdir(self): print '%-46s %19s %12s' % ('File Name', 'Modified ', 'Size') for zinfo in self.filelist: date = '%d-%02d-%02d %02d:%02d:%02d' % zinfo.date_time[:6] print '%-46s %s %12d' % (zinfo.filename, date, zinfo.file_size) def testzip(self): for zinfo in self.filelist: try: self.read(zinfo.filename) continue except BadZipfile: return zinfo.filename def getinfo(self, name): info = self.NameToInfo.get(name) if info is None: raise KeyError('There is no item named %r in the archive' % name) info is None return info def setpassword(self, pwd): self.pwd = pwd def read(self, name, pwd = None): return self.open(name, 'r', pwd).read() def read_raw(self, name, mode = 'r', pwd = None): zef = self.open(name, mode = mode, pwd = pwd) return zef.read_raw() def open(self, name, mode = 'r', pwd = None): if mode not in ('r', 'U', 'rU'): raise RuntimeError, 'open() requires mode "r", "U", or "rU"' mode not in ('r', 'U', 'rU') if not self.fp: raise RuntimeError, 'Attempt to read ZIP archive that was already closed' self.fp if self._filePassed: zef_file = self.fp else: zef_file = open(self.filename, 'rb') if isinstance(name, ZipInfo): zinfo = name else: zinfo = self.getinfo(name) zef_file.seek(zinfo.header_offset, 0) fheader = zef_file.read(sizeFileHeader) if fheader[0:4] != stringFileHeader: raise BadZipfile, 'Bad magic number for file header' fheader[0:4] != stringFileHeader fheader = struct.unpack(structFileHeader, fheader) fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if fname != zinfo.orig_filename: raise BadZipfile, 'File name in directory "%s" and header "%s" differ.' % (zinfo.orig_filename, fname) fname != zinfo.orig_filename is_encrypted = zinfo.flag_bits & 1 zd = None if is_encrypted: if not pwd: pwd = self.pwd if not pwd: raise RuntimeError, 'File %s is encrypted, password required for extraction' % name pwd zd = _ZipDecrypter(pwd) bytes = zef_file.read(12) h = map(zd, bytes[0:12]) if zinfo.flag_bits & 8: check_byte = zinfo._raw_time >> 8 & 255 else: check_byte = zinfo.CRC >> 24 & 255 if ord(h[11]) != check_byte: raise RuntimeError('Bad password for file', name) ord(h[11]) != check_byte if zd is None: zef = ZipExtFile(zef_file, zinfo) else: zef = ZipExtFile(zef_file, zinfo, zd) if 'U' in mode: zef.set_univ_newlines(True) return zef def extract(self, member, path = None, pwd = None): if not isinstance(member, ZipInfo): member = self.getinfo(member) if path is None: path = os.getcwd() return self._extract_member(member, path, pwd) def extractall(self, path = None, members = None, pwd = None): if members is None: members = self.namelist() for zipinfo in members: self.extract(zipinfo, path, pwd) def _extract_member(self, member, targetpath, pwd): if targetpath[-1:] == '/': targetpath = targetpath[:-1] fname = member.filename if isinstance(fname, unicode): fname = fname.encode(filesystem_encoding, 'replace') if fname.startswith('/'): fname = fname[1:] targetpath = os.path.join(targetpath, fname) targetpath = os.path.normpath(targetpath) upperdirs = os.path.dirname(targetpath) while upperdirs: if os.path.exists(upperdirs): if os.path.isdir(upperdirs): break os.remove(upperdirs) upperdirs = os.path.dirname(upperdirs) upperdirs = os.path.dirname(targetpath) if upperdirs and not os.path.exists(upperdirs): os.makedirs(upperdirs) self.extract_mapping[member.filename] = targetpath return targetpath def _writecheck(self, zinfo): if zinfo.filename in self.NameToInfo: if self.debug: print 'Duplicate name:', zinfo.filename if self.mode not in ('w', 'a'): raise RuntimeError, 'write() requires mode "w" or "a"' self.mode not in ('w', 'a') if not self.fp: raise RuntimeError, 'Attempt to write ZIP archive that was already closed' self.fp if zinfo.compress_type == ZIP_DEFLATED and not zlib: raise RuntimeError, 'Compression requires the (missing) zlib module' not zlib if zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED): raise RuntimeError, 'That compression method is not supported' zinfo.compress_type not in (ZIP_STORED, ZIP_DEFLATED) if zinfo.file_size > ZIP64_LIMIT: if not self._allowZip64: raise LargeZipFile('Filesize would require ZIP64 extensions') self._allowZip64 if zinfo.header_offset > ZIP64_LIMIT: if not self._allowZip64: raise LargeZipFile('Zipfile size would require ZIP64 extensions') self._allowZip64 def write(self, filename, arcname = None, compress_type = None): if not self.fp: raise RuntimeError('Attempt to write to ZIP archive that was already closed') self.fp st = os.stat(filename) mtime = time.localtime(st.st_mtime) date_time = mtime[0:6] if arcname is None: arcname = filename arcname = os.path.normpath(os.path.splitdrive(arcname)[1]) while arcname[0] in (os.sep, os.altsep): arcname = arcname[1:] if not isinstance(arcname, unicode): arcname = arcname.decode(filesystem_encoding) zinfo = ZipInfo(arcname, date_time) zinfo.external_attr = (st[0] & 65535) << 0x10L if compress_type is None: zinfo.compress_type = self.compression else: zinfo.compress_type = compress_type zinfo.file_size = st.st_size zinfo.flag_bits = 0 zinfo.header_offset = self.fp.tell() self._writecheck(zinfo) self._didModify = True fp = open(filename, 'rb') zinfo.CRC = CRC = 0 zinfo.compress_size = compress_size = 0 zinfo.file_size = file_size = 0 self.fp.write(zinfo.FileHeader()) if zinfo.compress_type == ZIP_DEFLATED: cmpr = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) else: cmpr = None while None: buf = fp.read(8192) if not buf: break file_size = file_size + len(buf) CRC = crc32(buf, CRC) & 0xFFFFFFFFL if cmpr: buf = cmpr.compress(buf) compress_size = compress_size + len(buf) continue fp.close() if cmpr: buf = cmpr.flush() compress_size = compress_size + len(buf) self.fp.write(buf) zinfo.compress_size = compress_size else: zinfo.compress_size = file_size zinfo.CRC = CRC zinfo.file_size = file_size position = self.fp.tell() self.fp.seek(zinfo.header_offset + 14, 0) self.fp.write(struct.pack('<LLL', zinfo.CRC, zinfo.compress_size, zinfo.file_size)) self.fp.seek(position, 0) self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo def writestr(self, zinfo_or_arcname, bytes, permissions = 384, compression = ZIP_DEFLATED, raw_bytes = False): if not isinstance(zinfo_or_arcname, ZipInfo): if not isinstance(zinfo_or_arcname, unicode): zinfo_or_arcname = zinfo_or_arcname.decode(filesystem_encoding) zinfo = ZipInfo(filename = zinfo_or_arcname, date_time = time.localtime(time.time())[:6]) zinfo.compress_type = compression zinfo.external_attr = permissions << 16 else: zinfo = zinfo_or_arcname if not self.fp: raise RuntimeError('Attempt to write to ZIP archive that was already closed') self.fp if not raw_bytes: zinfo.file_size = len(bytes) zinfo.header_offset = self.fp.tell() self._writecheck(zinfo) self._didModify = True if not raw_bytes: zinfo.CRC = crc32(bytes) & 0xFFFFFFFFL if zinfo.compress_type == ZIP_DEFLATED: co = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15) bytes = co.compress(bytes) + co.flush() zinfo.compress_size = len(bytes) else: zinfo.compress_size = zinfo.file_size zinfo.header_offset = self.fp.tell() self.fp.write(zinfo.FileHeader()) self.fp.write(bytes) self.fp.flush() if zinfo.flag_bits & 8: self.fp.write(struct.pack('<lLL', zinfo.CRC, zinfo.compress_size, zinfo.file_size)) self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo def add_dir(self, path, prefix = ''): if prefix: self.writestr(prefix + '/', '', 448) cwd = os.path.abspath(os.getcwd()) try: os.chdir(path) fp = (None + prefix if prefix else '').replace('//', '/') for f in os.listdir('.'): arcname = fp + f if os.path.isdir(f): self.add_dir(f, prefix = arcname) continue self.write(f, arcname) finally: os.chdir(cwd) def __del__(self): self.close() def close(self): if self.fp is None: return None if not self._filePassed: self.fp.close() self.fp = None def safe_replace(zipstream, name, datastream, extra_replacements = { }): z = ZipFile(zipstream, 'r') replacements = { name: datastream } replacements.update(extra_replacements) names = frozenset(replacements.keys()) try: temp = _[1] ztemp = ZipFile(temp, 'w') for obj in z.infolist(): if obj.filename in names: ztemp.writestr(obj, replacements[obj.filename].read()) continue SpooledTemporaryFile(max_size = 104857600).__exit__ ztemp.writestr(obj, z.read_raw(obj), raw_bytes = True) ztemp.close() z.close() temp.seek(0) zipstream.seek(0) zipstream.truncate() shutil.copyfileobj(temp, zipstream) zipstream.flush() finally: pass class PyZipFile(ZipFile): def writepy(self, pathname, basename = ''): (dir, name) = os.path.split(pathname) if os.path.isdir(pathname): initname = os.path.join(pathname, '__init__.py') if os.path.isfile(initname): if basename: basename = '%s/%s' % (basename, name) else: basename = name if self.debug: print 'Adding package in', pathname, 'as', basename (fname, arcname) = self._get_codename(initname[0:-3], basename) if self.debug: print 'Adding', arcname self.write(fname, arcname) dirlist = os.listdir(pathname) dirlist.remove('__init__.py') for filename in dirlist: path = os.path.join(pathname, filename) ext = os.path.splitext(filename)[-1] if os.path.isdir(path): if os.path.isfile(os.path.join(path, '__init__.py')): self.writepy(path, basename) os.path.isfile(os.path.join(path, '__init__.py')) if ext == '.py': (fname, arcname) = self._get_codename(path[0:-3], basename) if self.debug: print 'Adding', arcname self.write(fname, arcname) continue elif self.debug: print 'Adding files from directory', pathname for filename in os.listdir(pathname): path = os.path.join(pathname, filename) ext = os.path.splitext(filename)[-1] if ext == '.py': (fname, arcname) = self._get_codename(path[0:-3], basename) if self.debug: print 'Adding', arcname self.write(fname, arcname) continue elif pathname[-3:] != '.py': raise RuntimeError, 'Files added with writepy() must end with ".py"' (fname, arcname) = self._get_codename(pathname[0:-3], basename) if self.debug: print 'Adding file', arcname self.write(fname, arcname) def _get_codename(self, pathname, basename): file_py = pathname + '.py' file_pyc = pathname + '.pyc' file_pyo = pathname + '.pyo' if os.path.isfile(file_pyo) and os.stat(file_pyo).st_mtime >= os.stat(file_py).st_mtime: fname = file_pyo elif not os.path.isfile(file_pyc) or os.stat(file_pyc).st_mtime < os.stat(file_py).st_mtime: import py_compile if self.debug: print 'Compiling', file_py try: py_compile.compile(file_py, file_pyc, None, True) except py_compile.PyCompileError: err = None print err.msg fname = file_pyc else: fname = file_pyc archivename = os.path.split(fname)[1] if basename: archivename = '%s/%s' % (basename, archivename) return (fname, archivename) def main(args = None): import textwrap USAGE = textwrap.dedent(' Usage:\n zipfile.py -l zipfile.zip # Show listing of a zipfile\n zipfile.py -t zipfile.zip # Test if a zipfile is valid\n zipfile.py -e zipfile.zip target # Extract zipfile into target dir\n zipfile.py -c zipfile.zip src ... # Create zipfile from sources\n ') if args is None: args = sys.argv[1:] if not args or args[0] not in ('-l', '-c', '-e', '-t'): print USAGE sys.exit(1) if args[0] == '-l': if len(args) != 2: print USAGE sys.exit(1) zf = ZipFile(args[1], 'r') zf.printdir() zf.close() elif args[0] == '-t': if len(args) != 2: print USAGE sys.exit(1) zf = ZipFile(args[1], 'r') zf.testzip() print 'Done testing' elif args[0] == '-e': if len(args) != 3: print USAGE sys.exit(1) zf = ZipFile(args[1], 'r') out = args[2] for path in zf.namelist(): if path.startswith('./'): tgt = os.path.join(out, path[2:]) else: tgt = os.path.join(out, path) tgtdir = os.path.dirname(tgt) if not os.path.exists(tgtdir): os.makedirs(tgtdir) fp = open(tgt, 'wb') fp.write(zf.read(path)) fp.close() zf.close() elif args[0] == '-c': if len(args) < 3: print USAGE sys.exit(1) def addToZip(zf, path, zippath): if os.path.isfile(path): zf.write(path, zippath, ZIP_DEFLATED) elif os.path.isdir(path): for nm in os.listdir(path): addToZip(zf, os.path.join(path, nm), os.path.join(zippath, nm)) zf = ZipFile(args[1], 'w', allowZip64 = True) for src in args[2:]: addToZip(zf, src, os.path.basename(src)) zf.close() if __name__ == '__main__': main()