Viewing file: Postgres.py (14.85 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Rdf/Drivers/Postgres.py,v 1.33 2005/02/27 04:17:30 jkloth Exp $ """ A persistent RDF model driver using the PygreSQL adapter to PostgreSQL
See http://www.postgresql.org/
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import re, os, sys
from Ft.Rdf import Model from Ft.Rdf import Statement from Ft.Rdf.Drivers import PROPERTIES
from SQL import Commands, SqlAdapter
VERSION = "0.2"
def InitializeModule(): """ Post-import hook to initialize module's runtime variables that are not required at import time, but will be needed before the module-level functions are called. """ global _pg import _pg return
def GetDb(dbName, modelName='default'): return DbAdapter(dbName, modelName)
def SplitDbConnectString(connString): #rdf:userName/password@hostName:port:dbName username = None passwd = None hostname = None port = -1 dbName = None #or just #dbName
original = connString
if connString[:4] == 'rdf:': connString = connString[4:] else: return (connString, hostname, port, username, passwd)
fields = connString.split('@') if len(fields) == 2: #break out username and password networkString = fields[1]
#Look for password fields = fields[0].split('/') if len(fields) == 2: #We have both username, passwd = fields[0], fields[1] elif len(fields) == 1: #only username username, passwd = fields[0], None else: raise ValueError("Invalid Connect String: %s. Too many /" % original)
elif len(fields) > 2: raise ValueError("Invalid Connect String: %s. Too many @" % original) else: networkString = fields[0]
fields = networkString.split(':') if len(fields) == 3: #All of them hostname = fields[0] if len(fields[1]) > 0: port = int(fields[1]) dbName = fields[2] elif len(fields) == 2: #Host and db hostname = fields[0] dbName = fields[1] elif len(fields) == 1: #db dbName = fields[0] else: raise ValueError("Invalid Connect String: %s. Too many :" % original)
return (dbName, hostname, port, username, passwd)
def CreateDb(connString, modelName='default'):
(dbName, hostname, port, username, passwd) = SplitDbConnectString(connString) pg_db_name = "ft__" + dbName.lower()
tmpl_db = _pg.connect(dbname='template1', host=hostname, port=port, user=username, passwd=passwd, )
res = tmpl_db.query("select datname from pg_database where datname = '%s'" % pg_db_name).getresult() if not res: tmpl_db.query("CREATE DATABASE " + pg_db_name + " WITH ENCODING = 'SQL_ASCII'") tmpl_db.close()
# Creation commands constructed below pg_db = _pg.connect(dbname=pg_db_name, host=hostname, port = port, user = username, passwd = passwd, )
pg_db.query('BEGIN') try: pg_db.query("CREATE TABLE ftrdf_version (version text)") pg_db.query("INSERT INTO ftrdf_version (version) VALUES ('%s')"%VERSION) except: pass pg_db.query('COMMIT') pg_db.query('BEGIN') pg_db.query(CREATE_STATEMENT_TABLE%modelName) pg_db.query(CREATE_BOUND_TABLE%modelName) for (index, (table, columns)) in INDICES.items(): cols = ','.join(columns) table = table%modelName query = 'CREATE INDEX %s_%s_idx ON %s (%s)' % (index, modelName,table, cols) pg_db.query(query) pg_db.query('COMMIT') return DbAdapter(connString)
def DestroyDb(connString, modelName='default'):
(dbName, hostname, port, username, passwd) = SplitDbConnectString(connString) pg_db_name = "ft__" + dbName.lower()
pg_db = _pg.connect(dbname=pg_db_name, host=hostname, port = port, user = username, passwd = passwd, )
for (index, (table, columns)) in INDICES.items(): try: pg_db.query('DROP INDEX %s_%s_idx' % (index,modelName)) except _pg.error: pass
try: pg_db.query('DROP TABLE ftrdf_version') except: pass for tn in ['ftrdf_%s_statement'%modelName, 'ftrdf_%s_bound'%modelName, ]: try: pg_db.query('DROP TABLE %s' % tn) except _pg.error: sys.stderr.write("Unable to drop table %s\n" % tn);
return
def CheckVersion(db, connString, modelName): version = 0 try: res = db.query("SELECT version FROM ftrdf_version").getresult() if res[0][0] != VERSION: version = res[0][0] except: version = "0.1" if version: raise TypeError('This RDF Database is obsolete (%s), and will not work with this version of 4Suite\'s Postgres driver (%s). You must use an older version of 4Suite to export its contents, drop the database (usually "dropdb ft__xmlserver"), and re-run 4ss_manager init.'%(version, VERSION))
def ExistsDb(connString, modelName='default'):
(dbName, hostname, port, username, passwd) = SplitDbConnectString(connString) pg_db_name = "ft__" + dbName.lower()
db = _pg.connect(dbname='template1', host=hostname, port = port, user = username, passwd = passwd, )
rt = db.query("SELECT datname FROM pg_database WHERE datname='%s'" % pg_db_name) if rt and rt.ntuples() > 0: pg_db = _pg.connect(dbname=pg_db_name, host=hostname, port = port, user = username, passwd = passwd, )
for (index, (table, columns)) in INDICES.items(): if not pg_db.query("select indexname from pg_indexes where indexname = '%s_%s_idx'" % (index,modelName)).getresult(): return 0 for tn in ['ftrdf_%s_statement'%modelName, 'ftrdf_%s_bound' % modelName, ]: if not pg_db.query("select tablename from pg_tables where tablename = '%s'" % tn).getresult(): return 0 CheckVersion(pg_db, connString, modelName) return 1 return 0
class DbAdapter(SqlAdapter): def __init__(self, connString, modelName='default', db=None): SqlAdapter.__init__(self, _commands, _comparisons,modelName) (self._dbName,self._hostname,self._port,self._username,self._passwd) = SplitDbConnectString(connString) self._pgDbName = 'ft__' + self._dbName.lower() self.props = {PROPERTIES.OBJECT_TYPE_SUPPORTED: 1} return
def begin(self): self._db = _pg.connect(dbname=self._pgDbName, host=self._hostname, port = self._port, user = self._username, passwd = self._passwd, ) self._db.query('BEGIN') return
def commit(self): self._db.query('COMMIT') self._db.close() self._db = None return
def rollback(self): self._db.close() self._db = None return
# Statements for use in the SqlAdapter class _Cmd: def __init__(self, command): if isinstance(command, unicode): self._cmd = command else: self._cmd = unicode(command, 'ascii') return
def query(self, db, **args): try: if args: result = db.query((self._cmd % args).encode("utf-8")) else: result = db.query((self._cmd).encode("utf-8")) except UnicodeError: raise UnicodeError('query arguments must be unicode') try: return result and result.getresult() except AttributeError: pass return None
def execute(self, db, **args): try: if args: return db.query((self._cmd % args).encode("utf-8")) else: return db.query((self._cmd).encode("utf-8")) except UnicodeError: raise UnicodeError('query arguments must be unicode')
def BuildSubjectsFromPredAndObjs(predicate, objects, modelName, scope=None): if scope: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where predicate = '%s' AND domain = '%s' AND (" % (modelName, predicate, scope) else: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where predicate = '%s' AND (" % (modelName, predicate) for o in objects: qstr += "object = '%s' OR " % o qstr = qstr[:-4] + ')' return _Cmd(qstr)
def BuildSubjectsFromPredsAndObj(predicates, object, modelName, scope=None): if scope: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where object = '%s' AND domain = '%s' AND (" % (modelName, object, scope) else: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where object = '%s' AND (" % (modelName, object) for p in predicates: qstr += "predicate = '%s' OR " % p qstr = qstr[:-4] + ')' return _Cmd(qstr)
def BuildObjectsFromSubAndPreds(subject, predicates, modelName, scope=None): if scope: qstr = "SELECT DISTINCT object, otype from ftrdf_%s_statement where subject = '%s' AND domain = '%s' AND (" % (modelName, subject, scope) else: qstr = "SELECT DISTINCT object, otype from ftrdf_%s_statement where subject = '%s' AND (" % (modelName, subject) for p in predicates: qstr += "predicate = '%s' OR " % p qstr = qstr[:-4] + ')' return _Cmd(qstr)
from Ft.Rdf import OBJECT_TYPE_RESOURCE, OBJECT_TYPE_LITERAL
_commands = { Commands.ADD : _Cmd("INSERT INTO ftrdf_%(modelName)s_statement VALUES ('%(subject)s', '%(predicate)s', '%(object)s', '%(statementUri)s', '%(scope)s', '%(otype)s')"),
Commands.SIZE : _Cmd("SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement WHERE domain='%(scope)s'"), Commands.SIZE_ALL : _Cmd("SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement"),
## # RIL Expressions Commands.BIND : _Cmd("INSERT INTO ftrdf_%(modelName)s_bound VALUES ('%(name)s', '%(object)s', '%(scope)s')"), Commands.UNBIND : _Cmd("DELETE FROM ftrdf_%(modelName)s_bound WHERE name='%(name)s' AND domain='%(scope)s'"), Commands.LOOKUP : _Cmd("SELECT object FROM ftrdf_%(modelName)s_bound WHERE name='%(name)s' AND domain='%(scope)s'"), Commands.KEYS : _Cmd("SELECT name FROM ftrdf_%(modelName)s_bound WHERE domain='%(scope)s'"), Commands.HAS_KEY : _Cmd("SELECT COUNT(object) FROM ftrdf_%(modelName)s_bound WHERE name='%(name)s' AND domain='%(scope)s'"), Commands.SUBJECT_LIST : _Cmd("SELECT DISTINCT subject from ftrdf_%(modelName)s_statement"), Commands.SUBJECT_LIST_SCOPED : _Cmd("SELECT DISTINCT subject from ftrdf_%(modelName)s_statement WHERE domain='%(scope)s'"), Commands.PREDICATE_LIST : _Cmd("SELECT DISTINCT predicate from ftrdf_%(modelName)s_statement"), Commands.PREDICATE_LIST_SCOPED : _Cmd("SELECT DISTINCT predicate from ftrdf_%(modelName)s_statement WHERE domain='%(scope)s'"), Commands.RESOURCE_OBJECT_LIST : _Cmd("SELECT DISTINCT object from ftrdf_%%(modelName)s_statement WHERE otype='%s'"%(OBJECT_TYPE_RESOURCE)), Commands.RESOURCE_OBJECT_LIST_SCOPED : _Cmd("SELECT DISTINCT object from ftrdf_%%(modelName)s_statement WHERE otype='%s' AND domain='%%(scope)s'"%(OBJECT_TYPE_RESOURCE)), Commands.BUILD_SUBJS_FROM_PREDS_AND_OBJ : BuildSubjectsFromPredsAndObj, Commands.OBJECT_LIST : _Cmd("SELECT DISTINCT object, otype from ftrdf_%(modelName)s_statement"), Commands.BUILD_OBJS_FROM_SUB_AND_PREDS : BuildObjectsFromSubAndPreds, Commands.IS_RESOURCE : _Cmd("SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement WHERE subject='%(subject)s'"), Commands.BUILD_SUBJS_FROM_PRED_AND_OBJS : BuildSubjectsFromPredAndObjs, }
# For the complex pattern commands: complete, remove, contains and changeAcl _comparisons = {None : '=', Model.NORMAL : '=', Model.IGNORE_CASE : '~*', Model.REGEX : '~', Model.IGNORE_CASE + Model.REGEX : '~*', }
for bits in range(32): parts = [] if bits & 16: parts.append("subject%(subjectOp)s'%(subject)s'") if bits & 8: parts.append("predicate%(predicateOp)s'%(predicate)s'") if bits & 4: parts.append("object%(objectOp)s'%(object)s'") if bits & 2: parts.append("statementUri%(statementUriOp)s'%(statementUri)s'") if bits & 1: parts.append("domain%(scopeOp)s'%(scope)s'")
contains = 'SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement' complete = 'SELECT subject, predicate, object, statementUri, domain, otype FROM ftrdf_%(modelName)s_statement ' remove = 'DELETE FROM ftrdf_%(modelName)s_statement '
if parts: where = ' AND '.join(parts) contains = '%s WHERE %s' % (contains, where) complete = '%s WHERE %s' % (complete, where) remove = '%s WHERE %s' % (remove, where)
key = (bits & 16 > 0, bits & 8 > 0, bits & 4 > 0, bits & 2 > 0, bits & 1)
_commands[(Commands.CONTAINS,) + key] = _Cmd(contains) _commands[(Commands.COMPLETE,) + key] = _Cmd(complete) _commands[(Commands.REMOVE,) + key] = _Cmd(remove)
# PostgreSQL commands to create necessary tables
CREATE_STATEMENT_TABLE = """ CREATE TABLE ftrdf_%s_statement ( subject text, predicate text, object text, statementUri text, domain text, otype text) """
CREATE_BOUND_TABLE = """ CREATE TABLE ftrdf_%s_bound ( name text, object varchar, domain text) """
#Avoid indices on object. Indices on large TEXT fields in PG aren't much use #Besides, it leads to the error described in this thread: http://groups.google.com/groups?hl=en&lr=&ie=UTF-8&th=a8f50c690804c2b5&rnum=1 INDICES = { #'full' : ('ftrdf_%s_statement', ['subject', 'predicate', 'object', # 'statementUri', 'domain']), 'sp' : ('ftrdf_%s_statement', ['subject', 'predicate']), # 'so' : ('ftrdf_%s_statement', ['predicate','object']), # 'subject' : ('ftrdf_statement', ['subject']), # 'predicate' : ('ftrdf_statement', ['predicate']), # 'object' : ('ftrdf_statement', ['object']), # 'statement' : ('ftrdf_statement', ['statementUri']), 'source' : ('ftrdf_%s_statement', ['domain']), # 'binding' : ('ftrdf_%s_bound', ['name']), }
|