Viewing file: BerkeleyDBTxnTable.py (16.39 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
#A transactional equivalent of bsddb.dbtables
import re import sys import copy import xdrlib import random from types import ListType, StringType import cPickle as pickle
try: # For Pythons w/distutils pybsddb from bsddb3.db import * except ImportError: # For Python 2.3 from bsddb.db import *
class TableDBError(StandardError): pass class TableAlreadyExists(TableDBError): pass
class Cond: """This condition matches everything""" def __call__(self, s): return 1
class ExactCond(Cond): """Acts as an exact match condition function""" def __init__(self, strtomatch): self.strtomatch = strtomatch def __call__(self, s): return s == self.strtomatch
class PrefixCond(Cond): """Acts as a condition function for matching a string prefix""" def __init__(self, prefix): self.prefix = prefix def __call__(self, s): return s[:len(self.prefix)] == self.prefix
class PostfixCond(Cond): """Acts as a condition function for matching a string postfix""" def __init__(self, postfix): self.postfix = postfix def __call__(self, s): return s[-len(self.postfix):] == self.postfix
class LikeCond(Cond): """ Acts as a function that will match using an SQL 'LIKE' style string. Case insensitive and % signs are wild cards. This isn't perfect but it should work for the simple common cases. """ def __init__(self, likestr, re_flags=re.IGNORECASE): # escape python re characters chars_to_escape = '.*+()[]?' for char in chars_to_escape : likestr = likestr.replace(char, '\\'+char) # convert %s to wildcards self.likestr = likestr.replace('%', '.*') self.re = re.compile('^'+self.likestr+'$', re_flags) def __call__(self, s): return self.re.match(s)
# # keys used to store database metadata # _table_names_key = '__TABLE_NAMES__' # list of the tables in this db _columns = '._COLUMNS__' # table_name+this key contains a list of columns
def _columns_key(table): return table + _columns
# # these keys are found within table sub databases # _data = '._DATA_.' # this+column+this+rowid key contains table data _rowid = '._ROWID_.' # this+rowid+this key contains a unique entry for each # row in the table. (no data is stored) _rowid_str_len = 8 # length in bytes of the unique rowid strings
def _data_key(table, col, rowid): return table + _data + col + _data + rowid
def _search_col_data_key(table, col): return table + _data + col + _data
def _search_all_data_key(table): return table + _data
def _rowid_key(table, rowid): return table + _rowid + rowid + _rowid
def _search_rowid_key(table): return table + _rowid
def contains_metastrings(s) : """Verify that the given string does not contain any metadata strings that might interfere with dbtables database operation. """ if (s.find(_table_names_key) >= 0 or s.find(_columns) >= 0 or s.find(_data) >= 0 or s.find(_rowid) >= 0): # Then return 1 else: return 0
class bsdTableDB : def __init__(self, filename, dbhome, create=0, truncate=0, mode=0600, recover=0, dbflags=0): """bsdTableDB.open(filename, dbhome, create=0, truncate=0, mode=0600) Open database name in the dbhome BerkeleyDB directory. Use keyword arguments when calling this constructor. """ self.db = None myflags = DB_THREAD if create: myflags |= DB_CREATE flagsforenv = (DB_INIT_MPOOL | DB_INIT_LOCK | DB_INIT_TXN | DB_TXN_SYNC | dbflags) if recover: flagsforenv = flagsforenv | DB_RECOVER self.env = DBEnv() # enable auto deadlock avoidance self.env.set_lk_detect(DB_LOCK_DEFAULT) self.env.open(dbhome, myflags | flagsforenv) self.db = DB(self.env) # this code relies on DBCursor.set* methods to raise exceptions # rather than returning None self.db.set_get_returns_none(1) # allow duplicate entries [warning: be careful w/ metadata] self.db.set_flags(DB_DUP) self.txn = self.env.txn_begin() self.db.open(filename, DB_BTREE, dbflags | myflags, mode,txn=self.txn) self.dbfilename = filename # Initialize the table names list if this is a new database if not self.db.has_key(_table_names_key, self.txn): self.db.put(_table_names_key, pickle.dumps([], 1), txn=self.txn) # TODO verify more of the database's metadata? self._tablecolumns = {}
def __del__(self): pass
def close(self): if self.db is not None: self.db.close() self.db = None if self.env is not None: self.env.close() self.env = None
def checkpoint(self, mins=0): try: self.env.txn_checkpoint(mins) except DBIncompleteError: pass
def sync(self): try: self.db.sync() except DBIncompleteError: pass
def _db_print(self) : """Print the database to stdout for debugging""" print "******** Printing raw database for debugging ********" cur = self.db.cursor(self.txn) try: key, data = cur.first() while 1: print repr({key: data}) next = cur.next() if next: key, data = next else: cur.close() return except DBNotFoundError: cur.close()
def CreateTable(self, table, columns): """CreateTable(table, columns) - Create a new table in the database raises TableDBError if it already exists or for other DB errors. """ assert isinstance(columns, ListType) try: # checking sanity of the table and column names here on # table creation will prevent problems elsewhere. if contains_metastrings(table): raise ValueError( "bad table name: contains reserved metastrings") for column in columns : if contains_metastrings(column): raise ValueError( "bad column name: contains reserved metastrings")
columnlist_key = _columns_key(table) if self.db.has_key(columnlist_key): raise TableAlreadyExists, "table already exists" # store the table's column info self.db.put(columnlist_key, pickle.dumps(columns, 1), txn=self.txn)
# add the table name to the tablelist tablelist = pickle.loads(self.db.get(_table_names_key, txn=self.txn, flags=DB_RMW)) tablelist.append(table) # delete 1st, in case we opened with DB_DUP self.db.delete(_table_names_key, self.txn) self.db.put(_table_names_key, pickle.dumps(tablelist, 1), txn=self.txn)
except DBError, dberror: raise TableDBError, dberror[1]
def __load_column_info(self, table) : """initialize the self.__tablecolumns dict""" # check the column names try: tcolpickles = self.db.get(_columns_key(table),txn=self.txn) except DBNotFoundError: raise TableDBError, "unknown table: %r" % (table,) if not tcolpickles: raise TableDBError, "unknown table: %r" % (table,) self._tablecolumns[table] = pickle.loads(tcolpickles)
def __new_rowid(self, table, txn) : """Create a new unique row identifier""" unique = 0 while not unique: # Generate a random 64-bit row ID string # (note: this code has <64 bits of randomness # but it's plenty for our database id needs!) p = xdrlib.Packer() p.pack_int(int(random.random()*2147483647)) p.pack_int(int(random.random()*2147483647)) newid = p.get_buffer()
# Guarantee uniqueness by adding this key to the database try: self.db.put(_rowid_key(table, newid), None, txn=txn, flags=DB_NOOVERWRITE) except DBKeyExistError: pass else: unique = 1
return newid
def Insert(self, table, rowdict) : """Insert(table, datadict) - Insert a new row into the table using the keys+values from rowdict as the column values. """ try: if not self._tablecolumns.has_key(table): self.__load_column_info(table) for column in rowdict.keys() : if not self._tablecolumns[table].count(column): raise TableDBError, "unknown column: %r" % (column,)
# get a unique row identifier for this row rowid = self.__new_rowid(table, txn=self.txn)
# insert the row values into the table database for column, dataitem in rowdict.items(): # store the value self.db.put(_data_key(table, column, rowid), dataitem, txn=self.txn)
except DBError, dberror: # WIBNI we could just abort the txn and re-raise the exception? # But no, because TableDBError is not related to DBError via # inheritance, so it would be backwards incompatible. Do the next # best thing. info = sys.exc_info() raise TableDBError, dberror[1], info[2]
def Delete(self, table, conditions={}): """Delete(table, conditions) - Delete items matching the given conditions from the table. * conditions is a dictionary keyed on column names containing condition functions expecting the data string as an argument and returning a boolean. """ try: matching_rowids = self.__Select(table, [], conditions)
# delete row data from all columns columns = self._tablecolumns[table] for rowid in matching_rowids.keys(): try: for column in columns: # delete the data key try: self.db.delete(_data_key(table, column, rowid), self.txn) except DBNotFoundError: # XXXXXXX column may not exist, assume no error pass
try: self.db.delete(_rowid_key(table, rowid), self.txn) except DBNotFoundError: # XXXXXXX row key somehow didn't exist, assume no error pass #txn.commit() #txn = None except DBError, dberror: raise except DBError, dberror: raise TableDBError, dberror[1]
def Select(self, table, columns, conditions={}): """Select(table, conditions) - retrieve specific row data Returns a list of row column->value mapping dictionaries. * columns is a list of which column data to return. If columns is None, all columns will be returned. * conditions is a dictionary keyed on column names containing callable conditions expecting the data string as an argument and returning a boolean. """ try: if not self._tablecolumns.has_key(table): self.__load_column_info(table) if columns is None: columns = self._tablecolumns[table] matching_rowids = self.__Select(table, columns, conditions) except DBError, dberror: raise TableDBError, dberror[1] # return the matches as a list of dictionaries return matching_rowids.values()
def __Select(self, table, columns, conditions): """__Select() - Used to implement Select and Delete (above) Returns a dictionary keyed on rowids containing dicts holding the row data for columns listed in the columns param that match the given conditions. * conditions is a dictionary keyed on column names containing callable conditions expecting the data string as an argument and returning a boolean. """ # check the validity of each column name if not self._tablecolumns.has_key(table): self.__load_column_info(table) if columns is None: columns = self.tablecolumns[table] for column in (columns + conditions.keys()): if not self._tablecolumns[table].count(column): raise TableDBError, "unknown column: %r" % (column,)
# keyed on rows that match so far, containings dicts keyed on # column names containing the data for that row and column. matching_rowids = {} # keys are rowids that do not match rejected_rowids = {}
# attempt to sort the conditions in such a way as to minimize full # column lookups def cmp_conditions(atuple, btuple): a = atuple[1] b = btuple[1] if type(a) is type(b): if isinstance(a, PrefixCond) and isinstance(b, PrefixCond): # longest prefix first return cmp(len(b.prefix), len(a.prefix)) if isinstance(a, LikeCond) and isinstance(b, LikeCond): # longest likestr first return cmp(len(b.likestr), len(a.likestr)) return 0 if isinstance(a, ExactCond): return -1 if isinstance(b, ExactCond): return 1 if isinstance(a, PrefixCond): return -1 if isinstance(b, PrefixCond): return 1 # leave all unknown condition callables alone as equals return 0
conditionlist = conditions.items() conditionlist.sort(cmp_conditions)
# Apply conditions to column data to find what we want cur = self.db.cursor(self.txn) column_num = -1 for column, condition in conditionlist: column_num = column_num + 1 searchkey = _search_col_data_key(table, column) # speedup: don't linear search columns within loop if column in columns: savethiscolumndata = 1 # save the data for return else: savethiscolumndata = 0 # data only used for selection
try: key, data = cur.set_range(searchkey) while key[:len(searchkey)] == searchkey: # extract the rowid from the key rowid = key[-_rowid_str_len:]
if not rejected_rowids.has_key(rowid): # if no condition was specified or the condition # succeeds, add row to our match list. if not condition or condition(data): if not matching_rowids.has_key(rowid): matching_rowids[rowid] = {} if savethiscolumndata: matching_rowids[rowid][column] = data else: if matching_rowids.has_key(rowid): del matching_rowids[rowid] rejected_rowids[rowid] = rowid
key, data = cur.next()
except DBError, dberror: if dberror[0] != DB_NOTFOUND: raise continue
cur.close()
# we're done selecting rows, garbage collect the reject list del rejected_rowids
# extract any remaining desired column data from the # database for the matching rows. if len(columns) > 0: for rowid, rowdata in matching_rowids.items(): for column in columns: if rowdata.has_key(column): continue try: rowdata[column] = self.db.get( _data_key(table, column, rowid),txn=self.txn) except DBError, dberror: if dberror[0] != DB_NOTFOUND: raise rowdata[column] = None
# return the matches return matching_rowids
|