| Viewing file:  miscutils.py (10.54 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
#!/usr/bin/python -tt
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 # the Free Software Foundation; either version 2 of the License, or
 # (at your option) any later version.
 #
 # This program is distributed in the hope that it will be useful,
 # but WITHOUT ANY WARRANTY; without even the implied warranty of
 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 # GNU Library General Public License for more details.
 #
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 # Copyright 2003 Duke University
 
 import string
 import rpm
 import types
 import gzip
 import os
 import sys
 import locale
 
 import rpmUtils.transaction
 
 def rpmOutToStr(arg):
 if type(arg) != types.StringType:
 # and arg is not None:
 arg = str(arg)
 
 return arg
 
 
 def compareEVR((e1, v1, r1), (e2, v2, r2)):
 # return 1: a is newer than b
 # 0: a and b are the same version
 # -1: b is newer than a
 e1 = rpmOutToStr(e1)
 v1 = rpmOutToStr(v1)
 r1 = rpmOutToStr(r1)
 e2 = rpmOutToStr(e2)
 v2 = rpmOutToStr(v2)
 r2 = rpmOutToStr(r2)
 #print '%s, %s, %s vs %s, %s, %s' % (e1, v1, r1, e2, v2, r2)
 rc = rpm.labelCompare((e1, v1, r1), (e2, v2, r2))
 #print '%s, %s, %s vs %s, %s, %s = %s' % (e1, v1, r1, e2, v2, r2, rc)
 return rc
 
 def checkSig(ts, package):
 """Takes a transaction set and a package, check it's sigs,
 return 0 if they are all fine
 return 1 if the gpg key can't be found
 return 2 if the header is in someway damaged
 return 3 if the key is not trusted
 return 4 if the pkg is not gpg or pgp signed"""
 
 value = 0
 currentflags = ts.setVSFlags(0)
 fdno = os.open(package, os.O_RDONLY)
 try:
 hdr = ts.hdrFromFdno(fdno)
 except rpm.error, e:
 if str(e) == "public key not availaiable":
 value = 1
 if str(e) == "public key not available":
 value = 1
 if str(e) == "public key not trusted":
 value = 3
 if str(e) == "error reading package header":
 value = 2
 else:
 error, siginfo = getSigInfo(hdr)
 if error == 101:
 os.close(fdno)
 del hdr
 value = 4
 else:
 del hdr
 
 try:
 os.close(fdno)
 except OSError, e: # if we're not opened, don't scream about it
 pass
 
 ts.setVSFlags(currentflags) # put things back like they were before
 return value
 
 def getSigInfo(hdr):
 """checks signature from an hdr hand back signature information and/or
 an error code"""
 
 locale.setlocale(locale.LC_ALL, 'C')
 string = '%|DSAHEADER?{%{DSAHEADER:pgpsig}}:{%|RSAHEADER?{%{RSAHEADER:pgpsig}}:{%|SIGGPG?{%{SIGGPG:pgpsig}}:{%|SIGPGP?{%{SIGPGP:pgpsig}}:{(none)}|}|}|}|'
 siginfo = hdr.sprintf(string)
 if siginfo != '(none)':
 error = 0
 sigtype, sigdate, sigid = siginfo.split(',')
 else:
 error = 101
 sigtype = 'MD5'
 sigdate = 'None'
 sigid = 'None'
 
 infotuple = (sigtype, sigdate, sigid)
 return error, infotuple
 
 def pkgTupleFromHeader(hdr):
 """return a pkgtuple (n, a, e, v, r) from a hdr object, converts
 None epoch to 0, as well."""
 
 name = hdr['name']
 arch = hdr['arch']
 ver = hdr['version']
 rel = hdr['release']
 epoch = hdr['epoch']
 if epoch is None:
 epoch = '0'
 pkgtuple = (name, arch, epoch, ver, rel)
 return pkgtuple
 
 
 def rangeCheck(reqtuple, pkgtuple):
 """returns true if the package epoch-ver-rel satisfy the range
 requested in the reqtuple:
 ex: foo >= 2.1-1"""
 # we only ever get here if we have a versioned prco
 # nameonly shouldn't ever raise it
 (reqn, reqf, (reqe, reqv, reqr)) = reqtuple
 (n, a, e, v, r) = pkgtuple
 #simple failures
 if reqn != n: return 0
 # and you thought we were done having fun
 # if the requested release is left out then we have
 # to remove release from the package prco to make sure the match
 # is a success - ie: if the request is EQ foo 1:3.0.0 and we have
 # foo 1:3.0.0-15 then we have to drop the 15 so we can match
 if reqr is None:
 r = None
 if reqe is None:
 e = None
 if reqv is None: # just for the record if ver is None then we're going to segfault
 v = None
 
 rc = compareEVR((e, v, r), (reqe, reqv, reqr))
 
 if rc >= 1:
 if reqf in ['GT', 'GE', 4, 12]:
 return 1
 if rc == 0:
 if reqf in ['GE', 'LE', 'EQ', 8, 10, 12]:
 return 1
 if rc <= -1:
 if reqf in ['LT', 'LE', 2, 10]:
 return 1
 return 0
 
 ###########
 # Title: Remove duplicates from a sequence
 # Submitter: Tim Peters
 # From: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/52560
 
 def unique(s):
 """Return a list of the elements in s, but without duplicates.
 
 For example, unique([1,2,3,1,2,3]) is some permutation of [1,2,3],
 unique("abcabc") some permutation of ["a", "b", "c"], and
 unique(([1, 2], [2, 3], [1, 2])) some permutation of
 [[2, 3], [1, 2]].
 
 For best speed, all sequence elements should be hashable.  Then
 unique() will usually work in linear time.
 
 If not possible, the sequence elements should enjoy a total
 ordering, and if list(s).sort() doesn't raise TypeError it's
 assumed that they do enjoy a total ordering.  Then unique() will
 usually work in O(N*log2(N)) time.
 
 If that's not possible either, the sequence elements must support
 equality-testing.  Then unique() will usually work in quadratic
 time.
 """
 
 n = len(s)
 if n == 0:
 return []
 
 # Try using a dict first, as that's the fastest and will usually
 # work.  If it doesn't work, it will usually fail quickly, so it
 # usually doesn't cost much to *try* it.  It requires that all the
 # sequence elements be hashable, and support equality comparison.
 u = {}
 try:
 for x in s:
 u[x] = 1
 except TypeError:
 del u  # move on to the next method
 else:
 return u.keys()
 
 # We can't hash all the elements.  Second fastest is to sort,
 # which brings the equal elements together; then duplicates are
 # easy to weed out in a single pass.
 # NOTE:  Python's list.sort() was designed to be efficient in the
 # presence of many duplicate elements.  This isn't true of all
 # sort functions in all languages or libraries, so this approach
 # is more effective in Python than it may be elsewhere.
 try:
 t = list(s)
 t.sort()
 except TypeError:
 del t  # move on to the next method
 else:
 assert n > 0
 last = t[0]
 lasti = i = 1
 while i < n:
 if t[i] != last:
 t[lasti] = last = t[i]
 lasti += 1
 i += 1
 return t[:lasti]
 
 # Brute force is all that's left.
 u = []
 for x in s:
 if x not in u:
 u.append(x)
 return u
 
 
 def splitFilename(filename):
 """pass in a standard style rpm fullname and it returns
 a name, version, release, epoch, arch
 aka  foo-1.0-1.i386.rpm returns foo, 1.0, 1, i386
 1:bar-9-123a.ia64.rpm returns bar, 9, 123a, 1, ia64
 """
 
 if filename[-4:] == '.rpm':
 filename = filename[:-4]
 
 archIndex = string.rfind(filename, '.')
 arch = filename[archIndex+1:]
 
 relIndex = string.rfind(filename[:archIndex], '-')
 rel = filename[relIndex+1:archIndex]
 
 verIndex = string.rfind(filename[:relIndex], '-')
 ver = filename[verIndex+1:relIndex]
 
 epochIndex = string.find(filename, ':')
 if epochIndex == -1:
 epoch = ''
 else:
 epoch = filename[:epochIndex]
 
 name = filename[epochIndex + 1:verIndex]
 return name, ver, rel, epoch, arch
 
 
 def rpm2cpio(fdno, out=sys.stdout, bufsize=2048):
 """Performs roughly the equivalent of rpm2cpio(8).
 Reads the package from fdno, and dumps the cpio payload to out,
 using bufsize as the buffer size."""
 ts = rpmUtils.transaction.initReadOnlyTransaction()
 hdr = ts.hdrFromFdno(fdno)
 del ts
 
 compr = hdr[rpm.RPMTAG_PAYLOADCOMPRESSOR] or 'gzip'
 #XXX FIXME
 #if compr == 'bzip2':
 # TODO: someone implement me!
 #el
 if compr != 'gzip':
 raise rpmUtils.RpmUtilsError, \
 'Unsupported payload compressor: "%s"' % compr
 f = gzip.GzipFile(None, 'rb', None, os.fdopen(fdno, 'rb', bufsize))
 while 1:
 tmp = f.read(bufsize)
 if tmp == "": break
 out.write(tmp)
 f.close()
 
 def formatRequire (name, version, flags):
 s = name
 
 if flags:
 if flags & (rpm.RPMSENSE_LESS | rpm.RPMSENSE_GREATER |
 rpm.RPMSENSE_EQUAL):
 s = s + " "
 if flags & rpm.RPMSENSE_LESS:
 s = s + "<"
 if flags & rpm.RPMSENSE_GREATER:
 s = s + ">"
 if flags & rpm.RPMSENSE_EQUAL:
 s = s + "="
 if version:
 s = "%s %s" %(s, version)
 return s
 
 def stringToVersion(verstring):
 if verstring is None:
 return (None, None, None)
 i = string.find(verstring, ':')
 if i != -1:
 try:
 epoch = string.atol(verstring[:i])
 except ValueError:
 # look, garbage in the epoch field, how fun, kill it
 epoch = '0' # this is our fallback, deal
 else:
 epoch = '0'
 j = string.find(verstring, '-')
 if j != -1:
 if verstring[i + 1:j] == '':
 version = None
 else:
 version = verstring[i + 1:j]
 release = verstring[j + 1:]
 else:
 if verstring[i + 1:] == '':
 version = None
 else:
 version = verstring[i + 1:]
 release = None
 return (epoch, version, release)
 
 def hdrFromPackage(ts, package):
 """hand back the rpm header or raise an Error if the pkg is fubar"""
 try:
 fdno = os.open(package, os.O_RDONLY)
 except OSError, e:
 raise rpmUtils.RpmUtilsError, 'Unable to open file'
 
 try:
 hdr = ts.hdrFromFdno(fdno)
 except rpm.error, e:
 os.close(fdno)
 raise rpmUtils.RpmUtilsError, "RPM Error opening Package"
 if type(hdr) != rpm.hdr:
 os.close(fdno)
 raise rpmUtils.RpmUtilsError, "RPM Error opening Package"
 
 os.close(fdno)
 return hdr
 
 |