Viewing file: Model.py (12.03 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Rdf/Model.py,v 1.22 2005/03/05 06:31:05 mbrown Exp $ """ The model container for RDF meta-data: represents a directed graph
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
__all__ = ['NORMAL', 'IGNORE_CASE', 'REGEX', 'Model']
from Ft.Lib import Uuid from Ft.Lib.Uri import MatchesUriSyntax from Ft.Rdf import BNODE_BASE, BNODE_BASE_LEN from Ft.Rdf import RDF_MS_BASE, RDF_SCHEMA_BASE from Ft.Rdf import OBJECT_TYPE_RESOURCE, OBJECT_TYPE_LITERAL from Ft.Rdf import Container, RdfException from Ft.Rdf.Statement import Statement from Ft.Rdf.Resource import Resource
_SequenceTypes = [list, tuple]
# Bit flags for pattern matches NORMAL = 0x00 #search terms exactly matched IGNORE_CASE = 0x01 #search terms matched regardless of case REGEX = 0x02 #search terms treated and matched as regular expressions
class Model(Resource): """Represents a universe of RDF statements, which at the abstract level represents a set of graphs that make up an RDF model"""
def __init__(self, driver, schemaHandler=None, baseUri=''): """Initializer for an RDF model instance. driver is an intance of the driver used to store the statements. schemaHandler is a reference to an object that can be used to govern operations on the model, as typically formalized in a schema language such as RDFS or DAML. This object can be None, disabling all such specialized processing. """ Resource.__init__(self, baseUri) self._driver = driver self._schemaHandler = schemaHandler if schemaHandler: schemaHandler.initModel(self) #Expose the schema handler for relevant operations self.s = self._schemaHandler return
def suspendSchema(self): if self._schemaHandler: return self._schemaHandler.suspend() return None
def resumeSchema(self): if self._schemaHandler: return self._schemaHandler.resume() return None
def setSchemaActivity(self, value): if self._schemaHandler: return self._schemaHandler.setActivity(value) return None
def checkConsistency(self): """ Check the consistency of the entire model""" if self._schemaHandler: stmts = self.statements() self._schemaHandler.checkConstraints(stmts) return None
#------------------------------------------------------------------- # Primitive access and manipulation #------------------------------------------------------------------- def add(self, statements): """Add statement(s) to the model""" if type(statements) not in _SequenceTypes: statements = (statements,) if self._schemaHandler: self._schemaHandler.processNewStatements(statements) self._driver.add(self._unmapStatements(statements))
def remove(self, statements): """Remove statement(s) from the model.""" if type(statements) not in _SequenceTypes: statements = (statements,) self._driver.remove(self._unmapStatements(statements)) return
def removePattern(self, subject, predicate, object, statementUri=None, scope=None, **flags): """ Remove all statements from the model that match the specified pattern. See complete() for a description of the flags keyword argument. """ self._validateFlags(flags) self._driver.removePattern(subject, predicate, object, statementUri, scope, flags) return
def contains(self, statement): """Check if a statement is in the model""" s = self._unmapStatements([statement])[0] return self._driver.contains(s[0], s[1], s[2], s[3], s[4], {})
def containsPattern(self, subject, predicate, object, statementUri=None, scope=None, **flags): """ Determine if any statement matching the given criteria is in the model. See complete() for a description of the flags keyword argument. """ self._validateFlags(flags) return self._driver.contains(subject, predicate, object, statementUri, scope, flags)
def statements(self, scope=None): """Returns all the statments in the model.""" return self.complete(None, None, None, None, scope)
def size(self, scope=None): """Returns the number of statements in the model""" return self._driver.size(scope)
def complete(self, subject, predicate, object, statementUri=None, scope=None, **flags): """ Return all the statements in the model that match the given pattern.
Any combination of s, p and o can be None, and any None slot is treated as a wildcard that matches any value in the model. A list is returned comprising all the statement objects that match the arguments.
flags keyword arguments map to the listed arguments with 'Flags' append to the name. Where the value is any combination of IGNORE_CASE and REGEX.
http://www.w3.org/TR/rdf-mt/#RDFRules [7.3 RDFS Entailment Rules] An RDFS entailment rule (rdfs7) can be applied to 'enhance' and/or optimize this calculation """ if flags: self._validateFlags(flags) if self._schemaHandler: return self._schemaHandler._complete(subject, predicate, object, statementUri, scope, flags) else: return self._complete(subject, predicate, object, statementUri, scope, flags)
def _complete(self, subject, predicate, object, statementUri, scope, flags): statements = self._driver.complete(subject, predicate, object, statementUri, scope, flags) return self._mapStatements(statements)
def exclude(self, subject, predicate, object, statementUri=None, scope=None, **flags): """ Return all the statements in the model that DO NOT match the given pattern.
See complete() for a description of the flags keyword argument. """ self._validateFlags(flags) statements = self._driver.exclude(subject, predicate, object, statementUri, scope, flags) return self._mapStatements(statements)
#------------------------------------------------------------------- #Container access and manipulation #------------------------------------------------------------------- def addContainer(self, container, scope=None): """Add a container to the model. This involves adding a statement for the container type, and a container statement for each item in the collection.""" # FIXME - update to new interface 3/30/01 uri = container.uri if not uri: #We need to generate one uri = self.generateUri()
ctype = container.className stmts = [(uri, RDF_MS_BASE + 'type', RDF_MS_BASE+ctype, uri, scope or '', OBJECT_TYPE_RESOURCE)] idx = 1 for item in container: obj_type = MatchesUriSyntax(item) and OBJECT_TYPE_RESOURCE \ or OBJECT_TYPE_LITERAL stmts.append((uri, RDF_MS_BASE + '_%d' % idx, item, "", uri, obj_type)) idx += 1
self._driver.add(stmts)
return uri
def extractContainer(self, uri, scope=None): """Build a container object from the container item relationships in the model (_1, _2, etc.)."""
#Get type of the bag rt = self.complete(uri, RDF_MS_BASE+'type', None)
if not len(rt): return None
oCtype = rt[0].object ctype = rt[0].object[len(RDF_MS_BASE):]
#Get all of the statements about the bag contents = self.complete(uri, RDF_MS_BASE+'_.*', None, predicateFlags=REGEX)
#Make sure there is only on copy of each _ newContents = [] found = [] for c in contents: if c.predicate not in found: newContents.append(c) found.append(c.predicate) contents = newContents
if ctype == 'Seq': # Remove the leading underscore also strip_len = len(RDF_MS_BASE) + 1 contents.sort(lambda left, right, strip_=strip_len: cmp(int(left.predicate[strip_:]), int(right.predicate[strip_:])) )
contents = [item.object for item in contents]
cont = None if ctype == 'Bag': cont = Container.Bag(uri, contents) elif ctype == 'Seq': cont = Container.Sequence(uri, contents) elif ctype == 'Alt': cont = Container.Alternative(uri, contents) else: raise RdfException(RdfException.INVALID_CONTAINER_TYPE,(oCtype,)) return cont
#------------------------------------------------------------------- #Utility #------------------------------------------------------------------- def generateUri(self): """ Generates URIs on the fly, e.g. for reified statements. Do *not* use this to generate anonymous resources. Use generateBnode instead The default method is to generate a UUID URN, but this can be easily overridden.""" return 'urn:uuid:'+Uuid.UuidAsString(Uuid.GenerateUuid())
def generateBnode(self): """ Generates blank nodes (bnodes), AKA anonymous resources """ return BNODE_BASE + Uuid.UuidAsString(Uuid.GenerateUuid())
def isBnodeLabel(self, label): """ Determines whether a label is a blank node """ return label[:BNODE_BASE_LEN] == BNODE_BASE
def bind(self, object, name, scope=None): """Bind a Compiled Inference Object to a specific name in the Model""" self._driver.bind(object, name, scope)
def unbind(self, name, scope=None): """Remove a binding from the model""" self._driver.unbind(name, scope)
def lookup(self, name, scope=None): """Retrieve a previously bound Inference Object from the model""" return self._driver.lookup(name, scope)
def has_key(self, name, scope=None): """See if the specified name is bound to the model""" return self._driver.has_key(name, scope)
def keys(self, scope=None): """Get a list of bound names in the model""" return self._driver.keys(scope)
def compileRil(self, expression): """Compile a Ril expression into a compiled object""" from Ft.Rdf.Parsers.Ril import CompiledRilExpression return CompiledRilExpression.CompiledRilExpression(expression)
def _dump(self, scope=None): """Print all statements in the mode to stdout""" for statement in self.statements(): print statement
def _mapStatements(self, tuples): """Map a list of tuples to a list of statement objects""" return [Statement(*tup[:6]) for tup in tuples]
def _unmapStatements(self, statements): """Map a list of statement objects into a list of tuples""" return [s.asTuple(encoding='unicode') for s in statements]
def _validateFlags(self,flags): """Validate the the set of kw args passed into the pattern functions are what is allowed""" allowedFlags = ['subjectFlags', 'predicateFlags', 'objectFlags', 'scopeFlags', 'statementUriFlags']
for k in flags.keys(): if k not in allowedFlags: raise RdfException(RdfException.INVALID_FLAG,(k,))
|