Wednesday, August 14, 2013

Centralized logging for distributed applications with pyzmq

Simpler distributed applications can take advantage of centralized logging. PyZMQ, a Python bindings for ØMQ provides log handlers for the python logging module and can be easily used for this purpose. Log handlers utilizes ØMQ Pub/Sub pattern and broadcasts log messages through a PUB socket. It is quite easy to construct the message collector and write messages to a central location.
         +-------------+                        |
         |Machine1:App2|                 +---------------+
         +-------------+                          |
                   +-------------+                |

Client Application
To start with, we will need pyzmq library and support for logging library.
import logging
import random
import time
import zmq
from zmq.log.handlers import PUBHandler
Useful format that identifies where the logs are emanating from.

LOG_LEVELS = (logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL)

formatters = {
        logging.DEBUG: logging.Formatter("%(filename)s:%(lineno)d | %(message)s\n"),
        logging.INFO: logging.Formatter("%(filename)s:%(lineno)d | %(message)s\n"),
        logging.WARN: logging.Formatter("%(filename)s:%(lineno)d | %(message)s\n"),
        logging.ERROR: logging.Formatter("%(filename)s:%(lineno)d | %(message)s\n"),
        logging.CRITICAL: logging.Formatter("%(filename)s:%(lineno)d | %(message)s\n")
interval = 1
port = 5558
And finally the log handler that allows publication of messages over a PUB zmq socket.
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.connect('tcp://' % port)
logger = logging.getLogger("clientapp1")
handler = PUBHandler(pub)
handler.formatters = formatters
while True:
        level = random.choice(LOG_LEVELS)
        logger.log(level, "subtopic.subsub::Hello from %i" % os.getpid())

You may have also notice the use of specific style of message that helps you provide a specific subtopic which is useful for logging structure. Finally, we will implement the centralized logger.

Centralized logger
import as zmq
import logging
import logging.handlers

              'INFO': logging.INFO,
              'WARN': logging.WARN,
              'ERROR': logging.ERROR,
              'CRITICAL': logging.CRITICAL
port = 5558

The centralized logger implements the SUB pattern (of PUB/SUB) to subscribe to published messages and log the messages to a file. The published messages could emanate from different applications on different machines and provides for centralized logging.
logger = logging.getLogger()
context = zmq.Context(context)
socket_fd = context.socket(zmq.SUB)
socket_fd.bind("tcp://localhost:%s" % port)
socket_fd.setsockopt(zmq.SUBSCRIBE, "")
filehandler = logging.handlers.TimedRotatingFileHandler('log file', 'midnight',1)
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
while True:
        topic, message = socket_fd.recv_multipart()
        pos = topic.find('.')
        level = topic
        if pos > 0: level = topic[:pos]
        if message.endswith('\n'): message = message[:-1]
        log_msg = getattr(logging, level.lower())
        if pos > 0: message = topic[pos+1:] + " | " + message