home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- '''Class for representing and working with chroots.'''
- import subprocess
- import tempfile
- import os
- import os.path as os
- import shutil
-
- def setup_fakeroot_env():
- '''Set up a fakeroot/fakechroot environment.
-
- This needs the libraries /usr/lib/libfakeroot/libfakeroot-sysv.so and
- /usr/lib/fakechroot/libfakechroot.so; these paths can be overridden
- with the environment variables APPORT_LIBFAKEROOT and
- APPORT_LIBFAKECHROOT.'''
- libfakeroot = os.environ.get('APPORT_LIBFAKEROOT', '/usr/lib/libfakeroot/libfakeroot-sysv.so')
- if not os.path.exists(libfakeroot):
- raise AssertionError, '%s not found; please set APPORT_LIBFAKEROOT correctly' % libfakeroot
- libfakechroot = os.environ.get('APPORT_LIBFAKECHROOT', '/usr/lib/fakechroot/libfakechroot.so')
- if not os.path.exists(libfakechroot):
- raise AssertionError, '%s not found; please set APPORT_LIBFAKECHROOT correctly' % libfakechroot
- os.environ['LD_PRELOAD'] = '%s %s %s' % (libfakechroot, libfakeroot, os.environ.get('LD_PRELOAD', ''))
- os.environ['LD_LIBRARY_PATH'] = '/lib:' + os.environ.get('LD_LIBRARY_PATH', '')
- os.environ['FAKECHROOT'] = 'true'
-
-
- def pathsplit(p, rest = []):
- (h, t) = os.path.split(p)
- if len(h) < 1:
- return [
- t] + rest
- if len(t) < 1:
- return [
- h] + rest
- return pathsplit(h, [
- t] + rest)
-
-
- def commonpath(l1, l2, common = []):
- if len(l1) < 1:
- return (common, l1, l2)
- if len(l2) < 1:
- return (common, l1, l2)
- if l1[0] != l2[0]:
- return (common, l1, l2)
- return commonpath(l1[1:], l2[1:], common + [
- l1[0]])
-
-
- def relpath(p1, p2):
- (common, l1, l2) = commonpath(pathsplit(p1), pathsplit(p2))
- p = []
- if len(l1) > 0:
- p = [
- '../' * len(l1)]
-
- p = p + l2
- return os.path.join(*p)
-
-
- class Chroot:
- '''Work with a chroot (either in directory or in tarball form).
-
- If called as non-root user, this calls setup_fakeroot_env() to use the
- fakeroot/fakechroot libraries.'''
-
- def __init__(self, root):
- '''Bind to a chroot, which can either be a directory, a tarball, or
- None to work in the main system.
-
- If a tarball is given, then it gets unpacked into a temporary directory
- which is cleaned up at program termination.'''
- self.remove = False
- if os.geteuid() != 0:
- setup_fakeroot_env()
-
- self.root_tarball = None
- if root is None:
- self.root = None
- elif os.path.isdir(root):
- self.root = root
- elif not os.path.isfile(root):
- raise AssertionError
- self.root_tarball = root
- self.root = tempfile.mkdtemp()
- self.remove = True
- if not subprocess.call([
- 'tar',
- '-C',
- self.root,
- '-xzf',
- root]) == 0:
- raise AssertionError
-
-
- def __del__(self):
- if hasattr(self, 'remove') and self.remove:
- shutil.rmtree(self.root)
-
-
-
- def tar(self, tarball = None):
- '''Create a tarball from the chroot.
-
- If tarball does not specify a .tar.gz path, then the Chroot must have
- been created from a tarball, and that tarball is updated.'''
- if not tarball:
- if not self.root_tarball:
- raise AssertionError
- tarball = self.root_tarball
-
- f = open(tarball, 'w')
- orig_cwd = os.getcwd()
-
- try:
- os.chdir(self.root)
- tar = subprocess.Popen([
- 'tar',
- 'cp',
- '.'], stdout = subprocess.PIPE)
- gz = subprocess.Popen([
- 'gzip',
- '-9'], stdin = tar.stdout, stdout = f)
- if not tar.wait() == 0:
- raise AssertionError
- if not gz.wait() == 0:
- raise AssertionError
- f.close()
- finally:
- os.chdir(orig_cwd)
-
-
-
- def _exec_capture(self, argv, stdin = None):
- '''Internal helper function to wrap subprocess.Popen() and return a
- triple (stdout, stderr, returncode).'''
- if stdin:
- p = subprocess.Popen(argv, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- (out, err) = p.communicate(stdin)
- else:
- p = subprocess.Popen(argv, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- (out, err) = p.communicate()
- return (out, err, p.returncode)
-
-
- def run(self, argv):
- '''Execute the given commandline vector in the chroot and return the
- exit code.'''
- if self.root:
- return subprocess.call([
- 'chroot',
- self.root] + argv)
- return subprocess.call(argv)
-
-
- def run_capture(self, argv, stdin = None):
- '''Execute the given command line vector in the chroot and return a
- triple (stdout, stderr, exit code).'''
- if self.root:
- return self._exec_capture([
- 'chroot',
- self.root] + argv, stdin)
- return self._exec_capture(argv, stdin)
-
-
- def fix_symlinks(self):
- """Remove root prefix from symbolic links in chroot directory,
- otherwise chroot tarballs don't work at all, and we cannot move/rename
- chroot directories."""
- for r, dirs, files in os.walk(self.root):
- for f in files:
- path = os.path.join(r, f)
- if os.path.islink(path):
- target = os.readlink(path)
- if target.startswith(self.root):
- os.unlink(path)
- os.symlink(relpath(os.path.dirname(path), target), path)
-
- target.startswith(self.root)
-
-
-
-
- if __name__ == '__main__':
- import unittest
- import os
- import tarfile
- import re
- import shutil
-
- class ChrootTest(unittest.TestCase):
-
- def test_null(self):
- '''null chroot (working in the main system)'''
- c = Chroot(None)
- self.assertEqual(c.root_tarball, None)
- self.assertEqual(c.run([
- '/bin/sh',
- '-c',
- 'exit 42']), 42)
- (out, err, ret) = c.run_capture([
- '/bin/ls',
- '/bin/ls'])
- self.assertEqual(ret, 0)
- self.assertEqual(out, '/bin/ls\n')
- self.assertEqual(err, '')
- (out, err, ret) = c.run_capture([
- '/bin/ls',
- '/nonexisting/gibberish'])
- self.assertNotEqual(ret, 0)
- self.assertEqual(out, '')
- self.assertNotEqual(err, '')
-
-
- def _mkchroot(self):
- '''Return a test chroot dir with /bin/hello and /bin/42.'''
- d = tempfile.mkdtemp()
- bindir = os.path.join(d, 'bin')
- os.mkdir(bindir)
- open(os.path.join(d, 'hello.c'), 'w').write('\n#include <stdio.h>\nint main() { puts("hello"); return 0; }\n')
- open(os.path.join(d, '42.c'), 'w').write('\nint main() { return 42; }\n')
- if not subprocess.call([
- 'cc',
- '-static',
- os.path.join(d, 'hello.c'),
- '-o',
- os.path.join(bindir, 'hello')]) == 0:
- raise AssertionError
- if not subprocess.call([
- 'cc',
- '-static',
- os.path.join(d, '42.c'),
- '-o',
- os.path.join(bindir, '42')]) == 0:
- raise AssertionError
- return d
-
-
- def test_dir(self):
- '''directory chroot.'''
- d = self._mkchroot()
- tarpath = None
-
- try:
- c = Chroot(d)
- self.assertEqual(c.root_tarball, None)
- self.assertEqual(c.run([
- '/bin/42']), 42)
- (out, err, ret) = c.run_capture([
- '/bin/hello'])
- self.assertEqual(ret, 0)
- self.assertEqual(out, 'hello\n')
- self.assertEqual(err, '')
- open(os.path.join(c.root, 'newfile'), 'w')
- self.assertRaises(AssertionError, c.tar)
- (fd, tarpath) = tempfile.mkstemp()
- os.close(fd)
- c.tar(tarpath)
- t = tarfile.open(tarpath)
- if not set([
- './bin/42',
- './bin/hello',
- './newfile']).issubset(set(t.getnames())):
- pass
- self.assert_(set([
- 'bin/42',
- 'bin/hello',
- 'newfile']).issubset(set(t.getnames())))
- del c
- self.assert_(os.path.exists(os.path.join(d, 'bin', '42')), 'directory chroot should not delete the chroot')
- finally:
- shutil.rmtree(d)
- if tarpath:
- os.unlink(tarpath)
-
-
-
-
- def test_tarball(self):
- '''tarball chroot.'''
- d = self._mkchroot()
-
- try:
- (fd, tar) = tempfile.mkstemp()
- os.close(fd)
- orig_cwd = os.getcwd()
- os.chdir(d)
- if not subprocess.call([
- 'tar',
- 'czPf',
- tar,
- '.']) == 0:
- raise AssertionError
- if not os.path.exists(tar):
- raise AssertionError
- os.chdir(orig_cwd)
- finally:
- shutil.rmtree(d)
-
-
- try:
- c = Chroot(tar)
- self.assertEqual(c.root_tarball, tar)
- self.assertEqual(c.run([
- '/bin/42']), 42)
- (out, err, ret) = c.run_capture([
- '/bin/hello'])
- self.assertEqual(ret, 0)
- self.assertEqual(out, 'hello\n')
- self.assertEqual(err, '')
- open(os.path.join(c.root, 'newfile'), 'w')
- c.tar()
- t = tarfile.open(tar)
- if not set([
- './bin/42',
- './bin/hello',
- './newfile']).issubset(set(t.getnames())):
- pass
- self.assert_(set([
- 'bin/42',
- 'bin/hello',
- 'newfile']).issubset(set(t.getnames())))
- d = c.root
- del c
- self.assert_(not os.path.exists(d), 'tarball chroot should delete the temporary chroot')
- finally:
- os.unlink(tar)
-
-
-
- def test_fix_symlinks(self):
- '''symlink fixing in chroots.'''
- d = self._mkchroot()
-
- try:
- os.symlink(os.path.join(d, 'bin', '42'), os.path.join(d, 'bin', '42prefix'))
- os.symlink(os.path.join('/bin/42'), os.path.join(d, 'bin', '42noprefix'))
- os.symlink(os.path.join('bin/42'), os.path.join(d, '42rel'))
- c = Chroot(d)
- c.fix_symlinks()
- self.assertEqual(c.run([
- '/42rel']), 42)
- self.assertEqual(c.run([
- '/bin/42prefix']), 42)
- self.assertEqual(os.readlink(os.path.join(d, 'bin', '42noprefix')), '/bin/42')
- finally:
- shutil.rmtree(d)
-
-
-
- def _install_file(klass, path, root):
- '''Install given file into a chroot, preserving the path.
-
- Do nothing if the target file already exists.'''
- destpath = root + os.path.abspath(path)
- if os.path.exists(destpath):
- return None
- destdir = os.path.dirname(destpath)
- if not os.path.isdir(destdir):
- os.makedirs(destdir)
-
- shutil.copy(path, destdir)
-
- _install_file = classmethod(_install_file)
-
- def _install_exe(klass, exepath, root):
- '''Install an executable and all linked shlibs into a chroot.'''
- klass._install_file(exepath, root)
- ldd = subprocess.Popen([
- 'ldd',
- exepath], stdout = subprocess.PIPE)
- out = ldd.communicate()[0]
- if not ldd.returncode == 0:
- raise AssertionError
- for m in re.finditer(' => (/[^ ]+)', out):
- klass._install_file(m.group(1), root)
-
- for m in re.finditer('^\\s*(/[^ ]+)', out, re.M):
- klass._install_file(m.group(1), root)
-
-
- _install_exe = classmethod(_install_exe)
-
- def test_shell_ops(self):
- '''various shell operations in the chroot.'''
- d = tempfile.mkdtemp()
- ldir = os.path.join(d, 'lib')
- os.mkdir(ldir)
- if not subprocess.call('cp -a /lib/*.so ' + ldir, shell = True) == 0:
- raise AssertionError
-
- try:
- for cmd in ('bash', 'echo', 'cat', 'cp', 'ln', 'ls', 'rm', 'mkdir', 'rmdir', 'chmod', 'chown'):
- self._install_exe('/bin/' + cmd, d)
-
- self._install_exe('/usr/bin/stat', d)
- c = Chroot(d)
- self.assertEqual(c.run_capture([
- 'echo',
- 'hello']), ('hello\n', '', 0))
- self.assertEqual(c.run_capture([
- 'cat'], 'hello'), ('hello', '', 0))
- self.assertEqual(c.run_capture([
- '/bin/bash'], 'type echo'), ('echo is a shell builtin\n', '', 0))
- self.assertEqual(c.run_capture([
- '/bin/bash'], 'set -e; false'), ('', '', 1))
- (out, err, result) = c.run_capture([
- 'ls'])
- self.assertEqual(err, '')
- self.assertEqual(result, 0)
- files = out.splitlines()
- self.assert_('bin' in files)
- self.assert_('lib' in files)
- self.assertEqual(c.run_capture([
- 'bash'], "set -e\ncd /\nmkdir test\necho world > test/file\nln -s file test/link\ncat test/file\ncat test/link\nstat -c '%f %s %n' test/file\nstat -c '%f %n' test/link\nstat -L -c '%f %s %n' test/link\nchmod 600 test/file\nchown 0:0 test/file\nstat -c '%f %s %n' test/file\nrm test/link\nrm test/file\nrmdir test\nmkdir test\necho world > test/file\nrm -r test\n! test -e test\n"), ('world\nworld\n81a4 6 test/file\na1ff test/link\n81a4 6 test/link\n8180 6 test/file\n', '', 0))
- self.assertEqual(c.run_capture([
- 'bash'], "set -e\ncd /\nln -s echo bin/eco\nstat -c '%f %n' bin/eco\nstat -L -c '%f %n' bin/eco\neco -n hello\nrm bin/eco\n"), ('a1ff bin/eco\n81ed bin/eco\nhello', '', 0))
- self.assertEqual(c.run_capture([
- 'bash'], "set -e\nmkdir /test\necho world > /test/file\nln -s /test/file /test/link\ncat /test/file\ncat /test/link\nstat -c '%f %s %n' /test/file\nstat -c '%f %n' /test/link\nstat -L -c '%f %s %n' /test/link\nchmod 600 /test/file\nchown 0:0 /test/file\nstat -c '%f %s %n' /test/file\nrm /test/link\nrm /test/file\nrmdir /test\nmkdir /test\necho world > /test/file\nrm -r /test\n! test -e test\n"), ('world\nworld\n81a4 6 /test/file\na1ff /test/link\n81a4 6 /test/link\n8180 6 /test/file\n', '', 0))
- self.assertEqual(c.run_capture([
- 'bash'], 'set -e\nln -s /bin/echo /bin/eco\n/bin/eco -n hello\nrm /bin/eco\n'), ('hello', '', 0))
- self.assertEqual(c.run_capture([
- 'bash'], 'set -e\ncd /\nmkdir etc\necho "/bin/bash" > etc/shells\ncp etc/shells etc/shells.tmp\ncat etc/shells.tmp\nrm etc/shells.tmp\nrm etc/shells\nrmdir etc\n'), ('/bin/bash\n', '', 0))
- self.assertEqual(c.run_capture([
- 'bash'], 'set -e\nmkdir /etc\necho "/bin/bash" > /etc/shells\ncp /etc/shells /etc/shells.tmp\ncat /etc/shells.tmp\nrm /etc/shells.tmp\nrm /etc/shells\nrmdir /etc\n'), ('/bin/bash\n', '', 0))
- finally:
- shutil.rmtree(d)
-
-
-
- unittest.main()
-
-