""" Terminal handler for x/84 """
import contextlib
import logging
import codecs
import sys
from blessed import Terminal as BlessedTerminal

TERMINALS = dict()

[docs]class Terminal(BlessedTerminal): """ A thin wrapper over :class:`blessed.Terminal`. """ _session = None def __init__(self, kind, stream, rows, columns): """ Class initializer. """ self._rows = rows self._columns = columns BlessedTerminal.__init__(self, kind, stream) # *PATCH* against 'is None' check in method Terminal.inkey() # that raises RuntimeError to "prevent indefinite blocking # timeout without a keyboard attached" -- which is often our # intention. self._keyboard_fd = 'defunc' if sys.platform.lower().startswith('win32'): self._normal = '\x1b[m' @property def session(self): """ Session associated with this terminal. """ if self._session is None: from x84.bbs.session import getsession self._session = getsession() return self._session @property def number_of_colors(self): return 1 << 24 @number_of_colors.setter def number_of_colors(self, value): pass
[docs] def inkey(self, timeout=None, esc_delay=0.35, *_): # pylint: disable=C0111 # Missing docstring try: return BlessedTerminal.inkey(self, timeout, esc_delay=0.35) except UnicodeDecodeError as err: log = logging.getLogger(__name__) log.warn('UnicodeDecodeError: {0}'.format(err)) return u'?'
[docs] def set_keyboard_decoder(self, encoding): """ Set or change incremental decoder for keyboard input. """ log = logging.getLogger(__name__) try: self._keyboard_decoder = codecs.getincrementaldecoder(encoding)() self._encoding = encoding log.debug('keyboard encoding is {!r}'.format(encoding)) except Exception as err: log.exception(err)
[docs] def kbhit(self, timeout=0, *_): # pylint: disable=C0111 # Missing docstring # pull a value off the input buffer if available, val = self.session.read_event('input', timeout) # if available, place back into buffer and return True, if val is not None: self.session.buffer_input(val, pushback=True) return True # no value available within timeout. return False
[docs] def getch(self): # pylint: disable=C0111 # Missing docstring val = self.session.read_event('input') return self._keyboard_decoder.decode(val, final=False)
def _height_and_width(self): # pylint: disable=C0111 # Missing docstring from blessed.terminal import WINSZ return WINSZ(ws_row=self._rows, ws_col=self._columns, ws_xpixel=None, ws_ypixel=None)
[docs] @contextlib.contextmanager def raw(self): """ Dummy method yields nothing for blessed compatibility. """ yield
[docs] @contextlib.contextmanager def cbreak(self): """ Dummy method yields nothing for blessed compatibility. """ yield
@property def is_a_tty(self): """ Dummy property always returns True. """ return True
[docs]def translate_ttype(ttype): """ Return preferred terminal type given the session-negotiation ttype. This provides a kind of coercion; we know some terminals, such as SyncTerm report a terminal type of 'ansi' -- however, the author publishes a termcap database for 'ansi-bbs' which he instructs should be used! So an ``[system]`` configuration item of ``termcap-ansi`` may be set to ``'ansi-bbs'`` to coerce such terminals for Syncterm-centric telnet servers -- though I would not recommend it. Furthermore, if the ttype is (literally) 'unknown', then a system-wide default terminal type may be returned, also by ``[system]`` configuration option ``termcap-unknown``. """ from x84.bbs import get_ini log = logging.getLogger(__name__) termcap_unknown = get_ini('system', 'termcap-unknown') or 'ansi' termcap_ansi = get_ini('system', 'termcap-ansi') or 'ansi' if termcap_unknown != 'no' and ttype == 'unknown': log.debug("terminal-type {0!r} => {1!r}" .format(ttype, termcap_unknown)) return termcap_unknown elif (termcap_ansi != 'no' and ttype.lower().startswith('ansi') and ttype != termcap_ansi): log.debug("terminal-type {0!r} => {1!r}" .format(ttype, termcap_ansi)) return termcap_ansi return ttype
[docs]def determine_encoding(env): """ Determine and return preferred encoding given session env. """ from x84.bbs import get_ini default_encoding = get_ini( section='session', key='default_encoding' ) or 'utf8' fallback_encoding = { 'ansi': 'cp437', 'ansi-bbs': 'cp437', }.get(env['TERM'], default_encoding) return env.get('encoding', fallback_encoding)
[docs]def init_term(writer, env): """ Determine the final TERM and encoding and return a Terminal. curses is initialized using the value of 'TERM' of dictionary env, as well as a starting window size of 'LINES' and 'COLUMNS'. If the terminal-type is of 'ansi' or 'ansi-bbs', then the cp437 encoding is assumed; otherwise 'utf8'. A blessed-abstracted curses terminal is returned. """ from x84.bbs.ipc import IPCStream from x84.bbs import get_ini log = logging.getLogger(__name__) env['TERM'] = translate_ttype(env.get('TERM', 'unknown')) env['encoding'] = determine_encoding(env) term = Terminal(kind=env['TERM'], stream=IPCStream(writer=writer), rows=int(env.get('LINES', '24')), columns=int(env.get('COLUMNS', '80'))) if term.kind is None: # the given environment's TERM failed curses initialization # because, more than likely, the TERM type was not found. termcap_unknown = get_ini('system', 'termcap-unknown') or 'ansi' log.debug('terminal-type {0} failed, using {1} instead.' .format(env['TERM'], termcap_unknown)) term = Terminal(kind=termcap_unknown, stream=IPCStream(writer=writer), rows=int(env.get('LINES', '24')), columns=int(env.get('COLUMNS', '80')))"terminal type is {0!r}".format(term.kind)) return term
[docs]class TerminalProcess(object): """ Class record for tracking "terminals". Probably of most interest, is that a ``TerminalProcess`` is an abstract association with a multiprocessing.Process sub-process, and its i/o queues (``master_pipes``). This is not a really tty, or even a pseudo-tty (pty)! No termios, fnctl, or any terminal driver i/o is performed, it is all virtual. An instance of this class is stored using :func:`register_tty` and removed by :func:`unregister_tty`, and discovered using :func:`get_terminals`. """ def __init__(self, client, sid, master_pipes): """ Class constructor. """ from x84.bbs import get_ini self.client = client self.sid = sid (self.master_write, self.master_read) = master_pipes self.timeout = get_ini('system', 'timeout') or 0
[docs]def flush_queue(queue): """ Flush all data awaiting on the ipc queue. Seeks any remaining events in queue, used before closing to prevent zombie processes with IPC waiting to be picked up. """ log = logging.getLogger(__name__) try: while queue.poll(): event, data = queue.recv() if event == 'logger': log.handle(data) except (EOFError, IOError) as err: log.debug(err)
[docs]def register_tty(tty): """ Register a :class:`TerminalProcess` instance. """ log = logging.getLogger(__name__) log.debug('[{tty.sid}] registered tty'.format(tty=tty)) TERMINALS[tty.sid] = tty
[docs]def unregister_tty(tty): """ Unregister a :class:`TerminalProcess` instance. """ try: flush_queue(tty.master_read) tty.master_read.close() tty.master_write.close() except (EOFError, IOError) as err: log = logging.getLogger(__name__) log.exception(err) if # signal tcp socket to close tty.client.deactivate() del TERMINALS[tty.sid]
[docs]def get_terminals(): """ Returns a list of all terminals as tuples (session-id, ttys). """ return TERMINALS.items()
[docs]def find_tty(client): """ Given a client, return a matching tty, or None if not registered. """ try: return next(tty for _, tty in get_terminals() if client == tty.client) except StopIteration: pass
[docs]def kill_session(client, reason='killed'): """ Given a client, shutdown its socket and signal subprocess exit. """ from x84.bbs.exception import Disconnected client.shutdown() log = logging.getLogger(__name__) tty = find_tty(client) if tty is not None: try: tty.master_write.send(('exception', Disconnected(reason),)) except (EOFError, IOError): pass'[{tty.sid}] goodbye: {reason}' .format(tty=tty, reason=reason)) unregister_tty(tty)
[docs]def start_process(sid, env, CFG, child_pipes, kind, addrport, matrix_args=None, matrix_kwargs=None): """ A ``multiprocessing.Process`` target. :param str sid: string describing session source (IP address & port). :param dict env: dictionary of client environment variables (must contain at least ``'TERM'``). :param ConfigParser.ConfigParser CFG: bbs configuration :param tuple child_pipes: tuple of ``(writer, reader)`` for engine IPC. :param str kind: what kind of connection as string, ``'telnet'``, ``'ssh'``, etc. :param tuple addrport: ``(client-ip, client-port)`` as string and integer. :param tuple matrix_args: optional positional arguments to pass to matrix script. :param dict matrix_kwargs: optional keyward arguments to pass to matrix script. """ # pylint: disable=R0913,R0914 # Too many arguments (8/5) # Too many local variables (16/15) import x84.bbs.ini from x84.bbs.ipc import make_root_logger from x84.bbs.session import Session from x84.bbs.exception import Disconnected # CFG must be pickled and sent to child process; on windows systems, # fork() does not duplicate that it has been initialized, and requires # sending to child process x84.bbs.ini.CFG = CFG (writer, _) = child_pipes # remove any existing log handlers in child process and replace # with a new root log handler that sends to x84.bbs.engine over IPC. make_root_logger(writer) # instantiate and create a new terminal instance given the value # of env[TERM], negotiated by protocol. May modify the value of # env[TERM] by function translate_ttype terminal = init_term(writer=writer, env=env) try: # instantiate and run session kwargs = { 'terminal': terminal, 'sid': sid, 'env': env, 'child_pipes': child_pipes, 'kind': kind, 'addrport': addrport, 'matrix_args': matrix_args or (), 'matrix_kwargs': matrix_kwargs or {}, } Session(**kwargs).run() except Disconnected as err: log = logging.getLogger(__name__) finally: # signal exit to engine try: writer.send(('exit', None)) except IOError as err: # ignore [Errno 232] The pipe is being closed, # only occurs on win32 platform after early exit if err.errno != 232: raise
[docs]def spawn_client_session(client, matrix_kwargs=None): """ Spawn sub-process for connecting client. Optional """ from multiprocessing import Process, Pipe import x84.bbs.ini child_read, master_write = Pipe(duplex=False) master_read, child_write = Pipe(duplex=False) session_id = '{client.kind}-{client.addrport}'.format(client=client) # start sub-process, which will initialize the terminal and # begins the 'session' for the connecting client. Process(target=start_process, kwargs={ 'sid': session_id, 'env': client.env, 'CFG': x84.bbs.ini.CFG, 'child_pipes': (child_write, child_read), 'kind': client.kind, 'addrport': client.addrport, 'matrix_kwargs': matrix_kwargs, }).start() # and register its tty and master-side pipes for polling by x84.engine register_tty(TerminalProcess(client=client, sid=session_id, master_pipes=(master_write, master_read)))
[docs]def on_naws(client): """ Callback for telnet NAWS negotiation. On a Telnet NAWS sub-negotiation, check if client is yet registered in registry, and if so, send a 'refresh' event down the event queue. This is ultimately handled by :meth:`x84.bbs.session.Session.buffer_event`. """ for _, tty in get_terminals(): if client == tty.client: columns = int(client.env['COLUMNS']) rows = int(client.env['LINES']) tty.master_write.send(('refresh', ('resize', (columns, rows),))) break return True