Viewing file: Catalog.py (13.47 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Source: /var/local/cvsroot/4Suite/Ft/Xml/Catalog.py,v $ $Revision: 1.32 $ $Date: 2005/04/07 19:59:10 $ """ Classes and functions that help implement OASIS XML and TR9401 Catalogs. Resolution with Catalogs is handled via the Ft.Xml.InputSource module.
Based on a contribution to PyXML from Tarn Weisner Burton <twburton@users.sf.net>. See http://sourceforge.net/tracker/index.php?func=detail&aid=490069&group_id=6473&atid=306473
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import os, re, sys, warnings, cStringIO from xml.sax import xmlreader
from Ft import FtWarning, GetConfigVar from Ft.Lib import Uri, UriException from Ft.Xml import XML_NAMESPACE
__all__ = ['Catalog', 'GetDefaultCatalog']
TR9401 = re.compile(r'^\s*(BASE|CATALOG|DELEGATE|PUBLIC|SYSTEM|OVERRIDE\s+YES|OVERRIDE\s+NO)\s+"((?:[^"\\]|\\.)*)"(?:\s+"((?:[^"\\]|\\.)*)")?', re.M | re.I)
def IsXml(bytes): # Test to see if the given bytes represent a (possibly) well-formed # XML document. (see http://www.w3.org/TR/REC-xml/#sec-guessing) if bytes[:4] in ('\x3C\x3F\x78\x6D', # '<?xm' 'x00x3Cx00x3F', # UTF16-BE '<?' 'x3Cx00x3Fx00', # UTF16-LE '<?' 'x4Cx6FxA7x94'): # EBCDIC '<?xm' # Definitly not an SGML document, assume XML document return True elif bytes[:2] in ('xFExFF', 'xFFxFE'): # UTF-16 BOM if bytes[:6].decode('UTF-16') == '<?': # Definitly not an SGML document, assume XML document return True characters = bytes.decode('UTF-16') elif bytes[:3] == 'xEFxBBxBF': # UTF-8 BOM if bytes[3:5] == '<?': # Definitly not an SGML document, assume XML document return True characters = bytes[3:].decode('UTF-8') elif bytes[:4] in ('x00x00xFExFF', # big-endian (1234) BOM 'xFFxFEx00x00', # little-endian (4321) BOM 'x00x00xFFxFE', # unusual encoding (2143) BOM 'xFExFFx00x00', # unusual encoding (3412) BOM 'x00x00x00x3C', # big-endian (1234) '<' 'x3Cx00x00x00', # little-endian (4321) '<' 'x00x00x3Cx00', # unusual encoding (2143) '<' 'x00x3Cx00x00'): # unusual encoding (3412) '<' # UCS-4 encoded; Python doesn't have this codec so return False # as the parser will complain anyway. return False else: characters = bytes
# Find the first non-whitespace character i = 0 while characters[i] in '\x20\x09\x0D\x0A': i += 1
# The first non-whitespace character in a well-formed XML document must # be a '<'. return characters[i] == '<'
class Catalog: """ Reads and provides access to a catalog, providing mappings of public and system IDs to URIs, etc.
It is implemented as a SAX ContentHandler and is able to read OASIS TR 9401 Catalogs <http://www.oasis-open.org/specs/a401.htm> and OASIS XML Catalogs <http://www.oasis-open.org/committees/entity/spec.html> """ def __init__(self, uri, quiet=True): self.systemIds = {} self.publicIds = {} self.uris = {} self.publicDelegates = [] self.systemDelegates = [] self.uriDelegates = [] self.systemRewrites = [] self.uriRewrites = [] self.catalogs = [] self.uri = uri self.quiet = quiet
if not Uri.IsAbsolute(uri): # Using a relative URI here makes it hard to reliably # locate the catalog. Also, if the catalog doesn't set # its own base URI with xml:base, then we won't be able # to resolve relative URI references within the catalog. # So we should warn that this situation is undesirable. warnings.warn("Catalog URI '%s' is not absolute.", FtWarning, 2)
stream = Uri.BASIC_RESOLVER.resolve(uri) data = stream.read() stream.close()
if IsXml(data): # cannot be a TR 9401 document, assume an XML Catalog self._parseXmlCat(data) else: # cannot be an XML Catalog, assume a TR 9401 file self._parseTr9401(data)
if not quiet: sys.stderr.write('Catalog contents:\n') for key in self.__dict__.keys(): sys.stderr.write(' %s = %r\n' % (key, self.__dict__[key])) sys.stderr.flush() return
def _parseXmlCat(self, data): """ Parse an XML Catalog, as specified in http://www.oasis-open.org/committees/entity/spec-2001-08-06.html. Partially implemented. """ self.prefer_public = [True] self.base = [self.uri]
# Since we have the catalog data already, parse it. source = xmlreader.InputSource(self.uri) source.setByteStream(cStringIO.StringIO(data))
from Ft.Xml.Sax import CreateParser p = CreateParser() p.setContentHandler(self) p.parse(source)
# are these explicit dels needed? del self.prefer_public del self.base return
def _parseTr9401(self, data): """ Parse a TR9401 Catalog, as specified in <http://www.oasis-open.org/specs/a401.htm>. Partially implemented. """ prefer_public = True base = self.uri for cmd in TR9401.findall(data): token = cmd[0].upper() if token == 'PUBLIC': if len(cmd) == 3: self.publicIds[cmd[1]] = (Uri.Absolutize(cmd[2], base), prefer_public) elif token == 'SYSTEM': if len(cmd) == 3: self.systemIds[cmd[1]] = Uri.Absolutize(cmd[2], base) elif token == 'BASE': base = cmd[1] elif token[:8] == 'OVERRIDE': prefer_public = token[8:].strip() == 'YES' elif token == 'DELEGATE': if len(cmd) == 3: self.publicDelegates[cmd[1]] = Uri.Absolutize(cmd[2], base) elif token == 'CATALOG': if len(cmd) == 2: catalog = Catalog(Uri.Absolutize(cmd[1], base), self.quiet) self.catalogs.append(catalog) return
# methods used by the XML parser
def startElementNS(self, (namespace, name), qualifiedName, attrs): """ Handle an element start event for the XML parser. This is a SAX ContentHandler method. """ # update current base URI base = self.base[-1] if name not in ('rewriteSystem', 'rewriteURI'): base = attrs.get((XML_NAMESPACE, 'base'), base) self.base.append(base) if name == 'public': # a publicId lookup if self.__ensure_attrs(name, attrs, 'publicId', 'uri'): # save the state of prefer_public also publicId = attrs[(None, 'publicId')] uri = Uri.Absolutize(attrs[(None, 'uri')], base) self.publicIds[publicId] = (uri, self.prefer_public[-1]) elif name == 'system': # a systemId lookup if self.__ensure_attrs(name, attrs, 'systemId', 'uri'): systemId = attrs[(None, 'systemId')] uri = Uri.Absolutize(attrs[(None, 'uri')], base) self.systemIds[systemId] = uri elif name == 'uri': # a URI lookup if self.__ensure_attrs(name, attrs, 'name', 'uri'): name = attrs[(None, 'name')] uri = Uri.Absolutize(attrs[(None, 'uri')], base) self.uris[name] = uri elif name == 'rewriteURI': # a URI rewrite if self.__ensure_attrs(name, attrs, 'uriStartString', 'rewritePrefix'): startString = attrs[(None, 'uriStartString')] rewritePrefix = Uri.Absolutize(attrs[(None, 'rewritePrefix')], base) self.uriRewrites.append((startString, rewritePrefix)) elif name == 'rewriteSystem': # a systemId rewrite if self.__ensure_attrs(name, attrs, 'systemIdStartString', 'rewritePrefix'): startString = attrs[(None, 'systemIdStartString')] rewritePrefix = Uri.Absolutize(attrs[(None, 'rewritePrefix')], base) self.systemRewrites.append((startString, rewritePrefix)) elif name == 'delegateSystem': # delegate systemId to specific catalog if self.__ensure_attrs(name, attrs, 'systemIdStartString', 'catalog '): startString = attrs[(None, 'systemIdStartString')] catalog = Uri.Absolutize(attrs[(None, 'catalog')], base) delegate = Catalog(catalog, self.quiet) self.systemRewrites.append((startString, delegate)) elif name == 'delegatePublic': # delegate publicId to specific catalog if self.__ensure_attrs(name, attrs, 'publicIdStartString', 'catalog '): # save the state of prefer_public also startString = attrs[(None, 'publicIdStartString')] catalog = Uri.Absolutize(attrs[(None, 'catalog')], base) delegate = Catalog(catalog, self.quiet) self.systemRewrites.append((startString, catalog, self.prefer_public[-1])) elif name == 'delegateURI': # delegate URI to specific catalog if self.__ensure_attrs(name, attrs, 'uriStartString', 'catalog '): startString = attrs[(None, 'uriStartString')] catalog = Uri.Absolutize(attrs[(None, 'catalog')], base) delegate = Catalog(catalog, self.quiet) self.systemRewrites.append((startString, catalog)) elif name == 'nextCatalog': # the next catalog in a chain if self.__ensure_attrs(name, attrs, 'catalog'): catalog = Uri.Absolutize(attrs[(None, 'catalog')], base) self.catalogs.append(Catalog(catalog, self.quiet)) elif name in ('catalog', 'group'): # look for prefer attribute and update the stack prefer = self.prefer_public[-1] and 'public' or 'system' prefer = attrs.get((None, 'prefer'), prefer) == 'public' self.prefer_public.append(prefer) return
def __ensure_attrs(self, name, attrs, *attr_names): """ Ensure that the right attributes exist just in case the parser is a non-validating one. """ for attr_name in attr_names: #if not attr_name in attrs: if not attrs.has_key((None, attr_name)): if not self.quiet: print '%s: Malformed %s element, missing %s attribute' % (self.uri, name, attr_name) return False return True
def endElementNS(self, (namespace, name), qualifiedName): """ Handle an element end event for the XML parser. This is a SAX ContentHandler method. """ self.base.pop() if name in ('catalog', 'group'): # pop the stack self.prefer_public.pop() return
def GetDefaultCatalog(basename='default.cat'): """ Load the default catalog file(s). """ quiet = 'XML_DEBUG_CATALOG' not in os.environ
uris = [] # original 4Suite XML Catalog support if 'XML_CATALOGS' in os.environ: # os.pathsep seperated list of pathnames for path in os.environ['XML_CATALOGS'].split(os.pathsep): uris.append(Uri.OsPathToUri(path))
# libxml2 XML Catalog support if 'XML_CATALOG_FILES' in os.environ: # whitespace-separated list of pathnames or URLs (ick!) for path in os.environ['XML_CATALOG_FILES'].split(): # if its already not already an URL, make it one if not Uri.IsAbsolute(path): uris.append(Uri.OsPathToUri(path)) else: uris.append(path)
# add the default 4Suite catalog path = os.path.join(GetConfigVar('DATADIR'), basename) uris.append(Uri.OsPathToUri(path))
if not quiet: sys.stderr.write ("Uris: %s\n" % uris)
catalog = None for uri in uris: if not quiet: sys.stderr.write('Reading %s\n' % uri) sys.stderr.flush() try: # FIXME: Use dict merging rather than this inefficient cascading if catalog is None: if not quiet: sys.stderr.write('Creating catalog from %s\n' % uri) sys.stderr.flush() catalog = Catalog(uri, quiet) else: if not quiet: sys.stderr.write('Appending %s\n' % uri) sys.stderr.flush() catalog.catalogs.append(Catalog(uri, quiet)) except UriException, e: warnings.warn("Catalog resource (%s) disabled: %s" % (uri, e.message), FtWarning)
if not quiet: sys.stderr.write('Done. Result is %r\n' % catalog) sys.stderr.flush()
return catalog
|