Implementing a Kafka Producer and Consumer in Python

Today we’re going to play with Kafka. We’ll implement a simple producer and consumer in Python using the kafka-python library. The project consists of two main components: First tne producer. It uses a dedicated class to send messages to a Kafka topic. One consumer. It Listens to a Kafka topic, processes messages received, and commits their offsets. Communication with Kafka is handled by a helper module that encapsulates producer and consumer configurations. The setup uses Docker Compose to manage the Kafka broker and supporting services such as Zookeeper.

Below is a simplified Producer class and corresponding function:

import json
import logging

from jsonencoder import DefaultEncoder
from kafka import KafkaProducer

from settings import KAFKA_BOOTSTRAP_SERVERS

logger = logging.getLogger(__name__)


def get_producer():
    return KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda data: json.dumps(data, cls=DefaultEncoder).encode('utf-8')
    )


class Producer:
    def __init__(self):
        self.producer = get_producer()

    def send(self, topic: str, message: any):
        try:
            self.producer.send(topic, value=message)
            self.producer.flush()
            logger.info(f"Message sent to topic: {topic}: {message}")
        except Exception as e:
            logger.error(f"Error sending message to {topic}: {str(e)}")
            raise
        finally:
            if self.producer:
                self.producer.close()


def send_message(topic: str, message: any):
    producer = Producer()
    producer.send(topic, message)

We’re using click to build the command line interface.

import click
from datetime import datetime
from lib.kafka_broker import send_message


@click.command()
@click.option('--topic', required=True, help='topic')
@click.option('--message', required=True, help='message')
def run(topic, message):
    send_message(topic, dict(
        timestamp=datetime.now().isoformat(),
        body=message
    ))

The consumer processes messages by consuming them from a Kafka topic. When a message is received, it gets logged and the consumer commits the offsets, ensuring that no message is processed more than once. The consumer functionality is implemented in a callback that is passed as a parameter to the topic consumption function.

Below is the consumer’s function definition and command setup:

import logging
import click
from kafka import KafkaConsumer
from kafka.protocol.message import Message
from lib.kafka_broker import consume_topic

logger = logging.getLogger(__name__)

def process_message(message: Message, consumer: KafkaConsumer) -> None:
    logger.info(f"received message: {message.value}")
    consumer.commit()

@click.command()
@click.option('--topic', required=True, help='topic')
def run(topic):
    consume_topic(topic, process_message)

The consume_topic function (from lib/kafka_broker.py) configures the Kafka consumer to listen to a specific topic. On receipt of each message, the process_mensaje callback handles the message by logging information and committing the consumer’s current offset.

import json
import logging
from typing import Protocol

from kafka import KafkaConsumer
from kafka.protocol.message import Message

from settings import KAFKA_BOOTSTRAP_SERVERS

logger = logging.getLogger(__name__)

EARLIEST = 'earliest'  # Automatically reset the offset to the earliest offset.
LATEST = 'latest'  # Automatically reset the offset to the latest offset.
NONE = 'none'  # You must set the partition and index manually.


def get_consumer(topic, *,
                 auto_commit=False,
                 group_id=None,
                 auto_offset_reset=EARLIEST) -> KafkaConsumer:
    return KafkaConsumer(
        topic,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=auto_commit,
        group_id=group_id,
        value_deserializer=lambda data: json.loads(data.decode('utf-8'))
    )


class MessageProcessorProtocol(Protocol):
    def __call__(self, message: Message, consumer: KafkaConsumer) -> None:
        ...


def consume_topic(topic, callback: MessageProcessorProtocol, stop_event=None):
    logger.info(f"Listening to topic: {topic}")
    consumer = get_consumer(topic, group_id=topic)
    try:
        while stop_event is None or not stop_event.is_set():
            messages = consumer.poll(timeout_ms=1000)
            for tp, msgs in messages.items():
                for mensaje in msgs:
                    logger.info(f"Received message: {mensaje.value} "
                                f"Partition: {mensaje.partition}, "
                                f"Offset: {mensaje.offset}")
                    callback(mensaje, consumer)
    finally:
        consumer. Close()

The project relies on Docker Compose to run the required Kafka and Zookeeper containers. This setup allows the application to interact with a local Kafka broker without needing complex installation processes. A simplified excerpt of the docker-compose.yml file is shown below:


version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-net

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge

And that’s all. Source code in my github account.

Sending logs to a remote server using RabbitMQ

Time ago I wrote an article to show how to send Silex logs to a remote server. Today I want to use a messaging queue to do it. Normally, when I need queues, I use Gearman but today I want to play with RabbitMQ.

When we work with web applications it’s important to have, in some way or another, one way to decouple operations from the main request. Messaging queues are great tools to perform those operations. They even allow us to create our workers with a different languages than the main request. This days, for example, I’m working with modbus devices. The whole modbus logic is written in Python and I want to use a Frontend with PHP. I can rewrite the modbus logic with PHP (there’re PHP libraries to connect with modbus devices), but I’m not so crazy. Queues are our friends.

The idea in this post is the same than the previous post. We’ll use event dispatcher to emit events and we’ll send those events to a RabitMQ queue. We’ll use a Service Provider called.

<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use RabbitLogger\LoggerServiceProvider;
use Silex\Application;
use Symfony\Component\HttpKernel\Event;
use Symfony\Component\HttpKernel\KernelEvents;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$app = new Application(['debug' => true]);
$app->register(new LoggerServiceProvider($connection, $channel));

$app->on(KernelEvents::TERMINATE, function (Event\PostResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('TERMINATE');
});

$app->on(KernelEvents::CONTROLLER, function (Event\FilterControllerEvent $event) use ($app) {
    $app['rabbit.logger']->info('CONTROLLER');
});

$app->on(KernelEvents::EXCEPTION, function (Event\GetResponseForExceptionEvent $event) use ($app) {
    $app['rabbit.logger']->info('EXCEPTION');
});

$app->on(KernelEvents::FINISH_REQUEST, function (Event\FinishRequestEvent $event) use ($app) {
    $app['rabbit.logger']->info('FINISH_REQUEST');
});

$app->on(KernelEvents::RESPONSE, function (Event\FilterResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('RESPONSE');
});

$app->on(KernelEvents::REQUEST, function (Event\GetResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('REQUEST');
});

$app->on(KernelEvents::VIEW, function (Event\GetResponseForControllerResultEvent $event) use ($app) {
    $app['rabbit.logger']->info('VIEW');
});

$app->get('/', function (Application $app) {
    $app['rabbit.logger']->info('inside route');
    return "HELLO";
});

$app->run();

Here we can see the service provider:

<?php
namespace RabbitLogger;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Silex\Application;
use Silex\ServiceProviderInterface;

class LoggerServiceProvider implements ServiceProviderInterface
{
    private $connection;
    private $channel;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel)
    {
        $this->connection = $connection;
        $this->channel    = $channel;
    }

    public function register(Application $app)
    {
        $app['rabbit.logger'] = $app->share(
            function () use ($app) {
                $channelName = isset($app['logger.channel.name']) ? $app['logger.channel.name'] : 'logger.channel';
                return new Logger($this->connection, $this->channel, $channelName);
            }
        );
    }

    public function boot(Application $app)
    {
    }
}

And here the logger:

<?php
namespace RabbitLogger;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Silex\Application;

class Logger implements LoggerInterface
{
    private $connection;
    private $channel;
    private $queueName;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel, $queueName = 'logger')
    {
        $this->connection = $connection;
        $this->channel    = $channel;
        $this->queueName  = $queueName;
        $this->channel->queue_declare($queueName, false, false, false, false);
    }

    function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }

    public function emergency($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::EMERGENCY);
    }

    public function alert($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::ALERT);
    }

    public function critical($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::CRITICAL);
    }

    public function error($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::ERROR);
    }

    public function warning($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::WARNING);
    }

    public function notice($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::NOTICE);
    }

    public function info($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::INFO);
    }

    public function debug($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::DEBUG);
    }
    public function log($level, $message, array $context = [])
    {
        $this->sendLog($message, $context, $level);
    }

    private function sendLog($message, array $context = [], $level = LogLevel::INFO)
    {
        $msg = new AMQPMessage(json_encode([$message, $context, $level]), ['delivery_mode' => 2]);
        $this->channel->basic_publish($msg, '', $this->queueName);
    }
}

And finally the RabbitMQ Worker to process our logs

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('logger.channel', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//$channel->basic_qos(null, 1, null);
$channel->basic_consume('logger.channel', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

To run the example we must:

Start RabbitMQ server

rabbitmq-server

start Silex server

php -S 0.0.0.0:8080 -t www

start worker

php worker/worker.php

You can see whole project in my github account