Source code for x84.bbs.userbase

""" Userbase record database and utility functions for x/84. """
import logging
from x84.bbs.dbproxy import DBProxy

FN_PASSWORD_DIGEST = None
GROUPDB = 'groupbase'
USERDB = 'userbase'


[docs]def list_users(): """ Returns all user handles. :rtype: list :returns list of user handles. """ return [handle.decode('utf8') for handle in DBProxy(USERDB).keys()]
[docs]def get_user(handle): """ Returns User record by handle. :rtype: User :returns: instance of :class:`User` """ return DBProxy(USERDB)[handle]
[docs]def find_user(handle): """ Discover and return matching user by ``handle``, case-insensitive. :returns: matching handle as str, or None if not found. :rtype: None or str. """ for key in DBProxy(USERDB).keys(): if handle.lower() == key.decode('utf8').lower(): return key
[docs]class Group(object): """ A simple group record object. """ def __init__(self, name, members=()): """ Class initializer. """ self._name = name self._members = set(members) @property def name(self): """ Name of this group. """ return self._name @name.setter def name(self, value): # pylint: disable=C0111 # Missing docstring self._name = value @property def members(self): """ Members of this group as user handles. """ return self._members
[docs] def add(self, handle): """ Add user to group. """ log = logging.getLogger(__name__) log.info("Group({!r}).add({!r})".format(self.name, handle)) self._members.add(handle)
[docs] def remove(self, handle): """ Remove user from group. """ log = logging.getLogger(__name__) log.info("Group({!r}).remove({!r})".format(self.name, handle)) self._members.remove(handle)
[docs] def save(self): """ Save group record to database. """ DBProxy(GROUPDB)[self.name] = self
[docs] def delete(self): """ Delete group record, enforces referential integrity with Users. """ udb = DBProxy(USERDB) for chk_user in self.members: user = udb[chk_user] if self.name in user.groups: user.group_del(self.name) user.save() del DBProxy(GROUPDB)[self.name]
[docs]class User(object): """ A simple user record. """ def __init__(self, handle=u'anonymous'): """ Class initializer. """ self._handle = handle self._password = (None, None) self._location = u'' self._email = u'' self._groups = set() self._calls = 0 self._lastcall = 0 @property def handle(self): """ User handle, also the database key. """ return self._handle @handle.setter def handle(self, value): # pylint: disable=C0111 # Missing docstring self._handle = value @property def password(self): """ Password in encrypted form as tuple (salt, hash). Not generally used directly, but by :meth:`auth`. The ``setter`` of this property is provided a password in plain-text and encrypts it as given. If a password has not yet been set, it is (None, None). """ return self._password @password.setter def password(self, value): # pylint: disable=C0111 # Missing docstring log = logging.getLogger(__name__) from x84.bbs import ini if ini.CFG.getboolean('system', 'pass_ucase'): # facebook and mystic storage style, i wouldn't # recommend it though. self._password = get_digestpw()(value.upper()) else: self._password = get_digestpw()(value) log.info("set password for user {!r}.".format(self.handle))
[docs] def auth(self, try_pass): """ Authenticate user with given password, ``try_pass``. :rtype: bool :returns: whether the password is correct. """ from x84.bbs import ini pass_ucase = ini.CFG.getboolean('system', 'pass_ucase') assert isinstance(try_pass, unicode) assert len(try_pass) > 0 assert self.password != (None, None), ('account is without password') salt = self.password[0] digestpw = get_digestpw() return (self.password == digestpw(try_pass, salt) or pass_ucase and self.password == digestpw(try_pass.upper(), salt))
def __setitem__(self, key, value): # pylint: disable=C0111, # Missing docstring log = logging.getLogger(__name__) adb = DBProxy(USERDB, 'attrs') if self.handle == 'anonymous': log.debug("set attr {!r} not possible for 'anonymous'".format(key)) return with adb: if self.handle not in adb: adb[self.handle] = dict([(key, value), ]) else: attrs = adb[self.handle] attrs.__setitem__(key, value) adb[self.handle] = attrs log.debug("set attr {!r} for user {!r}.".format(key, self.handle)) __setitem__.__doc__ = dict.__setitem__.__doc__
[docs] def get(self, key, default=None): # pylint: disable=C0111, # Missing docstring from x84.bbs import ini log = logging.getLogger(__name__) adb = DBProxy(USERDB, 'attrs') if self.handle not in adb: if ini.CFG.getboolean('session', 'tap_db'): log.debug('User({!r}).get(key={!r}) returns default={!r}' .format(self.handle, key, default)) return default attrs = adb.get(self.handle, {}) if key not in attrs: if ini.CFG.getboolean('session', 'tap_db'): log.debug('User({!r}.get(key={!r}) returns default={!r}' .format(self.handle, key, default)) return default if ini.CFG.getboolean('session', 'tap_db'): log.debug('User({!r}.get(key={!r}) returns value.' .format(self.handle, key)) return attrs[key]
get.__doc__ = dict.get.__doc__ def __getitem__(self, key): # pylint: disable=C0111, # Missing docstring return DBProxy(USERDB, 'attrs')[self.handle][key] __getitem__.__doc__ = dict.__getitem__.__doc__ def __delitem__(self, key): # pylint: disable=C0111, # Missing docstring log = logging.getLogger(__name__) uadb = DBProxy(USERDB, 'attrs') with uadb: # retrieve attributes from uadb, attrs = uadb.get(self.handle, {}) # delete attribute if exists if key in attrs: attrs.__delitem__(key) uadb[self.handle] = attrs log.info("User({!r}) delete attr {!r}." .format(self.handle, key)) __delitem__.__doc__ = dict.__delitem__.__doc__ @property def groups(self): """ Set of groups user is a member of (set of strings). """ return self._groups
[docs] def group_add(self, group): """ Add user to group. """ return self._groups.add(group)
[docs] def group_del(self, group): """ Remove user from group. """ return self._groups.remove(group)
[docs] def save(self): """ Save user record to database. """ log = logging.getLogger(__name__) assert isinstance(self._handle, unicode), ('handle must be unicode') assert len(self._handle) > 0, ('handle must be non-zero length') assert (None, None) != self._password, ('password must be set') assert self._handle != u'anonymous', ('anonymous may not be saved.') udb = DBProxy(USERDB) with udb: if 0 == len(udb) and self.is_sysop is False: log.warn('{!r}: First new user becomes sysop.' .format(self.handle)) self.group_add(u'sysop') is_new = self.handle not in udb udb[self.handle] = self if is_new: log.info("saved new user '%s'.", self.handle) adb = DBProxy(USERDB, 'attrs') with adb: if self.handle not in adb: adb[self.handle] = dict() self._apply_groups()
[docs] def delete(self): """ Remove user from user and group databases. """ log = logging.getLogger(__name__) gdb = DBProxy(GROUPDB) with gdb: for gname in self._groups: group = gdb[gname] if self.handle in group.members: group.remove(self.handle) group.save() udb = DBProxy(USERDB) with udb: del udb[self.handle] log.info("deleted user '%s'.", self.handle)
@property def is_sysop(self): """ Whether the user is in the 'sysop' group. """ return u'sysop' in self._groups @property def lastcall(self): """ Time last called, ``time.time()`` epoch-formatted (float). """ return self._lastcall @lastcall.setter def lastcall(self, value): # pylint: disable=C0111 # Missing docstring self._lastcall = value @property def calls(self): """Legacy, number of times user has 'called' this board.""" return self._calls @calls.setter def calls(self, value): # pylint: disable=C0111 # Missing docstring self._calls = value @property def location(self): """ Legacy, used as a geographical location, group names, etc. """ return self._location @location.setter def location(self, value): # pylint: disable=C0111 # Missing docstring self._location = value @property def email(self): """ E-mail address. May be used for password resets. """ return self._email @email.setter def email(self, value): # pylint: disable=C0111 # Missing docstring self._email = value def _apply_groups(self): """ Enforce referential integrity of user's groups. """ log = logging.getLogger(__name__) gdb = DBProxy(GROUPDB) with gdb: for chk_grp in self._groups: if chk_grp not in gdb: gdb[chk_grp] = Group(chk_grp, set([self.handle])) log.info("created group {!r} for user {!r}." .format(chk_grp, self.handle)) # ensure membership in existing groups group = gdb[chk_grp] if self.handle not in group.members: group.add(self.handle) group.save() for gname, group in gdb.items(): if gname not in self._groups and self.handle in group.members: group.remove(self.handle) group.save()
def _digestpw_bcrypt(password, salt=None): """ Password digest using bcrypt (optional-preferred). """ import bcrypt if not salt: salt = bcrypt.gensalt() if isinstance(password, unicode): password = password.encode('utf8') return salt, bcrypt.hashpw(password, salt) def _digestpw_internal(password, salt=None): """ Password digest using regular python libs (slow). """ import hashlib import base64 import os if not salt: salt = base64.b64encode(os.urandom(32)) digest = salt + password for _ in range(0, 100000): # pylint: disable=E1101 # Module 'hashlib' has no 'sha256' digest = hashlib.sha256(digest).hexdigest() return salt, digest def _digestpw_plaintext(password, salt=None): """ No password digest, just store the passwords in plain text. """ if not salt: salt = 'none' return salt, password
[docs]def get_digestpw(): """ Returns singleton to password digest routine. """ global FN_PASSWORD_DIGEST if FN_PASSWORD_DIGEST is not None: return FN_PASSWORD_DIGEST from x84.bbs.ini import get_ini FN_PASSWORD_DIGEST = { 'bcrypt': _digestpw_bcrypt, 'internal': _digestpw_internal, 'plaintext': _digestpw_plaintext, }.get(get_ini('system', 'password_digest')) return FN_PASSWORD_DIGEST
[docs]def check_new_user(username): """ Boolean return when username matches ``newcmds`` ini cfg. """ from x84.bbs import get_ini matching = get_ini(section='matrix', key='newcmds', split=True) allowed = get_ini(section='nua', key='allow_apply', getter='getboolean') return allowed and username in matching
[docs]def check_bye_user(username): """ Boolean return when username matches ``byecmds`` in ini cfg. """ from x84.bbs import get_ini matching = get_ini(section='matrix', key='byecmds', split=True) return matching and username in matching
[docs]def check_anonymous_user(username): """ Boolean return when user is anonymous and is allowed. """ from x84.bbs import get_ini matching = get_ini(section='matrix', key='anoncmds', split=True) allowed = get_ini(section='matrix', key='enable_anonymous', getter='getboolean', split=False) return allowed and username in matching
[docs]def check_user_password(username, password): """ Boolean return when username and password match user record. """ from x84.bbs import find_user, get_user handle = find_user(username) if handle is None: return False user = get_user(handle) if user is None: return False return password and user.auth(password)
[docs]def parse_public_key(user_pubkey): """ Return paramiko key class instance of a user's public key text. """ import paramiko if len(user_pubkey.split()) == 3: key_msg, key_data, _ = user_pubkey.split() elif len(user_pubkey.split()) == 2: key_msg, key_data = user_pubkey.split() elif len(user_pubkey.split()) == 1: # when no key-type is specified, assume rsa key_msg, key_data = 'ssh-rsa', user_pubkey else: raise ValueError('Malformed public key format: {0!r}' .format(user_pubkey)) try: key_bytes = key_data.decode('ascii') except UnicodeDecodeError: raise ValueError('Malformed public key encoding: {0!r}' .format(key_data)) decoded_keybytes = paramiko.py3compat.decodebytes(key_bytes) try: return {'ssh-rsa': paramiko.RSAKey, 'ssh-dss': paramiko.DSSKey, 'ecdsa-sha2-nistp256': paramiko.ECDSAKey, }.get(key_msg)(data=decoded_keybytes) except KeyError: raise ValueError('Malformed public key_msg: {0!r}' .format(key_msg))
[docs]def check_user_pubkey(username, public_key): """ Boolean return when public_key matches user record. """ from x84.bbs import find_user, get_user log = logging.getLogger(__name__) handle = find_user(username) if handle is None: return False user_pubkey = get_user(handle).get('pubkey', False) if not user_pubkey: log.debug('pubkey authentication by {0!r} but no ' 'public key on record for the user.' .format(username)) return False try: stored_pubkey = parse_public_key(user_pubkey) except (ValueError, Exception): import sys (exc_type, exc_value, _) = sys.exc_info() log.debug('{0} for stored public key of user {1!r}: ' '{2}'.format(exc_type, username, exc_value)) else: return stored_pubkey == public_key