home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import os
- import sys
- import apt_pkg
- from apt import Package
- import apt.progress as apt
-
- class FetchCancelledException(IOError):
- '''Exception that is thrown when the user cancels a fetch operation.'''
- pass
-
-
- class FetchFailedException(IOError):
- '''Exception that is thrown when fetching fails.'''
- pass
-
-
- class LockFailedException(IOError):
- '''Exception that is thrown when locking fails.'''
- pass
-
-
- class Cache(object):
- """Dictionary-like package cache.
-
- This class has all the packages that are available in it's
- dictionary
- """
-
- def __init__(self, progress = None, rootdir = None, memonly = False):
- self._callbacks = { }
- if memonly:
- apt_pkg.Config.Set('Dir::Cache::pkgcache', '')
-
- if rootdir:
- if os.path.exists(rootdir + '/etc/apt/apt.conf'):
- apt_pkg.ReadConfigFile(apt_pkg.Config, rootdir + '/etc/apt/apt.conf')
-
- if os.path.isdir(rootdir + '/etc/apt/apt.conf.d'):
- apt_pkg.ReadConfigDir(apt_pkg.Config, rootdir + '/etc/apt/apt.conf.d')
-
- apt_pkg.Config.Set('Dir', rootdir)
- apt_pkg.Config.Set('Dir::State::status', rootdir + '/var/lib/dpkg/status')
-
- self.open(progress)
-
-
- def _runCallbacks(self, name):
- ''' internal helper to run a callback '''
- if name in self._callbacks:
- for callback in self._callbacks[name]:
- callback()
-
-
-
-
- def open(self, progress):
- ''' Open the package cache, after that it can be used like
- a dictionary
- '''
- self._runCallbacks('cache_pre_open')
- self._cache = apt_pkg.GetCache(progress)
- self._depcache = apt_pkg.GetDepCache(self._cache)
- self._records = apt_pkg.GetPkgRecords(self._cache)
- self._list = apt_pkg.GetPkgSourceList()
- self._list.ReadMainList()
- self._dict = { }
- if progress is not None:
- progress.Op = 'Building data structures'
-
- i = last = 0
- size = len(self._cache.Packages)
- for pkg in self._cache.Packages:
- if progress is not None and last + 100 < i:
- progress.update((i / float(size)) * 100)
- last = i
-
- if len(pkg.VersionList) > 0:
- self._dict[pkg.Name] = Package(self._cache, self._depcache, self._records, self._list, self, pkg)
-
- i += 1
-
- if progress is not None:
- progress.done()
-
- self._runCallbacks('cache_post_open')
-
-
- def __getitem__(self, key):
- ''' look like a dictionary (get key) '''
- return self._dict[key]
-
-
- def __iter__(self):
- for pkgname in self._dict.keys():
- yield self._dict[pkgname]
-
- raise StopIteration
-
-
- def has_key(self, key):
- return key in self._dict
-
-
- def __contains__(self, key):
- return key in self._dict
-
-
- def __len__(self):
- return len(self._dict)
-
-
- def keys(self):
- return self._dict.keys()
-
-
- def getChanges(self):
- ''' Get the marked changes '''
- changes = []
- for name in self._dict.keys():
- p = self._dict[name]
- if p.markedUpgrade and p.markedInstall and p.markedDelete and p.markedDowngrade or p.markedReinstall:
- changes.append(p)
- continue
-
- return changes
-
-
- def upgrade(self, distUpgrade = False):
- ''' Upgrade the all package, DistUpgrade will also install
- new dependencies
- '''
- self.cachePreChange()
- self._depcache.Upgrade(distUpgrade)
- self.cachePostChange()
-
-
- def requiredDownload(self):
- '''Get the size of the packages that are required to download.'''
- pm = apt_pkg.GetPackageManager(self._depcache)
- fetcher = apt_pkg.GetAcquire()
- pm.GetArchives(fetcher, self._list, self._records)
- return fetcher.FetchNeeded
-
- requiredDownload = property(requiredDownload)
-
- def additionalRequiredSpace(self):
- '''Get the size of the additional required space on the fs.'''
- return self._depcache.UsrSize
-
- additionalRequiredSpace = property(additionalRequiredSpace)
-
- def reqReinstallPkgs(self):
- '''Return the packages not downloadable packages in reqreinst state.'''
- reqreinst = set()
- for pkg in self:
- if not (pkg.candidateDownloadable):
- if pkg._pkg.InstState == apt_pkg.InstStateReInstReq or pkg._pkg.InstState == apt_pkg.InstStateHoldReInstReq:
- reqreinst.add(pkg.name)
- continue
-
- return reqreinst
-
- reqReinstallPkgs = property(reqReinstallPkgs)
-
- def _runFetcher(self, fetcher):
- res = fetcher.Run()
- failed = False
- transient = False
- errMsg = ''
- for item in fetcher.Items:
- if item.Status == item.StatDone:
- continue
-
- if item.StatIdle:
- transient = True
- continue
-
- errMsg += 'Failed to fetch %s %s\n' % (item.DescURI, item.ErrorText)
- failed = True
-
- if res == fetcher.ResultCancelled:
- raise FetchCancelledException(errMsg)
- res == fetcher.ResultCancelled
- if failed:
- raise FetchFailedException(errMsg)
- failed
- return res
-
-
- def _fetchArchives(self, fetcher, pm):
- ''' fetch the needed archives '''
- lockfile = apt_pkg.Config.FindDir('Dir::Cache::Archives') + 'lock'
- lock = apt_pkg.GetLock(lockfile)
- if lock < 0:
- raise LockFailedException('Failed to lock %s' % lockfile)
- lock < 0
-
- try:
- if not pm.GetArchives(fetcher, self._list, self._records):
- return False
- return self._runFetcher(fetcher)
- finally:
- os.close(lock)
-
-
-
- def isVirtualPackage(self, pkgname):
- '''Return whether the package is a virtual package.'''
- pkg = self._cache[pkgname]
- if pkg.ProvidesList:
- pass
- return bool(not (pkg.VersionList))
-
-
- def getProvidingPackages(self, virtual):
- '''
- Return a list of packages which provide the virtual package of the
- specified name
- '''
- providers = []
-
- try:
- vp = self._cache[virtual]
- if len(vp.VersionList) != 0:
- return providers
- except KeyError:
- return providers
-
- for pkg in self:
- v = self._depcache.GetCandidateVer(pkg._pkg)
- if v is None:
- continue
-
- for p in v.ProvidesList:
- if virtual == p[0]:
- providers.append(pkg)
- continue
-
-
- return providers
-
-
- def update(self, fetchProgress = None):
- ''' run the equivalent of apt-get update '''
- lockfile = apt_pkg.Config.FindDir('Dir::State::Lists') + 'lock'
- lock = apt_pkg.GetLock(lockfile)
- if lock < 0:
- raise LockFailedException('Failed to lock %s' % lockfile)
- lock < 0
-
- try:
- if fetchProgress is None:
- fetchProgress = apt.progress.FetchProgress()
-
- return self._cache.Update(fetchProgress, self._list)
- finally:
- os.close(lock)
-
-
-
- def installArchives(self, pm, installProgress):
- installProgress.startUpdate()
- res = installProgress.run(pm)
- installProgress.finishUpdate()
- return res
-
-
- def commit(self, fetchProgress = None, installProgress = None):
- ''' Apply the marked changes to the cache '''
- if fetchProgress is None:
- fetchProgress = apt.progress.FetchProgress()
-
- if installProgress is None:
- installProgress = apt.progress.InstallProgress()
-
- pm = apt_pkg.GetPackageManager(self._depcache)
- fetcher = apt_pkg.GetAcquire(fetchProgress)
- while True:
- res = self._fetchArchives(fetcher, pm)
- res = self.installArchives(pm, installProgress)
- if res == pm.ResultCompleted:
- break
-
- if res == pm.ResultFailed:
- raise SystemError('installArchives() failed')
- res == pm.ResultFailed
- fetcher.Shutdown()
- return res == pm.ResultCompleted
-
-
- def clear(self):
- ''' Unmark all changes '''
- self._depcache.Init()
-
-
- def cachePostChange(self):
- ''' called internally if the cache has changed, emit a signal then '''
- self._runCallbacks('cache_post_change')
-
-
- def cachePreChange(self):
- ''' called internally if the cache is about to change, emit
- a signal then '''
- self._runCallbacks('cache_pre_change')
-
-
- def connect(self, name, callback):
- ''' connect to a signal, currently only used for
- cache_{post,pre}_{changed,open} '''
- if name not in self._callbacks:
- self._callbacks[name] = []
-
- self._callbacks[name].append(callback)
-
-
-
- class Filter(object):
- ''' Filter base class '''
-
- def apply(self, pkg):
- ''' Filter function, return True if the package matchs a
- filter criteria and False otherwise
- '''
- return True
-
-
-
- class MarkedChangesFilter(Filter):
- ''' Filter that returns all marked changes '''
-
- def apply(self, pkg):
- if pkg.markedInstall and pkg.markedDelete or pkg.markedUpgrade:
- return True
- return False
-
-
-
- class FilteredCache(object):
- ''' A package cache that is filtered.
-
- Can work on a existing cache or create a new one
- '''
-
- def __init__(self, cache = None, progress = None):
- if cache is None:
- self.cache = Cache(progress)
- else:
- self.cache = cache
- self.cache.connect('cache_post_change', self.filterCachePostChange)
- self.cache.connect('cache_post_open', self.filterCachePostChange)
- self._filtered = { }
- self._filters = []
-
-
- def __len__(self):
- return len(self._filtered)
-
-
- def __getitem__(self, key):
- return self.cache._dict[key]
-
-
- def __iter__(self):
- for pkgname in self._filtered:
- yield self.cache[pkgname]
-
-
-
- def keys(self):
- return self._filtered.keys()
-
-
- def has_key(self, key):
- return key in self._filtered
-
-
- def __contains__(self, key):
- return key in self._filtered
-
-
- def _reapplyFilter(self):
- ''' internal helper to refilter '''
- self._filtered = { }
- for pkg in self.cache._dict.keys():
- for f in self._filters:
- if f.apply(self.cache._dict[pkg]):
- self._filtered[pkg] = 1
- break
- continue
-
-
-
-
- def setFilter(self, filter):
- '''Set the current active filter.'''
- self._filters = []
- self._filters.append(filter)
- self.cache.cachePostChange()
-
-
- def filterCachePostChange(self):
- '''Called internally if the cache changes, emit a signal then.'''
- self._reapplyFilter()
-
-
- def __getattr__(self, key):
- '''we try to look exactly like a real cache.'''
- return getattr(self.cache, key)
-
-
-
- def cache_pre_changed():
- print 'cache pre changed'
-
-
- def cache_post_changed():
- print 'cache post changed'
-
- if __name__ == '__main__':
- print 'Cache self test'
- apt_pkg.init()
- c = Cache(apt.progress.OpTextProgress())
- c.connect('cache_pre_change', cache_pre_changed)
- c.connect('cache_post_change', cache_post_changed)
- print 'aptitude' in c
- p = c['aptitude']
- print p.name
- print len(c)
- for pkg in c.keys():
- x = c[pkg].name
-
- c.upgrade()
- changes = c.getChanges()
- print len(changes)
- for p in changes:
- x = p.name
-
- for d in [
- '/tmp/pytest',
- '/tmp/pytest/partial']:
- if not os.path.exists(d):
- os.mkdir(d)
-
-
- apt_pkg.Config.Set('Dir::Cache::Archives', '/tmp/pytest')
- pm = apt_pkg.GetPackageManager(c._depcache)
- fetcher = apt_pkg.GetAcquire(apt.progress.TextFetchProgress())
- c._fetchArchives(fetcher, pm)
- print 'Testing filtered cache (argument is old cache)'
- f = FilteredCache(c)
- f.cache.connect('cache_pre_change', cache_pre_changed)
- f.cache.connect('cache_post_change', cache_post_changed)
- f.cache.upgrade()
- f.setFilter(MarkedChangesFilter())
- print len(f)
- for pkg in f.keys():
- x = f[pkg].name
-
- print len(f)
- print 'Testing filtered cache (no argument)'
- f = FilteredCache(progress = OpTextProgress())
- f.cache.connect('cache_pre_change', cache_pre_changed)
- f.cache.connect('cache_post_change', cache_post_changed)
- f.cache.upgrade()
- f.setFilter(MarkedChangesFilter())
- print len(f)
- for pkg in f.keys():
- x = f[pkg].name
-
- print len(f)
-
-