home *** CD-ROM | disk | FTP | other *** search
/ PC Welt 2006 November (DVD) / PCWELT_11_2006.ISO / casper / filesystem.squashfs / usr / lib / python2.4 / site-packages / problem_report.py < prev    next >
Encoding:
Python Source  |  2006-08-25  |  10.4 KB  |  427 lines

  1. '''Store, load, and handle problem reports.
  2.  
  3. Copyright (C) 2006 Canonical Ltd.
  4. Author: Martin Pitt <martin.pitt@ubuntu.com>
  5.  
  6. This program is free software; you can redistribute it and/or modify it
  7. under the terms of the GNU General Public License as published by the
  8. Free Software Foundation; either version 2 of the License, or (at your
  9. option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
  10. the full text of the license.
  11. '''
  12.  
  13. import bz2, base64, time, UserDict
  14.  
  15. class ProblemReport(UserDict.IterableUserDict):
  16.     def __init__(self, type = 'Crash', date = None):
  17.     '''Initialize a fresh problem report.
  18.     
  19.     type can be 'Crash', 'Packaging', or 'Kernel'. date is the desired
  20.     date/time string; if None (default), the current local time is used. '''
  21.  
  22.     if date == None:
  23.         date = time.asctime()
  24.     self.data = {'ProblemType': type, 'Date': date}
  25.  
  26.     def    load(self, file, binary=True):
  27.     '''Initialize problem report from a file-like object, using Debian
  28.     control file format.
  29.     
  30.     if binary is False, binary data is not loaded; the dictionary key is
  31.     created, but its value will be an empty string.'''
  32.  
  33.     self.data.clear()
  34.     key = None
  35.     value = None
  36.         b64_block = False
  37.     for line in file:
  38.         # continuation line
  39.         if line.startswith(' '):
  40.         if b64_block and not binary:
  41.             continue
  42.         assert (key != None and value != None)
  43.         if b64_block:
  44.             value += bd.decompress(base64.b64decode(line))
  45.         else:
  46.             if len(value) > 0:
  47.             value += '\n'
  48.             value += line[1:-1]
  49.         else:
  50.         if b64_block:
  51.             b64_block = False
  52.             bd = None
  53.         if key:
  54.             assert value != None
  55.             self.data[key] = value
  56.         (key, value) = line.split(':', 1)
  57.         value = value.strip()
  58.         if value == 'base64':
  59.             value = ''
  60.             b64_block = True
  61.             if binary:
  62.             bd = bz2.BZ2Decompressor()
  63.  
  64.     if key != None:
  65.         self.data[key] = value
  66.  
  67.     def has_removed_fields(self):
  68.     '''Check whether the report has any keys which were not loaded in load()
  69.     due to being compressed binary.'''
  70.  
  71.     return ('' in self.itervalues())
  72.  
  73.     def _is_binary(self, string):
  74.     '''Check if the given strings contains binary data.'''
  75.  
  76.     for c in string:
  77.         if c < ' ' and not c.isspace():
  78.         return True
  79.     return False
  80.  
  81.     def write(self, file):
  82.     '''Write information into the given file-like object, using Debian
  83.     control file format.
  84.  
  85.     If a value is a string, it is written directly. Otherwise it must be a
  86.     tuple containing a string, and an optional boolean value (in that
  87.     order); the first argument is interpreted as a file name, which will be
  88.     read and its content will become the value of this key.
  89.  
  90.     The second argument specifies whether the contents will be
  91.     bzip2'ed and base64-encoded (this defaults to True).
  92.     '''
  93.  
  94.     keys = self.data.keys()
  95.     keys.remove('ProblemType')
  96.     keys.sort()
  97.     keys.insert(0, 'ProblemType')
  98.     for k in keys:
  99.         v = self.data[k]
  100.         # if it's a string, copy it
  101.         if hasattr(v, 'find'):
  102.         if self._is_binary(v):
  103.             file.write (k + ': base64\n ')
  104.             bc = bz2.BZ2Compressor(9)
  105.             outblock = bc.compress(v)
  106.             if outblock:
  107.             file.write(base64.b64encode(outblock))
  108.             file.write('\n ')
  109.             file.write(base64.b64encode(bc.flush()))
  110.             file.write('\n')
  111.         elif v.find('\n') >= 0:
  112.             assert v.find('\n\n') < 0
  113.             print >> file, k + ':'
  114.             print >> file, '', v.replace('\n', '\n ')
  115.         else:
  116.             print >> file, k + ':', v
  117.         # if it's a tuple, it contains a file name, bzip2/base64-encode it
  118.         # (unless explicitly forced not to)
  119.         else:
  120.         f = open(v[0])
  121.         if len(v) >= 2 and not v[1]: # force uncompressed
  122.             v = f.read()
  123.             if v.find('\n') >= 0:
  124.             assert v.find('\n\n') < 0
  125.             print >> file, k + ':'
  126.             print >> file, '', v.replace('\n', '\n ')
  127.             else:
  128.             print >> file, k + ':', v
  129.         else:
  130.             print >> file, k + ': base64'
  131.             file.write(' ')
  132.             bc = bz2.BZ2Compressor(9)
  133.             while True:
  134.             block = f.read(512*1024)
  135.             if block:
  136.                 outblock = bc.compress(block)
  137.                 if outblock:
  138.                 file.write(base64.b64encode(outblock))
  139.                 file.write('\n ')
  140.             else:
  141.                 file.write(base64.b64encode(bc.flush()))
  142.                 file.write('\n')
  143.                 break
  144.  
  145.     def __setitem__(self, k, v):
  146.     assert hasattr(k, 'isalnum')
  147.     assert k.isalnum()
  148.     # value must be a string or a file reference (tuple (string [, bool]))
  149.     assert (hasattr(v, 'isalnum') or 
  150.         (hasattr(v, '__getitem__') and (
  151.         len(v) == 1 or (len(v) == 2 and v[1] in (True, False)))
  152.         and hasattr(v[0], 'isalnum')))
  153.  
  154.     return self.data.__setitem__(k, v)
  155.  
  156.  
  157. #
  158. # Unit test
  159. #
  160.  
  161. import unittest, StringIO, tempfile, os
  162.  
  163. class _ProblemReportTest(unittest.TestCase):
  164.     def test_basic_operations(self):
  165.     '''Test basic creation and operation.'''
  166.  
  167.     pr = ProblemReport()
  168.     pr['foo'] = 'bar'
  169.     pr['bar'] = ' foo   bar\nbaz\n   blip  '
  170.     self.assertEqual(pr['foo'], 'bar')
  171.     self.assertEqual(pr['bar'], ' foo   bar\nbaz\n   blip  ')
  172.     self.assertEqual(pr['ProblemType'], 'Crash')
  173.     self.assert_(time.strptime(pr['Date']))
  174.  
  175.     def test_ctor_arguments(self):
  176.     '''Test non-default constructor arguments.'''
  177.  
  178.     pr = ProblemReport('Kernel')
  179.     self.assertEqual(pr['ProblemType'], 'Kernel')
  180.     pr = ProblemReport(date = '19801224 12:34')
  181.     self.assertEqual(pr['Date'], '19801224 12:34')
  182.  
  183.     def test_sanity_checks(self):
  184.     '''Test various error conditions.'''
  185.  
  186.     pr = ProblemReport()
  187.     self.assertRaises(AssertionError, pr.__setitem__, 'a b', '1')
  188.     self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
  189.     self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
  190.     self.assertRaises(AssertionError, pr.__setitem__, 'a', (1,))
  191.     self.assertRaises(AssertionError, pr.__setitem__, 'a', ('/tmp/nonexistant', ''))
  192.     self.assertRaises(KeyError, pr.__getitem__, 'Nonexistant')
  193.  
  194.     def test_write(self):
  195.     '''Test write() and proper formatting.'''
  196.  
  197.     pr = ProblemReport(date = 'now!')
  198.     pr['Simple'] = 'bar'
  199.     pr['WhiteSpace'] = ' foo   bar\nbaz\n  blip  '
  200.     io = StringIO.StringIO()
  201.     pr.write(io)
  202.     self.assertEqual(io.getvalue(), 
  203. '''ProblemType: Crash
  204. Date: now!
  205. Simple: bar
  206. WhiteSpace:
  207.   foo   bar
  208.  baz
  209.    blip  
  210. ''')
  211.  
  212.     def test_load(self):
  213.     '''Test load() with various formatting.'''
  214.     pr = ProblemReport()
  215.     pr.load(StringIO.StringIO(
  216. '''ProblemType: Crash
  217. Date: now!
  218. Simple: bar
  219. WhiteSpace:
  220.   foo   bar
  221.  baz
  222.    blip  
  223. '''))
  224.     self.assertEqual(pr['ProblemType'], 'Crash')
  225.     self.assertEqual(pr['Date'], 'now!')
  226.     self.assertEqual(pr['Simple'], 'bar')
  227.     self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  228.  
  229.     # test last field a bit more
  230.     pr.load(StringIO.StringIO(
  231. '''ProblemType: Crash
  232. Date: now!
  233. Simple: bar
  234. WhiteSpace:
  235.   foo   bar
  236.  baz
  237.    blip  
  238.  
  239. '''))
  240.     self.assertEqual(pr['ProblemType'], 'Crash')
  241.     self.assertEqual(pr['Date'], 'now!')
  242.     self.assertEqual(pr['Simple'], 'bar')
  243.     self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  \n')
  244.     pr = ProblemReport()
  245.     pr.load(StringIO.StringIO(
  246. '''ProblemType: Crash
  247. WhiteSpace:
  248.   foo   bar
  249.  baz
  250.    blip  
  251. Last: foo
  252. '''))
  253.     self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  254.     self.assertEqual(pr['Last'], 'foo')
  255.  
  256.     pr.load(StringIO.StringIO(
  257. '''ProblemType: Crash
  258. WhiteSpace:
  259.   foo   bar
  260.  baz
  261.    blip  
  262. Last: foo
  263.  
  264. '''))
  265.     self.assertEqual(pr['WhiteSpace'], ' foo   bar\nbaz\n  blip  ')
  266.     self.assertEqual(pr['Last'], 'foo\n')
  267.  
  268.     # test that load() cleans up properly
  269.     pr.load(StringIO.StringIO('ProblemType: Crash'))
  270.     self.assertEqual(pr.keys(), ['ProblemType'])
  271.  
  272.     def test_write_file(self):
  273.     '''Test writing a report with binary file data.'''
  274.  
  275.     temp = tempfile.NamedTemporaryFile()
  276.     temp.write('AB' * 10 + '\0' * 10 + 'Z')
  277.     temp.flush()
  278.  
  279.     pr = ProblemReport(date = 'now!')
  280.     pr['File'] = (temp.name,)
  281.     io = StringIO.StringIO()
  282.     pr.write(io)
  283.     temp.close()
  284.  
  285.     self.assertEqual(io.getvalue(), 
  286. '''ProblemType: Crash
  287. Date: now!
  288. File: base64
  289.  QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=
  290. ''')
  291.  
  292.     # force compression/encoding bool
  293.     temp = tempfile.NamedTemporaryFile()
  294.     temp.write('foo\0bar')
  295.     temp.flush()
  296.     pr = ProblemReport(date = 'now!')
  297.     pr['File'] = (temp.name, False)
  298.     io = StringIO.StringIO()
  299.     pr.write(io)
  300.  
  301.     self.assertEqual(io.getvalue(), 
  302. '''ProblemType: Crash
  303. Date: now!
  304. File: foo\0bar
  305. ''')
  306.  
  307.     pr['File'] = (temp.name, True)
  308.     io = StringIO.StringIO()
  309.     pr.write(io)
  310.  
  311.     self.assertEqual(io.getvalue(), 
  312. '''ProblemType: Crash
  313. Date: now!
  314. File: base64
  315.  QlpoOTFBWSZTWQ7a+J8AAAHBgEAAMQCQACAAIhhoMAsZAwu5IpwoSAdtfE+A
  316. ''')
  317.     temp.close()
  318.  
  319.     def test_read_file(self):
  320.     '''Test reading a report with binary data.'''
  321.  
  322.     bin_report = '''ProblemType: Crash
  323. Date: now!
  324. File: base64
  325.  QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=
  326. Foo: Bar
  327. '''
  328.  
  329.     # test with reading everything
  330.     pr = ProblemReport()
  331.     pr.load(StringIO.StringIO(bin_report))
  332.     self.assertEqual(pr['File'], 'AB' * 10 + '\0' * 10 + 'Z')
  333.     self.assertEqual(pr.has_removed_fields(), False)
  334.  
  335.     # test with skipping binary data
  336.     pr.load(StringIO.StringIO(bin_report), binary=False)
  337.     self.assertEqual(pr['File'], '')
  338.     self.assertEqual(pr.has_removed_fields(), True)
  339.  
  340.     def test_big_file(self):
  341.     '''Test writing and re-decoding a big random file.'''
  342.  
  343.     # create 1 MB random file
  344.     temp = tempfile.NamedTemporaryFile()
  345.     data = os.urandom(1048576)
  346.     temp.write(data)
  347.     temp.flush()
  348.  
  349.     # write it into problem report
  350.     pr = ProblemReport()
  351.     pr['File'] = (temp.name,)
  352.     pr['Before'] = 'xtestx'
  353.     pr['ZAfter'] = 'ytesty'
  354.     io = StringIO.StringIO()
  355.     pr.write(io)
  356.     temp.close()
  357.  
  358.     # read it again
  359.     io.seek(0)
  360.     pr = ProblemReport()
  361.     pr.load(io)
  362.  
  363.     self.assert_(pr['File'] == data)
  364.     self.assertEqual(pr['Before'], 'xtestx')
  365.     self.assertEqual(pr['ZAfter'], 'ytesty')
  366.  
  367.     # write it again
  368.     io2 = StringIO.StringIO()
  369.     pr.write(io2)
  370.     self.assertEqual(io.getvalue(), io2.getvalue())
  371.  
  372.     def test_iter(self):
  373.     '''Test ProblemReport iteration.'''
  374.  
  375.     pr = ProblemReport()
  376.     pr['foo'] = 'bar'
  377.  
  378.     keys = []
  379.     for k in pr:
  380.         keys.append(k)
  381.     keys.sort()
  382.     self.assertEqual(' '.join(keys), 'Date ProblemType foo')
  383.  
  384.     self.assertEqual(len([k for k in pr if k != 'foo']), 2)
  385.  
  386.     def test_modify(self):
  387.     '''Test reading, modifying fields, and writing back.'''
  388.  
  389.     report = '''ProblemType: Crash
  390. Date: now!
  391. File: base64
  392.  QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=
  393. Long:
  394.  xxx
  395.  .
  396.  yyy
  397. Short: Bar
  398. '''
  399.  
  400.     pr = ProblemReport()
  401.     pr.load(StringIO.StringIO(report))
  402.  
  403.     self.assertEqual(pr['Long'], 'xxx\n.\nyyy')
  404.  
  405.     # write back unmodified
  406.     io = StringIO.StringIO()
  407.     pr.write(io)
  408.     self.assertEqual(io.getvalue(), report)
  409.  
  410.     pr['Short'] = 'aaa\nbbb'
  411.     pr['Long'] = '123'
  412.     io = StringIO.StringIO()
  413.     pr.write(io)
  414.     self.assertEqual(io.getvalue(), 
  415. '''ProblemType: Crash
  416. Date: now!
  417. File: base64
  418.  QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=
  419. Long: 123
  420. Short:
  421.  aaa
  422.  bbb
  423. ''')
  424.  
  425. if __name__ == '__main__':
  426.     unittest.main()
  427.