home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) """ TODO: * TODOs are specified in the classes * being more verbose in 'container'-object's __repr__: add bugnumber to each object * standardize *.changed (output: frozenset/list, property) * how to handle global attachment-related variables? * add revert()-function(s) THIS IS STILL WORK IN PROGRESS!! """ import re import os import libxml2 import exceptions from exceptions import parse_error from bugbase import Bug as BugBase from lptime import LPTime from tasksbase import LPTasks, LPTask from attachmentsbase import LPAttachments, LPAttachment from commentsbase import LPComments, LPComment from lphelper import user, product, change_obj, unicode_for_libxml2 from lpconstants import BUG, BASEURL from subscribersbase import LPSubscribers from utils import valid_lp_url def noerr(ctx, str): pass libxml2.registerErrorHandler(noerr, None) def _small_xpath(xml, expr): ''' Returns the content of the first result of a xpath expression ''' result = xml.xpathEval(expr) if not result: return False return result[0].content def get_bug(id): return Bug._container_refs[id] def _blocked(func, error = None): def f(a, *args, **kwargs): try: x = a.infotable.current except AttributeError: e = None error = '%s %s' % (f.error, e) raise AttributeError, error return func(a, *args, **kwargs) if not error: pass f.error = 'Unable to get current InfoTable row.' return f def _attr_ext(i, s): if i.startswith('__'): return '_%s%s' % (s.__class__.__name__, i) return i def _gen_getter(attr): ''' Returns a function to return the value of an attribute Example: get_example = _gen_getter("x.y") is like: def get_example(self): if not x.parsed: x.parse() return x.y ''' def func(s): attributes = attr.split('.') attributes = (map,)((lambda a: _attr_ext(a, s)), attributes) x = getattr(s, attributes[0]) attributes.insert(0, s) if not x.parsed: x.parse() return reduce(getattr, attributes) return func def _gen_setter(attr): ''' Returns a function to set the value of an attribute Example: set_example = _gen_setter("x.y") is like: def set_example(self, value): if not x.parsed: x.parse() x.y = value ''' def func(s, value): attributes = attr.split('.') attributes = (map,)((lambda a: _attr_ext(a, s)), attributes) x = getattr(s, attributes[0]) attributes.insert(0, s) if not x.parsed: x.parse() setattr(reduce(getattr, attributes[:-1]), attributes[-1], value) return func def create_project_task(project): x = [ None] * 14 task = Info(product(project), *x) task._type = '%s.product' % project return task def create_distro_task(distro = None, sourcepackage = None, url = None): x = [ None] * 14 if distro is None: distro = 'Ubuntu' if sourcepackage is not None: affects = product('%s (%s)' % (sourcepackage, distro.title())) else: affects = product(distro.title()) task = Info(affects, *x) task._type = '%s.sourcepackagename' % affects task._remote = url return task class Info(LPTask): """ The 'Info'-object represents one row of the 'InfoTable' of a bugreport * editable attributes: .sourcepackage: lp-name of a package/project .status: valid lp-status .importance: valid lp-importance (if the user is not permitted to change 'importance' an 'IOError' will be raised .assignee: lp-login of an user/group .milestone: value must be in '.valid_milestones' * read-only attributes: .affects, .target, .valid_milestones TODO: * rename 'Info' into 'Task' """ def __init__(self, affects, status, importance, assignee, current, editurl, type, milestone, available_milestone, lock_importance, targeted_to, remote, editlock, edit_fields, connection): data_dict = { 'affects': affects, 'status': status, 'importance': importance, 'assignee': assignee, 'current': current, 'editurl': editurl, 'type': type, 'milestone': milestone, 'available_milestone': available_milestone, 'lock_importance': lock_importance, 'targeted_to': targeted_to, 'remote': remote, 'editlock': editlock, 'edit_fields': edit_fields, 'connection': connection } LPTask.__init__(self, data_dict) self._cache = { 'sourcepackage': self._sourcepackage, 'status': self._status, 'importance': self._importance, 'assignee': self._assignee, 'milestone': self._milestone } def set_sourcepackage(self, package): if self._editlock: raise IOError, "The sourcepackage of this bug can't be edited, maybe because this bug is a duplicate of an other one" self._editlock self._sourcepackage = package def set_assignee(self, lplogin): if self._editlock: raise IOError, "The assignee of this bug can't be edited, maybe because this bug is a duplicate of an other one" self._editlock if self._remote: raise IOError, 'This task is linked to a remote-bug system, please change the assignee there' self._remote self._assignee = lplogin def set_status(self, status): if self._editlock: raise IOError, "The status of this bug can't be edited, maybe because this bug is a duplicate of an other one" self._editlock if self._remote: raise IOError, 'This task is linked to a remote-bug system, please change the status there' self._remote if status not in BUG.STATUS.values(): raise ValueError, "Unknown status '%s', status must be one of these: %s" % (status, BUG.STATUS.values()) status not in BUG.STATUS.values() self._status = status def set_importance(self, importance): if self._editlock: raise IOError, "The importance of this bug can't be edited, maybe because this bug is a duplicate of an other one" self._editlock if self._remote: raise IOError, 'This task is linked to a remote-bug system, please change the importance there' self._remote if self._lock_importance: raise IOError, "'Importance' changeable only by a project maintainer or bug contact" self._lock_importance if importance not in BUG.IMPORTANCE.values(): raise ValueError, "Unknown importance '%s', importance must be one of these: %s" % (importance, BUG.IMPORTANCE.values()) importance not in BUG.IMPORTANCE.values() self._importance = importance def set_milestone(self, milestone): if self._editlock: raise IOError, "The milestone of this bug can't be edited, maybe because this bug is a duplicate of an other one" self._editlock if not self._Info__available_milestone: raise ValeError, 'No milestones defined for this product' self._Info__available_milestone if milestone not in self._available_milestone: raise ValueError, 'Unknown milestone, milestone must be one of these: %s' % self._available_milestone milestone not in self._available_milestone self._milestone = milestone def commit(self, force_changes = False, ignore_lp_errors = True): """ Commits the local changes to launchpad.net * force_changes: general argument, has not effect in this case * ignore_lp_errors: if the user tries to commit invalid data to launchpad, launchpad returns an error-page. If 'ignore_lp_errors=False' Info.commit() will raise an 'ValueError' in this case, otherwise ignore this and leave the bugreport in launchpad unchanged (default=True) """ changed = self.changed if changed: if self._type: full_sourcepackage = self._type[:self._type.rfind('.')] else: full_sourcepackage = '%s_%s' % (self.targeted_to, str(self._affects)) s = self.sourcepackage if s == 'ubuntu': s = '' args = { '%s.actions.save' % full_sourcepackage: '1', '%s.comment_on_change' % full_sourcepackage: '' } if self._type: args[self._type] = s if '.status' in self._edit_fields: args['%s.status-empty-marker' % full_sourcepackage] = '1' args['%s.status' % full_sourcepackage] = self.status if '.importance' in self._edit_fields: args['%s.importance' % full_sourcepackage] = self.importance args['%s.importance-empty-marker' % full_sourcepackage] = '1' if '.milestone' in self._edit_fields: args['%s.mlestone' % full_sourcepackage] = '' args['%s.milestone-empty-marker' % full_sourcepackage] = '1' args['%s.assignee.option' % full_sourcepackage] = '%s.assignee.assign_to' % full_sourcepackage if not self.assignee: pass args['%s.assignee' % full_sourcepackage] = '' result = self._connection.post(self._editurl, args) if result.url.endswith('+editstatus') and not ignore_lp_errors: raise exceptions.PythonLaunchpadBugsValueError({ 'arguments': args }, self._editurl, 'one or more arguments might be wrong') not ignore_lp_errors class InfoTable(LPTasks): """ The 'InfoTable'-object represents the tasks at the top of a bugreport * read-only attributes: .current: returns the highlighted Info-object of the bugreport TODO: * rename 'InfoTable' into 'TaskTable' * allow adding of tasks (Also affects upstream/Also affects distribution) * does current/tracked work as expected? * remote: parse editable values """ def __init__(self, connection, xml, url): LPTasks.__init__(self, { 'connection': connection, 'url': url, 'xml': xml }) self._InfoTable__url = url self._InfoTable__xml = xml def parse(self): """ Parsing the info-table * format: 'Affects'|'Status'|'Importance'|'Assigned To' TODO: * working on 'tracked in...' - currently there is only one 'tracked in' entry per bugreport supported * REMOTE BUG!!! """ if self.parsed: return True rows = self._InfoTable__xml[0].xpathEval('tbody/tr[not(@style="display: none") and not(@class="secondary")]') parse_error(self._InfoTable__xml[0], 'InfoTable.rows', xml = self._InfoTable__xml, url = self._InfoTable__url) highl_target = None temp_status = BUG.STATUS.copy() temp_status['statusUNKNOWN'] = 'Unknown' temp_importance = BUG.IMPORTANCE.copy() temp_importance['importanceUNKNOWN'] = 'Unknown' tracked = False affects = product(None) for row in rows: edit_fields = set() tmp_affects = affects current = False remote = None if row.prop('class') == 'highlight': current = True row_cells = row.xpathEval('td') row_cells = _[1] affects = product(affects_lpname, affects_longname, affects_type) tracked = False targeted_to = None if row_cells[0].xpathEval('img[@alt="Targeted to"]'): targeted_to = row_cells[0].xpathEval('a')[0].content affects = tmp_affects if highl_target: if targeted_to.lower() == highl_target.lower(): current = True xmledit = row.xpathEval('following-sibling::tr[@style="display: none"][1]') type = None milestone = None available_milestone = { } editurl = None editlock = False lock_importance = False if xmledit: xmledit = xmledit[0] editurl = xmledit.xpathEval('td/form') parse_error(editurl, 'InfoTable.editurl', xml = xmledit, url = self._InfoTable__url) editurl = valid_lp_url(editurl[0].prop('action'), BASEURL.BUG) if xmledit.xpathEval('descendant::label[contains(@for,"bugwatch")]'): x = xmledit.xpathEval('descendant::label[contains(@for,"bugwatch")]/a') if x: remote = x[0].prop('href') else: remote = True if not remote: for i in [ 'product', 'sourcepackagename']: x = xmledit.xpathEval('td/form/div//table//input[contains(@id,".%s")]' % i) if x: type = x[0].prop('id') continue if not type: if not row.xpathEval('td[2]//img[contains(@src,"milestone")]'): parse_error(False, 'InfoTable.type.milestone', xml = xmledit, url = self._InfoTable__url) m = xmledit.xpathEval('descendant::select[contains(@id,".milestone")]//option') if m: for i in m: available_milestone[i.prop('value')] = i.content if i.prop('selected'): milestone = i.content continue m = xmledit.xpathEval('descendant::td[contains(@title, "Changeable only by a project maintainer") and count(span)=0]') if len(m) == 1 and not milestone: milestone = m[0].content.strip('\n ') if not xmledit.xpathEval('descendant::select[contains(@id,".importance")]'): lock_importance = True m = set([ '.sourcepackagename', '.product', '.status', '.status-empty-marker', '.importance', '.importance-empty-marker', '.milestone', '.milestone-empty-marker']) for i in m: x = xmledit.xpathEval('td/form//input[contains(@name,"%s")]' % i) y = xmledit.xpathEval('td/form//select[contains(@name,"%s")]' % i) if x or y: edit_fields.add(i) continue else: editlock = True parse_error(row_cells[1], 'InfoTable.status.1', xml = row_cells, url = self._InfoTable__url) parse_error(row_cells[1].prop('class') in temp_status, 'InfoTable.status.2', msg = "unknown bugstatus '%s' in InfoTable.parse()" % row_cells[1].prop('class'), error_type = exceptions.VALUEERROR, url = self._InfoTable__url) status = temp_status[row_cells[1].prop('class')] parse_error(row_cells[2], 'InfoTable.importance.1', xml = row_cells, url = self._InfoTable__url) parse_error(row_cells[2].prop('class') in temp_importance, 'InfoTable.importance.2', msg = "unknown bugimportance '%s' in InfoTable.parse()" % row_cells[2].prop('class'), error_type = exceptions.VALUEERROR, url = self._InfoTable__url) importance = temp_importance[row_cells[2].prop('class')] assignee = row_cells[3].xpathEval('a') if assignee: assignee = assignee[0] if remote: assignee = [](_[2]) else: assignee = user.parse_html_user(assignee) else: assignee = user(None) if current: self._current = len(self) if not editurl: pass self.append(Info(affects, status, importance, assignee, current, self._url, type, milestone, available_milestone, lock_importance, targeted_to, remote, editlock, edit_fields, connection = self._connection)) self.parsed = True return True def _LP_create_task(self, task, force_changes, ignore_lp_errors): if not task.component._type: pass tsk = '' if tsk.endswith('.product'): url = '%s/+choose-affected-product' % self._url args = { 'field.visited_steps': 'choose_product, specify_remote_bug_url', 'field.product': str(task.component.affects), 'field.actions.continue': 'Add to Bug Report' } result = self._connection.post(url, args) if result.url == url: raise exceptions.choose_pylpbugsError(error_type = exceptions.VALUEERROR, text = result.text, url = url) result.url == url elif tsk.endswith('.sourcepackagename'): url = '%s/+distrotask' % self._url if not task.component.target: pass if not task.component.sourcepackage: pass if not task.component.remote: pass args = { 'field.distribution': 'ubuntu', 'field.distribution-empty-marker': 1, 'field.sourcepackagename': '', 'field.visited_steps': 'specify_remote_bug_url', 'field.bug_url': '', 'field.actions.continue': 'Continue' } result = self._connection.post(url, args) if result.url == url: raise exceptions.choose_pylpbugsError(error_type = exceptions.VALUEERROR, text = result.text, url = url) result.url == url else: raise NotImplementedError return tsk.endswith('.product') class BugReport(object): """ The 'BugReport'-object is the report itself * editable attributes: .description: any text .title/.summary: any text .tags: list, use list operations to add/remove tags .nickname * read-only attributes: .target: e.g. 'upstream' .sourcepackage: 'None' if not package specified .reporter, .date """ def __init__(self, connection, xml, url): (self._BugReport__title, self._BugReport__description, self._BugReport__tags, self._BugReport__nickname, self.target, self.sourcepackage, self.reporter, self.date) = [ None] * 8 self._BugReport__cache = { } self.parsed = False self._BugReport__connection = connection self._BugReport__xml = xml self._BugReport__url = url self._BugReport__description_raw = None def __repr__(self): return '<BugReport>' def parse(self): if self.parsed: return True description = self._BugReport__xml.xpathEval('//body//div[@class="report"]/div[@id="bug-description"]') parse_error(description, 'BugReport.description', xml = self._BugReport__xml, url = self._BugReport__url) p = description[0].xpathEval('p') description = '' for i in p[:-1]: description = ''.join([ description, i.content, '\n\n']) description = ''.join([ description, p[-1:].pop().content]) self._BugReport__description = description title = self._BugReport__xml.xpathEval('//title') parse_error(title, 'BugReport.title', self._BugReport__xml, self._BugReport__url) titleFilter = 'Bug #[0-9]* in ([^:]*?): (.*)' title = re.findall(titleFilter, title[0].content) parse_error(title, 'BugReport.__title', url = self._BugReport__url) self._BugReport__title = title[0][1].rstrip('\xe2\x80\x9d').lstrip('\xe2\x80\x9c') target = title[0][0].split(' ') if len(target) == 2: self.target = target[1].lstrip('(').rstrip(')') self.sourcepackage = target[0] if self.sourcepackage == 'Ubuntu': self.sourcepackage = None tags = self._BugReport__xml.xpathEval('//body//div[@class="report"]//div[@id="bug-tags"]//a') self._BugReport__tags = [ i.content for i in tags ] m = self._BugReport__xml.xpathEval('//span[@class="object identifier"]') if not m: pass m = self._BugReport__xml.xpathEval('//div[@class="object identifier"]') parse_error(m, 'BugReport.__nickname', xml = self._BugReport__xml, url = self._BugReport__url) r = re.search('\\(([^\\)]+)\\)', m[0].content) if not r or r.group(1): pass self._BugReport__nickname = None d = self._BugReport__xml.xpathEval('//span[@class="object timestamp"]/span') if not d: pass d = self._BugReport__xml.xpathEval('//p[@class="object timestamp"]/span') parse_error(d, 'BugReport.date', xml = m[0], url = self._BugReport__url) self.date = LPTime(d[0].prop('title')) d = self._BugReport__xml.xpathEval('//span[@class="object timestamp"]/a') if not d: pass d = self._BugReport__xml.xpathEval('//p[@class="object timestamp"]/a') parse_error(d, 'BugReport.reporter', xml = m[0], url = self._BugReport__url) self.reporter = user.parse_html_user(d[0]) self._BugReport__cache = { 'title': self._BugReport__title, 'description': self._BugReport__description, 'tags': self._BugReport__tags[:], 'nickname': self._BugReport__nickname } self.parsed = True return True def get_title(self): return self._BugReport__title def set_title(self, title): self._BugReport__title = title title = property(get_title, set_title, doc = 'title of a bugreport') def get_description(self): return self._BugReport__description def set_description(self, description): self._BugReport__description = description description = property(get_description, set_description, doc = 'description of a bugreport') def tags(self): return self._BugReport__tags tags = property(tags) def get_nickname(self): return self._BugReport__nickname def set_nickname(self, nickname): self._BugReport__nickname = nickname nickname = property(get_nickname, set_nickname, doc = 'nickname of a bugreport') def changed(self): changed = set() for k in self._BugReport__cache: if self._BugReport__cache[k] != getattr(self, k): changed.add(change_obj(k)) continue return frozenset(changed) changed = property(changed) def description_raw(self): if not self._BugReport__description_raw: url = '%s/+edit' % self._BugReport__url result = self._BugReport__connection.get(url) xmldoc = libxml2.htmlParseDoc(unicode_for_libxml2(result.text), 'UTF-8') x = xmldoc.xpathEval('//textarea[@name="field.description"]') parse_error(x, 'BugReport.description_raw', xml = xmldoc, url = url) self._BugReport__description_raw = x[0].content return self._BugReport__description_raw description_raw = property(description_raw) def commit(self, force_changes = False, ignore_lp_errors = True): """ Commits the local changes to launchpad.net * force_changes: if a user adds a tag which has not been used before and force_changes is True then commit() tries to create a new tag for this package; if this fails or force_changes=False commit will raise a 'ValueError' * ignore_lp_errors: if the user tries to commit invalid data to launchpad, launchpad returns an error-page. If 'ignore_lp_errors=False' Info.commit() will raise an 'ValueError' in this case, otherwise ignore this and leave the bugreport in launchpad unchanged (default=True) """ if self.changed: if not self.title and description: raise exceptions.PythonLaunchpadBugsValueError(msg = "To change a bugreport 'description' and 'title' don't have to be empty", url = self._BugReport__url) description args = { 'field.actions.change': '1', 'field.title': self.title, 'field.description': description, 'field.tags': ' '.join(self.tags), 'field.name': nickname } url = '%s/+edit' % self._BugReport__url result = self._BugReport__connection.post(url, args) if result.url == url: pass None if [] not in [ i.component for i in self.changed ] else 'description' if [] not in [ i.component for i in self.changed ] else 'nickname' if not ignore_lp_errors or force_changes else force_changes class Attachment(LPAttachment): """ Returns an 'Attachment'-object * editable attributes: .description: any text .contenttype: any text .is_patch: True/False * read-only attributes: .id: hash(local_filename) for local files, launchpadlibrarian-id for files uploaded to launchpadlibrarian.net .is_down: True if a file is downloaded to ATTACHMENT_PATH .is_up: True if file is uploaded to launchpadlibrarian.net ... TODO: work on docstring """ def __init__(self, connection, url = None, localfilename = None, localfileobject = None, description = None, is_patch = None, contenttype = None, comment = None): LPAttachment.__init__(self, connection, url, localfilename, localfileobject, description, is_patch, contenttype, comment) def get_bugnumber(self): if self.is_up: return self.edit_url.split('/')[-3] def get_sourcepackage(self): if self.is_up: return self.edit_url.split('/')[-5] def get_edit_url(self): if self.is_up: return valid_lp_url(self._edit, BASEURL.BUG) class Attachments(LPAttachments): def __init__(self, comments, connection, xml): LPAttachments.__init__(self, comments = comments) self._Attachments__xml = xml self._Attachments__connection = connection self._Attachments__comments = comments def parse(self): super(Attachments, self).parse() if self._Attachments__xml: attachments = self._Attachments__xml[0].xpathEval('li[@class="download"]') all_att = { } for a in attachments: url = a.xpathEval('a')[0].prop('href') edit = a.xpathEval('small/a')[0].prop('href') all_att[url] = edit for i in self: i._edit = all_att.get(i.url, None) if not (i._edit) and i.is_up: parse_error(False, 'Attachments.edit.1', msg = "There is an attachment (id=%s) which is added to a comment but does not appear in the sidepanel ('%s')" % (i.id, self._Attachments__comments._url), error_type = exceptions.RUNTIMEERROR) continue else: parse_error(not (self._current), 'Attachments.edit.2', msg = "Unable to parse the 'attachments' sidepanel although there are files attached to comments ('%s')" % self._Attachments__comments._url, error_type = exceptions.RUNTIMEERROR) def commit(self, force_changes = True, ignore_lp_errors = False, com_subject = None): ''' when adding a new attachment, this attachment is added as a new comment. this new comment has no subject but a subject is required. setting force_changes=True and ignore_lp_errors=False results in adding a subject like: "Re: <bug summary>" ''' def _lp_edit(attachment): if not attachment.is_patch or 'on': pass if not attachment.contenttype: pass args = { 'field.actions.change': '1', 'field.title': attachment.description, 'field.patch': 'off', 'field.contenttype': 'text/plain' } self._Attachments__connection.post('%s/+edit' % attachment.edit_url, args) def _lp_delete(attachment): args = { 'field.actions.delete': '1', 'field.title': attachment.description, 'field.patch': 'off', 'field.contenttype': 'text/plain' } self._Attachments__connection.post('%s/+edit' % attachment.edit_url, args) def _lp_add(attachment): ''' delegated to comments ''' if not isinstance(attachment, Attachment): raise AssertionError, "<attachment> has to be an instance of 'Attachment'" c = Comment(attachment = (attachment,)) self._Attachments__comments._lp_add_comment(c, force_changes, ignore_lp_errors, com_subject) changed = set(self.changed) for i in changed: if i.action == 'added': _lp_add(i.component) continue (None, None, None, ((None,),)) if i.action == 'deleted': _lp_delete(i.component) continue if i.action == 'changed': _lp_edit(i.component) continue raise AttributeError, "Unknown action '%s' in Attachments.commit()" % i.component class Comment(LPComment): def __init__(self, subject = None, text = None, attachment = None): LPComment.__init__(self, subject, text, attachment) class Comments(LPComments): def __init__(self, connection, xml, url): LPComments.__init__(self, url = url) self._Comments__xml = xml self._Comments__connection = connection self._Comments__url = url def parse(self): for com in self._Comments__xml: m = com.xpathEval('div[@class="boardCommentDetails"]/a[1]') parse_error(m, 'Comments.user', xml = self._Comments__xml, url = self._Comments__url) com_user = user.parse_html_user(m[0]) m = com.xpathEval('div[@class="boardCommentDetails"]/a[2]') parse_error(m, 'Comments.nr', xml = self._Comments__xml, url = self._Comments__url) com_nr = m[0].prop('href').split('/')[-1] m = com.xpathEval('div[@class="boardCommentDetails"]/span') parse_error(m, 'Comments.date', xml = self._Comments__xml, url = self._Comments__url) com_date = LPTime(m[0].prop('title')) m = com.xpathEval('div[@class="boardCommentDetails"]/a[2]/strong') if m: com_subject = m[0].content else: com_subject = None m = com.xpathEval('div[@class="boardCommentBody"]/div') parse_error(m, 'Comments.text', xml = self._Comments__xml, url = self._Comments__url) com_text = m[0].content com_attachments = set() m = com.xpathEval('div[@class="boardCommentBody"]/ul/li') for a in m: a_url = a.xpathEval('a').pop() a = re.search(',\\n +(\\S+)(;|\\))', a.content) parse_error(a, 'Comments.attachment.re.%s' % a_url.prop('href'), xml = com, url = self._Comments__url) a_contenttype = a.group(1) com_attachments.add(Attachment(self._Comments__connection, url = a_url.prop('href'), description = a_url.content, comment = com_nr, contenttype = a_contenttype)) c = Comment(com_subject, com_text, com_attachments) c.set_attr(nr = com_nr, user = com_user, date = com_date) self.add(c) self._cache = self[:] self.parsed = True return True def new(self, subject = None, text = None, attachment = None): return Comment(subject, text, attachment, all_attachments = self._attachments) def _url(self): return self._Comments__url _url = property(_url) def _lp_add_comment(self, comment, force_changes, ignore_lp_errors, com_subject): if not isinstance(comment, Comment): raise AssertionError if not comment.subject and com_subject: pass if not comment.text: pass args = { 'field.subject': 'Re:', 'field.comment': '', 'field.actions.save': '1', 'field.filecontent.used': '', 'field.email_me.used': '' } url = self._Comments__url + '/+addcomment' result = self._Comments__connection.post(url, args) return result def commit(self, force_changes = False, ignore_lp_errors = True, com_subject = None): for i in self.changed: if i.action == 'added': self._lp_add_comment(i.component, force_changes, ignore_lp_errors, com_subject) continue raise AttributeError, "Unknown action '%s' in Comments.commit()" % i.component class Duplicates(object): def __init__(self, connection, xml, url): self.parsed = False self._Duplicates__cache = None self._Duplicates__connection = connection self._Duplicates__xml = xml self._Duplicates__url = url (self._Duplicates__duplicate_of, self._Duplicates__duplicates) = [ None] * 2 def __repr__(self): return '<Duplicates>' def parse(self): if self.parsed: return True nodes = self._Duplicates__xml.xpathEval('//body//a[@id="duplicate-of"]') if len(nodes) > 0: self._Duplicates__duplicate_of = int(nodes[0].prop('href').split('/').pop()) result = self._Duplicates__xml.xpathEval('//body//div[@class="portlet"]/h2[contains(.,"Duplicates of this bug")]/../div[@class="portletBody"]/div/ul//li/a') self._Duplicates__duplicates = []([ int(i.prop('href').split('/')[-1]) for i in result ]) self._Duplicates__cache = self._Duplicates__duplicate_of self.parsed = True return True def get_duplicates(self): return self._Duplicates__duplicates duplicates = property(get_duplicates, doc = 'get a list of duplicates') def get_duplicate_of(self): return self._Duplicates__duplicate_of def set_duplicate_of(self, bugnumber): if bugnumber == None: self._Duplicates__duplicate_of = None else: self._Duplicates__duplicate_of = int(bugnumber) duplicate_of = property(get_duplicate_of, set_duplicate_of, doc = 'this bug report is duplicate of') def changed(self): _Duplicates__changed = set() if self._Duplicates__cache != self._Duplicates__duplicate_of: _Duplicates__changed.add('duplicate_of') return frozenset(_Duplicates__changed) changed = property(changed) def commit(self, force_changes = False, ignore_lp_errors = True): if self.changed: if not self._Duplicates__duplicate_of: pass args = { 'field.actions.change': '1', 'field.duplicateof': '' } url = '%s/+duplicate' % self._Duplicates__url result = self._Duplicates__connection.post(url, args) if result.url == url and not ignore_lp_errors: x = libxml2.htmlParseDoc(result.text, 'UTF-8') y = x.xpathEval('//p[@class="error message"]') if y: raise ValueError, 'launchpad.net error: %s' % y[0].content y class Secrecy(object): def __init__(self, connection, xml, url): self.parsed = False self._Secrecy__cache = set() self._Secrecy__connection = connection self._Secrecy__xml = xml self._Secrecy__url = url (self._Secrecy__security, self._Secrecy__private) = [ False] * 2 def __repr__(self): return '<Secrecy>' def parse(self): if self.parsed: return True stable_xml = self._Secrecy__xml.xpathEval('//body//div[@id="big-badges"]') if stable_xml: if stable_xml[0].xpathEval('img[@alt="(Security vulnerability)"]'): self._Secrecy__security = True if stable_xml[0].xpathEval('img[@alt="(Private)"]'): self._Secrecy__private = True else: self._Secrecy__private = bool(self._Secrecy__xml.xpathEval('//a[contains(@href, "+secrecy")]/strong')) self._Secrecy__security = bool(self._Secrecy__xml.xpathEval('//div[contains(@style, "/@@/security")]')) self._Secrecy__cache = { 'security': self._Secrecy__security, 'private': self._Secrecy__private } self.parsed = True return True def _editlock(self): return bool(get_bug(id(self)).duplicate_of) def get_security(self): if not self.parsed: raise AssertionError, 'parse first' return self._Secrecy__security def set_security(self, security): self._Secrecy__security = bool(security) security = property(get_security, set_security, doc = 'security status') def get_private(self): if not self.parsed: raise AssertionError, 'parse first' return self._Secrecy__private def set_private(self, private): self._Secrecy__private = bool(private) private = property(get_private, set_private, doc = 'private status') def get_changed(self): _Secrecy__changed = set() for k in self._Secrecy__cache: if self._Secrecy__cache[k] != getattr(self, k): _Secrecy__changed.add(k) continue return frozenset(_Secrecy__changed) changed = property(get_changed, doc = 'get a list of changed attributes') def commit(self, force_changes = False, ignore_lp_errors = True): _Secrecy__url = '%s/+secrecy' % self._Secrecy__url status = [ 'off', 'on'] if self.changed: _Secrecy__args = { 'field.private': status[int(self.private)], 'field.security_related': status[int(self.security)], 'field.actions.change': 'Change' } _Secrecy__result = self._Secrecy__connection.post(_Secrecy__url, _Secrecy__args) class Subscribers(LPSubscribers): ''' TODO: * change structure: use three different sets instead of one big one ''' def __init__(self, connection, xml, url): self.parsed = False self._Subscribers__connection = connection self._Subscribers__xml = xml self._Subscribers__url = url LPSubscribers.__init__(self, ('directly', 'notified', 'duplicates')) def parse(self): if self.parsed: return True parse_error(self._Subscribers__xml, 'Subscribers.__xml', xml = self._Subscribers__xml, url = self._Subscribers__url) xml = self._Subscribers__xml[0].xpathEval('div[(@class="section" or @class="Section") and @id]') xml_YUI = self._Subscribers__xml[0].xpathEval('script[@type="text/javascript"]') if xml_YUI and not xml: bugnumber = int(self._Subscribers__url.split('/')[-1]) url = 'https://launchpad.net/bugs/%i/+bug-portlet-subscribers-content' % bugnumber page = self._Subscribers__connection.get(url) ctx = libxml2.htmlParseDoc(unicode_for_libxml2(page.text), 'UTF-8') xml = ctx.xpathEval('//div[(@class="section" or @class="Section") and @id]') if xml: sections_map = { 'subscribers-direct': 'directly', 'subscribers-indirect': 'notified', 'subscribers-from-duplicates': 'duplicates' } for s in xml: kind = sections_map[s.prop('id')] nodes = s.xpathEval('div/a') for i in nodes: self[kind].add(user.parse_html_user(i)) elif xml_YUI: n = '.YUI' else: n = '' parse_error(False, 'Subscribers.__xml.edge.stable%s' % n, xml = self._Subscribers__xml, url = self._Subscribers__url) self.parsed = True return True def commit(self, force_changes = False, ignore_lp_errors = True): x = self.changed def _add(self, lplogin): '''Add a subscriber to a bug.''' url = '%s/+addsubscriber' % self._Subscribers__url args = { 'field.person': lplogin, 'field.actions.add': 'Add' } result = self._Subscribers__connection.post(url, args) if result.url == url: x = libxml2.htmlParseDoc(result.text, 'UTF-8') if x.xpathEval('//div[@class="message" and contains(.,"Invalid value")]'): raise ValueError, 'Unknown Launchpad ID. You can only subscribe someone who has a Launchpad account.' x.xpathEval('//div[@class="message" and contains(.,"Invalid value")]') raise ValueError, 'Unknown error while subscribe %s to %s' % (lplogin, url) result.url == url return result def _remove(self, lplogin): '''Remove a subscriber from a bug.''' url = '%s/' % self._Subscribers__url args = { 'field.subscription': lplogin, 'unsubscribe': 'Continue' } result = self._Subscribers__connection.post(url, args) return result class ActivityWhat(str): def __new__(cls, what, url = None): obj = super(ActivityWhat, cls).__new__(ActivityWhat, what) obj._ActivityWhat__task = None obj._ActivityWhat__attribute = None x = what.split(':') if len(x) == 2: obj._ActivityWhat__task = x[0] obj._ActivityWhat__attribute = x[1].strip() elif len(x) == 1: obj._ActivityWhat__attribute = x[0] else: raise ValueError return len(x) == 2 def task(self): return self._ActivityWhat__task task = property(task) def attribute(self): return self._ActivityWhat__attribute attribute = property(attribute) class Activity(object): def __init__(self, date, user, what, old_value, new_value, message): self._Activity__date = date self._Activity__user = user self._Activity__what = what self._Activity__old_value = old_value self._Activity__new_value = new_value self._Activity__message = message def __repr__(self): return "<%s %s '%s'>" % (self.user, self.date, self.what) def date(self): return self._Activity__date date = property(date) def user(self): return self._Activity__user user = property(user) def what(self): return self._Activity__what what = property(what) def old_value(self): return self._Activity__old_value old_value = property(old_value) def new_value(self): return self._Activity__new_value new_value = property(new_value) def message(self): return self._Activity__message message = property(message) class ActivityLog(object): ''' TODO: there is nor clear relation between an entry in the activity log and a certain task, this is why the result of when(), completed(), assigned() and started_work() may differ from the grey infobox added to each task. Maybe we should also parse this box. ''' def __init__(self, connection, url): self.parsed = False self._ActivityLog__connection = connection self._ActivityLog__activity = [] self._ActivityLog__url = url def __repr__(self): return '<activity log>' def __str__(self): return str(self._ActivityLog__activity) def __iter__(self): for i in self.activity: yield i def __getitem__(self, key): return self.activity[key] def __len__(self): return len(self.activity) def activity(self): if not self.parsed: self.parse() return self._ActivityLog__activity activity = property(activity) def _activity_rev(self): if not self.parsed: self.parse() return self._ActivityLog__activity_rev def parse(self): if self.parsed: return True page = self._ActivityLog__connection.get('%s/+activity' % self._ActivityLog__url) self._ActivityLog__xmldoc = libxml2.htmlParseDoc(unicode_for_libxml2(page.text), 'UTF-8') table = self._ActivityLog__xmldoc.xpathEval('//body//table[@class="listing"][1]//tbody//tr') parse_error(table, 'ActivityLog.__table', xml = self._ActivityLog__xmldoc, url = self._ActivityLog__url) for row in table: r = row.xpathEval('td') parse_error(len(r) == 6, 'ActivityLog.len(td)', xml = row, url = self._ActivityLog__url) date = LPTime(r[0].content) x = r[1].xpathEval('a') parse_error(x, 'ActivityLog.lp_user', xml = row, url = self._ActivityLog__url) lp_user = user.parse_html_user(x[0]) try: what = ActivityWhat(r[2].content) except ValueError: self.parsed self.parsed parse_error(False, 'ActivityLog.ActivityWhat', xml = self._ActivityLog__xmldoc, url = '%s/+activity' % self._ActivityLog__url) except: self.parsed old_value = r[3].content new_value = r[4].content message = r[5].content self._ActivityLog__activity.append(Activity(date, lp_user, what, old_value, new_value, message)) self._ActivityLog__activity_rev = self._ActivityLog__activity[::-1] self.parsed = True return True def assigned(self, task): for i in self._activity_rev(): if i.what.task == task and i.what.attribute == 'assignee': return i.date def completed(self, task): for i in self._activity_rev(): if i.what.task == task and i.what.attribute == 'status' and i.new_value in ('Invalid', 'Fix Released'): return i.date def when(self, task): for i in self._activity_rev(): if i.what == 'bug' and i.message.startswith('assigned to') and i.message.count(task): return i.date def started_work(self, task): for i in self._activity_rev(): if i.what.task == task and i.what.attribute == 'status' and i.new_value in ('In Progress', 'Fix Committed'): return i.date class Mentoring(object): def __init__(self, connection, xml, url): self.parsed = False self._Mentoring__cache = set() self._Mentoring__connection = connection self._Mentoring__xml = xml self._Mentoring__url = url self._Mentoring__mentor = set() def __repr__(self): return '<Mentor for #%s>' % self._Mentoring__url def parse(self): if self.parsed: return True for i in self._Mentoring__xml: self._Mentoring__mentor.add(user.parse_html_user(i)) self._Mentoring__cache = self._Mentoring__mentor.copy() self.parsed = True return True def mentor(self): return self._Mentoring__mentor mentor = property(mentor) def changed(self): '''get a list of changed attributes currently read-only ''' return set() changed = property(changed) def commit(self, force_changes = False, ignore_lp_errors = True): raise NotImplementedError, 'this method is not implemented ATM' class BzrBranch(object): def __init__(self, title, url, status): self._BzrBranch__title = title self._BzrBranch__url = url self._BzrBranch__status = status def __repr__(self): return 'BzrBranch(%s, %s, %s)' % (self.title, self.url, self.status) __str__ = __repr__ def title(self): return self._BzrBranch__title title = property(title) def url(self): return self._BzrBranch__url url = property(url) def status(self): return self._BzrBranch__status status = property(status) class Branches(set): def __init__(self, connection, xml, url): self.parsed = False set.__init__(self) self._Branches__url = url self._Branches__xml = xml self._Branches__connection = connection def parse(self): if self.parsed: return True for i in self._Branches__xml: m = i.xpathEval('a[1]') if not m: raise AssertionError title = m[0].prop('title') url = m[0].prop('href') m = i.xpathEval('span') if not m: raise AssertionError status = m[0].content self.add(BzrBranch(title, url, status)) self.parsed = True def changed(self): '''get a list of changed attributes currently read-only ''' return set() changed = property(changed) def commit(self, force_changes = False, ignore_lp_errors = True): raise NotImplementedError, 'this method is not implemented ATM' class Bug(BugBase): _container_refs = { } def __init__(self, bug = None, url = None, connection = None): BugBase.__init__(self, bug, url, connection) bugpage = self._Bug__connection.get(self._Bug__url) self._Bug__text = bugpage.text self._Bug__url = bugpage.url self.xmldoc = libxml2.htmlParseDoc(unicode_for_libxml2(self._Bug__text), 'UTF-8') self._Bug__bugreport = BugReport(connection = self._Bug__connection, xml = self.xmldoc, url = self._Bug__url) self._Bug__infotable = InfoTable(connection = self._Bug__connection, xml = self.xmldoc.xpathEval('//body//table[@class="listing" or @class="duplicate listing"][1]'), url = self._Bug__url) self._Bug__comments = Comments(connection = self._Bug__connection, xml = self.xmldoc.xpathEval('//body//div[normalize-space(@class)="boardComment"]'), url = self._Bug__url) self._Bug__attachments = Attachments(self._Bug__comments, self._Bug__connection, self.xmldoc.xpathEval('//body//div[@id="portlet-attachments"]/div/div/ul')) self._Bug__duplicates = Duplicates(connection = self._Bug__connection, xml = self.xmldoc, url = self._Bug__url) self._Bug__secrecy = Secrecy(connection = self._Bug__connection, xml = self.xmldoc, url = self._Bug__url) self._Bug__subscribers = Subscribers(connection = self._Bug__connection, xml = self.xmldoc.xpathEval('//body//div[@id="portlet-subscribers"]'), url = self._Bug__url) self._Bug__mentor = Mentoring(connection = self._Bug__connection, xml = self.xmldoc.xpathEval('//body//img [@src="/@@/mentoring"]/parent::p//a'), url = self._Bug__url) self._Bug__activity = ActivityLog(connection = self._Bug__connection, url = self._Bug__url) self._Bug__branches = Branches(self._Bug__connection, self.xmldoc.xpathEval('//body//div[@class="bug-branch-summary"]'), self._Bug__url) Bug._container_refs[id(self._Bug__attachments)] = self Bug._container_refs[id(self._Bug__comments)] = self Bug._container_refs[id(self._Bug__secrecy)] = self def __del__(self): '''run self.xmldoc.freeDoc() to clear memory''' if hasattr(self, 'xmldoc'): self.xmldoc.freeDoc() def changed(self): _Bug__result = [] for i in filter((lambda a: a.startswith('_Bug__')), dir(self)): try: a = getattr(self.__dict__[i], 'changed') except AttributeError: continue if a: _Bug__result.append(change_obj(self.__dict__[i])) continue return _Bug__result changed = property(changed) def commit(self, force_changes = False, ignore_lp_errors = True): for i in self.changed: if isinstance(i.component, Comments): result = i.component.commit(force_changes, ignore_lp_errors, 'Re: %s' % self.title) continue result = i.component.commit(force_changes, ignore_lp_errors) def revert(self): ''' need a function to revert changes ''' pass get_url = _gen_getter('__url') get_bugnumber = _gen_getter('__bugnumber') get_reporter = _gen_getter('__bugreport.reporter') get_date = _gen_getter('__bugreport.date') get_duplicates = _gen_getter('__duplicates.duplicates') get_description_raw = _gen_getter('__bugreport.description_raw') get_activity = _gen_getter('__activity') def get_text(self): return '\n'.join % ([], []([ c.text for c in self.comments ])) get_title = _gen_getter('__bugreport.title') get_description = _gen_getter('__bugreport.description') set_description = _gen_setter('__bugreport.description') get_tags = _gen_getter('__bugreport.tags') get_nickname = _gen_getter('__bugreport.nickname') get_mentors = _gen_getter('__mentor.mentor') get_infotable = _gen_getter('__infotable') get_info = _blocked(_gen_getter('__infotable.current'), "No 'current' available.") get_target = _blocked(_gen_getter('__infotable.current.target'), "Can't get 'target'.") get_importance = _blocked(_gen_getter('__infotable.current.importance'), "Can't get 'importance'.") set_importance = _blocked(_gen_setter('__infotable.current.importance'), "Can't set 'importance'.") get_status = _blocked(_gen_getter('__infotable.current.status'), "Can't get 'status'.") set_status = _blocked(_gen_setter('__infotable.current.status'), "Can't set 'status'.") get_assignee = _blocked(_gen_getter('__infotable.current.assignee'), "Can't get 'assignee'.") set_assignee = _blocked(_gen_setter('__infotable.current.assignee'), "Can't set 'assignee'.") get_milestone = _blocked(_gen_getter('__infotable.current.milestone'), "Can't get 'milestone'.") set_milestone = _blocked(_gen_setter('__infotable.current.milestone'), "Can't set 'milestone'.") get_sourcepackage = _blocked(_gen_getter('__infotable.current.sourcepackage'), "Can't get 'sourcepackage'.") set_sourcepackage = _blocked(_gen_setter('__infotable.current.sourcepackage'), "Can't set 'sourcepackage'.") get_affects = _blocked(_gen_getter('__infotable.current.affects'), "Can't get 'affects'.") def has_target(self, target): if not self._Bug__infotable.parsed: self._Bug__infotable.parse() return self._Bug__infotable.has_target(target) get_duplicates = _gen_getter('__duplicates.duplicates') get_duplicate = _gen_getter('__duplicates.duplicate_of') set_duplicate = _gen_setter('__duplicates.duplicate_of') get_security = _gen_getter('__secrecy.security') set_security = _gen_setter('__secrecy.security') get_private = _gen_getter('__secrecy.private') set_private = _gen_setter('__secrecy.private') get_subscriptions = _gen_getter('__subscribers') get_attachments = _gen_getter('__attachments') def get_comments(self): x = self.attachments if not self._Bug__comments.parsed: self.comments.parse() return self._Bug__comments def get_subscriptions_category(self, type): ''' get subscriptions for a given category, possible categories are "directly", "notified", "duplicates" ''' if not self._Bug__subscribers.parsed: self._Bug__subscribers.parse() return self._Bug__subscribers.get_subscriptions(type) get_branches = _gen_getter('__branches') def create_new_bugreport(product, summary, description, connection, tags = [], security_related = False): ''' creates a new bugreport and returns its bug object product keys: "name", "target" (optional) tags: list of tags ''' args = { 'field.title': summary, 'field.comment': description, 'field.actions.submit_bug': 1 } if tags: args['field.tags'] = ' '.join(tags) if security_related: args['field.security_related'] = 'on' if product.has_key('target'): url = 'https://bugs.launchpad.net/%s/+source/%s/+filebug-advanced' % (product['target'], product['name']) args['field.packagename'] = product['name'] else: url = 'https://bugs.launchpad.net/%s/+filebug-advanced' % product['name'] try: result = connection.post(url, args) except exceptions.LaunchpadError: e = None try: x = connection.needs_login() except exceptions.LaunchpadError: x = False if x: raise exceptions.LaunchpadLoginError(url) x if isinstance(e, exceptions.LaunchpadURLError): if 'Page not found' in e.msg: m = "Maybe there is no product '%s' in " % product['name'] if product.has_key('target'): m += "the distribution '%s'" % product['target'] else: m += 'launchpad.net' raise exceptions.PythonLaunchpadBugsValueError(msg = m) 'Page not found' in e.msg else: raise isinstance(e, exceptions.LaunchpadURLError) if not result.url.endswith('+filebug-advanced'): return Bug(url = result.url, connection = connection) raise exceptions.choose_pylpbugsError(error_type = exceptions.VALUEERROR, text = result.text, url = url)