Today we’re going to play with Kafka. We’ll implement a simple producer and consumer in Python using the kafka-python library. The project consists of two main components: First tne producer. It uses a dedicated class to send messages to a Kafka topic. One consumer. It Listens to a Kafka topic, processes messages received, and commits their offsets. Communication with Kafka is handled by a helper module that encapsulates producer and consumer configurations. The setup uses Docker Compose to manage the Kafka broker and supporting services such as Zookeeper.
Below is a simplified Producer class and corresponding function:
import json
import logging
from jsonencoder import DefaultEncoder
from kafka import KafkaProducer
from settings import KAFKA_BOOTSTRAP_SERVERS
logger = logging.getLogger(__name__)
def get_producer():
return KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda data: json.dumps(data, cls=DefaultEncoder).encode('utf-8')
)
class Producer:
def __init__(self):
self.producer = get_producer()
def send(self, topic: str, message: any):
try:
self.producer.send(topic, value=message)
self.producer.flush()
logger.info(f"Message sent to topic: {topic}: {message}")
except Exception as e:
logger.error(f"Error sending message to {topic}: {str(e)}")
raise
finally:
if self.producer:
self.producer.close()
def send_message(topic: str, message: any):
producer = Producer()
producer.send(topic, message)
We’re using click to build the command line interface.
import click
from datetime import datetime
from lib.kafka_broker import send_message
@click.command()
@click.option('--topic', required=True, help='topic')
@click.option('--message', required=True, help='message')
def run(topic, message):
send_message(topic, dict(
timestamp=datetime.now().isoformat(),
body=message
))
The consumer processes messages by consuming them from a Kafka topic. When a message is received, it gets logged and the consumer commits the offsets, ensuring that no message is processed more than once. The consumer functionality is implemented in a callback that is passed as a parameter to the topic consumption function.
Below is the consumer’s function definition and command setup:
import logging
import click
from kafka import KafkaConsumer
from kafka.protocol.message import Message
from lib.kafka_broker import consume_topic
logger = logging.getLogger(__name__)
def process_message(message: Message, consumer: KafkaConsumer) -> None:
logger.info(f"received message: {message.value}")
consumer.commit()
@click.command()
@click.option('--topic', required=True, help='topic')
def run(topic):
consume_topic(topic, process_message)
The consume_topic function (from lib/kafka_broker.py) configures the Kafka consumer to listen to a specific topic. On receipt of each message, the process_mensaje callback handles the message by logging information and committing the consumer’s current offset.
import json
import logging
from typing import Protocol
from kafka import KafkaConsumer
from kafka.protocol.message import Message
from settings import KAFKA_BOOTSTRAP_SERVERS
logger = logging.getLogger(__name__)
EARLIEST = 'earliest' # Automatically reset the offset to the earliest offset.
LATEST = 'latest' # Automatically reset the offset to the latest offset.
NONE = 'none' # You must set the partition and index manually.
def get_consumer(topic, *,
auto_commit=False,
group_id=None,
auto_offset_reset=EARLIEST) -> KafkaConsumer:
return KafkaConsumer(
topic,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=auto_commit,
group_id=group_id,
value_deserializer=lambda data: json.loads(data.decode('utf-8'))
)
class MessageProcessorProtocol(Protocol):
def __call__(self, message: Message, consumer: KafkaConsumer) -> None:
...
def consume_topic(topic, callback: MessageProcessorProtocol, stop_event=None):
logger.info(f"Listening to topic: {topic}")
consumer = get_consumer(topic, group_id=topic)
try:
while stop_event is None or not stop_event.is_set():
messages = consumer.poll(timeout_ms=1000)
for tp, msgs in messages.items():
for mensaje in msgs:
logger.info(f"Received message: {mensaje.value} "
f"Partition: {mensaje.partition}, "
f"Offset: {mensaje.offset}")
callback(mensaje, consumer)
finally:
consumer. Close()
The project relies on Docker Compose to run the required Kafka and Zookeeper containers. This setup allows the application to interact with a local Kafka broker without needing complex installation processes. A simplified excerpt of the docker-compose.yml file is shown below:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- kafka-net
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
And that’s all. Source code in my github account.