Source code for x84.telnet

"""
Telnet server for x84.

Limitations:

- No linemode support, character-at-a-time only.
- No out-of-band / data mark (DM) / sync supported
- No flow control (``^S``, ``^Q``)

This is a modified version of miniboa retrieved from svn address
http://miniboa.googlecode.com/svn/trunk/miniboa which is meant for
MUD's. This server would not be safe for most (linemode) MUD clients.

Changes from miniboa:

- character-at-a-time input instead of linemode
- encoding option on send
- strict rejection of linemode
- terminal type detection
- environment variable support
- GA and SGA
- utf-8 safe
"""
# ------------------------------------------------------------------------------
#   miniboa/async.py
#   miniboa/telnet.py
#
#   Copyright 2009 Jim Storch
#
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain a
#   copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.
# ------------------------------------------------------------------------------

from __future__ import absolute_import

# std
import socket
import array
import time
import logging
import select
import errno
from telnetlib import LINEMODE, NAWS, NEW_ENVIRON, ENCRYPT, AUTHENTICATION
from telnetlib import BINARY, SGA, ECHO, STATUS, TTYPE, TSPEED, LFLOW
from telnetlib import XDISPLOC, IAC, DONT, DO, WONT, WILL, SE, NOP, DM, BRK
from telnetlib import IP, AO, AYT, EC, EL, GA, SB

# local
from x84.bbs.exception import Disconnected
from .terminal import spawn_client_session, on_naws
from .client import BaseClient, BaseConnect
from .server import BaseServer

IS = chr(0)  # Sub-process negotiation IS command
SEND = chr(1)  # Sub-process negotiation SEND command
UNSUPPORTED_WILL = (LINEMODE, LFLOW, TSPEED, ENCRYPT, AUTHENTICATION)

#---[ Telnet Notes ]-----------------------------------------------------------
# (See RFC 854 for more information)
#
# Negotiating a Local Option
# --------------------------
#
# Side A begins with:
#
#    "IAC WILL/WONT XX"   Meaning "I would like to [use|not use] option XX."
#
# Side B replies with either:
#
#    "IAC DO XX"     Meaning "OK, you may use option XX."
#    "IAC DONT XX"   Meaning "No, you cannot use option XX."
#
#
# Negotiating a Remote Option
# ----------------------------
#
# Side A begins with:
#
#    "IAC DO/DONT XX"  Meaning "I would like YOU to [use|not use] option XX."
#
# Side B replies with either:
#
#    "IAC WILL XX"   Meaning "I will begin using option XX"
#    "IAC WONT XX"   Meaning "I will not begin using option XX"
#
#
# The syntax is designed so that if both parties receive simultaneous requests
# for the same option, each will see the other's request as a positive
# acknowledgment of it's own.
#
# If a party receives a request to enter a mode that it is already in, the
# request should not be acknowledged.

# Where you see DE in my comments I mean 'Distant End', e.g. the client.

UNKNOWN = -1

#-----------------------------------------------------------------Telnet Option


[docs]class TelnetOption(object): """ Simple class used to track the status of an extended Telnet option. Attributes and their state values: - ``local_option``: UNKNOWN (default), True, or False. - ``remote_option``: UNKNOWN (default), True, or False. - ``reply_pending``: True or Fale. """ # pylint: disable=R0903 # Too few public methods (0/2) def __init__(self): """ Set attribute defaults on init. """ self.local_option = UNKNOWN # Local state of an option self.remote_option = UNKNOWN # Remote state of an option self.reply_pending = False # Are we expecting a reply?
[docs]def name_option(option): """ Perform introspection of global CONSTANTS for equivalent values, and return a string that displays its possible meanings """ values = ';?'.join([k for k, v in globals().iteritems() if option == v and k not in ('SEND', 'IS',)]) return values if values != '' else str(ord(option))
[docs]def debug_option(func): """ This function is a decorator that debug prints the 'from' address for callables decorated with this. This helps during telnet negotiation, to understand which function sets or checks local or remote option states. """ import os import inspect def wrapper(self, *args): """ inner wrapper for debug_option """ stack = inspect.stack() self.log.debug('%s:%s %s(%s%s)', os.path.basename(stack[1][1]), stack[1][2], func.__name__, name_option(args[0]), ', %s' % (args[1],) if len(args) == 2 else '') return func(self, *args) return wrapper
#------------------------------------------------------------------------Telnet
[docs]class TelnetClient(BaseClient): """ Represents a remote Telnet Client, instantiated from TelnetServer. """ # pylint: disable=R0902,R0904 # Too many instance attributes # Too many public methods kind = 'telnet' #: maximum size of telnet subnegotiation string, allowing for a fairly #: large value for NEW_ENVIRON. SB_MAXLEN = 65534 def __init__(self, sock, address_pair, on_naws=None): super(TelnetClient, self).__init__(sock, address_pair, on_naws) self.telnet_sb_buffer = array.array('c') # State variables for interpreting incoming telnet commands self.telnet_got_iac = False self.telnet_got_cmd = None self.telnet_got_sb = False self.telnet_opt_dict = {} self.ENV_REQUESTED = False self.ENV_REPLIED = False
[docs] def request_will_sga(self): """ Request DE to Suppress Go-Ahead. See RFC 858. """ self._iac_will(SGA) self._note_reply_pending(SGA, True)
[docs] def request_will_echo(self): """ Tell the DE that we would like to echo their text. See RFC 857. """ self._iac_will(ECHO) self._note_reply_pending(ECHO, True)
[docs] def request_will_binary(self): """ Tell the DE that we would like to use binary 8-bit (utf8). """ self._iac_will(BINARY) self._note_reply_pending(BINARY, True)
[docs] def request_do_binary(self): """ Tell the DE that we would like them to input binary 8-bit (utf8). """ self._iac_do(BINARY) self._note_reply_pending(BINARY, True)
[docs] def request_do_sga(self): """ Request to Negotiate SGA. See ... """ self._iac_do(SGA) self._note_reply_pending(SGA, True)
[docs] def request_do_naws(self): """ Request to Negotiate About Window Size. See RFC 1073. """ self._iac_do(NAWS) self._note_reply_pending(NAWS, True)
[docs] def request_do_env(self): """ Request to Negotiate About Window Size. See RFC 1073. """ self._iac_do(NEW_ENVIRON) self._note_reply_pending(NEW_ENVIRON, True)
[docs] def request_env(self): """ Request sub-negotiation NEW_ENVIRON. See RFC 1572. """ if self.ENV_REQUESTED: return # avoid asking twice .. rstr = bytes(''.join((IAC, SB, NEW_ENVIRON, SEND, chr(0)))) rstr += bytes(chr(0).join( ("USER TERM SHELL COLUMNS LINES C_CTYPE XTERM_LOCALE DISPLAY " "SSH_CLIENT SSH_CONNECTION SSH_TTY HOME HOSTNAME PWD MAIL LANG " "PWD UID USER_ID EDITOR LOGNAME".split()))) rstr += bytes(''.join((chr(3), IAC, SE))) self.ENV_REQUESTED = True self.send_str(rstr)
[docs] def request_do_ttype(self): """ Begins TERMINAL-TYPE negotiation """ if self.check_remote_option(TTYPE) in (False, UNKNOWN): self._iac_do(TTYPE) self._note_reply_pending(TTYPE, True)
[docs] def request_ttype(self): """ Sends IAC SB TTYPE SEND IAC SE """ self.send_str(bytes(''.join(( IAC, SB, TTYPE, SEND, IAC, SE))))
[docs] def recv_ready(self): """ Returns True if data is awaiting on the telnet socket. """ return (self.is_active() and bool( select.select([self.sock.fileno()], [], [], 0)[0]))
[docs] def socket_recv(self): """ Called by TelnetServer.poll() when recv data is ready. Read any data on socket, processing telnet commands, and buffering all other bytestrings to self.recv_buffer. If data is not received, or the connection is closed, x84.bbs.exception.Disconnected is raised. """ try: data = self.sock.recv(self.BLOCKSIZE_RECV) recv = len(data) if recv == 0: raise Disconnected('Closed by client (EOF)') except socket.error as err: if err.errno == errno.EWOULDBLOCK: return 0 raise Disconnected('socket_recv error: {0}'.format(err)) self.bytes_received += recv self.last_input_time = time.time() # Test for telnet commands, non-telnet bytes # are pushed to self.recv_buffer (side-effect), for byte in data: self._iac_sniffer(byte) return recv
[docs] def send_unicode(self, ucs, encoding='utf8'): """ Buffer unicode string, encoded for client as 'encoding'. """ # Must be escaped 255 (IAC + IAC) to avoid IAC interpretation. self.send_str(ucs.encode(encoding, 'replace').replace(IAC, 2 * IAC))
def _recv_byte(self, byte): """ Buffer non-telnet commands bytestrings into recv_buffer. """ self.recv_buffer.fromstring(byte) def _iac_sniffer(self, byte): """ Watches incomming data for Telnet IAC sequences. Passes the data, if any, with the IAC commands stripped to _recv_byte(). """ # Are we not currently in an IAC sequence coming from the DE? if self.telnet_got_iac is False: if byte == IAC: self.telnet_got_iac = True # Are we currently in a sub-negotiation? elif self.telnet_got_sb is True: self.telnet_sb_buffer.fromstring(byte) # Sanity check on length if len(self.telnet_sb_buffer) >= self.SB_MAXLEN: raise Disconnected('sub-negotiation buffer filled') else: # Just a normal NVT character self._recv_byte(byte) return # Did we get sent a second IAC? if byte == IAC and self.telnet_got_sb is True: # Must be an escaped 255 (IAC + IAC) self.telnet_sb_buffer.fromstring(byte) self.telnet_got_iac = False # Do we already have an IAC + CMD? elif self.telnet_got_cmd is not None: # Yes, so handle the option self._three_byte_cmd(byte) # We have IAC but no CMD else: # Is this the middle byte of a three-byte command? if byte in (DO, DONT, WILL, WONT): self.telnet_got_cmd = byte else: # Nope, must be a two-byte command self._two_byte_cmd(byte) def _two_byte_cmd(self, cmd): """ Handle incoming Telnet commands that are two bytes long. """ # self.log.debug ('recv _two_byte_cmd %s', name_option(cmd),) if cmd == SB: # Begin capturing a sub-negotiation string self.telnet_got_sb = True self.telnet_sb_buffer = array.array('c') elif cmd == SE: # Stop capturing a sub-negotiation string self.telnet_got_sb = False self._sb_decoder() elif cmd == IAC: # IAC, IAC is used for a literal \xff character. self._recv_byte(IAC) elif cmd == IP: self.deactivate() self.log.info('{self.addrport} received (IAC, IP): closing.' .format(self=self)) elif cmd == AO: flushed = len(self.recv_buffer) self.recv_buffer = array.array('c') self.log.debug('Abort Output (AO); %s bytes discarded.', flushed) elif cmd == AYT: self.send_str(bytes('\b')) self.log.debug('Are You There (AYT); "\\b" sent.') elif cmd == EC: self.recv_buffer.fromstring('\b') self.log.debug('Erase Character (EC); "\\b" queued.') elif cmd == EL: self.log.warn('Erase Line (EC) received; ignored.') elif cmd == GA: self.log.warn('Go Ahead (GA) received; ignored.') elif cmd == NOP: self.log.debug('NUL ignored.') elif cmd == DM: self.log.warn('Data Mark (DM) received; ignored.') elif cmd == BRK: self.log.warn('Break (BRK) received; ignored.') else: self.log.error('_two_byte_cmd invalid: %r', cmd) self.telnet_got_iac = False self.telnet_got_cmd = None def _three_byte_cmd(self, option): """ Handle incoming Telnet commmands that are three bytes long. """ cmd = self.telnet_got_cmd self.log.debug('recv IAC %s %s', name_option(cmd), name_option(option)) # Incoming DO's and DONT's refer to the status of this end if cmd == DO: self._handle_do(option) elif cmd == DONT: self._handle_dont(option) elif cmd == WILL: self._handle_will(option) elif cmd == WONT: self._handle_wont(option) else: self.log.debug('{self.addrport}: unhandled _three_byte_cmd: {opt}.' .format(self=self, opt=name_option(option))) self.telnet_got_iac = False self.telnet_got_cmd = None def _handle_do(self, option): """ Process a DO command option received by DE. """ # pylint: disable=R0912 # TelnetClient._handle_do: Too many branches (13/12) # if any pending WILL options were send, they have been received self._note_reply_pending(option, False) if option == ECHO: # DE requests us to echo their input if self.check_local_option(ECHO) is not True: self._note_local_option(ECHO, True) self._iac_will(ECHO) elif option == BINARY: # DE requests to recv BINARY if self.check_local_option(BINARY) is not True: self._note_local_option(BINARY, True) self._iac_will(BINARY) elif option == SGA: # DE wants us to supress go-ahead if self.check_local_option(SGA) is not True: self._note_local_option(SGA, True) # always send DO SGA after WILL SGA, requesting the DE # also supress their go-ahead. this order seems to be the # 'magic sequence' to disable linemode on certain clients self._iac_will(SGA) self._iac_do(SGA) elif option == LINEMODE: # DE wants to do linemode editing # denied if self.check_local_option(option) is not False: self._note_local_option(option, False) self._iac_wont(LINEMODE) elif option == ENCRYPT: # DE is willing to receive encrypted data # denied if self.check_local_option(option) is not False: self._note_local_option(option, False) # let DE know we refuse to send encrypted data. self._iac_wont(ENCRYPT) elif option == STATUS: # DE wants to know if we support STATUS, if self.check_local_option(option) is not True: self._note_local_option(option, True) self._iac_will(STATUS) self._send_status() else: if self.check_local_option(option) is UNKNOWN: self._note_local_option(option, False) self.log.debug('{self.addrport}: unhandled do: {opt}.' .format(self=self, opt=name_option(option))) self._iac_wont(option) def _send_status(self): """ Process a DO STATUS sub-negotiation received by DE. (rfc859) """ # warning: rstr = bytes(''.join((IAC, SB, STATUS, IS))) for opt, status in self.telnet_opt_dict.items(): # my_want_state_is_will if status.local_option is True: self.log.debug('send WILL %s', name_option(opt)) rstr += bytes(''.join((WILL, opt))) elif status.reply_pending is True and opt in (ECHO, SGA): self.log.debug('send WILL %s (want)', name_option(opt)) rstr += bytes(''.join((WILL, opt))) # his_want_state_is_will elif status.remote_option is True: self.log.debug('send DO %s', name_option(opt)) rstr += bytes(''.join((DO, opt))) elif (status.reply_pending is True and opt in (NEW_ENVIRON, NAWS, TTYPE)): self.log.debug('send DO %s (want)', name_option(opt)) rstr += bytes(''.join((DO, opt))) rstr += bytes(''.join((IAC, SE))) self.log.debug('send %s', ' '.join(name_option(opt) for opt in rstr)) self.send_str(rstr) def _handle_dont(self, option): """ Process a DONT command option received by DE. """ self._note_reply_pending(option, False) if option == ECHO: # client demands we do not echo if self.check_local_option(ECHO) is not False: self._note_local_option(ECHO, False) elif option == BINARY: # client demands no binary mode if self.check_local_option(BINARY) is not False: self._note_local_option(BINARY, False) elif option == SGA: # DE demands that we start or continue transmitting # GAs (go-aheads) when transmitting data. if self.check_local_option(SGA) is not False: self._note_local_option(SGA, False) elif option == LINEMODE: # client demands no linemode. if self.check_remote_option(LINEMODE) is not False: self._note_remote_option(LINEMODE, False) else: self.log.debug('{self.addrport}: unhandled dont: {opt}.' .format(self=self, opt=name_option(option))) def _handle_will(self, option): """ Process a WILL command option received by DE. """ # pylint: disable=R0912 # Too many branches (19/12) self._note_reply_pending(option, False) if option == ECHO: raise Disconnected( 'Refuse WILL ECHO by client, closing connection.') elif option == BINARY: if self.check_remote_option(BINARY) is not True: self._note_remote_option(BINARY, True) # agree to use BINARY self._iac_do(BINARY) elif option == NAWS: if self.check_remote_option(NAWS) is not True: self._note_remote_option(NAWS, True) self._note_local_option(NAWS, True) # agree to use NAWS, / go ahead ? self._iac_do(NAWS) elif option == STATUS: if self.check_remote_option(STATUS) is not True: self._note_remote_option(STATUS, True) self.send_str(bytes(''.join(( IAC, SB, STATUS, SEND, IAC, SE)))) # go ahead elif option in UNSUPPORTED_WILL: if self.check_remote_option(option) is not False: # let DE know we refuse to do linemode, encryption, etc. self._iac_dont(option) elif option == SGA: # IAC WILL SUPPRESS-GO-AHEAD # # The sender of this command requests permission to begin # suppressing transmission of the TELNET GO AHEAD (GA) # character when transmitting data characters, or the # sender of this command confirms it will now begin suppressing # transmission of GAs with transmitted data characters. if self.check_remote_option(SGA) is not True: # sender of this command confirms that the sender of data # is expected to suppress transmission of GAs. self._iac_do(SGA) self._note_remote_option(SGA, True) elif option == NEW_ENVIRON: if self.check_remote_option(NEW_ENVIRON) in (False, UNKNOWN): self._note_remote_option(NEW_ENVIRON, True) self.request_env() self._note_local_option(NEW_ENVIRON, True) elif option == XDISPLOC: # if they want to send it, go ahead. if self.check_remote_option(XDISPLOC): self._note_remote_option(XDISPLOC, True) self._iac_do(XDISPLOC) self.send_str(bytes(''.join(( IAC, SB, XDISPLOC, SEND, IAC, SE)))) elif option == TTYPE: if self.check_remote_option(TTYPE) in (False, UNKNOWN): self._note_remote_option(TTYPE, True) self.request_ttype() else: self.log.debug('{self.addrport}: unhandled will: {opt} (ignored).' .format(self=self, opt=name_option(option))) def _handle_wont(self, option): """ Process a WONT command option received by DE. """ # pylint: disable=R0912 # TelnetClient._handle_wont: Too many branches (13/12) self._note_reply_pending(option, False) if option == ECHO: if self.check_remote_option(ECHO) in (True, UNKNOWN): self._note_remote_option(ECHO, False) self._iac_dont(ECHO) elif option == BINARY: # client demands no binary mode if self.check_remote_option(BINARY) in (True, UNKNOWN): self._note_remote_option(BINARY, False) self._iac_dont(BINARY) elif option == SGA: if self._check_reply_pending(SGA): self._note_reply_pending(SGA, False) self._note_remote_option(SGA, False) elif self.check_remote_option(SGA) in (True, UNKNOWN): self._note_remote_option(SGA, False) self._iac_dont(SGA) elif option == TTYPE: if self._check_reply_pending(TTYPE): self._note_reply_pending(TTYPE, False) self._note_remote_option(TTYPE, False) elif self.check_remote_option(TTYPE) in (True, UNKNOWN): self._note_remote_option(TTYPE, False) self._iac_dont(TTYPE) elif option in (NEW_ENVIRON, NAWS): if self._check_reply_pending(option): self._note_reply_pending(option, False) self._note_remote_option(option, False) elif self.check_remote_option(option) in (True, UNKNOWN): self._note_remote_option(option, False) else: self.log.debug('{self.addrport}: unhandled wont: {opt}.' .format(self=self, opt=name_option(option))) self._note_remote_option(option, False) def _sb_decoder(self): """ Figures out what to do with a received sub-negotiation block. """ buf = self.telnet_sb_buffer if 0 == len(buf): self.log.error('nil SB') return self.log.debug('recv SB: %s %s', name_option(buf[0]), 'IS %r' % (buf[2:],) if len(buf) > 1 and buf[1] is IS else repr(buf[1:])) if 1 == len(buf) and buf[0] == chr(0): self.log.error('0nil SB') return elif len(buf) < 2: self.log.error('SB too short') return elif (TTYPE, IS) == (buf[0], buf[1]): self._sb_ttype(buf[2:].tostring()) elif (XDISPLOC, IS) == (buf[0], buf[1]): self._sb_xdisploc(buf[2:].tostring()) elif (NEW_ENVIRON, IS) == (buf[0], buf[1]): self._sb_env(buf[2:].tostring()) elif NAWS == buf[0]: self._sb_naws(buf) elif (STATUS, SEND) == (buf[0], buf[1]): self._send_status() else: self.log.error('unsupported subnegotiation, %s: %r', name_option(buf[0]), buf,) self.telnet_sb_buffer = '' def _sb_xdisploc(self, bytestring): """ Process incoming sub-negotiation XDISPLOC """ prev_display = self.env.get('DISPLAY', None) if prev_display is None: self.log.info("env['DISPLAY'] = %r.", bytestring) elif prev_display != bytestring: self.log.info("env['DISPLAY'] = %r by XDISPLOC was:%s.", bytestring, prev_display) else: self.log.debug('XDSIPLOC ignored (DISPLAY already set).') self.env['DISPLAY'] = bytestring def _sb_ttype(self, bytestring): """ Processes incoming subnegotiation TTYPE """ term_str = bytestring.lower().strip() while term_str.endswith('\x00'): term_str = term_str[:-1] # netrunner did this .. prev_term = self.env.get('TERM', None) if prev_term is None: self.log.debug("env['TERM'] = %r.", term_str) elif prev_term != term_str: self.log.debug("env['TERM'] = %r by TTYPE%s.", term_str, ', was: %s' % (prev_term,) if prev_term != self.TTYPE_UNDETECTED else '') else: self.log.debug('TTYPE ignored (TERM already set).') self.env['TERM'] = term_str def _sb_env(self, bytestring): """ Processes incoming sub-negotiation NEW_ENVIRON """ breaks = list([idx for (idx, byte) in enumerate(bytestring) if byte in (chr(0), chr(3))]) for start, end in zip(breaks, breaks[1:]): pair = bytestring[start + 1:end].split(chr(1)) if len(pair) == 1: if (pair[0] in self.env and pair[0] not in ('LINES', 'COLUMNS', 'TERM')): self.log.warn("del env[%r]", pair[0]) del self.env[pair[0]] elif len(pair) == 2: if pair[0] == 'TERM': pair[1] = pair[1].lower() overwrite = (pair[0] == 'TERM' and self.env['TERM'] == self.TTYPE_UNDETECTED) if (not pair[0] in self.env or overwrite): self.log.debug('env[%r] = %r', pair[0], pair[1]) self.env[pair[0]] = pair[1] elif pair[1] == self.env[pair[0]]: self.log.debug('env[%r] repeated', pair[0]) else: self.log.warn('%s=%s; conflicting value %s ignored.', pair[0], self.env[pair[0]], pair[1]) else: self.log.error('client NEW_ENVIRON; invalid %r', pair) self.ENV_REPLIED = True def _sb_naws(self, charbuf): """ Processes incoming subnegotiation NAWS """ if 5 != len(charbuf): self.log.error('{self.addrport}: bad length in NAWS buf ({buflen})' .format(self=self, buflen=len(charbuf))) return columns = (256 * ord(charbuf[1])) + ord(charbuf[2]) rows = (256 * ord(charbuf[3])) + ord(charbuf[4]) old_rows = self.env.get('LINES', None) old_columns = self.env.get('COLUMNS', None) if (old_rows == str(rows) and old_columns == str(columns)): self.log.debug('{self.addrport}: NAWS repeated'.format(self=self)) return if rows <= 0: self.log.debug('LINES %s ignored', rows) rows = old_rows if columns <= 0: self.log.debug('COLUMNS %s ignored', columns) columns = old_columns self.env['LINES'] = str(rows) self.env['COLUMNS'] = str(columns) if self.on_naws is not None: self.on_naws(self) #---[ State Juggling for Telnet Options ]----------------------------------
[docs] def check_local_option(self, option): """ Test the status of local negotiated Telnet options. """ if option not in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() return self.telnet_opt_dict[option].local_option
def _note_local_option(self, option, state): """ Record the status of local negotiated Telnet options. """ if option not in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() self.telnet_opt_dict[option].local_option = state
[docs] def check_remote_option(self, option): """ Test the status of remote negotiated Telnet options. """ if option not in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() return self.telnet_opt_dict[option].remote_option
def _note_remote_option(self, option, state): """ Record the status of local negotiated Telnet options. """ if option not in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() self.telnet_opt_dict[option].remote_option = state def _check_reply_pending(self, option): """ Test the status of requested Telnet options. """ if option not in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() return self.telnet_opt_dict[option].reply_pending def _note_reply_pending(self, option, state): """ Record the status of requested Telnet options. """ if option not in self.telnet_opt_dict: self.telnet_opt_dict[option] = TelnetOption() self.telnet_opt_dict[option].reply_pending = state #---[ Telnet Command Shortcuts ]------------------------------------------- def _iac_do(self, option): """ Send a Telnet IAC "DO" sequence. """ self.log.debug('send IAC DO %s', name_option(option)) self.send_str(bytes(''.join((IAC, DO, option)))) def _iac_dont(self, option): """ Send a Telnet IAC "DONT" sequence. """ self.log.debug('send IAC DONT %s', name_option(option)) self.send_str(bytes(''.join((IAC, DONT, option)))) def _iac_will(self, option): """ Send a Telnet IAC "WILL" sequence. """ self.log.debug('send IAC WILL %s', name_option(option)) self.send_str(bytes(''.join((IAC, WILL, option)))) def _iac_wont(self, option): """ Send a Telnet IAC "WONT" sequence. """ self.log.debug('send IAC WONT %s', name_option(option)) self.send_str(bytes(''.join((IAC, WONT, option))))
[docs]class ConnectTelnet(BaseConnect): """ Accept new Telnet Connection and negotiate options. """ #: maximum time elapsed allowed to begin on-connect negotiation TIME_NEGOTIATE = 2.50 #: wait upto 3500ms for all stages of negotiation to complete TIME_WAIT_STAGE = 3.50 #: polling duration during negotiation TIME_POLL = 0.10
[docs] def banner(self): """ This method is called after the connection is initiated. This routine happens to communicate with a wide variety of network scanners when listening on the default port on a public IP address. """ # According to Roger Espel Llima (espel@drakkar.ens.fr), you can # have your server send a sequence of control characters: # (0xff 0xfb 0x01) (0xff 0xfb 0x03) (0xff 0xfd 0x0f3). # Which translates to: # (IAC WILL ECHO) (IAC WILL SUPPRESS-GO-AHEAD) # (IAC DO SUPPRESS-GO-AHEAD). self.client.request_will_echo() self.client.request_will_sga() self.client.request_do_sga() # add DO & WILL BINARY, for utf8 input/output. self.client.request_do_binary() self.client.request_will_binary() # and terminal type, naws, and env, self.client.request_do_ttype() self.client.request_do_naws() self.client.request_do_env() self.client.send() # push
[docs] def run(self): """ Negotiate and inquire about terminal type, telnet options, window size, and tcp socket options before spawning a new session. """ try: self._set_socket_opts() mrk_bytes = self.client.bytes_received self.banner() # wait at least {TIME_NEGOTIATE} for the client to speak. # If it doesn't, we try only TTYPE. # If it fails to report that, we forget the rest. self.log.debug('{client.addrport}: pausing for negotiation' .format(client=self.client)) st_time = time.time() while ((0 == self.client.bytes_received or mrk_bytes == self.client.bytes_received) and time.time() - st_time < self.TIME_NEGOTIATE): if not self.client.is_active(): return if self.client.send_ready(): self.client.send() time.sleep(self.TIME_POLL) if self._check_ttype(start_time=st_time): # if client is able to negotiate terminal type, # try NAWS or ENV negotiation. if self.client.is_active(): self._check_env(start_time=st_time) if self.client.is_active(): self._check_naws(start_time=st_time) self.set_encoding() if self.client.is_active(): return spawn_client_session(client=self.client) except (Disconnected, socket.error) as err: self.log.debug('{client.addrport}: connection closed: {err}' .format(client=self.client, err=err)) except EOFError: self.log.debug('{client.addrport}: EOF from client' .format(client=self.client)) except Exception as err: self.log.debug('{client.addrport}: connection closed: {err}' .format(client=self.client, err=err)) finally: self.stopped = True self.client.deactivate()
def set_encoding(self): # set encoding to utf8 for clients negotiating BINARY mode and # not beginning with TERM 'ansi'. # # This assumes a very simple dualistic model: modern unix terminal # (probably using bsd telnet client), or SyncTerm. # # Clients *could* negotiate CHARSET for encoding, or simply forward a # LANG variable -- SyncTerm does neither. So we just assume any # terminal that says its "ansi" is just any number of old-world DOS # terminal emulating clients that are incapable of comprehending # "encoding" (Especially multi-byte!), they only decide upon a "font" # or "code page" to map char 0-255 to. # self.client.env['encoding'] = 'cp437' local = self.client.check_local_option remote = self.client.check_remote_option term = self.client.env.get('TERM', 'ansi') if (local(BINARY) and remote(BINARY) and not term.startswith('ansi')): self.client.env['encoding'] = 'utf8' def _timeleft(self, st_time): """ Returns True when difference of current time and st_time is below TIME_WAIT_STAGE. """ return bool(time.time() - st_time < self.TIME_WAIT_STAGE) def _check_env(self, start_time): """ Check for NEW_ENVIRON negotiation. """ def detected(): return (self.client.check_remote_option(NEW_ENVIRON) is not UNKNOWN or self.client.ENV_REPLIED) while not detected() and self._timeleft(start_time): if not self.client.is_active(): return if self.client.send_ready(): self.client.send() time.sleep(self.TIME_POLL) if detected(): self.log.debug('{client.addrport}: ENV={client.env!r}.' .format(client=self.client)) else: self.log.debug('{client.addrport}: request-do-new_environ failed.' .format(client=self.client)) def _check_naws(self, start_time): """ Check for NAWS negotiation. """ def detected(): return (self.client.env.get('LINES', None) is not None and self.client.env.get('COLUMNS', None) is not None) while not detected() and self._timeleft(start_time): if not self.client.is_active(): return if self.client.send_ready(): self.client.send() time.sleep(self.TIME_POLL) if detected(): self.log.debug('{client.addrport}: COLUMNS={client.env[COLUMNS]}, ' 'LINES={client.env[LINES]}.' .format(client=self.client)) else: self.log.debug('{client.addrport}: request-do-naws failed.' .format(client=self.client)) def _check_ttype(self, start_time): """ Check for TTYPE negotiation. """ def detected(): return self.client.env['TERM'] != self.client.TTYPE_UNDETECTED while not detected() and self._timeleft(start_time): if not self.client.is_active(): return False if self.client.send_ready(): self.client.send() time.sleep(self.TIME_POLL) if detected(): self.log.debug('{client.addrport}: TERM={client.env[TERM]}.' .format(client=self.client)) return True self.log.debug('{client.addrport}: request-terminal-type failed.' .format(client=self.client)) return False
[docs]class TelnetServer(BaseServer): """ Poll sockets for new connections and sending/receiving data from clients. """ client_factory = TelnetClient connect_factory = ConnectTelnet client_factory_kwargs = dict(on_naws=on_naws) # Dictionary of active clients, (file descriptor, TelnetClient,) clients = {} def __init__(self, config): """ Create a new Telnet Server. :param ConfigParser.ConfigParser config: configuration section ``[telnet]``, with options ``'addr'``, ``'port'`` """ self.log = logging.getLogger(__name__) self.address = config.get('telnet', 'addr') self.port = config.getint('telnet', 'port') # bind self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: self.server_socket.bind((self.address, self.port)) self.server_socket.listen(self.LISTEN_BACKLOG) except socket.error as err: self.log.error('Unable to bind {0}:{1}: {2}' .format(self.address, self.port, err)) exit(1) self.log.info('telnet listening on {self.address}:{self.port}/tcp' .format(self=self))