""" Database proxy helper for x/84. """
# std imports
import logging
# local
from x84.bbs.ini import get_ini
from x84.db import (
get_db_filepath,
get_database,
get_db_func,
get_db_lock,
log_db_cmd,
)
[docs]class DBProxy(object):
"""
Provide dictionary-like object interface to shared database.
A database call, such as __len__() or keys() is issued as a command
to the main engine when ``use_session`` is True, which spawns a thread
to acquire a lock on the database and return the results via IPC pipe
transfer.
"""
def __init__(self, schema, table='unnamed', use_session=True):
"""
Class initializer.
:param str scheme: database key, becomes basename of .sqlite3 file.
:param str table: optional database table.
:param bool use_session: Whether iterable returns should be sent over
an IPC pipe (client is a
:class:`x84.bbs.session.Session` instance),
or returned directly (such as used by the main
thread engine components.)
"""
self.log = logging.getLogger(__name__)
self.schema = schema
self.table = table
self._tap_db = get_ini('session', 'tab_db', getter='getboolean')
from x84.bbs.session import getsession
self._session = use_session and getsession()
[docs] def proxy_iter_session(self, method, *args):
""" Proxy for iterable-return method calls over session IPC pipe. """
event = 'db={0}'.format(self.schema)
self._session.flush_event(event)
self._session.send_event(event, (self.table, method, args))
data = self._session.read_event(event)
assert data == (None, 'StartIteration'), (
'iterable proxy used on non-iterable, {0!r}'.format(data))
data = self._session.read_event(event)
while data != (None, StopIteration):
yield data
data = self._session.read_event(event)
self._session.flush_event(event)
[docs] def proxy_method_direct(self, method, *args):
""" Proxy for direct dictionary method calls. """
dictdb = get_database(filepath=get_db_filepath(self.schema),
table=self.table)
try:
func = get_db_func(dictdb, method)
if self._tap_db:
log_db_cmd(self.log, self.schema, method, args)
return func(*args)
finally:
dictdb.close()
[docs] def proxy_iter(self, method, *args):
""" Proxy for iterable dictionary method calls. """
if self._session:
return self.proxy_iter_session(method, *args)
return self.proxy_method_direct(method, *args)
[docs] def proxy_method(self, method, *args):
""" Proxy for dictionary method calls. """
if self._session:
return self.proxy_method_session(method, *args)
return self.proxy_method_direct(method, *args)
[docs] def proxy_method_session(self, method, *args):
""" Proxy for dictionary method calls over IPC pipe. """
event = 'db-{0}'.format(self.schema)
self._session.send_event(event, (self.table, method, args))
return self._session.read_event(event)
[docs] def acquire(self):
""" Acquire system-wide lock on database. """
lock = get_db_lock(schema=self.schema, table=self.table)
if self._tap_db:
self.log.debug('lock acquire schema=%s, table=%s',
self.schema, self.table)
lock.acquire()
[docs] def release(self):
""" Release system-wide lock on database. """
lock = get_db_lock(schema=self.schema, table=self.table)
if self._tap_db:
self.log.debug('lock release schema=%s, table=%s',
self.schema, self.table)
lock.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.release()
# pylint: disable=C0111
# Missing docstring
def __contains__(self, key):
return self.proxy_method('__contains__', key)
__contains__.__doc__ = dict.__contains__.__doc__
def __getitem__(self, key):
return self.proxy_method('__getitem__', key)
__getitem__.__doc__ = dict.__getitem__.__doc__
def __setitem__(self, key, value):
return self.proxy_method('__setitem__', key, value)
__setitem__.__doc__ = dict.__setitem__.__doc__
def __delitem__(self, key):
return self.proxy_method('__delitem__', key)
__delitem__.__doc__ = dict.__delitem__.__doc__
[docs] def get(self, key, default=None):
return self.proxy_method('get', key, default)
get.__doc__ = dict.get.__doc__
[docs] def has_key(self, key):
return self.proxy_method('has_key', key)
has_key.__doc__ = dict.has_key.__doc__
[docs] def setdefault(self, key, value):
return self.proxy_method('setdefault', key, value)
setdefault.__doc__ = dict.setdefault.__doc__
[docs] def update(self, *args):
return self.proxy_method('update', *args)
update.__doc__ = dict.update.__doc__
def __len__(self):
return self.proxy_method('__len__')
__len__.__doc__ = dict.__len__.__doc__
[docs] def values(self):
return self.proxy_method('values')
values.__doc__ = dict.values.__doc__
[docs] def items(self):
return self.proxy_method('items')
items.__doc__ = dict.items.__doc__
[docs] def iteritems(self):
return self.proxy_iter('iteritems')
iteritems.__doc__ = dict.iteritems.__doc__
[docs] def iterkeys(self):
return self.proxy_iter('iterkeys')
iterkeys.__doc__ = dict.iterkeys.__doc__
[docs] def itervalues(self):
return self.proxy_iter('itervalues')
itervalues.__doc__ = dict.itervalues.__doc__
[docs] def keys(self):
return self.proxy_method('keys')
keys.__doc__ = dict.keys.__doc__
[docs] def pop(self):
return self.proxy_method('pop')
pop.__doc__ = dict.pop.__doc__
[docs] def popitem(self):
return self.proxy_method('popitem')
popitem.__doc__ = dict.popitem.__doc__
[docs] def copy(self):
# https://github.com/piskvorky/sqlitedict/issues/20
# @jquast: should sqlitedict have a .copy() method? "no."
return dict(self.proxy_method('items'))
copy.__doc__ = dict.copy.__doc__