Source code for x84.msgpoll

#!/usr/bin/env python2.7
""" x84net message poll for x/84. """

# std imports
import logging
import hashlib
import time
import json
import os

# local
from . import cmdline

# 3rd-party
import requests


[docs]def get_token(network): """ get token for authentication """ tm_value = int(time.time()) token = hashlib.sha256('{0}{1}'.format(network['token'], tm_value)) return '{0}|{1}|{2}'.format(network['board_id'], token.hexdigest(), tm_value)
[docs]def prepare_message(msg, network, parent): """ turn a Msg object into a dict for transfer """ from x84.bbs.msgbase import format_origin_line, to_utctime return { 'author': msg.author, 'subject': msg.subject, 'recipient': msg.recipient, 'parent': parent, 'tags': [tag for tag in msg.tags if tag != network['name']], 'body': u''.join((msg.body, format_origin_line())), 'ctime': to_utctime(msg.ctime) }
[docs]def pull_rest(net, last_msg_id): """ pull messages for a given network newer than the 'last' message idx """ url = '%smessages/%s/%s' % (net['url_base'], net['name'], last_msg_id) log = logging.getLogger(__name__) try: req = requests.get(url, headers={'Auth-X84net': get_token(net)}, verify=net['verify']) except requests.ConnectionError as err: log.warn('[{net[name]}] ConnectionError in pull_rest: {err}' .format(net=net, err=err)) return False except Exception as err: log.exception('[{net[name]}] exception in pull_rest: {err}' .format(net=net, err=err)) return False if req.status_code != 200: log.error('[{net[name]}] HTTP error, code={req.status_code}' .format(net=net, req=req)) return False try: response = json.loads(req.text) return response['messages'] if response['response'] else [] except Exception as err: log.exception('[{net[name]}] JSON error: {err}' .format(net=net, err=err)) return False
[docs]def push_rest(net, msg, parent): """ push message for a given network and append an origin line """ msg_data = prepare_message(msg, net, parent) url = '{net[url_base]}messages/{net[name]}/'.format(net=net) data = {'message': json.dumps(msg_data)} log = logging.getLogger(__name__) try: req = requests.put(url, headers={'Auth-X84net': get_token(net)}, data=data, verify=net['verify']) except Exception as err: log.exception('[{net[name]}] exception in push_rest: {err}' .format(net=net, err=err)) return False if req.status_code not in (200, 201): log.error('{net[name]} HTTP error, code={req.status_code}' .format(net=net, req=req)) return False try: response = json.loads(req.text) except Exception as err: log.exception('[{net[name]}] JSON error: {err}' .format(net=net, err=err)) else: if response['response'] and 'id' in response: return response['id'] return False
[docs]def get_networks(): """ Get list configured message networks. """ from x84.bbs import get_ini log = logging.getLogger(__name__) # pull list of network-associated tags network_list = get_ini(section='msg', key='network_tags', split=True) # expected configuration options, net_options = ('url_base token board_id'.split()) networks = list() for net_name in network_list: net = {'name': net_name} section = 'msgnet_{0}'.format(net_name) configured = True for option in net_options: if not get_ini(section=section, key=option): log.error('[{net_name}] Missing configuration, ' 'section=[{section}], option={option}.' .format(net_name=net_name, section=section, option=option)) configured = False net[option] = get_ini(section=section, key=option) if not configured: continue # make last_file an absolute path, relative to `datapath` net['last_file'] = os.path.join( os.path.expanduser(get_ini(section='system', key='datapath')), '{net[name]}_last'.format(net=net)) net['verify'] = True ca_path = get_ini(section=section, key='ca_path') if ca_path: ca_path = os.path.expanduser(ca_path) if not os.path.isfile(ca_path): log.warn("File not found for Config section [{section}], " "option {key}, value={ca_path}. default ca_verify " "will be used. ".format(section=section, key='ca_path', ca_path=ca_path)) else: net['verify'] = ca_path networks.append(net) return networks
[docs]def get_last_msg_id(last_file): """ Get the "last message id" by data file ``last_file``. """ # TODO(jquast): This should have been done internally (and far # more easily!) by a DBProxy database. last_msg_id = -1 log = logging.getLogger(__name__) try: # May raise IOError (File not Found) with open(last_file, 'r') as last_fp: last_msg_id = int(last_fp.read().strip()) except IOError: # So, create it; but this too, may raise an # OSError (Permission Denied), handled by caller. with open(last_file, 'w') as last_fp: last_fp.write(str(last_msg_id)) log.info('last_file created: {0}'.format(last_file)) return last_msg_id
[docs]def poll_network_for_messages(net): """ Poll for new messages of network, ``net``. """ from x84.bbs import Msg, DBProxy from x84.bbs.msgbase import to_localtime log = logging.getLogger(__name__) log.debug(u'[{net[name]}] Polling for new messages.'.format(net=net)) try: last_msg_id = get_last_msg_id(net['last_file']) except (OSError, IOError) as err: log.error('[{net[name]}] skipping network: {err}' .format(net=net, err=err)) return msgs = pull_rest(net=net, last_msg_id=last_msg_id) if msgs: log.info('[{net[name]}] Retrieved {num} messages.' .format(net=net, num=len(msgs))) else: log.debug('[{net[name]}] No messages.'.format(net=net)) return transdb = DBProxy('{0}trans'.format(net['name']), use_session=False) transkeys = transdb.keys() msgs = sorted(msgs, cmp=lambda x, y: cmp(int(x['id']), int(y['id']))) # store messages locally, saving their translated IDs to the transdb for msg in msgs: store_msg = Msg() store_msg.recipient = msg['recipient'] store_msg.author = msg['author'] store_msg.subject = msg['subject'] store_msg.body = msg['body'] store_msg.tags = set(msg['tags']) store_msg.tags.add(u''.join((net['name']))) if msg['recipient'] is None and u'public' not in msg['tags']: log.warn("[{net[name]}] No recipient (msg_id={msg[id]}), " "adding 'public' tag".format(net=net, msg=msg)) store_msg.tags.add(u'public') if (msg['parent'] is not None and str(msg['parent']) not in transkeys): log.warn('[{net[name]}] No such parent message ({msg[parent]}, ' 'msg_id={msg[id]}), removing reference.' .format(net=net, msg=msg)) elif msg['parent'] is not None: store_msg.parent = int(transdb[msg['parent']]) if msg['id'] in transkeys: log.warn('[{net[name]}] dupe (msg_id={msg[id]}) discarded.' .format(net=net, msg=msg)) else: # do not save this message to network, we already received # it from the network, set send_net=False store_msg.save(send_net=False, ctime=to_localtime(msg['ctime'])) with transdb: transdb[msg['id']] = store_msg.idx transkeys.append(msg['id']) log.info('[{net[name]}] Processed (msg_id={msg[id]}) => {new_id}' .format(net=net, msg=msg, new_id=store_msg.idx)) if 'last' not in net.keys() or int(net['last']) < int(msg['id']): net['last'] = msg['id'] if 'last' in net.keys(): with open(net['last_file'], 'w') as last_fp: last_fp.write(str(net['last'])) return
[docs]def publish_network_messages(net): """ Push messages to network, ``net``. """ from x84.bbs import DBProxy from x84.bbs.msgbase import format_origin_line, MSGDB log = logging.getLogger(__name__) log.debug(u'[{net[name]}] publishing new messages.'.format(net=net)) queuedb = DBProxy('{0}queues'.format(net['name']), use_session=False) transdb = DBProxy('{0}trans'.format(net['name']), use_session=False) msgdb = DBProxy(MSGDB, use_session=False) # publish each message for msg_id in sorted(queuedb.keys(), cmp=lambda x, y: cmp(int(x), int(y))): if msg_id not in msgdb: log.warn('[{net[name]}] No such message (msg_id={msg_id})' .format(net=net, msg_id=msg_id)) del queuedb[msg_id] continue msg = msgdb[msg_id] trans_parent = None if msg.parent is not None: matches = [key for key, data in transdb.items() if int(data) == msg.parent] if len(matches) > 0: trans_parent = matches[0] else: log.warn('[{net[name]}] Parent ID {msg.parent} ' 'not in translation-DB (msg_id={msg_id})' .format(net=net, msg=msg, msg_id=msg_id)) trans_id = push_rest(net=net, msg=msg, parent=trans_parent) if trans_id is False: log.error('[{net[name]}] Message not posted (msg_id={msg_id})' .format(net=net, msg_id=msg_id)) continue if trans_id in transdb.keys(): log.error('[{net[name]}] trans_id={trans_id} conflicts with ' '(msg_id={msg_id})' .format(net=net, trans_id=trans_id, msg_id=msg_id)) with queuedb: del queuedb[msg_id] continue # transform, and possibly duplicate(?) message .. with transdb, msgdb, queuedb: transdb[trans_id] = msg_id msg.body = u''.join((msg.body, format_origin_line())) msgdb[msg_id] = msg del queuedb[msg_id] log.info('[{net[name]}] Published (msg_id={msg_id}) => {trans_id}' .format(net=net, msg_id=msg_id, trans_id=trans_id))
[docs]def poller(poll_interval): """ Blocking function periodically polls configured message networks. """ log = logging.getLogger(__name__) # get all networks networks = get_networks() if networks: while True: do_poll(networks) time.sleep(poll_interval) else: log.error(u'No networks configured for poll/publish.')
[docs]def main(background_daemon=True): """ Entry point to configure and begin network message polling. Called by x84/engine.py, function main() as unmanaged thread. :param bool background_daemon: When True (default), this function returns and web modules are served in an unmanaged, background (daemon) thread. Otherwise, function call to ``main()`` is blocking. :rtype: None """ from threading import Thread from x84.bbs.ini import get_ini log = logging.getLogger(__name__) poll_interval = get_ini(section='msg', key='poll_interval', getter='getint' ) or 1984 if background_daemon: t = Thread(target=poller, args=(poll_interval,)) t.daemon = True log.info('msgpoll at {0}s intervals.'.format(poll_interval)) t.start() else: poller(poll_interval)
[docs]def do_poll(networks): """ Message polling process. Function is called periodically by :func:`poller`. """ # pull-from all networks map(poll_network_for_messages, networks) # publish-to all networks map(publish_network_messages, networks)
if __name__ == '__main__': # load only message polling module when executing this script directly. # # as we are running outside of the 'engine' context, it is necessary # for us to initialize the .ini configuration scheme so that the list # of web modules and ssl options may be gathered. import x84.bbs.ini x84.bbs.ini.init(*cmdline.parse_args()) # do not execute message polling as a background thread. main(background_daemon=False)