Source code for x84.fail2ban

"""
fail2ban module for x/84.

To enable, add to default.ini::

    [fail2ban]
    enabled = yes

The following options are available, but not required:

- ``ip_blacklist``: space-separated list of IPs on permanent blacklist.
- ``ip_whitelist``: space-separated list of IPs to always allow.
- ``max_attempted_logins``: max no. of logins allowed for given time window
- ``max_attempted_logins_window``: the length (in seconds) of the time window
  for which logins will be tracked (sliding scale).
- ``initial_ban_length``: ban length (in seconds) when an IP is blacklisted.
- ``ban_increment_length``: amount of time (in seconds) to add to a ban on
  subsequent login attempts
"""

# std imports
import logging
import time

# globals
BANNED_IP_LIST, ATTEMPTED_LOGINS = dict(), dict()


[docs]def get_fail2ban_function(): """ Return a function used to ban aggressively-connecting clients. This is analogous to the 'fail2ban' utility, for example, telnet or ssh connect scanners. Returns a function which may be passed an IP address, returning True if the connection from address ``ip`` should be accepted. :return: function accepting ip address, returning boolean :rtype: callable """ # local imports from x84.bbs import get_ini if not get_ini(section='fail2ban', key='enabled', getter='getboolean'): return lambda ip: True # configuration ip_blacklist = get_ini(section='fail2ban', key='ip_blacklist', split=True) ip_whitelist = get_ini(section='fail2ban', key='ip_whitelist', split=True) max_attempted_logins = get_ini( section='fail2ban', key='max_attempted_logins', getter='getint' ) or 3 max_attempted_logins_window = get_ini( section='fail2ban', key='max_attempted_logins_window', getter='getint' ) or 30 initial_ban_length = get_ini( section='fail2ban', key='initial_ban_length', getter='getint' ) or 360 ban_increment_length = get_ini( section='fail2ban', key='ban_increment_length', getter='getint' ) or 360 def wrapper(ip): """ Inner wrapper function. """ log = logging.getLogger(__name__) # pylint: disable=W0602 # Using global for 'BANNED_IP_LIST' but no assignment is done global BANNED_IP_LIST, ATTEMPTED_LOGINS now = int(time.time()) # check to see if IP is blacklisted if ip in ip_blacklist: log.debug('Blacklisted IP rejected: {ip}'.format(ip=ip)) return False # check to see if IP is banned elif ip in BANNED_IP_LIST: # expired? if now > BANNED_IP_LIST[ip]: # expired ban; remove it del BANNED_IP_LIST[ip] ATTEMPTED_LOGINS[ip] = { 'attempts': 1, 'expiry': now + max_attempted_logins_window } log.debug('Banned IP expired: {ip}'.format(ip=ip)) else: # increase the expiry and kick them out BANNED_IP_LIST[ip] += ban_increment_length log.debug('Banned IP rejected: {ip}'.format(ip=ip)) return False # check num of attempts, ban if exceeded max elif ip in ATTEMPTED_LOGINS: if now > ATTEMPTED_LOGINS[ip]['expiry']: # window closed; start over record = ATTEMPTED_LOGINS[ip] record['attempts'] = 1 record['expiry'] = now + max_attempted_logins_window ATTEMPTED_LOGINS[ip] = record log.debug('Attempt outside of expiry window') elif ATTEMPTED_LOGINS[ip]['attempts'] > max_attempted_logins: # max # of attempts reached del ATTEMPTED_LOGINS[ip] BANNED_IP_LIST[ip] = now + initial_ban_length log.warn('Exceeded maximum attempts; banning {ip}' .format(ip=ip)) return False else: # extend window record = ATTEMPTED_LOGINS[ip] record['attempts'] += 1 record['expiry'] += max_attempted_logins_window ATTEMPTED_LOGINS[ip] = record log.debug('Window extended') # log attempted login elif ip not in ip_whitelist: log.debug('First attempted login for this window') ATTEMPTED_LOGINS[ip] = { 'attempts': 1, 'expiry': now + max_attempted_logins_window, } return True return wrapper