home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __all__ = [
- 'unpack_archive',
- 'unpack_zipfile',
- 'unpack_tarfile',
- 'default_filter',
- 'UnrecognizedFormat',
- 'extraction_drivers',
- 'unpack_directory']
- import zipfile
- import tarfile
- import os
- import shutil
- from pkg_resources import ensure_directory
- from distutils.errors import DistutilsError
-
- class UnrecognizedFormat(DistutilsError):
- pass
-
-
- def default_filter(src, dst):
- return dst
-
-
- def unpack_archive(filename, extract_dir, progress_filter = default_filter, drivers = None):
- for driver in extraction_drivers:
-
- try:
- driver(filename, extract_dir, progress_filter)
- except UnrecognizedFormat:
- continue
- continue
-
- return None
- raise UnrecognizedFormat('Not a recognized archive type: %s' % filename)
-
-
- def unpack_directory(filename, extract_dir, progress_filter = default_filter):
- if not os.path.isdir(filename):
- raise UnrecognizedFormat('%s is not a directory' % (filename,))
- os.path.isdir(filename)
- paths = {
- filename: ('', extract_dir) }
- for base, dirs, files in os.walk(filename):
- (src, dst) = paths[base]
- for d in dirs:
- paths[os.path.join(base, d)] = (src + d + '/', os.path.join(dst, d))
-
- for f in files:
- name = src + f
- target = os.path.join(dst, f)
- target = progress_filter(src + f, target)
- if not target:
- continue
-
- ensure_directory(target)
- f = os.path.join(base, f)
- shutil.copyfile(f, target)
- shutil.copystat(f, target)
-
-
-
-
- def unpack_zipfile(filename, extract_dir, progress_filter = default_filter):
- if not zipfile.is_zipfile(filename):
- raise UnrecognizedFormat('%s is not a zip file' % (filename,))
- zipfile.is_zipfile(filename)
- z = zipfile.ZipFile(filename)
-
- try:
- for info in z.infolist():
- name = info.filename
- if name.startswith('/') or '..' in name:
- continue
-
- target = os.path.join(extract_dir, *name.split('/'))
- target = progress_filter(name, target)
- if not target:
- continue
-
- if name.endswith('/'):
- ensure_directory(target)
- continue
- ensure_directory(target)
- data = z.read(info.filename)
- f = open(target, 'wb')
-
- try:
- f.write(data)
- finally:
- f.close()
- del data
-
- finally:
- z.close()
-
-
-
- def unpack_tarfile(filename, extract_dir, progress_filter = default_filter):
-
- try:
- tarobj = tarfile.open(filename)
- except tarfile.TarError:
- raise UnrecognizedFormat('%s is not a compressed or uncompressed tar file' % (filename,))
-
-
- try:
-
- tarobj.chown = lambda *args: pass
- for member in tarobj:
- if member.isfile() or member.isdir():
- name = member.name
- if not name.startswith('/') and '..' not in name:
- dst = os.path.join(extract_dir, *name.split('/'))
- dst = progress_filter(name, dst)
- if dst:
- if dst.endswith(os.sep):
- dst = dst[:-1]
-
- tarobj._extract_member(member, dst)
-
-
- '..' not in name
-
- return True
- finally:
- tarobj.close()
-
-
- extraction_drivers = (unpack_directory, unpack_zipfile, unpack_tarfile)
-