home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) '''Crash database implementation for Launchpad. Copyright (C) 2007 Canonical Ltd. Author: Martin Pitt <martin.pitt@ubuntu.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. See http://www.gnu.org/copyleft/gpl.html for the full text of the license. ''' import urllib import tempfile import shutil import os.path as os import re import gzip import sys from cStringIO import StringIO import launchpadbugs.storeblob as launchpadbugs import launchpadbugs.connector as Connector import apport.crashdb as apport import apport Bug = Connector.ConnectBug() BugList = Connector.ConnectBugList() def get_source_version(distro, package, hostname): """Return the version of given source package in the latest release of given distribution. If 'distro' is None, we will look for a launchpad project . """ if distro: result = urllib.urlopen('https://%s/%s/+source/%s' % (hostname, distro, package)).read() m = re.search('href="/%s/\\w+/\\+source/%s/([^"]+)"' % (distro, re.escape(package)), result) if not m: raise ValueError, 'source package %s does not exist in %s' % (package, distro) m else: result = urllib.urlopen('https://%s/%s/+series' % (hostname, package)).read() m = re.search('href="/%s/([^"]+)"' % re.escape(package), result) if not m: raise ValueError, 'Series for %s does not exist in Launchpad' % package m return m.group(1) class CrashDatabase(apport.crashdb.CrashDatabase): '''Launchpad implementation of crash database interface.''' def __init__(self, cookie_file, bugpattern_baseurl, options): '''Initialize Launchpad crash database connection. You need to specify a Mozilla-style cookie file for download() and update(). For upload() and get_comment_url() you can use None.''' apport.crashdb.CrashDatabase.__init__(self, cookie_file, bugpattern_baseurl, options) self.distro = options.get('distro') self.arch_tag = 'need-%s-retrace' % apport.packaging.get_system_architecture() self.options = options self.cookie_file = cookie_file if self.options.get('staging', False): HTTPCONNECTION = HTTPCONNECTION import launchpadbugs.lpconstants Bug.set_connection_mode(HTTPCONNECTION.MODE.STAGING) BugList.set_connection_mode(HTTPCONNECTION.MODE.STAGING) self.hostname = 'staging.launchpad.net' else: self.hostname = 'launchpad.net' if self.cookie_file: Bug.authentication = self.cookie_file BugList.authentication = self.cookie_file def upload(self, report, progress_callback = None): '''Upload given problem report return a handle for it. This should happen noninteractively. If the implementation supports it, and a function progress_callback is passed, that is called repeatedly with two arguments: the number of bytes already sent, and the total number of bytes to send. This can be used to provide a proper upload progress indication on frontends.''' hdr = { } hdr['Tags'] = 'apport-%s' % report['ProblemType'].lower() if report.has_key('Tags'): hdr['Tags'] += ' ' + report['Tags'] a = report.get('PackageArchitecture') if not a or a == 'all': a = report.get('Architecture') if a: hdr['Tags'] += ' ' + a if 'CoreDump' in report and a: hdr['Tags'] += ' need-%s-retrace' % a if report['DistroRelease'].split()[0] == 'Ubuntu': hdr['Private'] = 'yes' hdr['Subscribers'] = 'apport' elif report.has_key('Traceback'): hdr['Tags'] += ' need-duplicate-check' if report['DistroRelease'].split()[0] == 'Ubuntu': hdr['Private'] = 'yes' hdr['Subscribers'] = 'apport' mime = tempfile.TemporaryFile() report.write_mime(mime, extra_headers = hdr, skip_keys = [ 'Date']) mime.flush() mime.seek(0) ticket = launchpadbugs.storeblob.upload(mime, progress_callback, staging = self.options.get('staging', False)) if not ticket: raise AssertionError return ticket def get_comment_url(self, report, handle): '''Return an URL that should be opened after report has been uploaded and upload() returned handle. Should return None if no URL should be opened (anonymous filing without user comments); in that case this function should do whichever interactive steps it wants to perform.''' args = { } title = report.standard_title() if title: args['field.title'] = title project = self.options.get('project') if 'ThirdParty' in report: project = report['SourcePackage'] if not project: if report.has_key('SourcePackage'): return 'https://bugs.%s/%s/+source/%s/+filebug/%s?%s' % (self.hostname, self.distro, report['SourcePackage'], handle, urllib.urlencode(args)) return 'https://bugs.%s/%s/+filebug/%s?%s' % (self.hostname, self.distro, handle, urllib.urlencode(args)) project return 'https://bugs.%s/%s/+filebug/%s?%s' % (self.hostname, project, handle, urllib.urlencode(args)) def download(self, id): '''Download the problem report from given ID and return a Report.''' report = apport.Report() attachment_path = tempfile.mkdtemp() Bug.content_types.append('application/x-gzip') try: b = Bug(id) m = re.search('(ProblemType:.*)$', b.description_raw, re.S) if not m: m = re.search('^--- \\r?$[\\r\\n]*(.*)', b.description_raw, re.M | re.S) if not m: raise AssertionError, 'bug description must contain standard apport format data' description = m.group(1).replace('\xc2\xa0', ' ') if '\r\n\r\n' in description: if 'Uname:' in description: (part1, part2) = description.split('Uname:', 1) description = part1.replace('\r\n\r\n', '\r\n') + 'Uname:' + part2.split('\r\n\r\n', 1)[0] else: description = description.replace('\r\n\r\n', '\r\n') report.load(StringIO(description)) report['Date'] = b.date.ctime() if 'ProblemType' not in report: if 'apport-bug' in b.tags: report['ProblemType'] = 'Bug' elif 'apport-crash' in b.tags: report['ProblemType'] = 'Crash' elif 'apport-kernelcrash' in b.tags: report['ProblemType'] = 'KernelCrash' elif 'apport-package' in b.tags: report['ProblemType'] = 'Package' else: raise ValueError, 'cannot determine ProblemType from tags: ' + str(b.tags) 'apport-bug' in b.tags for att in b.attachments.filter((lambda a: re.match('Dependencies.txt|CoreDump.gz|ProcMaps.txt|Traceback.txt|DpkgTerminalLog.txt', a.lp_filename))): key = os.path.splitext(att.lp_filename)[0] att.download(os.path.join(attachment_path, att.lp_filename)) if att.lp_filename.endswith('.txt'): report[key] = att.text continue if att.lp_filename.endswith('.gz'): report[key] = gzip.GzipFile(fileobj = StringIO(att.text)).read() continue raise Exception, 'Unknown attachment type: ' + att.lp_filename return report finally: shutil.rmtree(attachment_path) def update(self, id, report, comment = ''): '''Update the given report ID with the retraced results from the report (Stacktrace, ThreadStacktrace, StacktraceTop; also Disassembly if desired) and an optional comment.''' bug = Bug(id) comment += '\n\nStacktraceTop:' + report['StacktraceTop'].decode('utf-8', 'replace').encode('utf-8') tmpdir = tempfile.mkdtemp() t = { } try: t[0] = open(os.path.join(tmpdir, 'Stacktrace.txt'), 'w+') t[0].write(report['Stacktrace']) t[0].flush() t[0].seek(0) att = Bug.NewAttachment(localfileobject = t[0], description = 'Stacktrace.txt (retraced)') new_comment = Bug.NewComment(subject = 'Symbolic stack trace', text = comment, attachment = att) bug.comments.add(new_comment) t[1] = open(os.path.join(tmpdir, 'ThreadStacktrace.txt'), 'w+') t[1].write(report['ThreadStacktrace']) t[1].flush() t[1].seek(0) att = Bug.NewAttachment(localfileobject = t[1], description = 'ThreadStacktrace.txt (retraced)') new_comment = Bug.NewComment(subject = 'Symbolic threaded stack trace', attachment = att) bug.comments.add(new_comment) if report.has_key('StacktraceSource'): t[2] = open(os.path.join(tmpdir, 'StacktraceSource.txt'), 'w+') t[2].write(report['StacktraceSource']) t[2].flush() t[2].seek(0) att = Bug.NewAttachment(localfileobject = t[2], description = 'StacktraceSource.txt') new_comment = Bug.NewComment(subject = 'Stack trace with source code', attachment = att) bug.comments.add(new_comment) if report.has_key('SourcePackage') and bug.sourcepackage == 'ubuntu': bug.set_sourcepackage(report['SourcePackage']) finally: shutil.rmtree(tmpdir) if report.has_useful_stacktrace(): bug.attachments.remove(func = (lambda a: if not a.lp_filename: passre.match('^CoreDump.gz$', a.description))) bug.commit() try: bug.importance = 'Medium' bug.commit() except IOError: pass except: None<EXCEPTION MATCH>IOError None<EXCEPTION MATCH>IOError bug.commit() for x in t.itervalues(): x.close() self._subscribe_triaging_team(bug, report) def get_distro_release(self, id): """Get 'DistroRelease: <release>' from the given report ID and return it.""" bug = Bug(url = 'https://%s/bugs/%s' % (self.hostname, str(id))) m = re.search('DistroRelease: ([-a-zA-Z0-9.+/ ]+)', bug.description) if m: return m.group(1) raise ValueError, 'URL does not contain DistroRelease: field' def get_unretraced(self): '''Return an ID set of all crashes which have not been retraced yet and which happened on the current host architecture.''' bugs = BugList('https://bugs.%s/ubuntu/+bugs?field.tag=%s' % (self.hostname, self.arch_tag)) return set((lambda .0: for i in .0: int(i))(bugs)) def get_dup_unchecked(self): '''Return an ID set of all crashes which have not been checked for being a duplicate. This is mainly useful for crashes of scripting languages such as Python, since they do not need to be retraced. It should not return bugs that are covered by get_unretraced().''' bugs = BugList('https://bugs.%s/ubuntu/+bugs?field.tag=need-duplicate-check&batch=300' % self.hostname) return set((lambda .0: for i in .0: int(i))(bugs)) def get_unfixed(self): '''Return an ID set of all crashes which are not yet fixed. The list must not contain bugs which were rejected or duplicate. This function should make sure that the returned list is correct. If there are any errors with connecting to the crash database, it should raise an exception (preferably IOError).''' bugs = BugList('https://bugs.%s/ubuntu/+bugs?field.tag=apport-crash&batch=300' % self.hostname) return set((lambda .0: for i in .0: int(i))(bugs)) def get_fixed_version(self, id): """Return the package version that fixes a given crash. Return None if the crash is not yet fixed, or an empty string if the crash is fixed, but it cannot be determined by which version. Return 'invalid' if the crash report got invalidated, such as closed a duplicate or rejected. This function should make sure that the returned result is correct. If there are any errors with connecting to the crash database, it should raise an exception (preferably IOError).""" try: b = Bug(id) except Bug.Error.LPUrlError: e = None if e.value.startswith('Page not found'): return 'invalid' raise except: e.value.startswith('Page not found') if b.status == 'Fix Released': if b.sourcepackage: try: return get_source_version(self.distro, b.sourcepackage, self.hostname) except ValueError: e.value.startswith('Page not found') e.value.startswith('Page not found') return '' e.value.startswith('Page not found')<EXCEPTION MATCH>ValueError return '' if b.status == 'Invalid' or b.duplicate_of: return 'invalid' def duplicate_of(self, id): '''Return master ID for a duplicate bug. If the bug is not a duplicate, return None. ''' b = Bug(id) return b.duplicate_of def close_duplicate(self, id, master): '''Mark a crash id as duplicate of given master ID. If master is None, id gets un-duplicated. ''' bug = Bug(id) if master: m = Bug(master) if m.duplicate_of: master = m.duplicate_of bug.attachments.remove(func = (lambda a: re.match('^(CoreDump.gz$|Stacktrace.txt|ThreadStacktrace.txt|Dependencies.txt$|ProcMaps.txt$|ProcStatus.txt$|Registers.txt$|Disassembly.txt$)', a.lp_filename))) if bug.private: bug.private = None bug.commit() bug = Bug(id) bug.duplicate_of = int(master) else: bug.duplicate_of = None bug.commit() def mark_regression(self, id, master): """Mark a crash id as reintroducing an earlier crash which is already marked as fixed (having ID 'master').""" bug = Bug(id) comment = Bug.NewComment(subject = 'Possible regression detected', text = 'This crash has the same stack trace characteristics as bug #%i. However, the latter was already fixed in an earlier package version than the one in this report. This might be a regression or because the problem is in a dependent package.' % master) bug.comments.add(comment) bug.tags.append('regression-retracer') bug.commit() def mark_retraced(self, id): '''Mark crash id as retraced.''' b = Bug(id) if self.arch_tag in b.tags: b.tags.remove(self.arch_tag) b.commit() def mark_retrace_failed(self, id, invalid_msg = None): """Mark crash id as 'failed to retrace'.""" b = Bug(id) if invalid_msg: comment = Bug.NewComment(subject = 'Crash report cannot be processed', text = invalid_msg) b.comments.add(comment) b.status = 'Invalid' b.attachments.remove(func = (lambda a: re.match('^(CoreDump.gz$|Stacktrace.txt|ThreadStacktrace.txt|Dependencies.txt$|ProcMaps.txt$|ProcStatus.txt$|Registers.txt$|Disassembly.txt$)', a.lp_filename))) elif 'apport-failed-retrace' not in b.tags: b.tags.append('apport-failed-retrace') b.commit() def _mark_dup_checked(self, id, report): '''Mark crash id as checked for being a duplicate.''' b = Bug(id) if 'need-duplicate-check' in b.tags: b.tags.remove('need-duplicate-check') self._subscribe_triaging_team(b, report) b.commit() def _subscribe_triaging_team(self, bug, report): '''Subscribe the right triaging team to the bug.''' if report['DistroRelease'].split()[0] != 'Ubuntu': return None try: bug.subscriptions.add('ubuntu-crashes-universe') except ValueError: report['DistroRelease'].split()[0] != 'Ubuntu' report['DistroRelease'].split()[0] != 'Ubuntu' except: report['DistroRelease'].split()[0] != 'Ubuntu' if __name__ == '__main__': import unittest import urllib2 import cookielib crashdb = None segv_report = None python_report = None class _Tests(unittest.TestCase): test_package = 'coreutils' test_srcpackage = 'coreutils' known_test_id = 89040 known_test_id2 = 302779 def setUp(self): global crashdb if not crashdb: crashdb = self._get_instance() self.crashdb = crashdb self.ref_report = apport.Report() self.ref_report.add_os_info() self.ref_report.add_user_info() def _file_segv_report(self): '''File a SEGV crash report. Return crash ID. ''' r = apport.report._ApportReportTest._generate_sigsegv_report() r.add_package_info(self.test_package) r.add_os_info() r.add_gdb_info() r.add_user_info() self.assertEqual(r.standard_title(), 'crash crashed with SIGSEGV in f()') handle = self.crashdb.upload(r) self.assert_(handle) url = self.crashdb.get_comment_url(r, handle) self.assert_(url) id = self._fill_bug_form(url) self.assert_(id > 0) return id def test_1_report_segv(self): '''upload() and get_comment_url() for SEGV crash This needs to run first, since it sets segv_report. ''' global segv_report id = self._file_segv_report() segv_report = id print >>sys.stderr, '(https://staging.launchpad.net/bugs/%i) ' % id, def test_1_report_python(self): '''upload() and get_comment_url() for Python crash This needs to run early, since it sets python_report. ''' global python_report r = apport.Report('Crash') r['ExecutablePath'] = '/bin/foo' r['Traceback'] = 'Traceback (most recent call last):\n File "/bin/foo", line 67, in fuzz\n print weird\nNameError: global name \'weird\' is not defined' r.add_package_info(self.test_package) r.add_os_info() r.add_user_info() self.assertEqual(r.standard_title(), 'foo crashed with NameError in fuzz()') handle = self.crashdb.upload(r) self.assert_(handle) url = self.crashdb.get_comment_url(r, handle) self.assert_(url) id = self._fill_bug_form(url) self.assert_(id > 0) python_report = id print >>sys.stderr, '(https://staging.launchpad.net/bugs/%i) ' % id, def test_2_download(self): '''download()''' r = self.crashdb.download(segv_report) self.assertEqual(r['ProblemType'], 'Crash') self.assertEqual(r['DistroRelease'], self.ref_report['DistroRelease']) self.assertEqual(r['Architecture'], self.ref_report['Architecture']) self.assertEqual(r['Uname'], self.ref_report['Uname']) self.assertEqual(r.get('NonfreeKernelModules'), self.ref_report.get('NonfreeKernelModules')) self.assertEqual(r.get('UserGroups'), self.ref_report.get('UserGroups')) self.assertEqual(r['Signal'], '11') self.assert_(r['ExecutablePath'].endswith('/crash')) self.assertEqual(r['SourcePackage'], self.test_srcpackage) self.assert_(r['Package'].startswith(self.test_package + ' ')) self.assert_('f (x=42)' in r['Stacktrace']) self.assert_('f (x=42)' in r['StacktraceTop']) self.assert_(len(r['CoreDump']) > 1000) self.assert_('Dependencies' in r) def test_3_update(self): '''update()''' r = self.crashdb.download(segv_report) r['StacktraceTop'] = '?? ()' r['Stacktrace'] = 'long\ntrace' r['ThreadStacktrace'] = 'thread\neven longer\ntrace' self.crashdb.update(segv_report, r, 'I can has a better retrace?') r = self.crashdb.download(segv_report) self.assert_('CoreDump' in r) r['StacktraceTop'] = 'read () from /lib/libc.6.so\nfoo (i=1) from /usr/lib/libfoo.so' r['Stacktrace'] = 'long\ntrace' r['ThreadStacktrace'] = 'thread\neven longer\ntrace' self.crashdb.update(segv_report, r, 'good retrace!') r = self.crashdb.download(segv_report) self.failIf('CoreDump' in r) def test_get_distro_release(self): '''get_distro_release()''' self.assertEqual(self.crashdb.get_distro_release(segv_report), self.ref_report['DistroRelease']) def test_duplicates(self): '''duplicate handling''' self.assertEqual(self.crashdb.duplicate_of(segv_report), None) self.assertEqual(self.crashdb.get_fixed_version(segv_report), None) self.crashdb.close_duplicate(segv_report, self.known_test_id) self.assertEqual(self.crashdb.duplicate_of(segv_report), self.known_test_id) self.assertEqual(self.crashdb.get_fixed_version(segv_report), 'invalid') self.crashdb.close_duplicate(segv_report, None) self.assertEqual(self.crashdb.duplicate_of(segv_report), None) self.assertEqual(self.crashdb.get_fixed_version(segv_report), None) r = self.crashdb.download(segv_report) self.failIf('CoreDump' in r) self.crashdb.close_duplicate(self.known_test_id, self.known_test_id2) self.crashdb.close_duplicate(segv_report, self.known_test_id) self.assertEqual(self.crashdb.duplicate_of(segv_report), self.known_test_id2) self.crashdb.close_duplicate(self.known_test_id, None) self.crashdb.close_duplicate(self.known_test_id2, None) self.crashdb.close_duplicate(segv_report, None) def test_marking_segv(self): '''processing status markings for signal crashes''' unretraced_before = self.crashdb.get_unretraced() self.assert_(segv_report in unretraced_before) self.failIf(python_report in unretraced_before) self.crashdb.mark_retraced(segv_report) unretraced_after = self.crashdb.get_unretraced() self.failIf(segv_report in unretraced_after) self.assertEqual(unretraced_before, unretraced_after.union(set([ segv_report]))) self.assertEqual(self.crashdb.get_fixed_version(segv_report), None) self._mark_needs_retrace(segv_report) self.crashdb.mark_retraced(segv_report) self.crashdb.mark_retrace_failed(segv_report) unretraced_after = self.crashdb.get_unretraced() self.failIf(segv_report in unretraced_after) self.assertEqual(unretraced_before, unretraced_after.union(set([ segv_report]))) self.assertEqual(self.crashdb.get_fixed_version(segv_report), None) self._mark_needs_retrace(segv_report) self.crashdb.mark_retraced(segv_report) self.crashdb.mark_retrace_failed(segv_report, "I don't like you") unretraced_after = self.crashdb.get_unretraced() self.failIf(segv_report in unretraced_after) self.assertEqual(unretraced_before, unretraced_after.union(set([ segv_report]))) self.assertEqual(self.crashdb.get_fixed_version(segv_report), 'invalid') def test_marking_python(self): '''processing status markings for interpreter crashes''' unchecked_before = self.crashdb.get_dup_unchecked() self.assert_(python_report in unchecked_before) self.failIf(segv_report in unchecked_before) self.crashdb._mark_dup_checked(python_report, self.ref_report) unchecked_after = self.crashdb.get_dup_unchecked() self.failIf(python_report in unchecked_after) self.assertEqual(unchecked_before, unchecked_after.union(set([ python_report]))) self.assertEqual(self.crashdb.get_fixed_version(python_report), None) def test_update_invalid(self): '''updating a invalid crash This simulates a race condition where a crash being processed gets invalidated by marking it as a duplicate. ''' id = self._file_segv_report() print >>sys.stderr, '(https://staging.launchpad.net/bugs/%i) ' % id, r = self.crashdb.download(id) self.crashdb.close_duplicate(id, segv_report) r['StacktraceTop'] = 'read () from /lib/libc.6.so\nfoo (i=1) from /usr/lib/libfoo.so' r['Stacktrace'] = 'long\ntrace' r['ThreadStacktrace'] = 'thread\neven longer\ntrace' self.crashdb.update(id, r, 'good retrace!') r = self.crashdb.download(id) self.failIf('CoreDump' in r) def _get_instance(klass): '''Create a CrashDB instance''' return CrashDatabase(os.path.expanduser('~/.lpcookie.txt'), '', { 'distro': 'ubuntu', 'staging': True }) _get_instance = classmethod(_get_instance) def _fill_bug_form(self, url): '''Fill form for a distro bug and commit the bug. Return the report ID. ''' cj = cookielib.MozillaCookieJar() cj.load(self.crashdb.cookie_file) opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) re_pkg = re.compile('<input type="text" value="([^"]+)" id="field.packagename"') re_title = re.compile('<input.*id="field.title".*value="([^"]+)"') re_tags = re.compile('<input.*id="field.tags".*value="([^"]+)"') url = url.replace('+filebug/', '+filebug-advanced/') res = opener.open(url) self.assertEqual(res.getcode(), 200) content = res.read() m_pkg = re_pkg.search(content) m_title = re_title.search(content) m_tags = re_tags.search(content) url = url.split('?')[0] args = { 'packagename_option': 'choose', 'field.packagename': m_pkg.group(1), 'field.title': m_title.group(1), 'field.tags': m_tags.group(1), 'field.comment': 'ZOMG!', 'field.actions.submit_bug': '1' } res = opener.open(url, data = urllib.urlencode(args)) self.assertEqual(res.getcode(), 200) self.assert_('+source/%s/+bug/' % m_pkg.group(1) in res.geturl()) id = res.geturl().split('/')[-1] return int(id) def _fill_bug_form_project(self, url): '''Fill form for a project bug and commit the bug. Return the report ID. ''' cj = cookielib.MozillaCookieJar() cj.load(self.crashdb.cookie_file) opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) m = re.search('launchpad.net/([^/]+)/\\+filebug', url) if not m: raise AssertionError project = m.group(1) re_title = re.compile('<input.*id="field.title".*value="([^"]+)"') re_tags = re.compile('<input.*id="field.tags".*value="([^"]+)"') url = url.replace('+filebug/', '+filebug-advanced/') res = opener.open(url) self.assertEqual(res.getcode(), 200) content = res.read() m_title = re_title.search(content) m_tags = re_tags.search(content) url = url.split('?')[0] args = { 'field.title': m_title.group(1), 'field.tags': m_tags.group(1), 'field.comment': 'ZOMG!', 'field.actions.submit_bug': '1' } res = opener.open(url, data = urllib.urlencode(args)) self.assertEqual(res.getcode(), 200) self.assert_('launchpad.net/%s/+bug' % project in res.geturl()) id = res.geturl().split('/')[-1] return int(id) def _mark_needs_retrace(self, id): '''Mark a report ID as needing retrace.''' b = Bug(id) if self.crashdb.arch_tag not in b.tags: b.tags.append(self.crashdb.arch_tag) b.commit() def _mark_needs_dupcheck(self, id): '''Mark a report ID as needing duplicate check.''' b = Bug(id) if 'need-duplicate-check' not in b.tags: b.tags.append('need-duplicate-check') b.commit() def test_project(self): '''reporting crashes against a project instead of a distro''' crashdb = CrashDatabase(os.path.expanduser('~/.lpcookie.txt'), '', { 'project': 'langpack-o-matic', 'staging': True }) self.assertEqual(crashdb.distro, None) r = apport.Report('Crash') r['ExecutablePath'] = '/bin/foo' r['Traceback'] = 'Traceback (most recent call last):\n File "/bin/foo", line 67, in fuzz\n print weird\nNameError: global name \'weird\' is not defined' r.add_os_info() r.add_user_info() self.assertEqual(r.standard_title(), 'foo crashed with NameError in fuzz()') handle = crashdb.upload(r) self.assert_(handle) url = crashdb.get_comment_url(r, handle) self.assert_('launchpad.net/langpack-o-matic/+filebug' in url) id = self._fill_bug_form_project(url) self.assert_(id > 0) print >>sys.stderr, '(https://staging.launchpad.net/bugs/%i) ' % id, r = crashdb.download(id) r['StacktraceTop'] = 'read () from /lib/libc.6.so\nfoo (i=1) from /usr/lib/libfoo.so' r['Stacktrace'] = 'long\ntrace' r['ThreadStacktrace'] = 'thread\neven longer\ntrace' crashdb.update(id, r, 'good retrace!') r = crashdb.download(id) self.assertEqual(crashdb.get_fixed_version(id), None) crashdb.close_duplicate(id, self.known_test_id) self.assertEqual(crashdb.duplicate_of(id), self.known_test_id) self.assertEqual(crashdb.get_fixed_version(id), 'invalid') crashdb.close_duplicate(id, None) self.assertEqual(crashdb.duplicate_of(id), None) self.assertEqual(crashdb.get_fixed_version(id), None) unittest.main()