Viewing file: cDomlette.py (22.31 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Xml/cDomlette.py,v 1.38 2005/04/12 06:28:23 mbrown Exp $ """ cDomlette implementation: a very fast DOM-like library tailored for use in XPath/XSLT
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """ import re, warnings
from Ft.Xml import XML_NAMESPACE, XMLNS_NAMESPACE
# DOM stuff from cDomlettec import implementation from cDomlettec import NonvalParse, Parse, CreateParser
# cDomlette optimized NS functions from cDomlettec import GetAllNs, SeekNss
# Functions used for testing from cDomlettec import TestTree, TestRefCounts, StartNodeCounting, GetNodeCount
# -- XInclude support --------------------------------------------------
from cDomlettec import XPTR_START_STATE from cDomlettec import XPTR_ELEMENT_MATCH as ELEMENT_MATCH from cDomlettec import XPTR_ELEMENT_COUNT as ELEMENT_COUNT from cDomlettec import XPTR_ATTRIBUTE_MATCH as ATTRIBUTE_MATCH from cDomlettec import PARSE_STREAM_STATE as INITIAL_STATE
__all__ = ['implementation', 'NonvalParse', 'Parse', 'CreateParser', 'GetAllNs', 'SeekNss', 'ValParse']
class FragmentSpec: def __init__(self): self.states = [] self.nextAvailStateId = XPTR_START_STATE self.fromState = INITIAL_STATE return
def ProcessFragment(frag): """ Take an XPointer fragment and return a structure suitable for the cDomlette parser to update state tables Xptr e.g. xmlns(x=http://uche.ogbuji.net/eg) xpointer(/x:spam/x:eggs) """ from Ft.Xml.XPath import Compile from Ft.Xml.XPath.ParsedAbsoluteLocationPath import ParsedAbsoluteLocationPath XPTR_EXPR = re.compile("#.*xpointer\((?P<xptr>.*?)\)") NS_EXPR = re.compile("xmlns\((?P<nsdata>.*?)\)") xptr = XPTR_EXPR.findall(frag) #There must be an xptr expression if not xptr: return None
xptr = xptr[0] ns_mappings = {u'xml': XML_NAMESPACE} ns_matches = NS_EXPR.findall(frag) for m in ns_matches: try: prefix, ns = m.split('=') ns_mappings[prefix] = ns except ValueError: # FIXME: if we get here, then m doesn't conform to # the xmlns() scheme syntax. Raise exception? pass expr = Compile(xptr) #Assume context = root, which means in effect that the absolute = its relative "child" if isinstance(expr, ParsedAbsoluteLocationPath): #Extract the relative part (which looks like one form of parsed step) expr = expr._child result = FragmentSpec() HandleStep(expr, result, ns_mappings) return result
def HandleStep(expr, spec, nss): from Ft.Xml.XPath import Util from Ft.Xml.XPath.ParsedExpr import ParsedNLiteralExpr, ParsedEqualityExpr from Ft.Xml.XPath.ParsedNodeTest import LocalNameTest, QualifiedNameTest if hasattr(expr, "_left"): lstep = expr._left curr_step = expr._right HandleStep(lstep, spec, nss) elif hasattr(expr, "_left"): #Down to the "last" recursive step curr_step = expr._child else: #Down to the "last" recursive step curr_step = expr
#Set criteria by expanded name node_test = curr_step._nodeTest if isinstance(node_test, (LocalNameTest, QualifiedNameTest)): element_exp_name = node_test.getQuickKey(nss)[1] else: raise NotImplementedError(str(node_test)) criteria = [(ELEMENT_MATCH,) + element_exp_name]
#Set criteria from predicates if curr_step._predicates: pred = curr_step._predicates._predicates[0] if isinstance(pred, ParsedNLiteralExpr): #The third item is a counter used to count elements during the parsing criteria.extend([(ELEMENT_COUNT, int(pred._literal), [1])]) elif isinstance(pred, ParsedEqualityExpr) and pred._op == u'=': if hasattr(pred._left, '_axis') and \ pred._left._axis._axis == 'attribute': # criteria code criterion = [ATTRIBUTE_MATCH]
# Add the expanded name if hasattr(pred._left._nodeTest, '_localName'): criterion.append(nss[pred._left._nodeTest._prefix]) criterion.append(pred._left._nodeTest._localName) else: criterion.append(None) criterion.append(pred._left._nodeTest._name)
# Add the expected value criterion.append(pred._right._literal)
# Add this information to the criteria criteria.append(tuple(criterion))
# Add state transitions for the current step spec.states.append((spec.fromState, spec.nextAvailStateId, spec.nextAvailStateId + 1, spec.nextAvailStateId + 2, criteria )) spec.fromState = spec.nextAvailStateId spec.nextAvailStateId += 3 return
# -- Validation support ------------------------------------------------
from xml.sax import xmlreader from Ft import FtWarning from Ft.Xml import ReaderException, EMPTY_NAMESPACE from Ft.Xml.XInclude import XINCLUDE_NAMESPACE from Ft.Xml.Lib.XmlString import SplitQName, IsXmlSpace
class _ValReader:
### Reader Interface ###
def fromSrc(self, source, parseParamEntities=1): # DTD validation requires parameter entity parsing self.initHandler(source, 1) self.prepareParser()
# Parse it self.parse(source)
self.reset() return self.getRootNode()
def prepareParser(self): # We do this here due to the long import time from xml.parsers.xmlproc import xmlval, xmlproc
# Replace xmlproc's default uri resolution. We handle it # in the resolve_*_pubid() methods. from xml.parsers.xmlproc import xmlutils, dtdparser xmlutils.join_sysids = lambda base, url: url xmlproc.join_sysids = lambda base, url: url dtdparser.join_sysids = lambda base, url: url
self.parser = xmlval.XMLValidator()
self.parser.set_application(self) self.parser.set_error_handler(self) self.parser.set_pubid_resolver(self) self.parser.set_inputsource_factory(self) self.parser.set_dtd_listener(self)
# For validation self.parser.set_read_external_subset(1) return
def parse(self, source): self.parser.parse_resource(source)
def reset(self): self.parser = None return
def initHandler(self, inputSource, parseParamEntities): self.inputSource = inputSource self.parseParamEntities = parseParamEntities
# Init our root node document = implementation.createRootNode(inputSource.uri) self._rootNode = self._ownerDoc = document
# Setup the stack which keeps track of the nesting of DOM nodes. self._nodeStack = [self._rootNode] self._namespaces = [{u'xml': XML_NAMESPACE, u'xmlns':XMLNS_NAMESPACE}] self._currText = u''
self._preserveStateStack = [1] #initialize to preserve self._stripElements = inputSource.stripElements
# Setup our include depth self._includeDepth = 0
self._xmlBaseStack = [inputSource.uri]
# For XIncludes self._visited_hrefs = [] self._ignore_top_level_whitespace = [1]
# For disabling comments and processing instructions while # processing DTD self._inDTD = False return
def getRootNode(self): self._completeTextNode() return self._rootNode
### XMLPROC INTERFACES ###
# --- InputSourceFactory methods
def create_input_source(self, systemId): # systemId as returned from resolve_*_pubid() return systemId.stream
# --- PubIdResolver methods
def resolve_pe_pubid(self, publicId, systemId): #FIXME: this is a quick hack to get around a bug in this class. #the parser's InputSource is not kept up to date as entities #are resolved. For some reason the locator's is if isinstance(self._locator.get_current_sysid(), type("")): resolver = self.parser.get_current_sysid() else: resolver = self._locator.get_current_sysid() #resolver = self._locator.get_current_sysid() return resolver.resolve(systemId, publicId, 'PARAMETER_ENTITY', 1)
def resolve_doctype_pubid(self, publicId, systemId): #FIXME: this is a quick hack to get around a bug in this class. #the parser's InputSource is not kept up to date as entities #are resolved. For some reason the locator's is if isinstance(self._locator.get_current_sysid(), type("")): resolver = self.parser.get_current_sysid() else: resolver = self._locator.get_current_sysid() #resolver = self._locator.get_current_sysid() return resolver.resolve(systemId, publicId, 'DOCTYPE_DECL', 1)
def resolve_entity_pubid(self, publicId, systemId): #FIXME: this is a quick hack to get around a bug in this class. #the parser's InputSource is not kept up to date as entities #are resolved. For some reason the locator's is if isinstance(self._locator.get_current_sysid(), type("")): resolver = self.parser.get_current_sysid() else: resolver = self._locator.get_current_sysid() #resolver = self._locator.get_current_sysid() return resolver.resolve(systemId, publicId, 'EXTERNAL_ENTITY')
# --- DTDConsumer methods
def dtd_start(self): "Called when DTD parsing starts." self._inDTD = True return
def dtd_end(self): "Called when the DTD is completely parsed." self._inDTD = False return
def new_general_entity(self,name,val): "Receives internal general entity declarations." pass
def new_external_entity(self,ent_name,pub_id,sys_id,ndata): """Receives external general entity declarations. 'ndata' is the empty string if the entity is parsed.""" if ndata: resolver = self.inputSource.getUriResolver() uri = resolver.normalize(sys_id, self.inputSource.uri) self._ownerDoc.unparsedEntities[ent_name] = uri return
def new_parameter_entity(self,name,val): "Receives internal parameter entity declarations." pass
def new_external_pe(self,name,pubid,sysid): "Receives external parameter entity declarations." pass
def new_notation(self,name,pubid,sysid): "Receives notation declarations." pass
def new_element_type(self,elem_name,elem_cont): "Receives the declaration of an element type." pass
def new_attribute(self,elem,attr,a_type,a_decl,a_def): "Receives the declaration of a new attribute." pass
# --- Application methods
def set_locator(self, locator): self._locator = locator return
def doc_start(self): pass
def doc_end(self): pass
def handle_start_tag(self, name, attribs): """Signals the start of an element in non-namespace mode.
The name parameter contains the raw XML 1.0 name of the element type as a string and the attrs parameter holds an instance of the Attributes class containing the attributes of the element."""
self._completeTextNode()
self._ignore_top_level_whitespace.append(0)
# Create a copy of our parents namespaces self._namespaces.append(self._namespaces[-1].copy())
#Read our attributes for curr_attrib_key in attribs.keys(): if type(curr_attrib_key) is tuple: local = curr_attrib_key[1] prefix = None if curr_attrib_key[0]: raise RuntimeError("Namespaces in validating docs not supported") else: (prefix, local) = SplitQName(curr_attrib_key) if not local: raise ReaderException(ReaderException.INVALID_XMLNS, curr_attrib_key) if prefix == 'xmlns': self._namespaces[-1][local] = attribs[curr_attrib_key] or EMPTY_NAMESPACE elif prefix is None and local == 'xmlns': if not attribs[curr_attrib_key]: if None in self._namespaces[-1]: del self._namespaces[-1][None] else: self._namespaces[-1][None] = attribs[curr_attrib_key]
if isinstance(name, tuple): local = name[1] prefix = None if name[0]: raise RuntimeError("Namespaces in validating docs not supported") else: (prefix, local) = SplitQName(name)
# For consistency with cDomlette if prefix and prefix not in self._namespaces[-1]: self.error('unbound prefix')
if prefix in self._namespaces[-1]: namespace = self._namespaces[-1][prefix] else: namespace = EMPTY_NAMESPACE
attrs = {} qnames = {} for curr_attrib_key in attribs: if type(curr_attrib_key) is tuple: a_local = curr_attrib_key[1] a_prefix = None if curr_attrib_key[0]: raise RuntimeError("Namespaces in validating docs not supported") else: (a_prefix, a_local) = SplitQName(curr_attrib_key) if a_prefix: if a_prefix in self._namespaces[-1]: ns = self._namespaces[-1][a_prefix] else: # For consistency with cDomlette self.error('unbound prefix') else: ns = EMPTY_NAMESPACE
# For consistency with cDomlette if (ns, a_local) in attrs: self.error('duplicate attribute')
attrs[(ns, a_local)] = attribs[curr_attrib_key] qnames[(ns, a_local)] = curr_attrib_key
nsattribs = xmlreader.AttributesNSImpl(attrs, qnames)
#Adjust local variables (name, qname) = ((namespace or None, local), name)
#See if we need to handle XInclude
if self.inputSource.processIncludes and name == (XINCLUDE_NAMESPACE, 'include'):
if self._includeDepth: self._includeDepth += 1 else: #Looks like it is a GO!! href = nsattribs.get((EMPTY_NAMESPACE, u'href')) if not href: raise XIncludeException(XIncludeException.MISSING_HREF) source = self.inputSource.resolve(href, '', 'XINCLUDE') if source.uri in self._visited_hrefs: raise XIncludeException( XIncludeException.CIRCULAR_INCLUDE_ERROR, href=href)
parse = nsattribs.get((EMPTY_NAMESPACE, u'parse')) if not parse or parse == 'xml':
self._visited_hrefs.append(source.uri) self._ignore_top_level_whitespace.append(1)
orig_parser = self.parser orig_source = self.inputSource
self.prepareParser() self.inputSource = source
self.parse(source)
self.parser = orig_parser self.inputSource = orig_source
del self._visited_hrefs[-1] del self._ignore_top_level_whitespace[-1] else: self._currText = self._currText + source.stream.read() source.stream.close() self._includeDepth = 1 else: #end of the check for XInclude processing #Create the new Element new_element = self._ownerDoc.createElementNS(namespace, qname)
nextBase = self._xmlBaseStack[-1]
for attr_qname in nsattribs.getQNames():
attr_ns = nsattribs.getNameByQName(attr_qname)[0] (attr_prefix, attr_local) = SplitQName(attr_qname)
if attr_prefix is None and attr_local == 'xmlns': attr_ns = XMLNS_NAMESPACE attr_key = (attr_ns, None) else: attr_key = (attr_ns, attr_local)
new_element.setAttributeNS(attr_ns, attr_qname, nsattribs.getValueByQName(attr_qname))
#Look for a change in xml:base if (XML_NAMESPACE,'base') == attr_key: nextBase = nsattribs.getValueByQName(attr_qname)
#FIXME - store XML Base (cDomlette Nodes have this readonly) #new_element.xmlBase = new_element.baseURI = nextBase self._xmlBaseStack.append(nextBase)
new_pstate = self._preserveStateStack[-1] for (uri, local, strip) in self._stripElements: if (uri, local) in [(new_element.namespaceURI, new_element.localName), (EMPTY_NAMESPACE, u'*'), (new_element.namespaceURI, u'*') ]: new_pstate = not strip break self._preserveStateStack.append(new_pstate)
self._nodeStack.append(new_element) return
def handle_end_tag(self, name): """Signals the end of an element in non-namespace mode.
The name parameter contains the name of the element type, just as with the startElement event."""
del self._namespaces[-1]
if self._includeDepth: self._includeDepth = self._includeDepth - 1 del self._ignore_top_level_whitespace[-1] return
self._completeTextNode() del self._ignore_top_level_whitespace[-1]
new_element = self._nodeStack[-1]
del self._preserveStateStack[-1]
del self._xmlBaseStack[-1]
del self._nodeStack[-1] self._nodeStack[-1].appendChild(new_element) return
def handle_data(self, data, start, end): """Receive notification of character data.
The Parser will call this method to report each chunk of character data. SAX parsers may return all contiguous character data in a single chunk, or they may split it into several chunks; however, all of the characters in any single event must come from the same external entity so that the Locator provides useful information.""" if self._includeDepth: return self._currText = self._currText + data[start:end] return handle_ignorable_data = handle_data
def handle_pi(self, target, data): """Receive notification of a processing instruction.
The Parser will invoke this method once for each processing instruction found: note that processing instructions may occur before or after the main document element.
A SAX parser should never report an XML declaration (XML 1.0, section 2.8) or a text declaration (XML 1.0, section 4.3.1) using this method.""" if self._inDTD or self._includeDepth: return self._completeTextNode() pi = self._ownerDoc.createProcessingInstruction(target, data) pi.xmlBase = pi.baseURI = self._xmlBaseStack[-1] self._nodeStack[-1].appendChild(pi) return
def handle_comment(self, data): """Reports a comment anywhere in the document (including the DTD and outside the document element).
content is a string that holds the contents of the comment.""" if self._inDTD or self._includeDepth: return self._completeTextNode() comment = self._ownerDoc.createComment(data) comment.xmlBase = comment.baseURI = self._xmlBaseStack[-1] self._nodeStack[-1].appendChild(comment) return
def handle_doctype(self, name, publicId, systemId): """Report the start of the DTD declarations, if the document has an associated DTD.
A startEntity event will be reported before declaration events from the external DTD subset are reported, and this can be used to infer from which subset DTD declarations derive.
name is the name of the document element type, publicId the public identifier of the DTD (or None if none were supplied) and systemId the system identfier of the external subset (or None if none were supplied).""" self._ownerDoc.publicId = publicId self._ownerDoc.systemId = systemId return
def set_entity_info(self, xmlver, enc, sddecl): pass
# --- ErrorHandler methods
def get_locator(self): return self._locator
def fatal(self, message): raise ReaderException(ReaderException.XML_PARSE_ERROR, self._locator.get_current_sysid().uri, self._locator.get_line(), self._locator.get_column(), message) error = fatal
def warning(self, message): uri = self._locator.get_current_sysid().uri, line = self._locator.get_line(), col = self._locator.get_column(), # FIXME: l10n msg = "in %s at line %s, column %s: %s" % (uri[0], line[0], col[0], message) warnings.warn(msg, FtWarning, 2) return
# --- internal methods
def _completeTextNode(self): if not self._currText: return
data = self._currText self._currText = u''
# Ignore any top-level whitespace whitespace_only = IsXmlSpace(data) if whitespace_only and self._ignore_top_level_whitespace[-1]: return
if self._preserveStateStack[-1] or not whitespace_only: # Create a new text node text = self._ownerDoc.createTextNode(data) #FIXME - store XML Base (cDomlette Nodes have this readonly) #text.xmlBase = text.baseURI = self._xmlBaseStack[-1] top_node = self._nodeStack[-1] top_node.appendChild(text) return
try: from xml.parsers import xmlproc except ImportError: def ValParse(isrc, readExtDtd=True): from Ft.Xml import CheckVersion CheckVersion('a validating parser') else: def ValParse(isrc, readExtDtd=True): return _ValReader().fromSrc(isrc, readExtDtd)
|