Viewing file: BerkeleyDB.py (12.83 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Rdf/Drivers/BerkeleyDB.py,v 1.12 2005/04/06 22:00:57 jkloth Exp $ """ A persistent RDF model driver using Berkeley (BSD) database files
This driver implements 4RDF in the same structural format as SQL.py, but applies this interface on top of the embedded Berkeley DB.
Requires pybsddb (http://pybsddb.sourceforge.net/bsddb3.html), which is the standard bsddb module in Python 2.3 and up. However, even if you are using Python 2.3 up, it is possible that you won't have a functioning bsddb at all, or it may be wrapping the db185 Berkeley DB C libraries.
If necessary, update your C libs (db4+ preferred for compatibility with Berkeley DB XML). And if possible, upgrade or install pybsddb (this is mandatory on Python 2.2, if you want this driver to work).
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import os, re from Ft.Rdf.Drivers import PROPERTIES from Ft.Lib.DbUtil import EscapeQuotes from Ft.Lib.Set import Unique from Ft.Rdf import Statement, Model, OBJECT_TYPE_UNKNOWN from Ft.Rdf.Drivers import DataBaseExceptions from Ft import GetConfigVar
STATEMENTS_TABLE_NAME = 'ftrdf_statements'
STATEMENT_PARTS = ['subject', 'predicate', 'object', 'statementUri', 'scope', 'otype']
VERSION = "0.1"
def InitializeModule(): """ Post-import hook to initialize module's runtime variables that are not required at import time, but will be needed before the module-level functions are called. """ global db, DATABASE_DIR, BerkeleyDBTxnTable
try: # pybsddb installed w/distutils is preferable from bsddb3 import db except ImportError: # standard bsddb in Python 2.3 up is OK try: from bsddb import db except ImportError: import sys if sys.version_info[:2] < (2,3): msg = "On this version of Python, the BerkeleyDB driver" \ " requires pybsddb (bsddb3)." else: msg = "On this version of Python, the BerkeleyDB driver" \ " requires either pybsddb (bsddb3) or the standard" \ " bsddb module. Neither appears to be installed." raise ImportError(msg)
import BerkeleyDBTxnTable
dbdir = os.path.join(GetConfigVar('LOCALSTATEDIR'), 'BsdDb') dbdir = os.environ.get('FT_DATABASE_DIR', dbdir) DATABASE_DIR = os.path.abspath(dbdir)
# FIXME: l10n if not os.path.isdir(DATABASE_DIR): raise ValueError("BerkeleyDB database directory %s does not exist;" " create it or check FT_DATABASE_DIR" % DATABASE_DIR) return
def GetDb(modelName='default'): newdb = BerkeleyDBTxnTable.bsdTableDB( '%s.frdf'%(modelName), dbhome=DATABASE_DIR, create=0) return DbAdapter(modelName,newdb)
def CreateDb(modelName='default'): newdb = BerkeleyDBTxnTable.bsdTableDB( '%s.frdf'%(modelName), dbhome=DATABASE_DIR, create=1) newdb.CreateTable(STATEMENTS_TABLE_NAME, STATEMENT_PARTS) newdb.txn.commit() newdb.close() del newdb.txn del newdb.env del newdb
def ExistsDb(modelName='default'): return os.path.exists(os.path.join(DATABASE_DIR, '%s.frdf'%(modelName)))
def DestroyDb(modelName='default'): os.remove(os.path.join(DATABASE_DIR, '%s.frdf'%(modelName)))
class DbAdapter: def __init__(self, modelName, db=None): self._ignoreBegin = False if db: self._ignoreBegin = True self._db = db self.props = {PROPERTIES.OBJECT_TYPE_SUPPORTED: 1} self._modelName = modelName return
def begin(self): #begin is ignored if called after an instance is setup with an existing db object if not self._ignoreBegin: _db = BerkeleyDBTxnTable.bsdTableDB( '%s.frdf'%(self._modelName), dbhome=DATABASE_DIR, create=0) self._db = _db
def commit(self): self._db.txn.commit() self._db.close()
def rollback(self): self._db.txn.abort() self._db.close()
def add(self, statements): # Takes a list of tuples [(s, p, o, stmtUri, srcUri, otype), ...] if not self._db: raise DataBaseExceptions.NoTransaction for subject, predicate, object_, statementUri, scope, otype in statements: self._db.Insert(STATEMENTS_TABLE_NAME, {'subject': subject, 'predicate': predicate, 'object':object_, 'statementUri':statementUri, 'scope':scope, 'otype':otype}) return
def remove(self, statements): for s in statements: self.removePattern(s[0], s[1], s[2], s[3], s[4], {})
def removePattern(self, subject, predicate, object_, statementUri, scope, flags): if not self._db: raise DataBaseExceptions.NoTransaction conditions = {} addStatementPartCondition(conditions, 'subject', subject, flags) addStatementPartCondition(conditions, 'object', object_, flags) addStatementPartCondition(conditions, 'predicate', predicate, flags) addStatementPartCondition(conditions, 'statementUri', statementUri, flags) addStatementPartCondition(conditions, 'scope', scope, flags)
self._db.Delete(STATEMENTS_TABLE_NAME, conditions=conditions)
def complete(self, subject, predicate, object_, statementUri, scope, flags): if not self._db: raise DataBaseExceptions.NoTransaction
conditions = {} addStatementPartCondition(conditions, 'subject', subject, flags) addStatementPartCondition(conditions, 'object', object_, flags) addStatementPartCondition(conditions, 'predicate', predicate, flags) addStatementPartCondition(conditions, 'statementUri', statementUri, flags) addStatementPartCondition(conditions, 'scope', scope, flags)
result = self._db.Select(STATEMENTS_TABLE_NAME, None, conditions=conditions) return [ (x['subject'], x['predicate'], x['object'], x['statementUri'], x['scope'], x['otype']) for x in result]
def size(self, scope): if not self._db: raise DataBaseExceptions.NoTransaction if scope: conditions={} addStatementPartCondition(conditions, 'scope', scope) return len(self.complete(None,None,None,None,scope,{})) else: return len(self.complete(None,None,None,None,None,None))
def contains(self, subject, predicate, object_, statementUri, scope, flags): if not self._db: raise DataBaseExceptions.NoTransaction
conditions = {} addStatementPartCondition(conditions, 'subject', subject, flags) addStatementPartCondition(conditions, 'object', object_, flags) addStatementPartCondition(conditions, 'predicate', predicate, flags) addStatementPartCondition(conditions, 'statementUri', statementUri, flags) addStatementPartCondition(conditions, 'scope', scope, flags)
return len(self._db.Select(STATEMENTS_TABLE_NAME, [STATEMENT_PARTS[0]], conditions=conditions)) > 0
def bind(self, object_, name, scope): raise NotImplementedError('Not Implemented')
def unbind(self, name, scope): raise NotImplementedError('Not Implemented')
def lookup(self, name, scope): raise NotImplementedError('Not Implemented')
def keys(self, scope): raise NotImplementedError('Not Implemented')
def has_key(self, name, scope): raise NotImplementedError('Not Implemented')
def subjectsFromPredsAndObj(self, predicates, object_, scope): conditions = {} if scope: addStatementPartCondition(conditions, 'scope', scope) if object_: addStatementPartCondition(conditions, 'object', object_)
if predicates: conditions['predicate'] = lambda x: x in predicates else: conditions['predicate'] = lambda x: True
rt=[stmt['subject'] for stmt in self._db.Select(STATEMENTS_TABLE_NAME, ['subject'], conditions=conditions)] return Unique(rt)
def subjectsFromPredAndObjs(self, predicate, objects, scope): """ Get a list of resources with the given predicate and object """
conditions = {} if scope: addStatementPartCondition(conditions, 'scope', scope) if predicate: addStatementPartCondition(conditions, 'predicate', predicate)
if objects: conditions['object'] = lambda x: x in objects else: conditions['object'] = lambda x: True
rt=[stmt['subject'] for stmt in self._db.Select(STATEMENTS_TABLE_NAME, ['subject'], conditions=conditions)] return Unique(rt)
def objectsFromSubsAndPred(self, subjects, predicate, scope): """ Get a list of obejcts with the given predicate and subjects """
conditions = {} if scope: addStatementPartCondition(conditions, 'scope', scope) if predicate: addStatementPartCondition(conditions, 'predicate', predicate)
if subjects: conditions['subject'] = lambda x: x in subjects else: conditions['subject'] = lambda x: True
rt=[(stmt['object'], stmt['otype']) for stmt in self._db.Select(STATEMENTS_TABLE_NAME, ['object', 'otype'], conditions=conditions) ] return Unique(rt)
def objectsFromSubsAndPredNonDistinct(self, subjects, predicate, scope): """ Get a list of *non-distinct* objects with the given predicate and subjects """ conditions = {} if scope: addStatementPartCondition(conditions, 'scope', scope) if predicate: addStatementPartCondition(conditions, 'predicate', predicate)
if subjects: conditions['subject'] = lambda x: x in subjects else: conditions['subject'] = lambda x: True
return [(stmt['object'], stmt['otype']) for stmt in self._db.Select(STATEMENTS_TABLE_NAME, ['object', 'otype'], conditions=conditions).values()]
def objectsFromSubAndPreds(self, subject, predicates, scope): """ Get a list of obejcts with the given predicates and subject """ conditions = {} if scope: addStatementPartCondition(conditions, 'scope', scope) if subject: addStatementPartCondition(conditions, 'subject', subject)
if predicates: conditions['predicate'] = lambda x: x in predicates else: conditions['predicate'] = lambda x: True
rt=[(stmt['object'], stmt['otype']) for stmt in self._db.Select(STATEMENTS_TABLE_NAME, ['object', 'otype'], conditions=conditions)] return Unique(rt)
def isResource(self, res): conditions={} addStatementPartCondition(conditions, 'subject', res) rt = self._db.Select(STATEMENTS_TABLE_NAME, ['subject'], conditions=conditions) return len(rt) > 0
def resources(self, scope): pred_conditions={} subj_conditions={} if scope: addStatementPartCondition(pred_conditions, 'scope', scope) addStatementPartCondition(subj_conditions, 'scope', scope) else: addStatementPartCondition(subj_conditions, 'subject', None) results = Unique([stmt['subject'] for stmt in self._db.Select(STATEMENTS_TABLE_NAME, ['subject'], conditions=subj_conditions)]) pred_res = Unique( [stmt['predicate'] for stmt in self._db.Select(STATEMENTS_TABLE_NAME, ['predicate'], conditions=pred_conditions).values() ] ) for p in pred_res: if p not in results: results.append(p) return results
def addStatementPartCondition(conditions, stmtPart, value, flags=None): if value: if not(flags) or flags.get(stmtPart+'Flags') == Model.NORMAL: conditions[stmtPart] = lambda x: x == value elif flags.get(stmtPart+'Flags') == Model.IGNORE_CASE: conditions[stmtPart] = lambda x: x.lower() == value.lower() elif flags.get(stmtPart+'Flags') == Model.REGEX: conditions[stmtPart] = lambda x: re.match(value, x) is not None elif flags.get(stmtPart+'Flags') == Model.IGNORE_CASE + Model.REGEX : conditions[stmtPart] = lambda x: re.match(value, x, re.I) is not None else: conditions[stmtPart] = lambda x: True
|