Viewing file: XUpdate.py (22.52 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Xml/XUpdate.py,v 1.57 2005/03/22 09:02:20 mbrown Exp $ """ XUpdate request processing
XUpdate is specified (poorly) at http://xmldb-org.sourceforge.net/xupdate/xupdate-wd.html
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
__all__ = ['XUPDATE_NS', 'XUpdateException', 'g_errorMessages', 'StringWriter', 'SUPPORTED_VERSIONS', 'Processor', 'Reader', 'ApplyXUpdate']
XUPDATE_NS = 'http://www.xmldb.org/xupdate'
import string from xml.dom import Node
from Ft import FtException from Ft import TranslateMessage as _ from Ft.Xml import EMPTY_NAMESPACE, XML_NAMESPACE, Domlette from Ft.Xml.Lib.XmlString import SplitQName, XmlStrStrip, IsXmlSpace from Ft.Xml.XPath import g_parser as parser from Ft.Xml.XPath import Context, Conversions, Types from Ft.Xml.Xslt import NullWriter, DomWriter, AttributeValueTemplate from Ft.Xml.Xslt.CopyOfElement import CopyNode
class XUpdateException(FtException): """ Exception class for errors specific to XUpdate processing """ SYNTAX_ERROR = 1 UNRECOGNIZED_INSTRUCTION = 2 NO_VERSION = 10 NO_SELECT = 11 NO_TEST = 12 INVALID_SELECT = 13 UNSUPPORTED_VERSION = 14 INVALID_DOM_NODE = 100 UNKNOWN_NODE_TYPE = 101
def __init__(self, errorCode, *args, **kwargs): FtException.__init__(self, errorCode, g_errorMessages, args, **kwargs)
g_errorMessages = { XUpdateException.SYNTAX_ERROR: _('Syntax error in expression %(expr)r: %(err)s'), XUpdateException.UNRECOGNIZED_INSTRUCTION: _('Unrecognized instruction in XUpdate namespace: %(name)r'), XUpdateException.NO_VERSION: _('Missing required version attribute'), XUpdateException.NO_SELECT: _('Missing required select attribute'), XUpdateException.NO_TEST: _('Missing required "test" attribute'), XUpdateException.INVALID_SELECT: _('select expression "%(expr)s" must evaluate to a non-empty node-set'), XUpdateException.UNSUPPORTED_VERSION: _('XUpdate version %(version)s unsupported by this implementation'), XUpdateException.INVALID_DOM_NODE: _('Invalid DOM node %(node)r'), XUpdateException.UNKNOWN_NODE_TYPE: _('Unknown node type %(nodetype)r'), }
SUPPORTED_VERSIONS = ('1.0',)
class StringWriter(NullWriter.NullWriter): def __init__(self): self._result = []
def getResult(self): return u''.join(self._result)
def text(self, data): self._result.append(data) return
class Processor: def __init__(self, reader=None): self.writers = [NullWriter.NullWriter(None)] return
#needed by CopyNode def output(self): """Returns the current output writer""" return self.writers[-1]
def pushDomResult(self, ownerDocument): self.writers.append(DomWriter.DomWriter(ownerDocument)) return
def pushStringResult(self): self.writers.append(StringWriter()) return
def popResult(self): return self.writers.pop().getResult()
def execute(self, node, xupdate, variables=None, processorNss=None): if variables is None: variables = {} if processorNss is None: processorNss = {}
context = Context.Context(node, varBindings=variables, processorNss=processorNss) self.visit(context, xupdate, 0) return node
def visit(self, context, node, preserveSpace): try: node_type = node.nodeType except AttributeError: raise XUpdateException(XUpdateException.INVALID_DOM_NODE, node=node)
try: visit = self._dispatch_node[node_type] except KeyError: # unknown node type, try and get a "pretty" name for the error node_types = {} for name in dir(Node): if name.endswith('_NODE'): node_types[getattr(Node, name)] = name node_type = node_types.get(node_type, node_type) raise XUpdateException(XUpdateException.INVALID_DOM_NODE, nodetype=node_type) else: visit(self, context, node, preserveSpace) return
# -- NodeType Dispatch --------------------------------------------- _dispatch_node = {}
def _visit_document(self, context, node, preserveSpace): element = node.documentElement if element is not None and \ element.namespaceURI == XUPDATE_NS and \ element.localName == 'modifications': version = element.getAttributeNS(EMPTY_NAMESPACE, 'version') if not version: raise XUpdateException(XUpdateException.NO_VERSION) if version not in SUPPORTED_VERSIONS: raise XUpdateException(XUpdateException.UNSUPPORTED_VERSION, version=version) for node in element.childNodes: self.visit(context, node, preserveSpace) return _dispatch_node[Node.DOCUMENT_NODE] = _visit_document
def _visit_text(self, context, node, preserveSpace): if preserveSpace or not IsXmlSpace(node.data): self.writers[-1].text(node.data) return _dispatch_node[Node.TEXT_NODE] = _visit_text
def _visit_element(self, context, element, preserveSpace): xml_space = element.getAttributeNS(XML_NAMESPACE, 'space') if xml_space == 'preserve': preserveSpace = 1 elif xml_space == 'default': preserveSpace = 0 # else, no change
if element.namespaceURI != XUPDATE_NS: self.writers[-1].startElement(element.nodeName, element.namespaceURI)
# Process the attributes for attr in element.attributes.values(): self.writers[-1].attribute(attr.nodeName, attr.value, attr.namespaceURI)
# Now the children for node in element.childNodes: self.visit(context, node, preserveSpace)
self.writers[-1].endElement(element.nodeName, element.namespaceURI) else: try: visit = self._dispatch_xupdate[element.localName] except KeyError: # unknown/unsupported local-name raise XUpdateException( XUpdateException.UNRECOGNIZED_INSTRUCTION, name=element.localName) else: visit(self, context, element, preserveSpace) return _dispatch_node[Node.ELEMENT_NODE] = _visit_element
def _visit_ignorable(self, context, node, preserveSpace): return _dispatch_node[Node.COMMENT_NODE] = _visit_ignorable _dispatch_node[Node.PROCESSING_INSTRUCTION_NODE] = _visit_ignorable
# -- XUpdate Dispatch ---------------------------------------------- _dispatch_xupdate = {}
### Commands ###
def _xu_remove(self, context, element, preserveSpace): select = element.getAttributeNS(EMPTY_NAMESPACE, u'select') if not select: raise XUpdateException(XUpdateException.NO_SELECT)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element) nodeset = self.evaluateExpression(context, select) if nodeset: refnode = nodeset[0] if refnode.nodeType == Node.ATTRIBUTE_NODE: parent = refnode.ownerElement parent.removeAttributeNode(refnode) else: parent = refnode.parentNode if parent is None: parent = refnode.ownerDocument parent.removeChild(nodeset[0]) context.processorNss = oldNss return _dispatch_xupdate['remove'] = _xu_remove
def _xu_append(self, context, element, preserveSpace): select = element.getAttributeNS(EMPTY_NAMESPACE, u'select') if not select: raise XUpdateException(XUpdateException.NO_SELECT)
child = element.getAttributeNS(EMPTY_NAMESPACE, u'child') or u'last()'
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element)
nodeset = self.evaluateExpression(context, select) if not isinstance(nodeset, Types.NodesetType) or not nodeset: # Error if not a node-set or empty node-set raise XUpdateException(XUpdateException.INVALID_SELECT, expr=select)
for refnode in nodeset: self.pushDomResult(refnode.ownerDocument) # A wrapper element is used in case attributes are being added wrapper_localName = 'wrapper' wrapper_namespace = EMPTY_NAMESPACE try: self.writers[-1].startElement(wrapper_localName, wrapper_namespace) for node in element.childNodes: self.visit(context, node, preserveSpace) finally: self.writers[-1].endElement(wrapper_localName, wrapper_namespace) result = self.popResult()
size = len(refnode.childNodes) con = Context.Context(refnode, 1, size, processorNss={'xupdate': XUPDATE_NS}) # Python lists is 0-indexed counting, node-sets 1-indexed position = self.evaluateExpression(con, child) position = int(Conversions.NumberValue(position))
wrapper = result.childNodes[0] if wrapper.attributes and hasattr(refnode, 'setAttributeNodeNS'): for attr in wrapper.attributes.values(): refnode.setAttributeNodeNS(attr)
# we operate on a shallow copy of the child nodes here to avoid # modifying the membership of the sequence we're interating over. for node in tuple(wrapper.childNodes): if position >= size: refnode.appendChild(node) else: refnode.insertBefore(node, refnode.childNodes[position])
context.processorNss = oldNss return _dispatch_xupdate['append'] = _xu_append
def _xu_insert(self, context, element, preserveSpace): select = element.getAttributeNS(EMPTY_NAMESPACE, u'select') if not select: raise XUpdateException(XUpdateException.NO_SELECT)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element)
nodeset = self.evaluateExpression(context, select) if not nodeset: raise XUpdateException(XUpdateException.INVALID_SELECT, expr=select)
for refnode in nodeset: self.pushDomResult(refnode.ownerDocument) try: for child in element.childNodes: self.visit(context, child, preserveSpace) finally: result = self.popResult()
if element.localName == 'insert-before': refnode.parentNode.insertBefore(result, refnode) elif element.localName == 'insert-after': # if arg 2 is None, insertBefore behaves like appendChild refnode.parentNode.insertBefore(result, refnode.nextSibling) context.processorNss = oldNss return _dispatch_xupdate['insert-after'] = _xu_insert _dispatch_xupdate['insert-before'] = _xu_insert
def _xu_update(self, context, element, preserveSpace): select = element.getAttributeNS(EMPTY_NAMESPACE, u'select') if not select: raise XUpdateException(XUpdateException.NO_SELECT)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element)
nodeset = self.evaluateExpression(context, select) if not nodeset: raise XUpdateException(XUpdateException.INVALID_SELECT, expr=select) refnode = nodeset[0]
if refnode.nodeType == Node.ATTRIBUTE_NODE: self.pushStringResult() try: for child in element.childNodes: self.visit(context, child, preserveSpace) finally: result = self.popResult() refnode.value = result else: self.pushDomResult(refnode.ownerDocument) try: for child in element.childNodes: self.visit(context, child, preserveSpace) finally: result = self.popResult()
while refnode.firstChild: refnode.removeChild(refnode.firstChild)
refnode.appendChild(result)
context.processorNss = oldNss return _dispatch_xupdate['update'] = _xu_update
def _xu_rename(self, context, element, preserveSpace): select = element.getAttributeNS(EMPTY_NAMESPACE, u'select') if not select: raise XUpdateException(XUpdateException.NO_SELECT)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element)
nodeset = self.evaluateExpression(context, select) if not nodeset: raise XUpdateException(XUpdateException.INVALID_SELECT, expr=select)
new_name = XmlStrStrip(element.firstChild.data) (prefix, local) = SplitQName(new_name) if prefix: namespace = context.processorNss[prefix] else: namespace = EMPTY_NAMESPACE
for refnode in nodeset: if refnode.nodeType == Node.ATTRIBUTE_NODE: parent = refnode.ownerElement parent.removeAttributeNode(refnode) parent.setAttributeNS(namespace, new_name, refnode.value) else: parent = refnode.parentNode if parent is None: parent = refnode.ownerDocument new_elem = refnode.ownerDocument.createElementNS(namespace, new_name) parent.replaceChild(new_elem, refnode) # Copy any existing attributes to the newly created element if refnode.attributes: for attr in refnode.attributes.values(): new_elem.setAttributeNodeNS(attr) # Now copy any children as well while refnode.firstChild: new_elem.appendChild(refnode.firstChild)
context.processorNss = oldNss return _dispatch_xupdate['rename'] = _xu_rename
# Conditional statements are not part of the XUpdate spec, # though it has provisions for them because the spec is # not so much use without them # xupdate:if is a common-sense 4Suite extension def _xu_if(self, context, element, preserveSpace): test = element.getAttributeNS(EMPTY_NAMESPACE, u'test') if not test: raise XUpdateException(XUpdateException.NO_TEST)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element)
result = self.evaluateExpression(context, test) if Conversions.BooleanValue(result): for node in element.childNodes: self.visit(context, node, preserveSpace)
context.processorNss = oldNss return _dispatch_xupdate['if'] = _xu_if
def _xu_variable(self, context, element, preserveSpace): name = element.getAttributeNS(EMPTY_NAMESPACE, 'name') if not name: raise XUpdateException(XUpdateException.NO_NAME)
select = element.getAttributeNS(EMPTY_NAMESPACE, u'select') if select: oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element) result = self.evaluateExpression(context, select) context.processorNss = oldNss else: result = Conversions.StringValue(element)
(prefix, local) = SplitQName(name) if prefix: namespace = context.processorNss[prefix] else: namespace = EMPTY_NAMESPACE
context.varBindings[(namespace, local)] = result return _dispatch_xupdate['variable'] = _xu_variable
### Instructions ###
def _xu_element(self, context, element, preserveSpace): name = element.getAttributeNS(EMPTY_NAMESPACE, 'name') if not name: raise XUpdateException(XUpdateException.NO_NAME) _name = self.parseAVT(name)
namespace = element.getAttributeNS(EMPTY_NAMESPACE, 'namespace') _namespace = self.parseAVT(namespace)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element) name = _name.evaluate(context)
namespace = _namespace and _namespace.evaluate(context)
(prefix, local) = SplitQName(name) if not namespace: if prefix: namespace = context.processorNss[prefix] else: namespace = EMPTY_NAMESPACE
self.writers[-1].startElement(name, namespace) for child in element.childNodes: self.visit(context, child, preserveSpace) self.writers[-1].endElement(name, namespace) context.processorNss = oldNss return _dispatch_xupdate['element'] = _xu_element
def _xu_attribute(self, context, node, preserveSpace): name = node.getAttributeNS(EMPTY_NAMESPACE, 'name') if not name: raise XUpdateException(XUpdateException.NO_NAME) _name = self.parseAVT(name)
namespace = node.getAttributeNS(EMPTY_NAMESPACE, 'namespace') _namespace = self.parseAVT(namespace)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(node) name = _name.evaluate(context) namespace = _namespace and _namespace.evaluate(context)
(prefix, local) = SplitQName(name) if not namespace: if prefix: namespace = context.processorNss[prefix] else: namespace = EMPTY_NAMESPACE self.pushStringResult() try: for child in node.childNodes: self.visit(context, child, preserveSpace) finally: result = self.popResult()
self.writers[-1].attribute(name, result, namespace) context.processorNss = oldNss return _dispatch_xupdate['attribute'] = _xu_attribute
def _xu_text(self, context, element, preserveSpace): self.pushStringResult() try: for node in element.childNodes: self.visit(context, node, 1) finally: result = self.popResult()
self.writers[-1].text(result) return _dispatch_xupdate['text'] = _xu_text
def _xu_processing_instruction(self, context, element, preserveSpace): name = element.getAttributeNS(EMPTY_NAMESPACE, 'name') if not name: raise XUpdateException(XUpdateException.NO_NAME) _name = self.parseAVT(name)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(node) name = _name.evaluate(context)
self.pushStringResult() try: for node in element.childNodes: self.visit(context, node, preserveSpace) finally: result = self.popResult()
self.writers[-1].processingInstruction(name, result) context.processorNss = oldNss return _dispatch_xupdate['processing-instruction'] = _xu_processing_instruction
def _xu_comment(self, context, element, preserveSpace): self.pushStringResult() try: for node in element.childNodes: self.visit(context, node, preserveSpace) finally: result = self.popResult()
self.writers[-1].comment(result) return _dispatch_xupdate['comment'] = _xu_comment
def _xu_value_of(self, context, element, preserveSpace): select = element.getAttributeNS(EMPTY_NAMESPACE, u'select') if not select: raise XUpdateException(XUpdateException.NO_SELECT)
oldNss = context.processorNss context.processorNss = Domlette.GetAllNs(element)
result = self.evaluateExpression(context, select) if isinstance(result, Types.NodesetType): for node in result: # should be OK to pass self as processor; # CopyNode only needs to access its .writers[-1] CopyNode(self, node) else: # a string, number or boolean if not isinstance(result, Types.StringType): result = Conversions.StringValue(result) self.writers[-1].text(result) context.processorNss = oldNss return _dispatch_xupdate['value-of'] = _xu_value_of
def evaluateExpression(self, context, expression): try: parsed_expr = parser.new().parse(expression) except SyntaxError, e: raise XUpdateException(XUpdateException.SYNTAX_ERROR, expr=expression, err=str(e)) else: return parsed_expr.evaluate(context)
def parseAVT(self, avt): if avt is None: return None return AttributeValueTemplate.AttributeValueTemplate(avt)
class Reader(Domlette.NonvalidatingReaderBase): """ A reader of XUpdate documents. Must contain a fromSrc() method that takes an Ft.Xml.InputSource and returns a Domlette document. It does not need to detect XUpdate syntax errors. """ fromSrc = Domlette.NonvalidatingReaderBase.parse
# -- XUpdate user API -------------------------------------------------
def ApplyXUpdate(doc, xup): """ Takes 2 InputSources, one for the source document and one for the XUpdate instructions. It returns a DOM node representing the result of applying the XUpdate to the source document (the document is modified in-place). """ reader = Domlette.NonvalidatingReader xureader = Reader() processor = Processor() source = reader.parse(doc) xupdate = xureader.fromSrc(xup) processor.execute(source, xupdate) #The source has been updated in place return source
def ApplyXupdate(doc, xup): """ Deprecated. Use ApplyXUpdate (only the name changed). """ import warnings warnings.warn('Deprecated function ApplyXupdate called. Please' ' use ApplyXUpdate (with a capital "U") instead.', DeprecationWarning, 2) return ApplyXUpdate(doc, xup)
|