home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.4)
-
- '''Store, load, and handle problem reports.
-
- Copyright (C) 2006 Canonical Ltd.
- Author: Martin Pitt <martin.pitt@ubuntu.com>
-
- This program is free software; you can redistribute it and/or modify it
- under the terms of the GNU General Public License as published by the
- Free Software Foundation; either version 2 of the License, or (at your
- option) any later version. See http://www.gnu.org/copyleft/gpl.html for
- the full text of the license.
- '''
- import bz2
- import base64
- import time
- import UserDict
-
- class ProblemReport(UserDict.IterableUserDict):
-
- def __init__(self, type = 'Crash', date = None):
- """Initialize a fresh problem report.
- \t
- \ttype can be 'Crash', 'Packaging', or 'Kernel'. date is the desired
- \tdate/time string; if None (default), the current local time is used. """
- if date == None:
- date = time.asctime()
-
- self.data = {
- 'ProblemType': type,
- 'Date': date }
-
-
- def load(self, file, binary = True):
- '''Initialize problem report from a file-like object, using Debian
- \tcontrol file format.
- \t
- \tif binary is False, binary data is not loaded; the dictionary key is
- \tcreated, but its value will be an empty string.'''
- self.data.clear()
- key = None
- value = None
- b64_block = False
- for line in file:
- if line.startswith(' '):
- if b64_block and not binary:
- continue
-
- if b64_block:
- value += bd.decompress(base64.b64decode(line))
- elif len(value) > 0:
- value += '\n'
-
- value += line[1:-1]
- continue
- if b64_block:
- b64_block = False
- bd = None
-
- if key:
- self.data[key] = value
-
- (key, value) = line.split(':', 1)
- value = value.strip()
- if value == 'base64':
- value = ''
- b64_block = True
- if binary:
- bd = bz2.BZ2Decompressor()
-
- binary
-
- if key != None:
- self.data[key] = value
-
-
-
- def has_removed_fields(self):
- '''Check whether the report has any keys which were not loaded in load()
- \tdue to being compressed binary.'''
- return '' in self.itervalues()
-
-
- def _is_binary(self, string):
- '''Check if the given strings contains binary data.'''
- for c in string:
- if c < ' ' and not c.isspace():
- return True
- continue
-
- return False
-
-
- def write(self, file):
- """Write information into the given file-like object, using Debian
- \tcontrol file format.
-
- \tIf a value is a string, it is written directly. Otherwise it must be a
- \ttuple containing a string, and an optional boolean value (in that
- \torder); the first argument is interpreted as a file name, which will be
- \tread and its content will become the value of this key.
-
- \tThe second argument specifies whether the contents will be
- \tbzip2'ed and base64-encoded (this defaults to True).
- \t"""
- keys = self.data.keys()
- keys.remove('ProblemType')
- keys.sort()
- keys.insert(0, 'ProblemType')
- for k in keys:
- v = self.data[k]
- if hasattr(v, 'find'):
- if self._is_binary(v):
- file.write(k + ': base64\n ')
- bc = bz2.BZ2Compressor(9)
- outblock = bc.compress(v)
- if outblock:
- file.write(base64.b64encode(outblock))
- file.write('\n ')
-
- file.write(base64.b64encode(bc.flush()))
- file.write('\n')
- elif v.find('\n') >= 0:
- print >>file, k + ':'
- print >>file, '', v.replace('\n', '\n ')
- else:
- print >>file, k + ':', v
- self._is_binary(v)
- f = open(v[0])
- if len(v) >= 2 and not v[1]:
- v = f.read()
- if v.find('\n') >= 0:
- print >>file, k + ':'
- print >>file, '', v.replace('\n', '\n ')
- else:
- print >>file, k + ':', v
- v.find('\n') >= 0
- print >>file, k + ': base64'
- file.write(' ')
- bc = bz2.BZ2Compressor(9)
- while True:
- block = f.read(512 * 1024)
- if block:
- outblock = bc.compress(block)
- if outblock:
- file.write(base64.b64encode(outblock))
- file.write('\n ')
-
- outblock
- file.write(base64.b64encode(bc.flush()))
- file.write('\n')
- break
-
-
-
- def __setitem__(self, k, v):
- return self.data.__setitem__(k, v)
-
-
- import unittest
- import StringIO
- import tempfile
- import os
-
- class _ProblemReportTest(unittest.TestCase):
-
- def test_basic_operations(self):
- '''Test basic creation and operation.'''
- pr = ProblemReport()
- pr['foo'] = 'bar'
- pr['bar'] = ' foo bar\nbaz\n blip '
- self.assertEqual(pr['foo'], 'bar')
- self.assertEqual(pr['bar'], ' foo bar\nbaz\n blip ')
- self.assertEqual(pr['ProblemType'], 'Crash')
- self.assert_(time.strptime(pr['Date']))
-
-
- def test_ctor_arguments(self):
- '''Test non-default constructor arguments.'''
- pr = ProblemReport('Kernel')
- self.assertEqual(pr['ProblemType'], 'Kernel')
- pr = ProblemReport(date = '19801224 12:34')
- self.assertEqual(pr['Date'], '19801224 12:34')
-
-
- def test_sanity_checks(self):
- '''Test various error conditions.'''
- pr = ProblemReport()
- self.assertRaises(AssertionError, pr.__setitem__, 'a b', '1')
- self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
- self.assertRaises(AssertionError, pr.__setitem__, 'a', 1)
- self.assertRaises(AssertionError, pr.__setitem__, 'a', (1,))
- self.assertRaises(AssertionError, pr.__setitem__, 'a', ('/tmp/nonexistant', ''))
- self.assertRaises(KeyError, pr.__getitem__, 'Nonexistant')
-
-
- def test_write(self):
- '''Test write() and proper formatting.'''
- pr = ProblemReport(date = 'now!')
- pr['Simple'] = 'bar'
- pr['WhiteSpace'] = ' foo bar\nbaz\n blip '
- io = StringIO.StringIO()
- pr.write(io)
- self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nSimple: bar\nWhiteSpace:\n foo bar\n baz\n blip \n')
-
-
- def test_load(self):
- '''Test load() with various formatting.'''
- pr = ProblemReport()
- pr.load(StringIO.StringIO('ProblemType: Crash\nDate: now!\nSimple: bar\nWhiteSpace:\n foo bar\n baz\n blip \n'))
- self.assertEqual(pr['ProblemType'], 'Crash')
- self.assertEqual(pr['Date'], 'now!')
- self.assertEqual(pr['Simple'], 'bar')
- self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n blip ')
- pr.load(StringIO.StringIO('ProblemType: Crash\nDate: now!\nSimple: bar\nWhiteSpace:\n foo bar\n baz\n blip \n \n'))
- self.assertEqual(pr['ProblemType'], 'Crash')
- self.assertEqual(pr['Date'], 'now!')
- self.assertEqual(pr['Simple'], 'bar')
- self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n blip \n')
- pr = ProblemReport()
- pr.load(StringIO.StringIO('ProblemType: Crash\nWhiteSpace:\n foo bar\n baz\n blip \nLast: foo\n'))
- self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n blip ')
- self.assertEqual(pr['Last'], 'foo')
- pr.load(StringIO.StringIO('ProblemType: Crash\nWhiteSpace:\n foo bar\n baz\n blip \nLast: foo\n \n'))
- self.assertEqual(pr['WhiteSpace'], ' foo bar\nbaz\n blip ')
- self.assertEqual(pr['Last'], 'foo\n')
- pr.load(StringIO.StringIO('ProblemType: Crash'))
- self.assertEqual(pr.keys(), [
- 'ProblemType'])
-
-
- def test_write_file(self):
- '''Test writing a report with binary file data.'''
- temp = tempfile.NamedTemporaryFile()
- temp.write('AB' * 10 + '\x00' * 10 + 'Z')
- temp.flush()
- pr = ProblemReport(date = 'now!')
- pr['File'] = (temp.name,)
- io = StringIO.StringIO()
- pr.write(io)
- temp.close()
- self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\n')
- temp = tempfile.NamedTemporaryFile()
- temp.write('foo\x00bar')
- temp.flush()
- pr = ProblemReport(date = 'now!')
- pr['File'] = (temp.name, False)
- io = StringIO.StringIO()
- pr.write(io)
- self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: foo\x00bar\n')
- pr['File'] = (temp.name, True)
- io = StringIO.StringIO()
- pr.write(io)
- self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWQ7a+J8AAAHBgEAAMQCQACAAIhhoMAsZAwu5IpwoSAdtfE+A\n')
- temp.close()
-
-
- def test_read_file(self):
- '''Test reading a report with binary data.'''
- bin_report = 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\nFoo: Bar\n'
- pr = ProblemReport()
- pr.load(StringIO.StringIO(bin_report))
- self.assertEqual(pr['File'], 'AB' * 10 + '\x00' * 10 + 'Z')
- self.assertEqual(pr.has_removed_fields(), False)
- pr.load(StringIO.StringIO(bin_report), binary = False)
- self.assertEqual(pr['File'], '')
- self.assertEqual(pr.has_removed_fields(), True)
-
-
- def test_big_file(self):
- '''Test writing and re-decoding a big random file.'''
- temp = tempfile.NamedTemporaryFile()
- data = os.urandom(1048576)
- temp.write(data)
- temp.flush()
- pr = ProblemReport()
- pr['File'] = (temp.name,)
- pr['Before'] = 'xtestx'
- pr['ZAfter'] = 'ytesty'
- io = StringIO.StringIO()
- pr.write(io)
- temp.close()
- io.seek(0)
- pr = ProblemReport()
- pr.load(io)
- self.assert_(pr['File'] == data)
- self.assertEqual(pr['Before'], 'xtestx')
- self.assertEqual(pr['ZAfter'], 'ytesty')
- io2 = StringIO.StringIO()
- pr.write(io2)
- self.assertEqual(io.getvalue(), io2.getvalue())
-
-
- def test_iter(self):
- '''Test ProblemReport iteration.'''
- pr = ProblemReport()
- pr['foo'] = 'bar'
- keys = []
- for k in pr:
- keys.append(k)
-
- keys.sort()
- self.assertEqual(' '.join(keys), 'Date ProblemType foo')
- []([](_[1]), 2)
-
-
- def test_modify(self):
- '''Test reading, modifying fields, and writing back.'''
- report = 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\nLong:\n xxx\n .\n yyy\nShort: Bar\n'
- pr = ProblemReport()
- pr.load(StringIO.StringIO(report))
- self.assertEqual(pr['Long'], 'xxx\n.\nyyy')
- io = StringIO.StringIO()
- pr.write(io)
- self.assertEqual(io.getvalue(), report)
- pr['Short'] = 'aaa\nbbb'
- pr['Long'] = '123'
- io = StringIO.StringIO()
- pr.write(io)
- self.assertEqual(io.getvalue(), 'ProblemType: Crash\nDate: now!\nFile: base64\n QlpoOTFBWSZTWc5ays4AAAdGAEEAMAAAECAAMM0AkR6fQsBSDhdyRThQkM5ays4=\nLong: 123\nShort:\n aaa\n bbb\n')
-
-
- if __name__ == '__main__':
- unittest.main()
-
-