home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / lib / python2.6 / dist-packages / apport / chroot.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-10-12  |  15.2 KB  |  435 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. '''Class for representing and working with chroots.'''
  5. import subprocess
  6. import tempfile
  7. import os
  8. import os.path as os
  9. import shutil
  10.  
  11. def setup_fakeroot_env():
  12.     '''Set up a fakeroot/fakechroot environment.
  13.  
  14.     This needs the libraries /usr/lib/libfakeroot/libfakeroot-sysv.so and
  15.     /usr/lib/fakechroot/libfakechroot.so; these paths can be overridden
  16.     with the environment variables APPORT_LIBFAKEROOT and
  17.     APPORT_LIBFAKECHROOT.'''
  18.     libfakeroot = os.environ.get('APPORT_LIBFAKEROOT', '/usr/lib/libfakeroot/libfakeroot-sysv.so')
  19.     if not os.path.exists(libfakeroot):
  20.         raise AssertionError, '%s not found; please set APPORT_LIBFAKEROOT correctly' % libfakeroot
  21.     libfakechroot = os.environ.get('APPORT_LIBFAKECHROOT', '/usr/lib/fakechroot/libfakechroot.so')
  22.     if not os.path.exists(libfakechroot):
  23.         raise AssertionError, '%s not found; please set APPORT_LIBFAKECHROOT correctly' % libfakechroot
  24.     os.environ['LD_PRELOAD'] = '%s %s %s' % (libfakechroot, libfakeroot, os.environ.get('LD_PRELOAD', ''))
  25.     os.environ['LD_LIBRARY_PATH'] = '/lib:' + os.environ.get('LD_LIBRARY_PATH', '')
  26.     os.environ['FAKECHROOT'] = 'true'
  27.  
  28.  
  29. def pathsplit(p, rest = []):
  30.     (h, t) = os.path.split(p)
  31.     if len(h) < 1:
  32.         return [
  33.             t] + rest
  34.     if len(t) < 1:
  35.         return [
  36.             h] + rest
  37.     return pathsplit(h, [
  38.         t] + rest)
  39.  
  40.  
  41. def commonpath(l1, l2, common = []):
  42.     if len(l1) < 1:
  43.         return (common, l1, l2)
  44.     if len(l2) < 1:
  45.         return (common, l1, l2)
  46.     if l1[0] != l2[0]:
  47.         return (common, l1, l2)
  48.     return commonpath(l1[1:], l2[1:], common + [
  49.         l1[0]])
  50.  
  51.  
  52. def relpath(p1, p2):
  53.     (common, l1, l2) = commonpath(pathsplit(p1), pathsplit(p2))
  54.     p = []
  55.     if len(l1) > 0:
  56.         p = [
  57.             '../' * len(l1)]
  58.     
  59.     p = p + l2
  60.     return os.path.join(*p)
  61.  
  62.  
  63. class Chroot:
  64.     '''Work with a chroot (either in directory or in tarball form).
  65.  
  66.     If called as non-root user, this calls setup_fakeroot_env() to use the
  67.     fakeroot/fakechroot libraries.'''
  68.     
  69.     def __init__(self, root):
  70.         '''Bind to a chroot, which can either be a directory, a tarball, or
  71.         None to work in the main system.
  72.  
  73.         If a tarball is given, then it gets unpacked into a temporary directory
  74.         which is cleaned up at program termination.'''
  75.         self.remove = False
  76.         if os.geteuid() != 0:
  77.             setup_fakeroot_env()
  78.         
  79.         self.root_tarball = None
  80.         if root is None:
  81.             self.root = None
  82.         elif os.path.isdir(root):
  83.             self.root = root
  84.         elif not os.path.isfile(root):
  85.             raise AssertionError
  86.         self.root_tarball = root
  87.         self.root = tempfile.mkdtemp()
  88.         self.remove = True
  89.         if not subprocess.call([
  90.             'tar',
  91.             '-C',
  92.             self.root,
  93.             '-xzf',
  94.             root]) == 0:
  95.             raise AssertionError
  96.  
  97.     
  98.     def __del__(self):
  99.         if hasattr(self, 'remove') and self.remove:
  100.             shutil.rmtree(self.root)
  101.         
  102.  
  103.     
  104.     def tar(self, tarball = None):
  105.         '''Create a tarball from the chroot.
  106.  
  107.         If tarball does not specify a .tar.gz path, then the Chroot must have
  108.         been created from a tarball, and that tarball is updated.'''
  109.         if not tarball:
  110.             if not self.root_tarball:
  111.                 raise AssertionError
  112.             tarball = self.root_tarball
  113.         
  114.         f = open(tarball, 'w')
  115.         orig_cwd = os.getcwd()
  116.         
  117.         try:
  118.             os.chdir(self.root)
  119.             tar = subprocess.Popen([
  120.                 'tar',
  121.                 'cp',
  122.                 '.'], stdout = subprocess.PIPE)
  123.             gz = subprocess.Popen([
  124.                 'gzip',
  125.                 '-9'], stdin = tar.stdout, stdout = f)
  126.             if not tar.wait() == 0:
  127.                 raise AssertionError
  128.             if not gz.wait() == 0:
  129.                 raise AssertionError
  130.             f.close()
  131.         finally:
  132.             os.chdir(orig_cwd)
  133.  
  134.  
  135.     
  136.     def _exec_capture(self, argv, stdin = None):
  137.         '''Internal helper function to wrap subprocess.Popen() and return a
  138.         triple (stdout, stderr, returncode).'''
  139.         if stdin:
  140.             p = subprocess.Popen(argv, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
  141.             (out, err) = p.communicate(stdin)
  142.         else:
  143.             p = subprocess.Popen(argv, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
  144.             (out, err) = p.communicate()
  145.         return (out, err, p.returncode)
  146.  
  147.     
  148.     def run(self, argv):
  149.         '''Execute the given commandline vector in the chroot and return the
  150.         exit code.'''
  151.         if self.root:
  152.             return subprocess.call([
  153.                 'chroot',
  154.                 self.root] + argv)
  155.         return subprocess.call(argv)
  156.  
  157.     
  158.     def run_capture(self, argv, stdin = None):
  159.         '''Execute the given command line vector in the chroot and return a
  160.         triple (stdout, stderr, exit code).'''
  161.         if self.root:
  162.             return self._exec_capture([
  163.                 'chroot',
  164.                 self.root] + argv, stdin)
  165.         return self._exec_capture(argv, stdin)
  166.  
  167.     
  168.     def fix_symlinks(self):
  169.         """Remove root prefix from symbolic links in chroot directory,
  170.         otherwise chroot tarballs don't work at all, and we cannot move/rename
  171.         chroot directories."""
  172.         for r, dirs, files in os.walk(self.root):
  173.             for f in files:
  174.                 path = os.path.join(r, f)
  175.                 if os.path.islink(path):
  176.                     target = os.readlink(path)
  177.                     if target.startswith(self.root):
  178.                         os.unlink(path)
  179.                         os.symlink(relpath(os.path.dirname(path), target), path)
  180.                     
  181.                 target.startswith(self.root)
  182.             
  183.         
  184.  
  185.  
  186. if __name__ == '__main__':
  187.     import unittest
  188.     import os
  189.     import tarfile
  190.     import re
  191.     import shutil
  192.     
  193.     class ChrootTest(unittest.TestCase):
  194.         
  195.         def test_null(self):
  196.             '''null chroot (working in the main system)'''
  197.             c = Chroot(None)
  198.             self.assertEqual(c.root_tarball, None)
  199.             self.assertEqual(c.run([
  200.                 '/bin/sh',
  201.                 '-c',
  202.                 'exit 42']), 42)
  203.             (out, err, ret) = c.run_capture([
  204.                 '/bin/ls',
  205.                 '/bin/ls'])
  206.             self.assertEqual(ret, 0)
  207.             self.assertEqual(out, '/bin/ls\n')
  208.             self.assertEqual(err, '')
  209.             (out, err, ret) = c.run_capture([
  210.                 '/bin/ls',
  211.                 '/nonexisting/gibberish'])
  212.             self.assertNotEqual(ret, 0)
  213.             self.assertEqual(out, '')
  214.             self.assertNotEqual(err, '')
  215.  
  216.         
  217.         def _mkchroot(self):
  218.             '''Return a test chroot dir with /bin/hello and /bin/42.'''
  219.             d = tempfile.mkdtemp()
  220.             bindir = os.path.join(d, 'bin')
  221.             os.mkdir(bindir)
  222.             open(os.path.join(d, 'hello.c'), 'w').write('\n#include <stdio.h>\nint main() { puts("hello"); return 0; }\n')
  223.             open(os.path.join(d, '42.c'), 'w').write('\nint main() { return 42; }\n')
  224.             if not subprocess.call([
  225.                 'cc',
  226.                 '-static',
  227.                 os.path.join(d, 'hello.c'),
  228.                 '-o',
  229.                 os.path.join(bindir, 'hello')]) == 0:
  230.                 raise AssertionError
  231.             if not subprocess.call([
  232.                 'cc',
  233.                 '-static',
  234.                 os.path.join(d, '42.c'),
  235.                 '-o',
  236.                 os.path.join(bindir, '42')]) == 0:
  237.                 raise AssertionError
  238.             return d
  239.  
  240.         
  241.         def test_dir(self):
  242.             '''directory chroot.'''
  243.             d = self._mkchroot()
  244.             tarpath = None
  245.             
  246.             try:
  247.                 c = Chroot(d)
  248.                 self.assertEqual(c.root_tarball, None)
  249.                 self.assertEqual(c.run([
  250.                     '/bin/42']), 42)
  251.                 (out, err, ret) = c.run_capture([
  252.                     '/bin/hello'])
  253.                 self.assertEqual(ret, 0)
  254.                 self.assertEqual(out, 'hello\n')
  255.                 self.assertEqual(err, '')
  256.                 open(os.path.join(c.root, 'newfile'), 'w')
  257.                 self.assertRaises(AssertionError, c.tar)
  258.                 (fd, tarpath) = tempfile.mkstemp()
  259.                 os.close(fd)
  260.                 c.tar(tarpath)
  261.                 t = tarfile.open(tarpath)
  262.                 if not set([
  263.                     './bin/42',
  264.                     './bin/hello',
  265.                     './newfile']).issubset(set(t.getnames())):
  266.                     pass
  267.                 self.assert_(set([
  268.                     'bin/42',
  269.                     'bin/hello',
  270.                     'newfile']).issubset(set(t.getnames())))
  271.                 del c
  272.                 self.assert_(os.path.exists(os.path.join(d, 'bin', '42')), 'directory chroot should not delete the chroot')
  273.             finally:
  274.                 shutil.rmtree(d)
  275.                 if tarpath:
  276.                     os.unlink(tarpath)
  277.                 
  278.  
  279.  
  280.         
  281.         def test_tarball(self):
  282.             '''tarball chroot.'''
  283.             d = self._mkchroot()
  284.             
  285.             try:
  286.                 (fd, tar) = tempfile.mkstemp()
  287.                 os.close(fd)
  288.                 orig_cwd = os.getcwd()
  289.                 os.chdir(d)
  290.                 if not subprocess.call([
  291.                     'tar',
  292.                     'czPf',
  293.                     tar,
  294.                     '.']) == 0:
  295.                     raise AssertionError
  296.                 if not os.path.exists(tar):
  297.                     raise AssertionError
  298.                 os.chdir(orig_cwd)
  299.             finally:
  300.                 shutil.rmtree(d)
  301.  
  302.             
  303.             try:
  304.                 c = Chroot(tar)
  305.                 self.assertEqual(c.root_tarball, tar)
  306.                 self.assertEqual(c.run([
  307.                     '/bin/42']), 42)
  308.                 (out, err, ret) = c.run_capture([
  309.                     '/bin/hello'])
  310.                 self.assertEqual(ret, 0)
  311.                 self.assertEqual(out, 'hello\n')
  312.                 self.assertEqual(err, '')
  313.                 open(os.path.join(c.root, 'newfile'), 'w')
  314.                 c.tar()
  315.                 t = tarfile.open(tar)
  316.                 if not set([
  317.                     './bin/42',
  318.                     './bin/hello',
  319.                     './newfile']).issubset(set(t.getnames())):
  320.                     pass
  321.                 self.assert_(set([
  322.                     'bin/42',
  323.                     'bin/hello',
  324.                     'newfile']).issubset(set(t.getnames())))
  325.                 d = c.root
  326.                 del c
  327.                 self.assert_(not os.path.exists(d), 'tarball chroot should delete the temporary chroot')
  328.             finally:
  329.                 os.unlink(tar)
  330.  
  331.  
  332.         
  333.         def test_fix_symlinks(self):
  334.             '''symlink fixing in chroots.'''
  335.             d = self._mkchroot()
  336.             
  337.             try:
  338.                 os.symlink(os.path.join(d, 'bin', '42'), os.path.join(d, 'bin', '42prefix'))
  339.                 os.symlink(os.path.join('/bin/42'), os.path.join(d, 'bin', '42noprefix'))
  340.                 os.symlink(os.path.join('bin/42'), os.path.join(d, '42rel'))
  341.                 c = Chroot(d)
  342.                 c.fix_symlinks()
  343.                 self.assertEqual(c.run([
  344.                     '/42rel']), 42)
  345.                 self.assertEqual(c.run([
  346.                     '/bin/42prefix']), 42)
  347.                 self.assertEqual(os.readlink(os.path.join(d, 'bin', '42noprefix')), '/bin/42')
  348.             finally:
  349.                 shutil.rmtree(d)
  350.  
  351.  
  352.         
  353.         def _install_file(klass, path, root):
  354.             '''Install given file into a chroot, preserving the path.
  355.             
  356.             Do nothing if the target file already exists.'''
  357.             destpath = root + os.path.abspath(path)
  358.             if os.path.exists(destpath):
  359.                 return None
  360.             destdir = os.path.dirname(destpath)
  361.             if not os.path.isdir(destdir):
  362.                 os.makedirs(destdir)
  363.             
  364.             shutil.copy(path, destdir)
  365.  
  366.         _install_file = classmethod(_install_file)
  367.         
  368.         def _install_exe(klass, exepath, root):
  369.             '''Install an executable and all linked shlibs into a chroot.'''
  370.             klass._install_file(exepath, root)
  371.             ldd = subprocess.Popen([
  372.                 'ldd',
  373.                 exepath], stdout = subprocess.PIPE)
  374.             out = ldd.communicate()[0]
  375.             if not ldd.returncode == 0:
  376.                 raise AssertionError
  377.             for m in re.finditer(' => (/[^ ]+)', out):
  378.                 klass._install_file(m.group(1), root)
  379.             
  380.             for m in re.finditer('^\\s*(/[^ ]+)', out, re.M):
  381.                 klass._install_file(m.group(1), root)
  382.             
  383.  
  384.         _install_exe = classmethod(_install_exe)
  385.         
  386.         def test_shell_ops(self):
  387.             '''various shell operations in the chroot.'''
  388.             d = tempfile.mkdtemp()
  389.             ldir = os.path.join(d, 'lib')
  390.             os.mkdir(ldir)
  391.             if not subprocess.call('cp -a /lib/*.so ' + ldir, shell = True) == 0:
  392.                 raise AssertionError
  393.             
  394.             try:
  395.                 for cmd in ('bash', 'echo', 'cat', 'cp', 'ln', 'ls', 'rm', 'mkdir', 'rmdir', 'chmod', 'chown'):
  396.                     self._install_exe('/bin/' + cmd, d)
  397.                 
  398.                 self._install_exe('/usr/bin/stat', d)
  399.                 c = Chroot(d)
  400.                 self.assertEqual(c.run_capture([
  401.                     'echo',
  402.                     'hello']), ('hello\n', '', 0))
  403.                 self.assertEqual(c.run_capture([
  404.                     'cat'], 'hello'), ('hello', '', 0))
  405.                 self.assertEqual(c.run_capture([
  406.                     '/bin/bash'], 'type echo'), ('echo is a shell builtin\n', '', 0))
  407.                 self.assertEqual(c.run_capture([
  408.                     '/bin/bash'], 'set -e; false'), ('', '', 1))
  409.                 (out, err, result) = c.run_capture([
  410.                     'ls'])
  411.                 self.assertEqual(err, '')
  412.                 self.assertEqual(result, 0)
  413.                 files = out.splitlines()
  414.                 self.assert_('bin' in files)
  415.                 self.assert_('lib' in files)
  416.                 self.assertEqual(c.run_capture([
  417.                     'bash'], "set -e\ncd /\nmkdir test\necho world > test/file\nln -s file test/link\ncat test/file\ncat test/link\nstat -c '%f %s %n' test/file\nstat -c '%f %n' test/link\nstat -L -c '%f %s %n' test/link\nchmod 600 test/file\nchown 0:0 test/file\nstat -c '%f %s %n' test/file\nrm test/link\nrm test/file\nrmdir test\nmkdir test\necho world > test/file\nrm -r test\n! test -e test\n"), ('world\nworld\n81a4 6 test/file\na1ff test/link\n81a4 6 test/link\n8180 6 test/file\n', '', 0))
  418.                 self.assertEqual(c.run_capture([
  419.                     'bash'], "set -e\ncd /\nln -s echo bin/eco\nstat -c '%f %n' bin/eco\nstat -L -c '%f %n' bin/eco\neco -n hello\nrm bin/eco\n"), ('a1ff bin/eco\n81ed bin/eco\nhello', '', 0))
  420.                 self.assertEqual(c.run_capture([
  421.                     'bash'], "set -e\nmkdir /test\necho world > /test/file\nln -s /test/file /test/link\ncat /test/file\ncat /test/link\nstat -c '%f %s %n' /test/file\nstat -c '%f %n' /test/link\nstat -L -c '%f %s %n' /test/link\nchmod 600 /test/file\nchown 0:0 /test/file\nstat -c '%f %s %n' /test/file\nrm /test/link\nrm /test/file\nrmdir /test\nmkdir /test\necho world > /test/file\nrm -r /test\n! test -e test\n"), ('world\nworld\n81a4 6 /test/file\na1ff /test/link\n81a4 6 /test/link\n8180 6 /test/file\n', '', 0))
  422.                 self.assertEqual(c.run_capture([
  423.                     'bash'], 'set -e\nln -s /bin/echo /bin/eco\n/bin/eco -n hello\nrm /bin/eco\n'), ('hello', '', 0))
  424.                 self.assertEqual(c.run_capture([
  425.                     'bash'], 'set -e\ncd /\nmkdir etc\necho "/bin/bash" > etc/shells\ncp etc/shells etc/shells.tmp\ncat etc/shells.tmp\nrm etc/shells.tmp\nrm etc/shells\nrmdir etc\n'), ('/bin/bash\n', '', 0))
  426.                 self.assertEqual(c.run_capture([
  427.                     'bash'], 'set -e\nmkdir /etc\necho "/bin/bash" > /etc/shells\ncp /etc/shells /etc/shells.tmp\ncat /etc/shells.tmp\nrm /etc/shells.tmp\nrm /etc/shells\nrmdir /etc\n'), ('/bin/bash\n', '', 0))
  428.             finally:
  429.                 shutil.rmtree(d)
  430.  
  431.  
  432.  
  433.     unittest.main()
  434.  
  435.