Viewing file: MySQL.py (16.5 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
######################################################################## # $Header: /var/local/cvsroot/4Suite/Ft/Rdf/Drivers/MySQL.py,v 1.7 2005/04/11 03:17:51 cogbuji Exp $ """ A persistent RDF model driver using MySQL with InnoDB
Copyright 2005 Fourthought, Inc. (USA). Detailed license and copyright information: http://4suite.org/COPYRIGHT Project home, documentation, distributions: http://4suite.org/ """
import sys, types
from Ft.Rdf import Model from Ft.Rdf import Statement from Ft.Rdf import OBJECT_TYPE_RESOURCE, OBJECT_TYPE_LITERAL from Ft.Rdf.Drivers import PROPERTIES
from SQL import Commands, SqlAdapter
VERSION = "0.1"
def InitializeModule(): """ Post-import hook to initialize module's runtime variables that are not required at import time, but will be needed before the module-level functions are called. """ global MySQLdb import MySQLdb return
def GetDb(dbName, modelName='default'): return DbAdapter(dbName, modelName)
def SplitDbConnectString(connString): #rdf:userName/password@hostName:port:dbName username = None passwd = None hostname = None port = -1 dbName = None #or just #dbName
original = connString
if connString[:4] == 'rdf:': connString = connString[4:] else: return (connString, hostname, port, username, passwd)
fields = connString.split('@') if len(fields) == 2: #break out username and password networkString = fields[1]
#Look for password fields = fields[0].split('/') if len(fields) == 2: #We have both username, passwd = fields[0], fields[1] elif len(fields) == 1: #only username username, passwd = fields[0], None else: raise ValueError("Invalid Connect String: %s. Too many /" % original)
elif len(fields) > 2: raise ValueError("Invalid Connect String: %s. Too many @" % original) else: networkString = fields[0]
fields = networkString.split(':') if len(fields) == 3: #All of them hostname = fields[0] if len(fields[1]) > 0: port = int(fields[1]) dbName = fields[2] elif len(fields) == 2: #Host and db hostname = fields[0] dbName = fields[1] elif len(fields) == 1: #db dbName = fields[0] else: raise ValueError("Invalid Connect String: %s. Too many :" % original)
return (dbName, hostname, port, username, passwd)
def CreateDb(connString, modelName='default'):
(dbName, hostname, port, username, passwd) = SplitDbConnectString(connString) db_name = dbName.lower()
#test_db = MySQLdb.connect(user=username, # passwd=passwd, # db='test', # port=port, # host=hostname # )
#c=test_db.cursor() #c.execute("""SET AUTOCOMMIT=0""") #c.execute("""SHOW DATABASES""") #if not (db_name.encode('utf-8'),) in c.fetchall(): # c.execute("""CREATE DATABASE %s"""%(db_name,)) # test_db.commit()
#test_db.close()
#ASSUME DATABASE EXISTS!
# Creation commands constructed below msql_db = MySQLdb.connect(user=username, passwd=passwd, db=db_name, port=port, host=hostname ) c=msql_db.cursor() c.execute("""SET AUTOCOMMIT=0""")
try: c.execute("""CREATE TABLE ftrdf_version (version text)""") c.execute("""INSERT INTO ftrdf_version (version) VALUES ('%s')"""%(VERSION,)) msql_db.commit() except: pass
c.execute(CREATE_STATEMENT_TABLE%(modelName,modelName,modelName,modelName)) c.execute(CREATE_BOUND_TABLE%modelName) msql_db.commit() msql_db.close() return DbAdapter(connString,modelName)
def DestroyDb(connString, modelName='default'):
(dbName, hostname, port, username, passwd) = SplitDbConnectString(connString) db_name = dbName.lower()
msql_db = MySQLdb.connect(user=username, passwd=passwd, db=db_name, port=port, host=hostname ) c=msql_db.cursor() c.execute("""SET AUTOCOMMIT=0""")
try: c.execute('DROP TABLE ftrdf_version') except: pass for tn in ['ftrdf_%s_statement'%modelName, 'ftrdf_%s_bound'%modelName, ]: try: c.execute('DROP TABLE %s' % tn) except: sys.stderr.write("Unable to drop table %s\n" % tn);
msql_db.commit() msql_db.close() return
def CheckVersion(db, connString, modelName): version = 0 c=db.cursor() try: c.execute("SELECT version FROM ftrdf_version") res = c.fetchone() if res[0] != VERSION: version = res[0] except: version = "0.1"
def ExistsDb(connString, modelName='default'):
(dbName, hostname, port, username, passwd) = SplitDbConnectString(connString) db_name = dbName.lower()
#test_db = MySQLdb.connect(user=username, # passwd=passwd, # db='test', # port=port, # host=hostname # ) #c=test_db.cursor() #c.execute("""SET AUTOCOMMIT=0""") #c.execute("""SHOW DATABASES""") #rt = c.fetchall()
#if (db_name.encode('utf-8'),) in rt: #ASSUME DATABASE EXISTS!
if True: msql_db = MySQLdb.connect(user=username, passwd=passwd, db=db_name, port=port, host=hostname ) c=msql_db.cursor() c.execute("""SET AUTOCOMMIT=0""")
for tn in ['ftrdf_%s_statement'%modelName, 'ftrdf_%s_bound' % modelName, ]: c.execute("""show tables like '%s'"""%(tn,)) rt=c.fetchall() if not rt: return 0 CheckVersion(msql_db, connString, modelName) msql_db.rollback() msql_db.close() #test_db.rollback() #test_db.close() return 1 #test_db.rollback() #test_db.close() return -1
class DbAdapter(SqlAdapter): def __init__(self, connString, modelName='default', db=None): SqlAdapter.__init__(self, _commands, _comparisons,modelName) (self._dbName,self._hostname,self._port,self._username,self._passwd) = SplitDbConnectString(connString) self._DbName = self._dbName.lower() self.props = {PROPERTIES.OBJECT_TYPE_SUPPORTED: 1} return
def begin(self): self._db = MySQLdb.connect(user = self._username, passwd = self._passwd, db=self._DbName, port=self._port, host=self._hostname ) c=self._db.cursor() c.execute("""SET AUTOCOMMIT=0""") return
def commit(self): self._db.commit() self._db.close() self._db = None return
def rollback(self): self._db.rollback() self._db.close() self._db = None return
# Statements for use in the SqlAdapter class _Cmd: def __init__(self, command): import types if (isinstance(command, types.UnicodeType)): self._cmd = command else: self._cmd = unicode(command, 'ascii') return
def query(self, db, **args): c=db.cursor() c.execute("""SET AUTOCOMMIT=0""") try: if args: c.execute((self._cmd % args).encode("utf-8")) rt=c.fetchall() else: c.execute((self._cmd).encode("utf-8")) rt=c.fetchall() except UnicodeError: raise UnicodeError('query arguments must be unicode') return rt
def execute(self, db, **args): c=db.cursor() c.execute("""SET AUTOCOMMIT=0""") try: if args: c.execute((self._cmd % args).encode("utf-8")) else: c.execute((self._cmd).encode("utf-8")) except UnicodeError: raise UnicodeError('query arguments must be unicode')
#Functions copied from Postgres driver verbatim
def BuildSubjectsFromPredAndObjs(predicate, objects, modelName, scope=None): if scope: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where predicate = '%s' AND domain = '%s' AND (" % (modelName, predicate, scope) else: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where predicate = '%s' AND (" % (modelName, predicate) for o in objects: qstr += "object = '%s' OR " % o qstr = (objects and qstr[:-4] or qstr[:-5]) + ')' return _Cmd(qstr)
def BuildSubjectsFromPredsAndObj(predicates, object, modelName, scope=None): if scope: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where object = '%s' AND domain = '%s' AND (" % (modelName, object, scope) else: qstr = "SELECT DISTINCT subject from ftrdf_%s_statement where object = '%s' AND (" % (modelName, object) for p in predicates: qstr += "predicate = '%s' OR " % p qstr = (predicates and qstr[:-4] or qstr[:-5]) + ')' return _Cmd(qstr)
def BuildObjectsFromSubAndPreds(subject, predicates, modelName, scope=None): if scope: qstr = "SELECT DISTINCT object, otype from ftrdf_%s_statement where subject = '%s' AND domain = '%s' AND (" % (modelName, subject, scope) else: qstr = "SELECT DISTINCT object, otype from ftrdf_%s_statement where subject = '%s' AND (" % (modelName, subject) for p in predicates: qstr += "predicate = '%s' OR " % p qstr = (predicates and qstr[:-4] or qstr[:-5]) + ')' return _Cmd(qstr)
def BuildObjectsFromSubsAndPred(subjects, predicate, modelName, scope=None): if scope: qstr = "SELECT DISTINCT object, otype from ftrdf_%s_statement where predicate = '%s' AND domain = '%s' AND (" % (modelName, predicate, scope) else: qstr = "SELECT DISTINCT object, otype from ftrdf_%s_statement where predicate = '%s' AND (" % (modelName, predicate) for s in subjects: qstr += "subject = '%s' OR " % s qstr = (subjects and qstr[:-4] or qstr[:-5]) + ')' return _Cmd(qstr)
def BuildNonDistinctObjectsFromSubsAndPred(subjects, predicate, modelName, scope=None): if scope: qstr = "SELECT object, otype from ftrdf_%s_statement where predicate = '%s' AND domain = '%s' AND (" % (modelName, predicate, scope) else: qstr = "SELECT object, otype from ftrdf_%s_statement where predicate = '%s' AND (" % (modelName, predicate) for s in subjects: qstr += "subject = '%s' OR " % s qstr = (subjects and qstr[:-4] or qstr[:-5]) + ')' return _Cmd(qstr)
_commands = { Commands.ADD : _Cmd("INSERT INTO ftrdf_%(modelName)s_statement VALUES ('%(subject)s', '%(predicate)s', '%(object)s', '%(statementUri)s', '%(scope)s', '%(otype)s')"),
Commands.SIZE : _Cmd("SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement WHERE domain='%(scope)s'"), Commands.SIZE_ALL : _Cmd("SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement"),
## # RIL Expressions Commands.BIND : _Cmd("INSERT INTO ftrdf_%(modelName)s_bound VALUES ('%(name)s', '%(object)s', '%(scope)s')"), Commands.UNBIND : _Cmd("DELETE FROM ftrdf_%(modelName)s_bound WHERE name='%(name)s' AND domain='%(scope)s'"), Commands.LOOKUP : _Cmd("SELECT object FROM ftrdf_%(modelName)s_bound WHERE name='%(name)s' AND domain='%(scope)s'"), Commands.KEYS : _Cmd("SELECT name FROM ftrdf_%(modelName)s_bound WHERE domain='%(scope)s'"), Commands.HAS_KEY : _Cmd("SELECT COUNT(object) FROM ftrdf_%(modelName)s_bound WHERE name='%(name)s' AND domain='%(scope)s'"), Commands.SUBJECT_LIST : _Cmd("SELECT DISTINCT subject from ftrdf_%(modelName)s_statement"), Commands.SUBJECT_LIST_SCOPED : _Cmd("SELECT DISTINCT subject from ftrdf_%(modelName)s_statement WHERE domain='%(scope)s'"), Commands.PREDICATE_LIST : _Cmd("SELECT DISTINCT predicate from ftrdf_%(modelName)s_statement"), Commands.PREDICATE_LIST_SCOPED : _Cmd("SELECT DISTINCT predicate from ftrdf_%(modelName)s_statement WHERE domain='%(scope)s'"), Commands.RESOURCE_OBJECT_LIST : _Cmd("SELECT DISTINCT object from ftrdf_%%(modelName)s_statement WHERE otype='%s'"%(OBJECT_TYPE_RESOURCE)), Commands.RESOURCE_OBJECT_LIST_SCOPED : _Cmd("SELECT DISTINCT object from ftrdf_%%(modelName)s_statement WHERE otype='%s' AND domain='%%(scope)s'"%(OBJECT_TYPE_RESOURCE)), Commands.BUILD_SUBJS_FROM_PREDS_AND_OBJ : BuildSubjectsFromPredsAndObj, Commands.OBJECT_LIST : _Cmd("SELECT DISTINCT object, otype from ftrdf_%(modelName)s_statement"), Commands.BUILD_OBJS_FROM_SUB_AND_PREDS : BuildObjectsFromSubAndPreds, Commands.BUILD_OBJS_FROM_SUB_AND_PREDS_SCOPED : BuildObjectsFromSubAndPreds, Commands.BUILD_NON_DISTINCT_OBJS_FROM_SUB_AND_PREDS: BuildNonDistinctObjectsFromSubsAndPred, Commands.BUILD_OBJS_FROM_SUBS_AND_PRED : BuildObjectsFromSubsAndPred, Commands.BUILD_OBJS_FROM_SUBS_AND_PRED_SCOPED : BuildObjectsFromSubsAndPred, Commands.IS_RESOURCE : _Cmd("SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement WHERE subject='%(subject)s'"), Commands.BUILD_SUBJS_FROM_PRED_AND_OBJS : BuildSubjectsFromPredAndObjs, Commands.BUILD_SUBJS_FROM_PRED_AND_OBJS_SCOPED : BuildSubjectsFromPredAndObjs }
# For the complex pattern commands: complete, remove, contains and changeAcl _comparisons = {None : '=', Model.NORMAL : '=', Model.IGNORE_CASE : '~*', Model.REGEX : '~', Model.IGNORE_CASE + Model.REGEX : '~*', }
for bits in range(32): parts = [] if bits & 16: parts.append("subject%(subjectOp)s'%(subject)s'") if bits & 8: parts.append("predicate%(predicateOp)s'%(predicate)s'") if bits & 4: parts.append("object%(objectOp)s'%(object)s'") if bits & 2: parts.append("statementUri%(statementUriOp)s'%(statementUri)s'") if bits & 1: parts.append("domain%(scopeOp)s'%(scope)s'")
contains = 'SELECT COUNT(subject) FROM ftrdf_%(modelName)s_statement' complete = 'SELECT subject, predicate, object, statementUri, domain, otype FROM ftrdf_%(modelName)s_statement ' remove = 'DELETE FROM ftrdf_%(modelName)s_statement '
if parts: where = ' AND '.join(parts) contains = '%s WHERE %s' % (contains, where) complete = '%s WHERE %s' % (complete, where) remove = '%s WHERE %s' % (remove, where)
key = (bits & 16 > 0, bits & 8 > 0, bits & 4 > 0, bits & 2 > 0, bits & 1)
_commands[(Commands.CONTAINS,) + key] = _Cmd(contains) _commands[(Commands.COMPLETE,) + key] = _Cmd(complete) _commands[(Commands.REMOVE,) + key] = _Cmd(remove)
#Avoid indices on object. Indices on large TEXT fields in PG aren't much use #Besides, it leads to the error described in this thread: http://groups.google.com/groups?hl=en&lr=&ie=UTF-8&th=a8f50c690804c2b5&rnum=1 #FIXME: Is this a problem in MySQL and/or in an RDF db where most objects are uri's?
INDICES = { #'full' : ('ftrdf_%s_statement', ['subject', 'predicate', 'object', # 'statementUri', 'domain']), 'spo' : ('ftrdf_%s_statement', ['subject', 'predicate','object']), # 'so' : ('ftrdf_%s_statement', ['predicate','object']), # 'subject' : ('ftrdf_statement', ['subject']), # 'predicate' : ('ftrdf_statement', ['predicate']), # 'object' : ('ftrdf_statement', ['object']), # 'statement' : ('ftrdf_statement', ['statementUri']), 'source' : ('ftrdf_%s_statement', ['domain']), # 'binding' : ('ftrdf_%s_bound', ['name']), }
CREATE_STATEMENT_TABLE = """ CREATE TABLE ftrdf_%s_statement ( subject varchar(150) not NULL, predicate varchar(150) not NULL, object text, statementUri varchar(45) not NULL, domain text, otype varchar(5) not NULL, INDEX %s_stmt_spd_index (subject(100),predicate(100),domain(50)), INDEX %s_stmt_pod_index (predicate(100),object(50),domain(50)), INDEX %s_stmt_d_index (domain(10))) TYPE=InnoDB"""
CREATE_BOUND_TABLE = """ CREATE TABLE ftrdf_%s_bound ( name text, object text, domain text) TYPE=InnoDB """
|