Source code for x84.engine

#!/usr/bin/env python
""" Command-line launcher and event loop for x/84. """
# Place ALL metadata in setup.py, except where not suitable, place here.
# For any contributions, feel free to tag __author__ etc. at top of such file.
__author__ = "Johannes Lundberg (jojo), Jeff Quast (dingo)"
__url__ = u'https://github.com/jquast/x84/'
__copyright__ = "Copyright 2003"
__credits__ = [
    # use 'scene' names unless preferred or unavailable.
    "zipe",
    "jojo",
    "maze",
    "dingo",
    "spidy",
    "beardy",
    "haliphax",
    "megagumbo",
    "hellbeard",
    "Mercyful Fate",
]
__license__ = 'ISC'

# std
import logging
import select
import socket
import time
import sys

# local
__import__('encodings')  # provides alternate encodings
from x84 import cmdline
from x84.db import DBHandler
from x84.terminal import get_terminals, kill_session, find_tty
from x84.fail2ban import get_fail2ban_function


[docs]def main(): """ x84 main entry point. The system begins and ends here. Command line arguments to engine.py: - ``--config=`` location of alternate configuration file - ``--logger=`` location of alternate logging.ini file """ # load existing .ini files or create default ones. import x84.bbs.ini x84.bbs.ini.init(*cmdline.parse_args()) from x84.bbs import get_ini from x84.bbs.ini import CFG if sys.maxunicode == 65535: # apple is the only known bastardized variant that does this; # presumably for memory/speed savings (UCS-2 strings are faster # than UCS-4). Python 3 dynamically allocates string types by # their widest content, so such things aren't necessary, there. import warnings warnings.warn('This python is built without wide unicode support. ' 'some internationalized languages will not be possible.') # retrieve list of managed servers servers = get_servers(CFG) # begin unmanaged servers if (CFG.has_section('web') and (not CFG.has_option('web', 'enabled') or CFG.getboolean('web', 'enabled'))): # start https server for one or more web modules. from x84 import webserve webserve.main() if get_ini(section='msg', key='network_tags'): # start background timer to poll for new messages # of message networks we may be a member of. from x84 import msgpoll msgpoll.main() try: # begin main event loop _loop(servers) except KeyboardInterrupt: # exit on ^C, killing any client sessions. for server in servers: for thread in server.threads[:]: if not thread.stopped: thread.stopped = True server.threads.remove(thread) for key, client in server.clients.items()[:]: kill_session(client, 'server shutdown') del server.clients[key] return 0
[docs]def get_servers(CFG): """ Instantiate and return enabled servers by configuration ``CFG``. """ servers = [] if (CFG.has_section('telnet') and (not CFG.has_option('telnet', 'enabled') or CFG.getboolean('telnet', 'enabled'))): # start telnet server instance from x84.telnet import TelnetServer servers.append(TelnetServer(config=CFG)) if (CFG.has_section('ssh') and not CFG.has_option('ssh', 'enabled') or CFG.getboolean('ssh', 'enabled')): # start ssh server instance # # may raise an ImportError for systems where pyOpenSSL and etc. could # not be installed (due to any issues with missing python-dev, libffi, # cc, etc.). Allow it to raise naturally, the curious user should # either discover and resolve the root issue, or disable ssh if it # cannot be resolved. from x84.ssh import SshServer servers.append(SshServer(config=CFG)) if (CFG.has_section('rlogin') and (not CFG.has_option('rlogin', 'enabled') or CFG.getboolean('rlogin', 'enabled'))): # start rlogin server instance from x84.rlogin import RLoginServer servers.append(RLoginServer(config=CFG)) return servers
[docs]def find_server(servers, fd): """ Find matching ``server.server_socket`` for given file descriptor. """ for server in servers: if fd == server.server_socket.fileno(): return server
[docs]def accept(log, server, check_ban): """ Accept new connection from server, spawning an unmanaged thread. Connecting socket accepted is server.server_socket, instantiate a new instance of client_factory, with optional keyword arguments defined by server.client_factory_kwargs, registering it with dictionary server.clients, and spawning an unmanaged thread using connect_factory, with optional keyword arguments server.connect_factory_kwargs. """ if None in (server.client_factory, server.connect_factory): raise NotImplementedError( "No accept for server class {server.__class__.__name__}" .format(server=server)) client_factory_kwargs = server.client_factory_kwargs if callable(server.client_factory_kwargs): client_factory_kwargs = server.client_factory_kwargs(server) connect_factory_kwargs = server.connect_factory_kwargs if callable(server.connect_factory_kwargs): connect_factory_kwargs = server.connect_factory_kwargs(server) try: sock, address_pair = server.server_socket.accept() # busy signal if server.client_count() > server.MAX_CONNECTIONS: try: sock.shutdown(socket.SHUT_RDWR) except socket.error: pass sock.close() log.error('{addr}: refused, maximum connections reached.' .format(addr=address_pair[0])) return # connecting IP is banned if check_ban(address_pair[0]) is False: log.debug('{addr}: refused, banned.'.format(addr=address_pair[0])) try: sock.shutdown(socket.SHUT_RDWR) except socket.error: pass sock.close() return # instantiate a client of this type client = server.client_factory(sock, address_pair, **client_factory_kwargs) # spawn on-connect negotiation thread. When successful, # a new sub-process is spawned and registered as a session tty. server.clients[client.sock.fileno()] = client thread = server.connect_factory(client, **connect_factory_kwargs) log.info('{client.kind} connection from {client.addrport} ' '(*{thread.name}).'.format(client=client, thread=thread)) server.threads.append(thread) thread.start() except socket.error as err: log.error('accept error {0}:{1}'.format(*err))
[docs]def get_session_output_fds(servers): """ Return file descriptors of all ``tty.master_read`` pipes. """ session_fds = [] for server in servers: for client in server.clients.values(): tty = find_tty(client) if tty is not None: session_fds.append(tty.master_read.fileno()) return session_fds
[docs]def client_recv(servers, ready_fds, log): """ Test all clients for recv_ready(). If any data is available, then ``client.socket_recv()`` is called, buffering the data for the session which is exhausted by :func:`session_send`. """ from x84.bbs.exception import Disconnected for server in servers: for client in server.clients_ready(ready_fds): try: client.socket_recv() except Disconnected as err: log.debug('{client.addrport}: disconnect on recv: {err}' .format(client=client, err=err)) kill_session(client, 'disconnected: {err}'.format(err=err))
[docs]def client_send(terminals, log): """ Test all clients for send_ready(). If any data is available, then ``tty.client.send()`` is called. This is data sent from the session to the tcp client. """ from x84.bbs.exception import Disconnected # nothing to send until tty is registered. for _, tty in terminals: if tty.client.send_ready(): try: tty.client.send() except Disconnected as err: log.debug('{client.addrport}: disconnect on send: {err}' .format(client=tty.client, err=err)) kill_session(tty.client, 'disconnected: {err}'.format(err=err))
[docs]def session_send(terminals): """ Test all tty clients for input_ready(). Meaning, tcp data has been buffered to be received by the tty session, and send it to the tty input queue (tty.master_write). Also, test all sessions for idle timeout, signaling exit to subprocess when reached. """ for _, tty in terminals: if tty.client.input_ready(): try: tty.master_write.send(('input', tty.client.get_input())) except IOError: # this may happen if a sub-process crashes, or more often, # because the subprocess has logged off, but the user kept # banging the keyboard before we have had the opportunity # to close their telnet socket. kill_session(tty.client, 'no tty for socket data') # poll about and kick off idle users elif tty.timeout and tty.client.idle() > tty.timeout: kill_session(tty.client, 'timeout')
[docs]def handle_lock(locks, tty, event, data, tap_events, log): """ handle locking event of ``(lock-key, (method, stale))``. """ # pylint: disable=R0913 # Too many arguments (6/5) method, stale = data if method == 'acquire': # this lock is already held, if event in locks: # check if lock held by an active session, holder = locks[event][1] for _sid, _ in get_terminals(): if _sid == holder and _sid != tty.sid: # acquire the lock from a now-deceased session. log.debug('[{tty.sid}] {event} not acquired, ' 'held by active session: {holder}' .format(tty=tty, event=event, holder=holder)) break elif _sid == holder and _sid == tty.sid: # acquire the lock from ourselves! We'll allow it # (this is termed, "re-entrant locking"). log.debug('{tty.sid}] {event} is re-acquired!' .format(tty=tty, event=event)) else: # lock is held by a now-defunct session, re-acquired. log.debug('[{tty.sid}] {event} re-acquiring stale lock, ' 'previously held by session no longer active: ' '{holder}' .format(tty=tty, event=event, holder=holder)) del locks[event] # lock is not held, or release by previous block if event not in locks: # acknowledge its requirement, locks[event] = (time.time(), tty.sid) tty.master_write.send((event, True,)) if tap_events: log.debug('[{tty.sid}] {event} granted lock.' .format(tty=tty, event=event)) # lock cannot be acquired, else: holder = locks[event][1] elapsed = time.time() - locks[event][0] if (stale is not None and elapsed > stale): # caller has decreed that this lock may be acquired even if # it already held, if it has been held longer than length of # time `stale`. This is simply to prevent a global freeze # when the programmer knows the holder may fail to release, # though this is not currently used in the demonstration # system. locks[event] = (time.time(), tty.sid) log.warn('[{tty.sid}] {event} re-acquiring stale lock, ' 'previously held active session {holder} after ' '{elapsed}s elapsed (stale={stale})' .format(tty=tty, event=event, holder=holder, elapsed=elapsed, stale=stale)) tty.master_write.send((event, True,)) # signal busy with matching event, data=False else: log.debug('[{tty.sid}] {event} lock rejected; already held ' 'by active session {holder} for {elapsed} seconds ' '(stale={stale})' .format(tty=tty, event=event, holder=holder, elapsed=elapsed, stale=stale)) tty.master_write.send((event, False,)) elif method == 'release': if event not in locks: log.error('[{tty.sid}] {event} lock failed to release, ' 'not acquired.'.format(tty=tty, event=event)) else: del locks[event] if tap_events: log.debug('[{tty.sid}] {event} released lock.' .format(tty=tty, event=event))
[docs]def session_recv(locks, terminals, log, tap_events): """ Receive data waiting for terminal sessions. All data received from subprocess is handled here. """ for sid, tty in terminals: while tty.master_read.poll(): try: event, data = tty.master_read.recv() except (EOFError, IOError) as err: # sub-process unexpectedly closed log.exception('master_read pipe: {0}'.format(err)) kill_session(tty.client, 'master_read pipe: {0}'.format(err)) break except TypeError as err: log.exception('unpickling error: {0}'.format(err)) break # 'exit' event, unregisters client if event == 'exit': kill_session(tty.client, 'client exit') break # 'logger' event, prefix log message with handle and IP address elif event == 'logger': data.msg = ('{data.handle}[{tty.sid}] {data.msg}' .format(data=data, tty=tty)) log.handle(data) # 'output' event, buffer for tcp socket elif event == 'output': tty.client.send_unicode(ucs=data[0], encoding=data[1]) # 'remote-disconnect' event, hunt and destroy elif event == 'remote-disconnect': for _sid, _tty in terminals: # data[0] is 'send-to' address. if data[0] == _sid: kill_session( tty.client, 'remote-disconnect by {0}'.format(sid)) break # 'route': message passing directly from one session to another elif event == 'route': if tap_events: log.debug('route {0!r}'.format(data)) tgt_sid, send_event, send_val = data[0], data[1], data[2:] for _sid, _tty in terminals: if tgt_sid == _sid: _tty.master_write.send((send_event, send_val)) break # 'global': message broadcasting to all sessions elif event == 'global': if tap_events: log.debug('broadcast: {data!r}'.format(data=data)) for _sid, _tty in terminals: if sid != _sid: _tty.master_write.send((event, data,)) # 'set-timeout': set user-preferred timeout elif event == 'set-timeout': if tap_events: log.debug('[{tty.sid}] set-timeout {data}' .format(tty=tty, data=data)) tty.timeout = data # 'db*': access DBProxy API for shared sqlitedict elif event.startswith('db'): DBHandler(tty.master_write, event, data).start() # 'lock': access fine-grained bbs-global locking elif event.startswith('lock'): handle_lock(locks, tty, event, data, tap_events, log) else: log.error('[{tty.sid}] unhandled event, data: ' '({event}, {data})' .format(tty=tty, event=event, data=data))
def _loop(servers): """ Main event loop. Never returns. """ # pylint: disable=R0912,R0914,R0915 # Too many local variables (24/15) from x84.bbs.ini import CFG SELECT_POLL = 0.02 # polling time is 20ms # WIN32 has no session_fds (multiprocess queues are not polled using # select), use a persistently empty set; for WIN32, sessions are always # polled for data at every loop. WIN32 = sys.platform.lower().startswith('win32') session_fds = set() log = logging.getLogger('x84.engine') if not len(servers): raise ValueError("No servers configured for event loop! (ssh, telnet)") tap_events = CFG.getboolean('session', 'tap_events') check_ban = get_fail2ban_function() locks = dict() while True: # shutdown, close & delete inactive clients, for server in servers: # bbs sessions that are no longer active on the socket # level -- send them a 'kill signal' for key, client in server.clients.items()[:]: if not client.is_active(): kill_session(client, 'socket shutdown') del server.clients[key] # on-connect negotiations that have completed or failed. # delete their thread instance from further evaluation for thread in [_thread for _thread in server.threads if _thread.stopped][:]: server.threads.remove(thread) check_r = list() for server in servers: check_r.append(server.server_socket.fileno()) check_r.extend(server.client_fds()) if not WIN32: # WIN32's IPC is not done using sockets, so it # is not possible to use select.select() on them session_fds = get_session_output_fds(servers) check_r.extend(session_fds) # We'd like to use timeout 'None', but the registration of # a new client in terminal.start_process surprises us with new # file descriptors for the session i/o. Unless we loop for # additional `session_fds', a connecting client would block. try: ready_r, _, _ = select.select(check_r, [], [], SELECT_POLL) except select.error as err: # more than likely EBADF (9, 'Bad file descriptor'), it would seem # the socket we've just decided to poll has just gone bad. log.debug('continue after select.error: {0}'.format(err)) continue for fd in ready_r: # see if any new tcp connections were made server = find_server(servers, fd) if server is not None: accept(log, server, check_ban) # receive new data from tcp clients. client_recv(servers, ready_r, log) terms = get_terminals() # receive new data from session terminals if WIN32 or set(session_fds) & set(ready_r): try: session_recv(locks, terms, log, tap_events) except IOError as err: # if the ipc closes while we poll, warn and continue log.warn(err) # send tcp data to clients client_send(terms, log) # send session data, poll for user-timeout and disconnect them session_send(terms) if __name__ == '__main__': exit(main())