home *** CD-ROM | disk | FTP | other *** search
/ PC Welt 2006 November (DVD) / PCWELT_11_2006.ISO / casper / filesystem.squashfs / usr / lib / python2.4 / site-packages / problem_report.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2006-08-31  |  11.4 KB  |  343 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. '''Store, load, and handle problem reports.
  5.  
  6. Copyright (C) 2006 Canonical Ltd.
  7. Author: Martin Pitt <martin.pitt@ubuntu.com>
  8.  
  9. This program is free software; you can redistribute it and/or modify it
  10. under the terms of the GNU General Public License as published by the
  11. Free Software Foundation; either version 2 of the License, or (at your
  12. option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
  13. the full text of the license.
  14. '''
  15. import bz2
  16. import base64
  17. import time
  18. import UserDict
  19.  
  20. class ProblemReport(UserDict.IterableUserDict):
  21.     
  22.     def __init__(self, type = 'Crash', date = None):
  23.         """Initialize a fresh problem report.
  24. \t
  25. \ttype can be 'Crash', 'Packaging', or 'Kernel'. date is the desired
  26. \tdate/time string; if None (default), the current local time is used. """
  27.         if date == None:
  28.             date = time.asctime()
  29.         
  30.         self.data = {
  31.             'ProblemType': type,
  32.             'Date': date }
  33.  
  34.     
  35.     def load(self, file, binary = True):
  36.         '''Initialize problem report from a file-like object, using Debian
  37. \tcontrol file format.
  38. \t
  39. \tif binary is False, binary data is not loaded; the dictionary key is
  40. \tcreated, but its value will be an empty string.'''
  41.         self.data.clear()
  42.         key = None
  43.         value = None
  44.         b64_block = False
  45.         for line in file:
  46.             if line.startswith(' '):
  47.                 if b64_block and not binary:
  48.                     continue
  49.                 
  50.                 if not key != None or value != None:
  51.                     raise AssertionError
  52.                 if b64_block:
  53.                     value += bd.decompress(base64.b64decode(line))
  54.                 elif len(value) > 0:
  55.                     value += '\n'
  56.                 
  57.                 value += line[1:-1]
  58.                 continue
  59.             if b64_block:
  60.                 b64_block = False
  61.                 bd = None
  62.             
  63.             if key:
  64.                 if not value != None:
  65.                     raise AssertionError
  66.                 self.data[key] = value
  67.             
  68.             (key, value) = line.split(':', 1)
  69.             value = value.strip()
  70.             if value == 'base64':
  71.                 value = ''
  72.                 b64_block = True
  73.                 if binary:
  74.                     bd = bz2.BZ2Decompressor()
  75.                 
  76.             binary
  77.         
  78.         if key != None:
  79.             self.data[key] = value
  80.         
  81.  
  82.     
  83.     def has_removed_fields(self):
  84.         '''Check whether the report has any keys which were not loaded in load()
  85. \tdue to being compressed binary.'''
  86.         return '' in self.itervalues()
  87.  
  88.     
  89.     def _is_binary(self, string):
  90.         '''Check if the given strings contains binary data.'''
  91.         for c in string:
  92.             if c < ' ' and not c.isspace():
  93.                 return True
  94.                 continue
  95.         
  96.         return False
  97.  
  98.     
  99.     def write(self, file):
  100.         """Write information into the given file-like object, using Debian
  101. \tcontrol file format.
  102.  
  103. \tIf a value is a string, it is written directly. Otherwise it must be a
  104. \ttuple containing a string, and an optional boolean value (in that
  105. \torder); the first argument is interpreted as a file name, which will be
  106. \tread and its content will become the value of this key.
  107.  
  108. \tThe second argument specifies whether the contents will be
  109. \tbzip2'ed and base64-encoded (this defaults to True).
  110. \t"""
  111.         keys = self.data.keys()
  112.         keys.remove('ProblemType')
  113.         keys.sort()
  114.         keys.insert(0, 'ProblemType')
  115.         for k in keys:
  116.             v = self.data[k]
  117.             if hasattr(v, 'find'):
  118.                 if self._is_binary(v):
  119.                     file.write(k + ': base64\n ')
  120.                     bc = bz2.BZ2Compressor(9)
  121.                     outblock = bc.compress(v)
  122.                     if outblock:
  123.                         file.write(base64.b64encode(outblock))
  124.                         file.write('\n ')
  125.                     
  126.                     file.write(base64.b64encode(bc.flush()))
  127.                     file.write('\n')
  128.                 elif v.find('\n') >= 0:
  129.                     if not v.find('\n\n') < 0:
  130.                         raise AssertionError
  131.                     print >>file, k + ':'
  132.                     print >>file, '', v.replace('\n', '\n ')
  133.                 else:
  134.                     print >>file, k + ':', v
  135.             self._is_binary(v)
  136.             f = open(v[0])
  137.             if len(v) >= 2 and not v[1]:
  138.                 v = f.read()
  139.                 if v.find('\n') >= 0:
  140.                     if not v.find('\n\n') < 0:
  141.                         raise AssertionError
  142.                     print >>file, k + ':'
  143.                     print >>file, '', v.replace('\n', '\n ')
  144.                 else:
  145.                     print >>file, k + ':', v
  146.             v.find('\n') >= 0
  147.             print >>file, k + ': base64'
  148.             file.write(' ')
  149.             bc = bz2.BZ2Compressor(9)
  150.             while True:
  151.                 block = f.read(512 * 1024)
  152.                 if block:
  153.                     outblock = bc.compress(block)
  154.                     if outblock:
  155.                         file.write(base64.b64encode(outblock))
  156.                         file.write('\n ')
  157.                     
  158.                 outblock
  159.                 file.write(base64.b64encode(bc.flush()))
  160.                 file.write('\n')
  161.                 break
  162.         
  163.  
  164.     
  165.     def __setitem__(self, k, v):
  166.         if not hasattr(k, 'isalnum'):
  167.             raise AssertionError
  168.         if not k.isalnum():
  169.             raise AssertionError
  170.         if not hasattr(v, 'isalnum'):
  171.             if hasattr(v, '__getitem__'):
  172.                 if not len(v) == 1 or len(v) == 2 or v[1] in (True, False) or hasattr(v[0], 'isalnum'):
  173.                     raise AssertionError
  174.         return self.data.__setitem__(k, v)
  175.  
  176.  
  177. import unittest
  178. import StringIO
  179. import tempfile
  180. import os
  181.  
  182. class _ProblemReportTest(unittest.TestCase):
  183.     
  184.     def test_basic_operations(self):
  185.         '''Test basic creation and operation.'''
  186.         pr = ProblemReport()
  187.         pr['foo'] = 'bar'
  188.         pr['bar'] = ' foo   bar\nbaz\n   blip  '
  189.         self.assertEqual(pr['foo'], 'bar')
  190.         self.assertEqual(pr['bar'], ' foo   bar\nbaz\n   blip  ')
  191.         self.assertEqual(pr['ProblemType'], 'Crash')
  192.         self.assert_(time.strptime(pr['Date']))
  193.  
  194.     
  195.     def test_ctor_arguments(self):
  196.         '''Test non-default constructor arguments.'''
  197.         pr = ProblemReport('Kernel')
  198.         self.assertEqual(pr['ProblemType'], 'Kernel')
  199.         pr = ProblemReport(date = '19801224 12:34')
  200.         self.assertEqual(pr['Date'], '19801224 12:34')
  201.  
  202.     
  203.     def test_sanity_checks(self):
  204.         '''Test various error conditions.'''
  205.         pr = ProblemReport()
  206.         self.assertRaises(AssertionError, pr.__setitem__, 'a b', '1')
  207.         self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
  208.         self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
  209.         self.assertRaises(AssertionError, pr.__setitem__, 'a', (1,))
  210.         self.assertRaises(AssertionError, pr.__setitem__, 'a', ('/tmp/nonexistant', ''))
  211.         self.assertRaises(KeyError, pr.__getitem__, 'Nonexistant')
  212.  
  213.     
  214.     def test_write(self):
  215.         '''Test write() and proper formatting.'''
  216.         pr = ProblemReport(date = 'now!')
  217.         pr['Simple'] = 'bar'
  218.         pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  '
  219.         io = StringIO.StringIO()
  220.         pr.write(io)
  221.         self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nSimple: bar\nWhiteSpace:\n  foo   bar\n baz\n   blip  \n')
  222.  
  223.     
  224.     def test_load(self):
  225.         '''Test load() with various formatting.'''
  226.         pr = ProblemReport()
  227.         pr.load(StringIO.StringIO('ProblemType: Crash\nDate: now!\nSimple: bar\nWhiteSpace:\n  foo   bar\n baz\n   blip  \n'))
  228.         self.assertEqual(pr['ProblemType'], 'Crash')
  229.         self.assertEqual(pr['Date'], 'now!')
  230.         self.assertEqual(pr['Simple'], 'bar')
  231.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  232.         pr.load(StringIO.StringIO('ProblemType: Crash\nDate: now!\nSimple: bar\nWhiteSpace:\n  foo   bar\n baz\n   blip  \n \n'))
  233.         self.assertEqual(pr['ProblemType'], 'Crash')
  234.         self.assertEqual(pr['Date'], 'now!')
  235.         self.assertEqual(pr['Simple'], 'bar')
  236.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  \n')
  237.         pr = ProblemReport()
  238.         pr.load(StringIO.StringIO('ProblemType: Crash\nWhiteSpace:\n  foo   bar\n baz\n   blip  \nLast: foo\n'))
  239.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  240.         self.assertEqual(pr['Last'], 'foo')
  241.         pr.load(StringIO.StringIO('ProblemType: Crash\nWhiteSpace:\n  foo   bar\n baz\n   blip  \nLast: foo\n \n'))
  242.         self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  243.         self.assertEqual(pr['Last'], 'foo\n')
  244.         pr.load(StringIO.StringIO('ProblemType: Crash'))
  245.         self.assertEqual(pr.keys(), [
  246.             'ProblemType'])
  247.  
  248.     
  249.     def test_write_file(self):
  250.         '''Test writing a report with binary file data.'''
  251.         temp = tempfile.NamedTemporaryFile()
  252.         temp.write('AB' * 10 + '\x00' * 10 + 'Z')
  253.         temp.flush()
  254.         pr = ProblemReport(date = 'now!')
  255.         pr['File'] = (temp.name,)
  256.         io = StringIO.StringIO()
  257.         pr.write(io)
  258.         temp.close()
  259.         self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\n')
  260.         temp = tempfile.NamedTemporaryFile()
  261.         temp.write('foo\x00bar')
  262.         temp.flush()
  263.         pr = ProblemReport(date = 'now!')
  264.         pr['File'] = (temp.name, False)
  265.         io = StringIO.StringIO()
  266.         pr.write(io)
  267.         self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: foo\x00bar\n')
  268.         pr['File'] = (temp.name, True)
  269.         io = StringIO.StringIO()
  270.         pr.write(io)
  271.         self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWQ7a+J8AAAHBgEAAMQCQACAAIhhoMAsZAwu5IpwoSAdtfE+A\n')
  272.         temp.close()
  273.  
  274.     
  275.     def test_read_file(self):
  276.         '''Test reading a report with binary data.'''
  277.         bin_report = 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\nFoo: Bar\n'
  278.         pr = ProblemReport()
  279.         pr.load(StringIO.StringIO(bin_report))
  280.         self.assertEqual(pr['File'], 'AB' * 10 + '\x00' * 10 + 'Z')
  281.         self.assertEqual(pr.has_removed_fields(), False)
  282.         pr.load(StringIO.StringIO(bin_report), binary = False)
  283.         self.assertEqual(pr['File'], '')
  284.         self.assertEqual(pr.has_removed_fields(), True)
  285.  
  286.     
  287.     def test_big_file(self):
  288.         '''Test writing and re-decoding a big random file.'''
  289.         temp = tempfile.NamedTemporaryFile()
  290.         data = os.urandom(1048576)
  291.         temp.write(data)
  292.         temp.flush()
  293.         pr = ProblemReport()
  294.         pr['File'] = (temp.name,)
  295.         pr['Before'] = 'xtestx'
  296.         pr['ZAfter'] = 'ytesty'
  297.         io = StringIO.StringIO()
  298.         pr.write(io)
  299.         temp.close()
  300.         io.seek(0)
  301.         pr = ProblemReport()
  302.         pr.load(io)
  303.         self.assert_(pr['File'] == data)
  304.         self.assertEqual(pr['Before'], 'xtestx')
  305.         self.assertEqual(pr['ZAfter'], 'ytesty')
  306.         io2 = StringIO.StringIO()
  307.         pr.write(io2)
  308.         self.assertEqual(io.getvalue(), io2.getvalue())
  309.  
  310.     
  311.     def test_iter(self):
  312.         '''Test ProblemReport iteration.'''
  313.         pr = ProblemReport()
  314.         pr['foo'] = 'bar'
  315.         keys = []
  316.         for k in pr:
  317.             keys.append(k)
  318.         
  319.         keys.sort()
  320.         self.assertEqual(' '.join(keys), 'Date ProblemType foo')
  321.         []([](_[1]), 2)
  322.  
  323.     
  324.     def test_modify(self):
  325.         '''Test reading, modifying fields, and writing back.'''
  326.         report = 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\nLong:\n xxx\n .\n yyy\nShort: Bar\n'
  327.         pr = ProblemReport()
  328.         pr.load(StringIO.StringIO(report))
  329.         self.assertEqual(pr['Long'], 'xxx\n.\nyyy')
  330.         io = StringIO.StringIO()
  331.         pr.write(io)
  332.         self.assertEqual(io.getvalue(), report)
  333.         pr['Short'] = 'aaa\nbbb'
  334.         pr['Long'] = '123'
  335.         io = StringIO.StringIO()
  336.         pr.write(io)
  337.         self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\nLong: 123\nShort:\n aaa\n bbb\n')
  338.  
  339.  
  340. if __name__ == '__main__':
  341.     unittest.main()
  342.  
  343.