home *** CD-ROM | disk | FTP | other *** search
- # Copyright 2001 (C) ActiveState Tool Corp., All Rights Reserved.
- #
- __version__ = '0.1.0.0'
-
- import string, sys, cmd, os, getopt, re, time
- from PPM import ppmconfig
- import xml.dom.minidom
- import soaptalk
- import soaplib
-
- #XXX
- #XXX
-
- from types import *
- # exceptions, errorreporter objects are imported from this module
- from ppmerrors import * # XXX
- from installer import isurl, isfile, isdir
- from errormsg import * # the error messages are defined here
-
- def CheckPackage(pacname, packages):
- if isurl(pacname) or isfile(pacname):
- pacname = pacname[string.rfind(pacname,'/')+1:]
- for package in packages:
- if package.name.lower() == pacname.lower() or\
- package.primname.lower() == pacname.lower():
- version = package.version
- version = string.replace(version,',','.')
- errorreporter.Error("Version %s of %s is already installed. " \
- "Use 'verify --upgrade %s (with or without --force).\n" % (version ,
- package.name, package.name))
- return 1
- return 0
-
- def getMaxAttrLength(PacList,Attr):
- maxNameLength = 0
- for pac in PacList:
- Ln = len(getattr(pac,Attr))
- if Ln > maxNameLength:
- maxNameLength = Ln
- return maxNameLength
-
- def AnalyzeArgs(args,cnfs):
- if type(args) is StringType:
- args = args.split()
- opts, args = getopt.getopt(args,'', ['case','nocase','abstract','author'])
- pattern = string.join(args,' ')
- if not pattern or pattern=='*':
- pattern = '.*'
- search = ''
- if hasattr(cnfs.options, 'ignorecase'):
- case = int(cnfs.options['ignorecase'])
- else:
- case = 0
- for o,v in opts:
- if o == '--case':
- case = 1
- if o == '--nocase':
- case = 0
- if o == '--author':
- search = 'auth'
- if o == '--abstract':
- search = 'abs'
- try:
- if case:
- compiled_patt = re.compile(pattern)
- else:
- compiled_patt = re.compile(pattern, re.IGNORECASE)
- except re.error:
- raise InvalidRegx(pattern)
-
- return search, pattern, case
-
- def AnalyzeSetArgs(args,options):
- if type(args) is StringType:
- args = args.split()
- optins = []
- shortopts = []
- result = {}
- L = options.keys()
- L.sort()
- for x in L:
- optins.append(x+'=')
- if not x[0]+':' in shortopts:
- shortopts.append(x[0]+':')
- optins.append('save')
- shortopts.append('s') # short option for save
- shortopts.append('a:') # short option for tracefile
- shortopts.append('o:') # short option for confirm
- shortopts = string.join(shortopts)
- opts , ars = getopt.getopt(args,shortopts,optins)
- for i in range(len(optins)):
- if optins[i][-1] == '=':
- optins[i]=optins[i][:-1]
- FirstLetters = map((lambda x: x[0]), optins)
- for o,v in opts:
- if o[2:] in optins:
- result[optins[optins.index(o[2:])]] = v
- elif o[1:] == 's':
- result['save']=''
- elif o[1:] == 'a':
- result['tracefile'] = v
- elif o[1:] == 'o':
- result['confirm'] = v
- elif o[1:] in FirstLetters:
- result[optins[FirstLetters.index(o[1:])]] = v
- return result
-
- def ServerSearch(rep,pattern='.*',tag='',case=0):
- s = soaptalk.soaptalk(rep.uri,rep.namespace)
- if os.name =='posix' and string.find(sys.platform,'sun')>=0:
- archname = 'sun4-solaris'
- elif os.name =='posix' and string.find(sys.platform,'linux')>=0:
- archname = 'i686-linux'
- elif os.name =='nt' or os.name=='dos':
- archname = 'MSWin32'
- if not case:
- pattern = '(?i)'+pattern
- return s.search_ppds(archname,pattern,tag)
-
- def GetMePackages(PPDs):
- if type(PPDs) is TupleType:
- raise BadPPD(str(PPDs))
- if not PPDs:
- errorreporter.Error("package not found!")
- raise PackageNotFound
- elif string.find(PPDs,'404')>=0 or \
- string.find(PPDs,'Not Found')>=0:
- raise PackageNotFound
- else:
- PPDs = str(PPDs)
- try:
- domdom = xml.dom.minidom.parseString(PPDs)
- except Exception, e:
- raise BadPPD(e)
- #try:
- softpkgs = [ppmconfig.Package(x) for x in
- domdom.getElementsByTagName("SOFTPKG")]
- # raise FailInstall("Defective PPD")
- domdom.unlink()
- return softpkgs
-
- def DoWeHaveIt(pacname,packages,ignore_case=1):
- if isurl(pacname) or isfile(pacname):
- pacname = pacname[string.rfind(pacname,'/')+1:]
- for package in packages:
- if ignore_case:
- if package.name.lower() == pacname.lower() or\
- package.primname.lower() == pacname.lower():
- return 1
- else:
- if package.name== pacname or\
- package.primname == pacname:
- return 1
- return 0
-
- # it is possible to pass a URL(or a file) as part of the
- # package name(e.g., http://www.something/pacname).
- # the following function extract the URL from the
- # package name if there is one.
- def ExtractLocation(pacname):
- if isurl(pacname) or isfile(pacname):
- return pacname[:string.rfind(pacname,'/')]
- else:
- return ''
-
- def GetPPD(location,pac,install):
- def fetchPPD(loco,pac,install):
- ppd = install.fetch_ppd(loco,pac)
- return ppd
-
- if not location:
- return ''
- if type(location) is StringType:
- # attemp to verify packages from this location
- if location:
- if location[-1]!='/':
- location += '/'
- errorreporter.Msg('Fetching package PPDs from: %s' % location)
- return fetchPPD('',location+pac,install)
- elif type(location) is ListType:
- for loco in location:
- return fetchPPD(loco,pac,install)
-
- # compare software version numbers
- # if v1 if greater than v2 returns 1, if v2 is greater
- # returns -1 and else returns 0
- def Compare(v1,v2):
- v1 = string.split(v1,'.')
- v2 = string.split(v2,'.')
- l1 = len(v1)
- l2 = len(v2)
- if l1>l2: length = l2
- else: length = l1
- # version is supposed to be n1.n2.n3.n4 where n(i) belongs to int
- # if version has alphabetic values(alpha, beta) it is substituted
- # for 0
- for v in (v1,v2):
- for i in range(len(v)):
- v[i] = str(v[i])
- try:
- v[i] = int(v[i])
- except ValueError:
- v[i] = 0
- for i in range(l1-1,0,-1):
- if v1[i]>v2[i]:
- return 1
- elif v2[i]>v1[i]:
- return -1
- return 0
-
- def GetVerifyOpts(args):
- if type(args) is StringType:
- args = args.split()
- opts, packages = getopt.getopt(args,'ful:', ['location=','upgrade','force'])
- location = ''
- force = 0
- upgrade = 0
- for o, v in opts:
- if o in ('--location' , '-l'):
- location = v
- if o in ('--force', '-f'):
- force = 1
- if o in ('--upgrade', '-u'):
- upgrade = 1
- return location, force, upgrade, packages
-
- def UpGrade(pkg,rmPkg,location,cnfs,install):
- errorreporter.Msg("Upgrading package '%s'..." % pkg.name)
- if location and type(location) is StringType:
- try:
- install.InstallPackage(pkg.ppd,cnfs,cnfs.options['clean'],\
- upgrade=1)
- cnfs.remove_package(rmPkg)
- if location:
- cnfs.add_package(install.softpkg,location)
- else:
- cnfs.add_package(install.softpkg,pac)
- except FailInstall , e:
- errorreporter.Error(e)
- return 0
- except PackageNotFound:
- errorreporter.Error('Package (or PPD) not found at %s location.' % location)
- return 0
- except BadPPD, e:
- errorreporter.Error('An error occured during installation: %s' % e)
- return 0
- except AlreadyInstalled, e:
- errorreporter.Error(e)
- return 0
- else:
- locations = cnfs.get_repository_info()
- l = len(locations)
- for loco in locations:
- try:
- install.InstallPackage(pkg.ppd,cnfs,cnfs.options['clean'],\
- upgrade=1)
- cnfs.remove_package(rmPkg)
- cnfs.add_package(install.softpkg,loco)
- return
- except FailInstall, e:
- errorreporter.Error(e)
- return 0
- except BadPPD, e:
- errorreporter.Error(e)
- return 0
- except PackageNotFound:
- errorreporter.Error('Package %s not found in the repository %s: %s' % \
- (pkg.name,loco.name,loco.uri))
- if l>1 :
- errorreporter.Msg('Trying the next repository...')
- l -=1
- continue
- except BadPPD, e:
- errorreporter.Error('An error occured during installation: %s' % e)
- return 0
- except AlreadyInstalled, e:
- errorreporter.Error(e)
- return 0
- def doMore():
- try:
- answer = raw_input("[press 'q' to quit or 'Enter' to continue...']")
- except EOFError:
- answer = 'q'
- if answer!='q':
- print
- return 0
- else:
- return 1
-
- class PyPPM:
- def __init__(self, largs):
- def doit(*arguments):
- if arguments:
- return ppmconfig.PPMConfig(arguments[0])
- else:
- return ppmconfig.PPMConfig()
-
- self.interactive = 0
- #checks to see if we should be interactive or not
- if largs:
- index = 0
- while index<=len(largs)-1:
- dd = largs[index]
- if dd != sys.argv[0] and dd:
- if dd=='-db':
- db_index = largs.index(dd)
- try:
- self.cnfs = doit(largs[db_index+1])
- except IndexError:
- errorreporter.Error("You have to specify an argument to -db option."\
- " Example: pyppm -db database_name")
- self.exit(1) #XXX
- except BadConfigFile, e:
- errorreporter.Error(e)
- self.exit(1) #XXX
- if len(largs)>db_index+2:
- largs[db_index:db_index+2] = []
- continue
- else:
- largs[db_index:]
- break
- try:
- getattr(self,'cnfs')
- except AttributeError:
- self.cnfs = doit()
- try:
- func=getattr(self, 'do_'+str(dd))
- except AttributeError, e:
- errorreporter.Error("No such command!: '%s'. "\
- "Try 'pyppm help'." % dd)
- return 0
- else:
- if len(largs)>1:
- if str(dd)!= 'help':
- funcargs = largs[1:]
- elif str(dd)== 'help':
- funcargs = largs[1:][0]
- elif len(largs)>0:
- if str(dd)== 'help':
- funcargs = "help"
- else:
- funcargs = largs[1:]
- # XXX all of the arg parsing should be
- # XXX centralized here.
- rc = func(funcargs)
- # remember DOS/Unix convention is 0 is success
- if rc in (1, None):
- self.exit(0)
- elif rc==0:
- self.exit(1)
- else:
- raise TypeError("Unknown return code %s" % rc)
-
- else:
- index += 1
-
- if not hasattr(self, 'cnfs'):
- self.cnfs = doit()
-
- self.interactive = 1
- errorreporter.formatter.interactive = 1
-
- #XXX we do not have this in this version. There is
- #no standard way of removing python packages
- #def do_remove(self,args):
- # remove a module
- #pass
-
- def do_version(self,args):
- self.page_text(__version__)
-
- def do_querydb(self,args):
- "prints the path to the configuration file."
- self.page_text(os.path.abspath(self.cnfs.config_file_name))
-
- def do_verify(self,args):
- "verify the version of the packages installed."
- try:
- location, force, upgrade, packages = GetVerifyOpts(args)
- except getopt.GetoptError, e:
- errorreporter.Error(e)
- return 0
- return self.real_verify(location, force, upgrade, packages)
-
- def real_verify(self, location, force, upgrade, packages):
- """location: string,
- force: boolean,
- upgrades: boolean,
- packages: string"""
- PacObj = self.cnfs.get_installed_packages()
- if packages:
- for pac in packages:
- if not DoWeHaveIt(pac,PacObj,ignore_case=self.cnfs.options['ignorecase']):
- errorreporter.Error("package %s is not installed. Try 'install' command." % pac)
- return
- self.cnfs.get_repository_info()
- verbose = self.cnfs.options['verbose']
- install = installer.Installer(verbose, self.page_text)
- if self.cnfs.options['ignorecase']:
- pkgs = map((lambda x: x.name.lower()),PacObj)
- else:
- pkgs = map((lambda x: x.name),PacObj)
- versions = map((lambda x: x.version),PacObj)
- PkgTable= {}
- UpGradeable = []
- for i in range(len(pkgs)):
- PkgTable[pkgs[i]] = str(versions[i])
- if len(packages)==0:
- packages = pkgs
- if not location:
- location = self.cnfs.get_repository_info()
- if self.cnfs.options['ignorecase']:
- packages = map(string.lower,packages)
- for pac in packages:
- ppd = GetPPD(location, pac, install)
- try:
- pkg = GetMePackages(ppd)[0]
- except Exception, e:
- errorreporter.Error('did not get a valid response for package %s: %s' % (pac,e))
- return 0
- if force:
- UpGradeable.append(pkg)
- elif Compare(PkgTable[pac],pkg.version):
- UpGradeable.append(pkg)
- self.page_text("An upgrade is available for %s." % pkg.name)
- else:
- self.page_text("%s is up to date." % pkg.name)
- answer = 'y'
- if len(UpGradeable)>0:
- if upgrade and self.interactive \
- and self.cnfs.options['confirm']==1:
- arguments = ''
- for p in UpGradeable:
- arguments += p.name+ ' '
- if len(UpGradeable)>0:
- answer = raw_input("Upgrade package %s? (Y/n): " % arguments)
- if not answer:
- answer = 'y'
- else:
- answer = raw_input("Upgrade packages? (Y/n): ")
- if not answer:
- answer='y'
- if answer.lower()=='y' and upgrade:
- def FindMyFavoritePackage(pacname,PacObjs):
- for p in PacObjs:
- if p.name == pacname:
- return p
- return None
- for p in UpGradeable:
- rmPkg = FindMyFavoritePackage(p.name , PacObj)
- try:
- UpGrade(p,rmPkg,location,self.cnfs,install)
- except Exception, e:
- errorreporter.Error('An error occured during installation: %s' % e)
-
- def do_search(self,args):
- #searchs for the requested package
- try:
- search, pattern, case = AnalyzeArgs(args,self.cnfs)
- except getopt.GetoptError, e:
- errorreporter.Error(invalid_args % e)
- return 0
- except InvalidRegx, e:
- errorreporter.Error(invalid_regular_exp % e)
- return 0
-
- return self.real_search(search, pattern, case)
-
- def real_search(self, s, p, c):
- """search: repository URL
- pattern: regular expresion
- case: case sensitive (1) or not (0)"""
-
- self.cnfs.get_repository_info()
- for loco in self.cnfs.get_repository_info():
- errorreporter.Msg('searching %s repository -- URL= %s' % (loco.name , loco.uri))
- tag = ''
- if s=='abs':tag = 'ABSTRACT'
- elif s=='auth':tag = 'AUTHOR'
- try:
- PPD = ServerSearch(loco,pattern=p,case=c,tag=tag)
- except Exception, e:
- errorreporter.Error(e)
- return
- except soaplib.ProtocolError, e:
- errorreporter.Error('can not access server: ' % e)
- return
- try:
- PacList = GetMePackages(PPD)
- except Exception, e:
- errorreporter.Error(e)
- return
- errorreporter.formatter.maxname = getMaxAttrLength(PacList,'name')
- errorreporter.formatter.maxversion = getMaxAttrLength(PacList,'version')
- errorreporter.formatter.TotalLines = 0
- self.page_text([repr(pac) for pac in PacList])
-
- def page_text(self, lines):
- if type(lines)==ListType:
- data = "\n".join(lines)
- else:
- data = lines
- if self.interactive:
- import pydoc
- if sys.platform.startswith('win'):
- pydoc.pager(data)
- else:
- pydoc.ttypager(data)
- else:
- sys.stdout.write(data.rstrip()+"\n")
-
-
- def do_query(self,args):
- # query installed packages
- try:
- search, pattern, case = AnalyzeArgs(args,self.cnfs)
- except getopt.GetoptError, e:
- errorreporter.Error(invalid_args % e)
- return
- except InvalidRegx, e:
- errorreporter.Error(invalid_regular_exp % e)
- return
-
- return self.real_query(search, pattern, case)
-
- def real_query(self, search, pattern, case):
- if case:
- compiled_patt = re.compile(pattern)
- else:
- compiled_patt = re.compile(pattern, re.IGNORECASE)
-
- installed_packages = self.cnfs.get_installed_packages()
- errorreporter.formatter.maxname = getMaxAttrLength(installed_packages,'name')
- errorreporter.formatter.maxversion = getMaxAttrLength(installed_packages,'version')
- errorreporter.formatter.TotalLines = 0
- lines = []
- for p in installed_packages:
- if search == 'abs':
- if compiled_patt.search(p.abstract):
- lines.append(repr(p))
- elif search == 'auth':
- if compiled_patt.search(p.author):
- lines.append(repr(p))
- else:
- if compiled_patt.search(p.name):
- lines.append(repr(p))
-
- self.page_text(lines)
-
- def do_genconfig(self, args):
- # regenerates the configuration file
- import PyPMHelps
- if os.name=='nt':
- OSVALUE='MSWin32'
- tempvar='%TEMP%'
- else:
- OSVALUE=os.name
- tempvar = '$TEMP'
- try:
- self.page_text(PyPMHelps.configstr % (os.name ,
- os.environ['TEMP'] , ''))
- except KeyError:
- self.page_text(PyPMHelps.configstr % (os.name , tempvar , ''))
-
- def do_set(self,args):
- # manipulates PPM options
- self.cnfs.get_repository_info()
- if not args:
- errorreporter.formatter.formatOptions(self.cnfs.options, self.page_text)
- errorreporter.formatter.formatOptions(self.cnfs.get_repository_info(), self.page_text)
- return
- try:
- opts = AnalyzeSetArgs(args,self.cnfs.options)
- except getopt.GetoptError, e:
- errorreporter.Error(e)
- return
-
- return self.real_set(opts)
-
- def real_set(self, result):
- """result is a dictionary"""
- SaveIt = 0
- for key in result.keys():
- if key in self.cnfs.options.keys():
- errorreporter.Msg('Setting the ''%s'' option...' % key)
- if type(self.cnfs.options[key]) is IntType:
- try:
- self.cnfs.options[key] = int(result[key])
- except (AttributeError, ValueError):
- pass # XXX should report options that are
- # not integers
- elif type(self.cnfs.options[key]) is StringType:
- if result.has_key(key):
- self.cnfs.options[key] = str(result[key])
- elif key == 'save':
- SaveIt = 1
- # reflecting changes on the tracer object
- errorreporter.tracer.verbose = self.cnfs.options['verbose']
- if SaveIt:
- errorreporter.Msg('Saving the options to config file...')
- try:
- self.cnfs.save_options()
- except Exception, e:
- errorreporter.Error('An error occured while saving the configuration to the file.')
-
- def do_exit(self,args):
- sys.exit(0)
-
- def do_quit(self,args):
- self.do_exit(args)
-
- def do_EOF(self,args):
- self.do_exit(args)
-
- def exit(self,code):
- if self.pause: raw_input('Press any key to continue ...')
- sys.exit(code)
-
- #XXX
- def do_install(self,args):
- # installs a module
- location = ''
- if type(args) is StringType:
- args = args.split()
- try:
- opts, packages = getopt.getopt(args,'', ['location='])
- except getopt.GetoptError, e:
- errorreporter.Error(e)
- return
- for o,v in opts:
- if o=='--location':
- location = v
- else:
- errorreporter.Error('unrecognized option to install command: %s' % v)
- return
- if not packages:
- errorreporter.Error("No package was specified. Try 'help install'.")
- return
- if location:
- if location[-1]!='/': location += '/'
-
- return self.real_install(location, packages)
- # def do_install
-
- def real_install(self, location, packages):
- """location of PPDs
- packages: list of packages"""
- self.cnfs.get_installed_packages()
- install = installer.Installer(self.cnfs.options['verbose'],
- self.page_text)
-
- for pac in packages:
- dont_install = CheckPackage(pac, self.cnfs.InstPackages)
- answer = 'y'
- if dont_install:
- success = 0
- continue
- success = 1
- if self.interactive and self.cnfs.options['confirm']==1 and \
- dont_install==0:
- answer = raw_input("Install package %s? (Y/n): " % pac)
- if not answer:
- answer = 'y'
- elif dont_install==0:
- answer = 'y'
- if answer.lower() == 'y':
- self.page_text("Installing package '%s'..." % pac)
- if location or isurl(pac) or isfile(pac):
- try:
- ppd = install.fetch_ppd('',location+pac)
- except ProtocolError , e:
- errorreporter.Error(e)
- return 0
- except soaplib.ProtocolError, e:
- errorreporter.Error('can not reach the server: ' % e)
- return 0
- except Exception, e:
- errorreporter.Error(e)
- return 0
- try:
- install.InstallPackage(ppd,self.cnfs,self.cnfs.options['clean'])
- if location:
- self.cnfs.add_package(install.softpkg,location)
- else:
- self.cnfs.add_package(install.softpkg,pac)
- except FailInstall , e:
- errorreporter.Error(e)
- return 0
- except PackageNotFound:
- errorreporter.Error('Package (or PPD) not found at %s location.' % location)
- return 0
- except BadPPD, e:
- errorreporter.Error('Bad PPD file: %s' % e)
- return 0
- except AlreadyInstalled, e:
- errorreporter.Error(e)
- return 0
- else:
- l = len(self.cnfs.get_repository_info())
- for loco in self.cnfs.get_repository_info():
- errorreporter.Msg('Connecting to SOAP server %s...' % loco.name)
- try:
- ppd = install.fetch_ppd(loco,pac)
- except ProtocolError , e:
- errorreporter.Error(e)
- return 0
- except soaplib.ProtocolError, e:
- errorreporter.Error('can not reach the server: ' % e)
- return 0
- except Exception, e:
- errorreporter.Error(e)
- return 0
- try:
- install.InstallPackage(ppd,self.cnfs,self.cnfs.options['clean'])
- self.cnfs.add_package(install.softpkg,loco)
- break
- except FailInstall, e:
- errorreporter.Error(e)
- return 0
- except BadPPD, e:
- errorreporter.Error(e)
- return 0
- except PackageNotFound:
- errorreporter.Error('Package %s not found in the repository %s: %s' % \
- (pac,loco.name,loco.uri))
- if l>1 :
- errorreporter.Msg('Trying the next repository...')
- l -=1
- success = 0
- continue
- except BadPPD, e:
- errorreporter.Error('Bad PPD file: %s' % e)
- return 0
- except AlreadyInstalled, e:
- errorreporter.Error(e)
- return 0
- return 0
-
- def help_querydb(self):
- self.page_text(PyPMHelps.helpquerydb)
-
- def help_install(self):
- self.page_text(PyPMHelps.helpinstall)
-
- def help_query(self):
- self.page_text(PyPMHelps.helpquery)
-
- def help_search(self):
- self.page_text(PyPMHelps.helpsearch)
-
- def help_set(self):
- self.page_text(PyPMHelps.helpset)
-
- def help_version(self):
- self.page_text(PyPMHelps.helpversion)
-
- def help_genconfig(self):
- self.page_text(PyPMHelps.helpgenconfig)
-
- def help_help(self):
- self.page_text(PyPMHelps.helphelp)
-
- def help_verify(self):
- self.page_text(PyPMHelps.helpverify)
-
-
-
-
- class Pyshell(cmd.Cmd , PyPPM):
- def __init__(self, args, pause):
- self.pause = pause
- self.prompt = 'PPM>'
- cmds = [
- 'genconfig',
- 'help',
- 'install',
- 'query',
- 'remove',
- 'search',
- 'set',
- 'verify',
- 'version',
- 'getconfig',
- ]
- if len(args)>0 and type(args) is ListType:
- PyPPM.__init__(self,args)
- else:
- PyPPM.__init__(self, [])
- cmd.Cmd.__init__(self)
-
- def emptyline(self):
- pass
-
- def preloop(self):
- self.page_text("""
- Copyright (C) 2000-2001 ActiveState Tool Corp., All rights reserved.
- ActivePython PPM interactive shell (%s). Type 'help' for available commands.""" % (self.cnfs.version))
-
-