Viewing file: MetaKit.py (17.59 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Rdf/Drivers/MetaKit.py,v 1.6 2005/03/29 00:30:48 mbrown Exp $ """ A persistent RDF model driver using MetaKit database files
MetaKit is an efficient embedded database library with a small footprint. See http://www.equi4.com/metakit/
This driver originally donated by Alexander Smishlajev: http://lists.fourthought.com/pipermail/4suite-dev/2002-August/000621.html with copyright transferred to Fourthought.
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import os, re, codecs import cPickle
from Ft.Rdf import Model from Ft.Rdf.Drivers import DataBaseExceptions from Ft.Rdf.Drivers import PROPERTIES
# see revision 1.1 in 4Suite CVS for Alexander's revision history VERSION = "0.1"
VERSION_VIEW = "ftrdf_version[version:S]" STATEMENT_VIEW = "ftrdf_%s_statement" \ "[subject:S,predicate:S,object:S,statementUri:S,scope:S,otype:S]" BOUND_VIEW = "ftrdf_%s_bound[name:S,object:B,scope:S]"
# Note: no "otype" attribute STATEMENT_ATTRS = ("subject", "predicate", "object", "statementUri", "scope") STATEMENT_FLAGS = map(lambda x: "%sFlags" %x, STATEMENT_ATTRS)
# MetaKit writable mode: # 1 - single user read/write; cannot be shared # 2 - commit-extend mode. supports multiple readers and a single writer; # datafiles will grow on each commit, and need to be cleaned up # periodically to avoid filling the disk. MODE_RW = 1
def InitializeModule(): """ Post-import hook to initialize module's runtime variables that are not required at import time, but will be needed before the module-level functions are called. """ global metakit global DATABASE_DIR
import metakit
from Ft import GetConfigVar dbdir = os.path.join(GetConfigVar('LOCALSTATEDIR'), 'MetaKit') dbdir = os.environ.get('FT_DATABASE_DIR', dbdir)
# Normalize path, remove unnecessary slashes DATABASE_DIR = os.path.abspath(dbdir)
# FIXME: l10n if not os.path.isdir(DATABASE_DIR): raise ValueError("MetaKit database directory %s does not exist;" " create it or check FT_DATABASE_DIR" % DATABASE_DIR) return
def CreateFileName(dbName): return os.path.join(DATABASE_DIR, "ft__%s.mk" % str(dbName))
def CreateDb(dbName, modelName='default'): fName = CreateFileName(dbName) _fPath = os.path.dirname(fName) if _fPath and not os.path.isdir(_fPath): os.makedirs(_fPath) if os.path.exists(fName): os.unlink(fName) db = metakit.storage(fName, 1) # database version vw = db.getas(VERSION_VIEW) vw.append(version=VERSION) db.commit() return DbAdapter(dbName, modelName)
def GetDb(dbName,modelName='default'): return DbAdapter(dbName, modelName)
def DestroyDb(dbName, modelName='default'): fName = CreateFileName(dbName) if os.path.exists(fName): os.unlink(fName)
def CheckVersion(dbName): fName = CreateFileName(dbName) db = metakit.storage(fName, 0) # database version vw = db.getas(VERSION_VIEW) _version = vw[0].version if _version != VERSION: raise TypeError("This RDF Database is obsolete (%s)," +" and will not work with this version" +" of 4Suite\'s MetaKit driver (%s)." +" You must use an older version of 4Suite" +" to export its contents, delete the database file" +" (%s), and re-run 4ss_manager init." %(_version, VERSION, fName))
def ExistsDb(dbName, modelName='default'): fName = CreateFileName(dbName) if not os.path.exists(fName): return 0 db = metakit.storage(fName, 1) vw = db.getas(VERSION_VIEW) return ((len(vw) > 0) and (vw[0].version == VERSION))
(enc, dec, srdr, swtr) = codecs.lookup('utf-8') dec_utf8 = lambda s: s and dec(s)[0] or u"" enc_utf8 = lambda u: u and enc(u)[0] or ""
def _filter_function(rowref, filter={}): """apply each function in filter to corresp. rowref attrs; return truth
parameters: rowref - parameter passed to the filtering function by MetaKit view: row refference filter - maps row attribute names to filters.
return True if all of attribute filtering functions returned True """ for (_name, _filter) in filter.items(): if not _filter(getattr(rowref, _name)): return 0 return 1
class DbAdapter: def __init__(self, name, modelName='default'): self._fName = CreateFileName(name) self._db = None self._modelName = modelName self._statementView = "ftrdf_%s_statement" % modelName self._boundView = "ftrdf_%s_bound" % modelName self.props = {PROPERTIES.OBJECT_TYPE_SUPPORTED: 1} return
def require_transaction(self): """raise an exception if transaction was not opened""" if not self._db: raise DataBaseExceptions.NoTransaction
def mk_bound_view(self): """return the MetaKit view containing RDF bindings""" self.require_transaction() return self._db.getas(BOUND_VIEW % self._modelName)
def mk_statement_view(self): """return the MetaKit view containing RDF statements""" self.require_transaction() return self._db.getas(STATEMENT_VIEW % self._modelName)
def _mk_query_statement(self, statement, flags): """internal function: return (query, filter) pair""" # combine flags with values: # make a list of (name, pattern, flag) tuples _condition = map(None, STATEMENT_ATTRS, map(enc_utf8, statement), map(lambda n, f=flags: f.get(n, None), STATEMENT_FLAGS) ) # MetaKit condition: values with Model.NORMAL flag _query = {} # additional filter functions: _filter = {} # compose the query and filter list for (_name, _pattern, _flag) in _condition: if not _pattern: continue elif _flag in (None, Model.NORMAL): _query[_name] = _pattern elif _flag == Model.IGNORE_CASE: _filter[_name] = lambda x, s=_pattern.lower(): x.lower() == s elif _flag == Model.REGEX: try: _filter[_name] = re.compile(_pattern).match except re.error, e: raise RdfException(RdfException.INVALID_REGEX_STATEMENT, _pattern, str(e)) elif _flag == Model.REGEX + Model.IGNORE_CASE: try: _filter[_name] = re.compile(_pattern, re.IGNORECASE).match except re.error, e: raise RdfException(RdfException.INVALID_REGEX_STATEMENT, _pattern, str(e)) else: # this couldn't happen _query[_name] = _pattern #print ("mk_condition:", _condition, _query, _filter) return (_query, _filter)
def mk_select(self, statement, flags): """complete the statement; return MetaKit view
parameters: statement - 5-element sequence (subject, predicate, object, statementUri, scope) flags - dictionary with following recognized keys: "subjectFlags", "predicateFlags", "objectFlags", "statementUriFlags", "scopeFlags". dictionary values are any combination of IGNORE_CASE and REGEX """ (_query, _filter) = self._mk_query_statement(statement, flags) # get the view _vw = self.mk_statement_view() # initial filtering (by exact match patterns) if _query: _vw = _vw.select(_query) # additional filtering if _filter: _vi = _vw.filter(lambda x, f=_filter: _filter_function(x, f)) _vw = _vw.remapwith(_vi) #print ("mk_select: %i rows" % len(_vw), _query, _filter) return _vw
def mk_versa_list(self, view, sel_col, sel_val): """Versa query utility: return rownums for all sel_col in sel_val
Parameters: view - MetaKit view used for lookups. sel_col - name of the filter column. Lookup is performed on this column for values in sel_val sel_val - sequence of the lookup values. When sel_col value matches any value in sel_val, value from val_sel column is returned. Return value: list of matching row numbers """ _rv = [] _sv = map(enc_utf8, sel_val or ()) for _ii in range(len(view)): _row = view[_ii] if (not _sv) or (getattr(view[_ii], sel_col) in _sv): _rv.append(_ii) #print ("versa_query: %i rows" % len(_rv), _sv) return _rv
def mk_get_subjects(self, view, indices): """return disctinct subjects from listed rows
Parameters: view - RDF Model (MetaKit view) indices - list of row indexes Return value: list of unique subjects """ _rv = {} for _ii in indices: _rv[dec_utf8(view[_ii].subject)] = 1 return _rv.keys()
def mk_get_objects(self, view, indices): """return disctinct (object, otype) pairs from listed rows
Parameters: view - RDF Model (MetaKit view) indices - list of row indexes Return value: list of unique (object, otype) pairs """ _rv = {} for _ii in indices: _row = view[_ii] _rv[tuple(map(dec_utf8, (_row.object, _row.otype)))] = 1 return _rv.keys()
### Transactional Interface ###
def begin(self): self._db = metakit.storage(self._fName, MODE_RW) return
def commit(self): if not self._db: raise DataBaseExceptions.NoTransaction self._db.commit() self._db = None return
def rollback(self): self._db.rollback() self._db = None return
### Operations ###
def add(self, statements): # stored statements -> statement tuple # Takes a list of tuples [(s, p, o, stmtUri, srcUri, otype), ...] _vw = self.mk_statement_view() for s in statements: # XXX Ft/Share/Tests/Rdf/Drivers/test_interface.py", # line 38, in Test, passes 4-element tuple (without otype). # XXX What should be default otype? if len(s) < 6: _otype = '' else: _otype = enc_utf8(s[5]) _vw.append( subject=enc_utf8(s[0]), predicate=enc_utf8(s[1]), object=enc_utf8(s[2]), statementUri=enc_utf8(s[3]), scope=enc_utf8(s[4]), otype=_otype # s[5] ) return
def remove(self, statements): for s in statements: self.removePattern(s[0], s[1], s[2], s[3], s[4], {}) return
def removePattern(self, subject, predicate, object, statementUri, scope, flags): # prepare query (_query, _filter) = self._mk_query_statement( (subject, predicate, object, statementUri, scope), flags) # get the view _vw = self.mk_statement_view() if not (_query or _filter): # empty condition; remove all _vw[:] = [] elif not _filter: # simple condition: remove statements matching _query _vi = _vw.indices(_vw.select(_query)) _vw.remove(_vi) else: # augument _filter with _query for _name, _val in _query.items(): _filter[_name] = lambda x, s=_val: x == s # remove rows matching _filter _vi = _vw.filter(lambda x, f=_filter: _filter_function(x, f)) _vw.remove(_vi) return
### Queries
def properties(self): _vw = self.mk_statement_view() # group by predicate (get a count in the second column; discard it) _vw = _vw.counts(_vw.structure()[1], "cnt") return map(lambda x: dec_utf8(x.predicate), _vw)
def resources(self): _vw = self.mk_statement_view() # get distinct set of objects (renamed to subjects for union) _v1 = _vw.counts(_vw.structure()[2], "cnt").rename("object", "subject") # get distinct set of subjects augumented with the set of objects _vw = _v1.union(_vw.counts(_vw.structure()[0], "cnt")) # group by resource (get a count in the second column; discard it) _vw = _vw.counts(_vw.structure()[0], "cnt") return map(lambda x: dec_utf8(x.subject), _vw)
def complete(self, subject, predicate, object, statementUri, scope, flags): # complete the model _vw = self.mk_select( (subject, predicate, object, statementUri, scope), flags) # compose list-of-lists _props = _vw.structure() _lol = [] for _rowref in _vw: _ll = [] for _property in _props: _val = getattr(_rowref, _property.name) if _property.type == "S": _ll.append(dec_utf8(_val)) else: _ll.append(_val) _lol.append(tuple(_ll)) return _lol
def size(self, scope): _vw = self.mk_statement_view() if scope: _vw = _vw.select({"scope":scope}) return len(_vw)
def contains(self, subject, predicate, object, statementUri, scope, flags): # complete the model _vw = self.mk_select( (subject, predicate, object, statementUri, scope), flags) # return True if we've got any rows return len(_vw) > 0
def bind(self, object, name, scope): _vw = self.mk_bound_view() # encode name and scope _name = enc_utf8(name) _scope = enc_utf8(scope) _v1 = _vw.select({"name":_name, "scope":_scope}) _obj = cPickle.dumps(object) if len(_v1): # name already bound; replace object _v1[0].object = _obj else: # new binding _vw.append(name=_name, scope=_scope, object=_obj) return
def unbind(self, name, scope): _vw = self.mk_bound_view() # encode name and scope _name = enc_utf8(name) _scope = enc_utf8(scope) _v1 = _vw.select({"name":_name, "scope":_scope}) if len(_v1): # binding found; delete _vw.remove(_v1) return
def lookup(self, name, scope): _vw = self.mk_bound_view() # encode name and scope _name = enc_utf8(name) _scope = enc_utf8(scope) _v1 = _vw.select({"name":_name, "scope":_scope}) if len(_v1): return cPickle.loads(_v1[0].object) else: return None
def keys(self, scope): _vw = self.mk_bound_view() if scope: _scope = enc_utf8(scope) _vw = _vw.select({"scope":scope}) return map(lambda x: dec_utf8(x.name), _vw)
def has_key(self, name, scope): _vw = self.mk_bound_view() _query = {"name":enc_utf8(name)} if scope: _query["scope"] = enc_utf8(scope) return (len(_vw.select(_query)) > 0)
## Utilities for performance, primarily in Versa ## def subjectsFromPredAndObjs(self, predicate, objects, scope=None): """Get a list of subjects with the given predicate and objects""" #FIXME: support scope _vw = self.mk_statement_view() if predicate: _vw = _vw.select(predicate=enc_utf8(predicate)) #print ("subjectsFromPredAndObjs: %i rows" % len(_vw), predicate, objects) if len(_vw) < 1: return [] _idx = self.mk_versa_list(_vw, "object", objects) return self.mk_get_subjects(_vw, _idx)
def subjectsFromPredsAndObj(self, predicates, object, scope=None): """Get a list of subjects with the given predicates and object""" #FIXME: support scope _vw = self.mk_statement_view() if object: _vw = _vw.select(object=enc_utf8(object)) #print ("subjectsFromPredAndObjs: %i rows" % len(_vw), predicates, object) if len(_vw) < 1: return [] _idx = self.mk_versa_list(_vw, "predicate", predicates) return self.mk_get_subjects(_vw, _idx)
def objectsFromSubAndPreds(self, subject, predicates, scope=None): """Get a list of objects with the given predicates and subject""" #FIXME: support scope _vw = self.mk_statement_view() if subject: _vw = _vw.select(subject=enc_utf8(subject)) #print ("objectsFromSubAndPreds: %i rows" % len(_vw), subject, predicates) if len(_vw) < 1: return [] _idx = self.mk_versa_list(_vw, "predicate", predicates) return self.mk_get_objects(_vw, _idx)
def isResource(self, res): _vw = self.mk_statement_view() # XXX what is "resource"? # SQL-based drivers perform only subject lookup, # Memory-based drivers - subject and predicate?? # i thought that both subjects and objects are "resources". # this implementation looks only for subjects to pass the tests # # 23-aug-2002 [als] as far as i understood http://www-106.ibm.com # /developerworks/xml/library/x-think10/index.html # resources are all subjects and predicates and objects # with otype=="R". OTOH resource objects should be subjects # in other statements, shouldn't they? # #if _vw.find(object=enc_utf8(res)) >= 0: return 1 return (_vw.find(subject=enc_utf8(res)) >= 0)
# vim: set ts=4 et :
|