home *** CD-ROM | disk | FTP | other *** search
/ Mac Easy 2010 May / Mac Life Ubuntu.iso / casper / filesystem.squashfs / usr / share / pyshared / DistUpgrade / sourceslist.py < prev    next >
Encoding:
Python Source  |  2009-03-30  |  16.0 KB  |  450 lines

  1. #  aptsource.py - Provide an abstraction of the sources.list
  2. #
  3. #  Copyright (c) 2004-2007 Canonical Ltd.
  4. #                2004 Michiel Sikkes
  5. #                2006-2007 Sebastian Heinlein
  6. #
  7. #  Author: Michiel Sikkes <michiel@eyesopened.nl>
  8. #          Michael Vogt <mvo@debian.org>
  9. #          Sebastian Heinlein <glatzor@ubuntu.com>
  10. #
  11. #  This program is free software; you can redistribute it and/or
  12. #  modify it under the terms of the GNU General Public License as
  13. #  published by the Free Software Foundation; either version 2 of the
  14. #  License, or (at your option) any later version.
  15. #
  16. #  This program is distributed in the hope that it will be useful,
  17. #  but WITHOUT ANY WARRANTY; without even the implied warranty of
  18. #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  19. #  GNU General Public License for more details.
  20. #
  21. #  You should have received a copy of the GNU General Public License
  22. #  along with this program; if not, write to the Free Software
  23. #  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
  24. #  USA
  25.  
  26. import gettext
  27. import glob
  28. import os.path
  29. import re
  30. import shutil
  31. import sys
  32. import time
  33.  
  34. import apt_pkg
  35. from aptsources.distinfo import DistInfo
  36.  
  37.  
  38. # some global helpers
  39.  
  40.  
  41. def is_mirror(master_uri, compare_uri):
  42.     """ check if the given add_url is idential or a mirror of orig_uri
  43.         e.g. master_uri = archive.ubuntu.com
  44.             compare_uri = de.archive.ubuntu.com
  45.             -> True
  46.     """
  47.     # remove traling spaces and "/"
  48.     compare_uri = compare_uri.rstrip("/ ")
  49.     master_uri = master_uri.rstrip("/ ")
  50.     # uri is identical
  51.     if compare_uri == master_uri:
  52.         #print "Identical"
  53.         return True
  54.     # add uri is a master site and orig_uri has the from "XX.mastersite"
  55.     # (e.g. de.archive.ubuntu.com)
  56.     try:
  57.         compare_srv = compare_uri.split("//")[1]
  58.         master_srv = master_uri.split("//")[1]
  59.         #print "%s == %s " % (add_srv, orig_srv)
  60.     except IndexError: # ok, somethings wrong here
  61.         #print "IndexError"
  62.         return False
  63.     # remove the leading "<country>." (if any) and see if that helps
  64.     if "." in compare_srv and \
  65.            compare_srv[compare_srv.index(".")+1:] == master_srv:
  66.         #print "Mirror"
  67.         return True
  68.     return False
  69.  
  70.  
  71. def uniq(s):
  72.     """ simple and efficient way to return uniq list """
  73.     return list(set(s))
  74.  
  75.  
  76. class SourceEntry:
  77.     """ single sources.list entry """
  78.  
  79.     def __init__(self, line, file=None):
  80.         self.invalid = False            # is the source entry valid
  81.         self.disabled = False           # is it disabled ('#' in front)
  82.         self.type = ""                  # what type (deb, deb-src)
  83.         self.uri = ""                   # base-uri
  84.         self.dist = ""                  # distribution (dapper, edgy, etc)
  85.         self.comps = []                 # list of available componetns
  86.                                         # (may empty)
  87.         self.comment = ""               # (optional) comment
  88.         self.line = line                # the original sources.list line
  89.         if file is None:
  90.             file = apt_pkg.Config.FindDir(
  91.                 "Dir::Etc")+apt_pkg.Config.Find("Dir::Etc::sourcelist")
  92.         self.file = file               # the file that the entry is located in
  93.         self.parse(line)
  94.         self.template = None           # type DistInfo.Suite
  95.         self.children = []
  96.  
  97.     def __eq__(self, other):
  98.         """ equal operator for two sources.list entries """
  99.         return (self.disabled == other.disabled and
  100.                 self.type == other.type and
  101.                 self.uri == other.uri and
  102.                 self.dist == other.dist and
  103.                 self.comps == other.comps)
  104.  
  105.     def mysplit(self, line):
  106.         """ a split() implementation that understands the sources.list
  107.             format better and takes [] into account (for e.g. cdroms) """
  108.         line = line.strip()
  109.         pieces = []
  110.         tmp = ""
  111.         # we are inside a [..] block
  112.         p_found = False
  113.         space_found = False
  114.         for i in range(len(line)):
  115.             if line[i] == "[":
  116.                 p_found=True
  117.                 tmp += line[i]
  118.             elif line[i] == "]":
  119.                 p_found=False
  120.                 tmp += line[i]
  121.             elif space_found and not line[i].isspace():
  122.                 # we skip one or more space
  123.                 space_found = False
  124.                 pieces.append(tmp)
  125.                 tmp = line[i]
  126.             elif line[i].isspace() and not p_found:
  127.                 # found a whitespace
  128.                 space_found = True
  129.             else:
  130.                 tmp += line[i]
  131.         # append last piece
  132.         if len(tmp) > 0:
  133.             pieces.append(tmp)
  134.         return pieces
  135.  
  136.     def parse(self, line):
  137.         """ parse a given sources.list (textual) line and break it up
  138.             into the field we have """
  139.         line = self.line.strip()
  140.         #print line
  141.         # check if the source is enabled/disabled
  142.         if line == "" or line == "#": # empty line
  143.             self.invalid = True
  144.             return
  145.         if line[0] == "#":
  146.             self.disabled = True
  147.             pieces = line[1:].strip().split()
  148.             # if it looks not like a disabled deb line return
  149.             if not pieces[0] in ("rpm", "rpm-src", "deb", "deb-src"):
  150.                 self.invalid = True
  151.                 return
  152.             else:
  153.                 line = line[1:]
  154.         # check for another "#" in the line (this is treated as a comment)
  155.         i = line.find("#")
  156.         if i > 0:
  157.             self.comment = line[i+1:]
  158.             line = line[:i]
  159.         # source is ok, split it and see what we have
  160.         pieces = self.mysplit(line)
  161.         # Sanity check
  162.         if len(pieces) < 3:
  163.             self.invalid = True
  164.             return
  165.         # Type, deb or deb-src
  166.         self.type = pieces[0].strip()
  167.         # Sanity check
  168.         if self.type not in ("deb", "deb-src", "rpm", "rpm-src"):
  169.             self.invalid = True
  170.             return
  171.         # URI
  172.         self.uri = pieces[1].strip()
  173.         if len(self.uri) < 1:
  174.             self.invalid = True
  175.         # distro and components (optional)
  176.         # Directory or distro
  177.         self.dist = pieces[2].strip()
  178.         if len(pieces) > 3:
  179.             # List of components
  180.             self.comps = pieces[3:]
  181.         else:
  182.             self.comps = []
  183.  
  184.     def set_enabled(self, new_value):
  185.         """ set a line to enabled or disabled """
  186.         self.disabled = not new_value
  187.         # enable, remove all "#" from the start of the line
  188.         if new_value:
  189.             self.line = self.line.lstrip().lstrip('#')
  190.         else:
  191.             # disabled, add a "#"
  192.             if self.line.strip()[0] != "#":
  193.                 self.line = "#" + self.line
  194.  
  195.     def __str__(self):
  196.         """ debug helper """
  197.         return self.str().strip()
  198.  
  199.     def str(self):
  200.         """ return the current line as string """
  201.         if self.invalid:
  202.             return self.line
  203.         line = ""
  204.         if self.disabled:
  205.             line = "# "
  206.         line += "%s %s %s" % (self.type, self.uri, self.dist)
  207.         if len(self.comps) > 0:
  208.             line += " " + " ".join(self.comps)
  209.         if self.comment != "":
  210.             line += " #"+self.comment
  211.         line += "\n"
  212.         return line
  213.  
  214.  
  215. class NullMatcher(object):
  216.     """ a Matcher that does nothing """
  217.  
  218.     def match(self, s):
  219.         return True
  220.  
  221.  
  222. class SourcesList:
  223.     """ represents the full sources.list + sources.list.d file """
  224.  
  225.     def __init__(self,
  226.                  withMatcher=True,
  227.                  matcherPath="/usr/share/python-apt/templates/"):
  228.         self.list = []          # the actual SourceEntries Type
  229.         if withMatcher:
  230.             self.matcher = SourceEntryMatcher(matcherPath)
  231.         else:
  232.             self.matcher = NullMatcher()
  233.         self.refresh()
  234.  
  235.     def refresh(self):
  236.         """ update the list of known entries """
  237.         self.list = []
  238.         # read sources.list
  239.         dir = apt_pkg.Config.FindDir("Dir::Etc")
  240.         file = apt_pkg.Config.Find("Dir::Etc::sourcelist")
  241.         self.load(dir+file)
  242.         # read sources.list.d
  243.         partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
  244.         for file in glob.glob("%s/*.list" % partsdir):
  245.             self.load(file)
  246.         # check if the source item fits a predefined template
  247.         for source in self.list:
  248.             if not source.invalid:
  249.                 self.matcher.match(source)
  250.  
  251.     def __iter__(self):
  252.         """ simple iterator to go over self.list, returns SourceEntry
  253.             types """
  254.         for entry in self.list:
  255.             yield entry
  256.         raise StopIteration
  257.  
  258.     def add(self, type, uri, dist, orig_comps, comment="", pos=-1, file=None):
  259.         """
  260.         Add a new source to the sources.list.
  261.         The method will search for existing matching repos and will try to
  262.         reuse them as far as possible
  263.         """
  264.         # create a working copy of the component list so that
  265.         # we can modify it later
  266.         comps = orig_comps[:]
  267.         # check if we have this source already in the sources.list
  268.         for source in self.list:
  269.             if not source.disabled and not source.invalid and \
  270.                source.type == type and uri == source.uri and \
  271.                source.dist == dist:
  272.                 for new_comp in comps:
  273.                     if new_comp in source.comps:
  274.                         # we have this component already, delete it
  275.                         # from the new_comps list
  276.                         del comps[comps.index(new_comp)]
  277.                         if len(comps) == 0:
  278.                             return source
  279.         for source in self.list:
  280.             # if there is a repo with the same (type, uri, dist) just add the
  281.             # components
  282.             if not source.disabled and not source.invalid and \
  283.                source.type == type and uri == source.uri and \
  284.                source.dist == dist:
  285.                 comps = uniq(source.comps + comps)
  286.                 source.comps = comps
  287.                 return source
  288.             # if there is a corresponding repo which is disabled, enable it
  289.             elif source.disabled and not source.invalid and \
  290.                  source.type == type and uri == source.uri and \
  291.                  source.dist == dist and \
  292.                  len(set(source.comps) & set(comps)) == len(comps):
  293.                 source.disabled = False
  294.                 return source
  295.         # there isn't any matching source, so create a new line and parse it
  296.         line = "%s %s %s" % (type, uri, dist)
  297.         for c in comps:
  298.             line = line + " " + c
  299.         if comment != "":
  300.             line = "%s #%s\n" % (line, comment)
  301.         line = line + "\n"
  302.         new_entry = SourceEntry(line)
  303.         if file is not None:
  304.             new_entry.file = file
  305.         self.matcher.match(new_entry)
  306.         self.list.insert(pos, new_entry)
  307.         return new_entry
  308.  
  309.     def remove(self, source_entry):
  310.         """ remove the specified entry from the sources.list """
  311.         self.list.remove(source_entry)
  312.  
  313.     def restoreBackup(self, backup_ext):
  314.         " restore sources.list files based on the backup extension "
  315.         dir = apt_pkg.Config.FindDir("Dir::Etc")
  316.         file = apt_pkg.Config.Find("Dir::Etc::sourcelist")
  317.         if os.path.exists(dir+file+backup_ext) and \
  318.             os.path.exists(dir+file):
  319.             shutil.copy(dir+file+backup_ext, dir+file)
  320.         # now sources.list.d
  321.         partsdir = apt_pkg.Config.FindDir("Dir::Etc::sourceparts")
  322.         for file in glob.glob("%s/*.list" % partsdir):
  323.             if os.path.exists(file+backup_ext):
  324.                 shutil.copy(file+backup_ext, file)
  325.  
  326.     def backup(self, backup_ext=None):
  327.         """ make a backup of the current source files, if no backup extension
  328.             is given, the current date/time is used (and returned) """
  329.         already_backuped = set()
  330.         if backup_ext is None:
  331.             backup_ext = time.strftime("%y%m%d.%H%M")
  332.         for source in self.list:
  333.             if not source.file in already_backuped \
  334.                 and os.path.exists(source.file):
  335.                 shutil.copy(source.file, "%s%s" % (source.file, backup_ext))
  336.         return backup_ext
  337.  
  338.     def load(self, file):
  339.         """ (re)load the current sources """
  340.         try:
  341.             f = open(file, "r")
  342.             lines = f.readlines()
  343.             for line in lines:
  344.                 source = SourceEntry(line, file)
  345.                 self.list.append(source)
  346.         except:
  347.             print "could not open file '%s'" % file
  348.         else:
  349.             f.close()
  350.  
  351.     def save(self):
  352.         """ save the current sources """
  353.         files = {}
  354.         # write an empty default config file if there aren't any sources
  355.         if len(self.list) == 0:
  356.             path = "%s%s" % (apt_pkg.Config.FindDir("Dir::Etc"),
  357.                              apt_pkg.Config.Find("Dir::Etc::sourcelist"))
  358.             header = (
  359.                 "## See sources.list(5) for more information, especialy\n"
  360.                 "# Remember that you can only use http, ftp or file URIs\n"
  361.                 "# CDROMs are managed through the apt-cdrom tool.\n")
  362.             open(path, "w").write(header)
  363.             return
  364.         for source in self.list:
  365.             if source.file not in files:
  366.                 files[source.file] = open(source.file, "w")
  367.             files[source.file].write(source.str())
  368.         for f in files:
  369.             files[f].close()
  370.  
  371.     def check_for_relations(self, sources_list):
  372.         """get all parent and child channels in the sources list"""
  373.         parents = []
  374.         used_child_templates = {}
  375.         for source in sources_list:
  376.             # try to avoid checking uninterressting sources
  377.             if source.template is None:
  378.                 continue
  379.             # set up a dict with all used child templates and corresponding
  380.             # source entries
  381.             if source.template.child:
  382.                 key = source.template
  383.                 if key not in used_child_templates:
  384.                     used_child_templates[key] = []
  385.                 temp = used_child_templates[key]
  386.                 temp.append(source)
  387.             else:
  388.                 # store each source with children aka. a parent :)
  389.                 if len(source.template.children) > 0:
  390.                     parents.append(source)
  391.         #print self.used_child_templates
  392.         #print self.parents
  393.         return (parents, used_child_templates)
  394.  
  395.  
  396. class SourceEntryMatcher:
  397.     """ matcher class to make a source entry look nice
  398.         lots of predefined matchers to make it i18n/gettext friendly
  399.         """
  400.  
  401.     def __init__(self, matcherPath):
  402.         self.templates = []
  403.         # Get the human readable channel and comp names from the channel .infos
  404.         spec_files = glob.glob("%s/*.info" % matcherPath)
  405.         for f in spec_files:
  406.             f = os.path.basename(f)
  407.             i = f.find(".info")
  408.             f = f[0:i]
  409.             dist = DistInfo(f, base_dir=matcherPath)
  410.             for template in dist.templates:
  411.                 if template.match_uri is not None:
  412.                     self.templates.append(template)
  413.         return
  414.  
  415.     def match(self, source):
  416.         """Add a matching template to the source"""
  417.         _ = gettext.gettext
  418.         found = False
  419.         for template in self.templates:
  420.             if (re.search(template.match_uri, source.uri) and
  421.                 re.match(template.match_name, source.dist)):
  422.                 found = True
  423.                 source.template = template
  424.                 break
  425.             elif (template.is_mirror(source.uri) and
  426.                 re.match(template.match_name, source.dist)):
  427.                 found = True
  428.                 source.template = template
  429.                 break
  430.         return found
  431.  
  432.  
  433. # some simple tests
  434. if __name__ == "__main__":
  435.     apt_pkg.InitConfig()
  436.     sources = SourcesList()
  437.  
  438.     for entry in sources:
  439.         print entry.str()
  440.         #print entry.uri
  441.  
  442.     mirror = is_mirror("http://archive.ubuntu.com/ubuntu/",
  443.                        "http://de.archive.ubuntu.com/ubuntu/")
  444.     print "is_mirror(): %s" % mirror
  445.  
  446.     print is_mirror("http://archive.ubuntu.com/ubuntu",
  447.                     "http://de.archive.ubuntu.com/ubuntu/")
  448.     print is_mirror("http://archive.ubuntu.com/ubuntu/",
  449.                     "http://de.archive.ubuntu.com/ubuntu")
  450.