Implementing a Kafka Producer and Consumer in Python

Today we’re going to play with Kafka. We’ll implement a simple producer and consumer in Python using the kafka-python library. The project consists of two main components: First tne producer. It uses a dedicated class to send messages to a Kafka topic. One consumer. It Listens to a Kafka topic, processes messages received, and commits their offsets. Communication with Kafka is handled by a helper module that encapsulates producer and consumer configurations. The setup uses Docker Compose to manage the Kafka broker and supporting services such as Zookeeper.

Below is a simplified Producer class and corresponding function:

import json
import logging

from jsonencoder import DefaultEncoder
from kafka import KafkaProducer

from settings import KAFKA_BOOTSTRAP_SERVERS

logger = logging.getLogger(__name__)


def get_producer():
    return KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda data: json.dumps(data, cls=DefaultEncoder).encode('utf-8')
    )


class Producer:
    def __init__(self):
        self.producer = get_producer()

    def send(self, topic: str, message: any):
        try:
            self.producer.send(topic, value=message)
            self.producer.flush()
            logger.info(f"Message sent to topic: {topic}: {message}")
        except Exception as e:
            logger.error(f"Error sending message to {topic}: {str(e)}")
            raise
        finally:
            if self.producer:
                self.producer.close()


def send_message(topic: str, message: any):
    producer = Producer()
    producer.send(topic, message)

We’re using click to build the command line interface.

import click
from datetime import datetime
from lib.kafka_broker import send_message


@click.command()
@click.option('--topic', required=True, help='topic')
@click.option('--message', required=True, help='message')
def run(topic, message):
    send_message(topic, dict(
        timestamp=datetime.now().isoformat(),
        body=message
    ))

The consumer processes messages by consuming them from a Kafka topic. When a message is received, it gets logged and the consumer commits the offsets, ensuring that no message is processed more than once. The consumer functionality is implemented in a callback that is passed as a parameter to the topic consumption function.

Below is the consumer’s function definition and command setup:

import logging
import click
from kafka import KafkaConsumer
from kafka.protocol.message import Message
from lib.kafka_broker import consume_topic

logger = logging.getLogger(__name__)

def process_message(message: Message, consumer: KafkaConsumer) -> None:
    logger.info(f"received message: {message.value}")
    consumer.commit()

@click.command()
@click.option('--topic', required=True, help='topic')
def run(topic):
    consume_topic(topic, process_message)

The consume_topic function (from lib/kafka_broker.py) configures the Kafka consumer to listen to a specific topic. On receipt of each message, the process_mensaje callback handles the message by logging information and committing the consumer’s current offset.

import json
import logging
from typing import Protocol

from kafka import KafkaConsumer
from kafka.protocol.message import Message

from settings import KAFKA_BOOTSTRAP_SERVERS

logger = logging.getLogger(__name__)

EARLIEST = 'earliest'  # Automatically reset the offset to the earliest offset.
LATEST = 'latest'  # Automatically reset the offset to the latest offset.
NONE = 'none'  # You must set the partition and index manually.


def get_consumer(topic, *,
                 auto_commit=False,
                 group_id=None,
                 auto_offset_reset=EARLIEST) -> KafkaConsumer:
    return KafkaConsumer(
        topic,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=auto_commit,
        group_id=group_id,
        value_deserializer=lambda data: json.loads(data.decode('utf-8'))
    )


class MessageProcessorProtocol(Protocol):
    def __call__(self, message: Message, consumer: KafkaConsumer) -> None:
        ...


def consume_topic(topic, callback: MessageProcessorProtocol, stop_event=None):
    logger.info(f"Listening to topic: {topic}")
    consumer = get_consumer(topic, group_id=topic)
    try:
        while stop_event is None or not stop_event.is_set():
            messages = consumer.poll(timeout_ms=1000)
            for tp, msgs in messages.items():
                for mensaje in msgs:
                    logger.info(f"Received message: {mensaje.value} "
                                f"Partition: {mensaje.partition}, "
                                f"Offset: {mensaje.offset}")
                    callback(mensaje, consumer)
    finally:
        consumer. Close()

The project relies on Docker Compose to run the required Kafka and Zookeeper containers. This setup allows the application to interact with a local Kafka broker without needing complex installation processes. A simplified excerpt of the docker-compose.yml file is shown below:


version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - kafka-net

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge

And that’s all. Source code in my github account.

Implementing OAuth2 with a Vue Frontend and Python Backend using Nginx as a Reverse Proxy

We’ve seen in other posts that we can use OAuth2-proxy to provide OAuth2 authentication in our application. Today, for example, we will protect a Vue application, but instead of using oauth2-proxy, we will implement the functionality provided by oauth2-proxy directly in Python.

Our Vue application is very simple: it has only one button that shows some information. The entire Vue application, as well as the backend that serves this information (developed with Flask), will be protected and authenticated with OAuth2. For this example, we will use GitHub as the authentication provider.

<script setup lang="ts">
import {ref} from "vue";

defineProps<{
  msg: string
}>()

const data = ref(null);
const showModal = ref(false);

const fetchData = async () => {
  try {
    const response = await fetch("/app/api/userinfo", { redirect: "manual" });

    const logoutStatuses = [401, 403, 302, 303];
    if (response.type === "opaqueredirect" || logoutStatuses.includes(response.status)) {
      window.location.href = "/app/oauth/logout";
    } else {
      data.value = await response.json();
      showModal.value = true;
    }
  } catch (error) {
    console.error("Error al obtener los datos", error);
  }
};

</script>

<template>
  <div class="greetings">
    <h1 class="green">{{ msg }}</h1>
    <h3>
      You’ve successfully created a project with
      <button @click="fetchData" class="bg-blue-500 text-white p-2 rounded">
        Load data
      </button>
    </h3>
    <div v-if="showModal" class="fixed inset-0 flex items-center justify-center bg-gray-800 bg-opacity-50">
      <div class="bg-white p-6 rounded shadow-lg w-1/3">
        <h2 class="text-xl font-bold mb-4">Datos del Backend</h2>
        <pre class="bg-gray-100 p-3 rounded text-sm">{{ data }}</pre>
        <button @click="showModal = false" class="mt-4 bg-red-500 text-white p-2 rounded">Cerrar</button>
      </div>
    </div>
  </div>
</template>

<style scoped>
h1 {
  font-weight: 500;
  font-size: 2.6rem;
  position: relative;
  top: -10px;
}

h3 {
  font-size: 1.2rem;
}

.greetings h1,
.greetings h3 {
  text-align: center;
}

@media (min-width: 1024px) {
  .greetings h1,
  .greetings h3 {
    text-align: left;
  }
}
</style>

The Flask backend is as follows and allows us to respond with user data in a protected route and a public route.

import logging
from datetime import datetime

from flask import Flask, session
from flask_compress import Compress

from core.oauth_proxy import setup_oauth
from settings import APP_PATH, SECRET, SESSION, DEBUG, OAUTH

app = Flask(__name__)
app.debug = DEBUG
app.secret_key = SECRET
app.config.update(SESSION)
Compress(app)
for logger_name in ['werkzeug', ]:
    logging.getLogger(logger_name).setLevel(logging.WARNING)

setup_oauth(app, OAUTH, APP_PATH)


@app.get(f"/{APP_PATH}/api/userinfo")
def protected_route():
    now = datetime.now().isoformat()
    return dict(
        session=session['user'],
        now=now
    )


@app.get(f"/{APP_PATH}/api/no_auth")
def public_route():
    return "public, route!"

In order to use OAuth2, we need to use a reverse proxy like NGINX. The NGINX configuration file is shown below.

upstream app {
    server host.docker.internal:5000;
}

upstream front {
    server host.docker.internal:5173;
}


server {
    listen 8000;
    server_tokens off;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    proxy_set_header X-Forwarded-Host $host:$server_port;

    location / {
        auth_request /app/oauth2/auth;
        error_page 401 = @error401;
        auth_request_set $auth_cookie $upstream_http_set_cookie;
        try_files $uri @proxy_to_front;
    }

    location /app/api/ {
        auth_request /app/oauth2/auth;
        error_page 401 = @error401;
        auth_request_set $auth_cookie $upstream_http_set_cookie;
        try_files $uri @proxy_to_app;
    }

    location /app/api/no_auth {
        try_files $uri @proxy_to_app;
    }

    location /app/oauth2/ {
        proxy_set_header X-Real-IP               $remote_addr;
        proxy_set_header X-Auth-Request-Redirect $request_uri;
        proxy_pass http://app;
    }

    location @proxy_to_app {
        proxy_pass http://app;
    }

    location @proxy_to_front {
        proxy_pass http://front;
    }

    location @error401 {
        auth_request_set $auth_cookie $upstream_http_set_cookie;
        add_header Set-Cookie $auth_cookie;
        return 302 /app/oauth2/sign_in;
    }
}

The authentication flow is managed as follows, using a small blueprint that handles OAuth2 redirections, token exchange, and user session storage.

import logging
import secrets
from urllib.parse import urlencode

import requests
from flask import Blueprint, jsonify
from flask import request, session, redirect

logger = logging.getLogger(__name__)


def _clean_session():
    session.pop('user', default=None)
    session.pop('state', default=None)
    session.pop('referer', default=None)


def get_oauth_proxy_blueprint(oauth_conf, app_path, *, sub_path="oauth2", callback_url='/callback',
                              signin_url='/sign_in', auth_url='/auth', logout_url='/logout'):
    blueprint = Blueprint('oauth_proxy', __name__, url_prefix=f'/{app_path}/{sub_path}')

    @blueprint.get(callback_url)
    def callback():
        referer = session.get('referer')
        state = request.args.get('state')
        session_state = session.get('state')

        if 'state' not in session:
            return redirect(f"{referer}")
        if state == session_state:
            authorization_code = request.args.get('code')
            token_data = {
                'grant_type': oauth_conf.get('GRANT_TYPE', 'authorization_code'),
                'code': authorization_code,
                'redirect_uri': oauth_conf['REDIRECT_URL'],
                'client_id': oauth_conf['CLIENT_ID'],
                'client_secret': oauth_conf['CLIENT_SECRET']
            }
            response = requests.post(oauth_conf['TOKEN_URL'],
                                     data=token_data,
                                     headers={'Accept': 'application/json'})
            response_data = response.json()
            headers = {
                "Authorization": f"Bearer {response_data.get('access_token')}",
                'Accept': 'application/json'
            }
            user_response = requests.get(oauth_conf['USER_URL'],
                                         data=token_data,
                                         headers=headers)
            if user_response.ok:
                user_data = user_response.json()
                session['user'] = dict(
                    username=user_data['login'],
                    name=user_data['name'],
                    email=user_data['email']
                )
                session.pop('state', default=None)
                session.pop('referer', default=None)
            else:
                _clean_session()
            return redirect(referer)

    @blueprint.get(signin_url)
    def sign_in():
        state = secrets.token_urlsafe(32)
        session['state'] = state
        authorize = oauth_conf['AUTHORIZE_URL']
        query_string = urlencode({
            'scope': oauth_conf.get('SCOPE', 'read write'),
            'prompt': oauth_conf.get('PROMPT', 'login'),
            'approval_prompt': oauth_conf.get('APPROVAL_PROMPT', 'auto'),
            'state': state,
            'response_type': oauth_conf.get('RESPONSE_TYPE', 'code'),
            'redirect_uri': oauth_conf['REDIRECT_URL'],
            'client_id': oauth_conf['CLIENT_ID']
        })
        return redirect(f"{authorize}?{query_string}")

    @blueprint.get(auth_url)
    def auth():
        if not session.get("user"):
            referer = request.headers.get('X-Auth-Request-Redirect')
            session['referer'] = referer
            return redirect(f"oauth2/sign_in", 401)
        else:

            return jsonify(dict(error='OK')), 200

    @blueprint.get(logout_url)
    def logout():
        _clean_session()
        return redirect(logout_url)

    return blueprint


def setup_oauth(app, oauth_conf, app_path, *, sub_path="oauth2", callback_url='/callback',
                signin_url='/sign_in', auth_url='/auth', logout_url='/logout'):
    app.register_blueprint(get_oauth_proxy_blueprint(oauth_conf, app_path, sub_path=sub_path, callback_url=callback_url,
                                                     signin_url=signin_url, auth_url=auth_url, logout_url=logout_url))

You can see the full code of the project in my github

OAuth2 Authentication in Streamlit Applications with Nginx and OAuth2-Proxy

Normally, when I want to provide authentication to a service, I use OAuth2. There are libraries to integrate this authentication mechanism into a web application, but sometimes we cannot do this easily because it is a third-party service over which we have no control. In these cases, it is possible that this third-party service has support for OAuth2 and can also log in with OAuth2. But sometimes this is not possible, or it is too complicated. In these cases, a solution is to use a proxy that handles the authentication and communicates with the third-party service. In this example, we will use a Streamlit application as if it were a third-party application.

import streamlit as st

st.set_page_config(
    page_title="Home",
    page_icon="👋",
)
st.write("# Welcome to Streamlit! 👋")
st.markdown(
    """
    Streamlit is an open-source app framework built specifically for
    Machine Learning and Data Science projects.
    **👈 Select a demo from the sidebar** to see some examples
    of what Streamlit can do!
    ### Want to learn more?
    - Check out [streamlit.io](https://streamlit.io)
    - Jump into our [documentation](https://docs.streamlit.io)
    - Ask a question in our [community
        forums](https://discuss.streamlit.io)
    ### See more complex demos
    - Use a neural net to [analyze the Udacity Self-driving Car Image
        Dataset](https://github.com/streamlit/demo-self-driving)
    - Explore a [New York City rideshare dataset](https://github.com/streamlit/demo-uber-nyc-pickups)
"""
)

st.sidebar.success("Select a demo above.")

Our Streamlit application has a page.

from random import randint

import streamlit as st

st.set_page_config(
    page_title="Hello",
    page_icon="👋",
)

st.markdown("# Plotting Demo")
st.sidebar.header("Plotting Demo")
st.write("This demo illustrates a combination of plotting with Streamlit. Enjoy!")

data = [dict(name=f"name{i}", value=randint(1, 1000)) for i in range(1, 101)]

progress_bar = st.sidebar.progress(0)
status_text = st.sidebar.empty()
chart = st.line_chart([item['value'] for item in data])

progress_bar.empty()

st.button("Re-run")

To use OAuth authentication in the Streamlit application, we are using Nginx as a reverse proxy with the auth_request directive to direct requests to an OAuth2-proxy service deployed in our stack. OAuth2-proxy can be configured to authenticate any OAuth2 server compatible with OpenID. In my example, I am using GitHub, but you can use ActiveDirectory, Google, Keycloak, or even your own OAuth2 server. This is my Nginx configuration:

This is my Nginx configuration:

upstream app {
    server streamlit:8501;
}

upstream oauth2 {
    server oauth2-proxy:4180;
}

server {
    listen 8000;

    location / {
        auth_request /oauth2/auth;
        error_page 401 = @error401;
        try_files $uri @proxy_to_app;
    }

    location /_stcore/stream {
        auth_request /oauth2/auth;
        error_page 401 = @error401;
        proxy_pass http://app/_stcore/stream;
        proxy_http_version 1.1;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 86400;
    }

    location @error401 {
        return 302 /oauth2/sign_in;
    }

    location /oauth2/ {
        try_files $uri @proxy_to_oauth2;
    }

    location @proxy_to_oauth2 {
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_pass http://oauth2;
    }

    location @proxy_to_app {
        proxy_set_header X-Forwarded-Proto https;
        proxy_set_header X-Url-Scheme $scheme;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_redirect off;
        proxy_pass http://app;
    }
}

The complete stack can be seen in the docker-compose.yml:

version: '3.9'

services:
  streamlit:
    build: .
    environment:
      - ENVIRONMENT=docker
    command: ["streamlit", "run", "st.py", "--server.port=8501", "--server.address=0.0.0.0"]

  nginx:
    build: .docker/nginx
    ports:
      - "8000:8000"

  oauth2-proxy:
    image: quay.io/oauth2-proxy/oauth2-proxy:v7.8.1
    env_file:
      - .env

And that’s all. The advantage of using oauth2-proxy is that we don’t need to do anything within the Streamlit application to have OAuth2 authentication. This greatly simplifies the integration process, as all the authentication logic is handled outside the main application. Additionally, oauth2-proxy is compatible with any OAuth2 server that complies with OpenID, giving us the flexibility to use different authentication providers. By using Nginx as a reverse proxy, we can efficiently redirect and manage authentication requests, ensuring that only authenticated users can access our Streamlit application.

Full code available in my github account.

Implementing Industrial OPC UA Communication with Python and Asyncio

Today we’re going to work with an industrial protocol called OPC UA. We’ll be using the opcua-asyncio library to create a simple OPC UA server and client. We’ll also be using the `asyncio` library to handle the asynchronous communication between the server and the client. The idea es build a OPC UA server that exposes a variable and a client that reads and writes to that variable.

To simulate a changing variable, I’ve created a simple script that changes one variable every second with the value of the current time and persists it to a Redis database.

import logging
import time

import redis

from settings import REDIS_HOST, REDIS_PORT

logger = logging.getLogger(__name__)


def update_redis_variable_loop():
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    while True:
        timestamp_ms = int(time.time() * 1_000)
        r.set('ts', timestamp_ms)
        logger.info(f"Updated variable: {timestamp_ms}")
        time.sleep(1)

The server will have an authentication mechanism using a username and password, and it will also have a self-signed certificate and a private key to encrypt the communication. To generate the self-signed certificate and private key, you can use the following commands:

openssl genpkey -algorithm RSA -out private_key.pem
openssl req -new -key private_key.pem -out certificate.csr
openssl x509 -req -days 365 -in certificate.csr -signkey private_key.pem -out certificate.pem

This OPC UA server will expose the variable that we’re updating in the Redis database.

class UserManager:
    def get_user(self, iserver, username=None, password=None, certificate=None):
        if certificate and OPC_USERS_DB.get(username, False) == password:
            logger.info(f"User '{username}' authenticated")
            return User(role=UserRole.User)
        return None


async def main():
    server = Server(user_manager=UserManager())
    await server.init()
    server.set_endpoint(OPC_ENDPOINT)

    await server.load_certificate(OPC_CERTIFICATE)
    await server.load_private_key(OPC_PRIVATE_KEY)
    server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])

    namespace_idx = await server.register_namespace(OPC_NAMESPACE)
    obj = await server.nodes.objects.add_object(namespace_idx, "Gonzalo")
    var = await obj.add_variable(namespace_idx, "T", 0, datatype=ua.VariantType.Int32)
    await var.set_writable(False)

    redis_client = await redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    logger.info(f"Starting server on {OPC_ENDPOINT}")

    async with server:
        while True:
            await asyncio.sleep(1)
            value = await redis_client.get('ts')
            if value is not None:
                value = int(value)
                logger.info(f"Set value of {var} to {value}")
                await var.write_value(value)


def server(debug: bool = False):
    asyncio.run(main(), debug=debug)

And now we create a OPC UA client that reads the variable from the server and prints it to the console.

import asyncio
import logging

from asyncua import Client

from settings import OPC_ENDPOINT, OPC_CERTIFICATE, OPC_PRIVATE_KEY, OPC_USERNAME, OPC_PASSWORD

logger = logging.getLogger(__name__)


async def main():
    c = Client(url=OPC_ENDPOINT)
    c.set_user(OPC_USERNAME)
    c.set_password(OPC_PASSWORD)
    await c.set_security_string(f"Basic256Sha256,SignAndEncrypt,{OPC_CERTIFICATE},{OPC_PRIVATE_KEY}")

    async with c:
        node = c.get_node("ns=2;i=2")
        value = await node.read_value()
        logger.info(f"Value: {value}")


def client(debug: bool = False):
    asyncio.run(main(), debug=debug)

In our example we are using click to create a CLI interface to run the server and the client.

# Start Redis server
docker-compose up

# Start the process that updates the variable in Redis
python cli.py backend

# Run the server
python cli.py server

# Run the client
python cli.py client

Full code available in my github account

Creating a standalone WebSocket Server with FastApi and JWT Authentication in Python

In this post, I will show you how to create a WebSocket server in Python that uses JWT tokens for authentication. The server is designed to be independent of the main process, making it easy to integrate into existing applications. The client-side JavaScript will handle reconnections incrementally.

The WebSocket server will be created using FastApi, the web framework built on top of Starlette. This is the entrypoint.
import logging

from fastapi import FastAPI

from asgi_ws import setup_app

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)
SECRET_KEY = "your_secret_key"

app = FastAPI()

app = setup_app(
    app=app,
    base_path='/ws',
    jwt_secret_key=SECRET_KEY,
)
The `setup_app` function is defined in the `lib.websockets` module. This function will set up the WebSocket server and the necessary routes.
def setup_app(app, jwt_secret_key: str, base_path='/ws', jwt_algorithm: str = "HS256"):
    ws_router = get_ws_router(
        jwt_secret_key=jwt_secret_key,
        jwt_algorithm=jwt_algorithm,
        base_path=base_path
    )
    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"

    app.mount("/js", StaticFiles(directory=static_dir), name="js")
    app.include_router(ws_router)

    return app
The `get_ws_router` function is defined in the same module. This function will create the WebSocket router and the necessary routes.
def get_ws_router(jwt_secret_key: str, base_path='ws', jwt_algorithm: str = "HS256"):
    ws_router = APIRouter()

    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"
    ws_router.mount(f"{base_path}/js", StaticFiles(directory=static_dir), name="js")

    manager = ConnectionManager(jwt_secret_key=jwt_secret_key, jwt_algorithm=jwt_algorithm)

    @ws_router.post(f"{base_path}/emmit")
    async def emmit_endpoint(request: Request):
        payload = await request.json()
        await manager.broadcast(payload["channel"], payload["payload"])
        return True

    @ws_router.websocket(f"{base_path}/")
    async def websocket_endpoint(websocket: WebSocket):
        token = websocket.query_params.get("token")
        if not token:
            await websocket.close(code=1008)
            raise HTTPException(status_code=401, detail="Token required")

        await manager.connect(websocket, token)
        try:
            while True:
                message: Message = await websocket.receive()
                if message["type"] == "websocket.disconnect":
                    manager.disconnect(websocket)
                    break
        except WebSocketDisconnect:
            manager.disconnect(websocket)

    return ws_router
Websockets are bidirectional communication channels that allow real-time data transfer between clients and servers, but I prefer to avoid the communication from the client to the server. When a client wants to send a message to the server, it will send an HTTP POST request to the `/emit` endpoint (via the main process). The server will then broadcast the message to all connected clients. The client will only receive messages from the server. Because of that we need a main wsgi process using FastApi or another web framework to handle the HTTP requests. 

This an example with FastApi:
<pre class="wp-block-syntaxhighlighter-code"><!DOCTYPE html>
<html>
<head>
    <title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>

<input type="text" id="messageText" autocomplete="off"/>
<button onclick="sendMessage()">Send</button>

<ul id='messages'>
</ul>
<a href="8000/js/websockets.js">//localhost:8000/js/websockets.js</a>
<script>
    async function sendMessage() {
        const channel = 'chat';
        const url = `/emit/${channel}`;
        const input = document.getElementById("messageText");
        const message = input.value;
        input.value = '';
        const body = JSON.stringify({channel: 'chat1', payload: message});
        const headers = {'Content-Type': 'application/json'};

        try {
            const response = await fetch(url, {method: 'POST', headers: headers, body: body});
        } catch (error) {
            console.error('Error:', error);
        }
    }

    (async function () {
        const getToken = async () => {
            const response = await fetch('/token');
            const {token} = await response.json();
            return token;
        };

        const messageCallback = (event) => {
            const messages = document.getElementById('messages');
            const message = document.createElement('li');
            message.textContent = event.data;
            messages.appendChild(message);
        };

        const wsManager = new WebSocketManager('ws://localhost:8000/ws/', getToken, messageCallback);
        await wsManager.connect();
    })();

</script>
</body>
</html></pre>

Library is available at pypi

poetry add asgi_ws
pip install asgi_ws

Full code available in my github account.

Creating a Real-Time Flask Application with Flask-SocketIO and Redis

Today, we’re going to create a simple Flask application with real-time communication using websockets and the SocketIO library. We’ll leverage the Flask-SocketIO extension for integration.

Here’s the plan: while websockets support bidirectional communication, we’ll use them exclusively for server-to-client messages. For client-to-server interactions, we’ll stick with traditional HTTP communication.

Our application will include session-based authentication. To simulate login, we’ve created a route called /login that establishes a session. This session-based authentication will also apply to our websocket connections.

A key objective of this tutorial is to enable sending websocket messages from outside the web application. For instance, you might want to send messages from a cron job or an external service. To achieve this, we’ll use a message queue to facilitate communication between the SocketIO server and the client application. We’ll utilize Redis as our message queue.

That’s the main application

from flask import Flask, render_template, session, request

from lib.ws import register_ws, emit_event, EmitWebsocketRequest
from settings import REDIS_HOST, WS_PATH

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

register_ws(app=app, socketio_path=WS_PATH, redis_host=REDIS_HOST)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/login')
def login():
    session['user'] = 'Gonzalo'
    return dict(name=session['user'])


@app.post('/api/')
def api():
    data = EmitWebsocketRequest(**request.json)
    emit_event(data.channel, data.body)

    return dict(status=True)

That’s the html template

<pre class="wp-block-syntaxhighlighter-code"><!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Flask-SocketIO Websocket Example</title>
    <a href="//cdn.socket.io/4.0.0/socket.io.min.js">//cdn.socket.io/4.0.0/socket.io.min.js</a>
</head>
<body>

<h1>Flask-SocketIO Websocket Example</h1>
<label for="message">Message:</label>
<input type="text" id="message" placeholder="type a message...">

<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>

<script>
    document.addEventListener("DOMContentLoaded", function () {
        let host = location.protocol + '//' + location.hostname + ':' + location.port
        let socket = io.connect(host, {
            path: '/ws/socket.io',
            reconnection: true,
            reconnectionDelayMax: 5000,
            reconnectionDelay: 1000
        });

        socket.on('connect', function () {
            console.log('Connected to ws');
        });

        socket.on('disconnect', function () {
            console.log('Disconnected from ws');
        });

        socket.on('message', function (msg) {
            let messages = document.getElementById('messages');
            let messageItem = document.createElement('li');
            messageItem.textContent = msg;
            messages.appendChild(messageItem);
        });

        window.sendMessage = async function () {
            const url = '/api/';
            const payload = {"channel": "message", "body": this.message.value};

            try {
                const response = await fetch(url, {
                    method: 'POST',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify(payload)
                });

                if (!response.ok) {
                    console.error('Error: ' + response.statusText);
                }

                await response.json();
            } catch (error) {
                console.error('Error:', error);
            }
        };
    });
</script>
</body>
</html></pre>

The register_ws function binds SocketIO to our Flask server. To enable sending messages from outside our Flask application, we need to instantiate SocketIO in two different ways. For this purpose, I’ve created a ws.py file. Note: I’m using Pydantic to validate the HTTP requests.

import logging
from typing import Dict, Any, Union

from flask import session
from flask_socketio import SocketIO
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class Conf:
    def __init__(self, socketio=None):
        self._socketio = socketio

    @property
    def socketio(self):
        return self._socketio

    @socketio.setter
    def socketio(self, value):
        self._socketio = value


conf = Conf()


def emit_event(channel, body):
    conf.socketio.emit(channel, body)


class EmitWebsocketRequest(BaseModel):
    channel: str
    body: Union[Dict[str, Any], str]


def setup_ws(redis_host, redis_port=6379):
    conf.socketio = SocketIO(message_queue=f'redis://{redis_host}:{redis_port}')


def register_ws(
        app,
        redis_host,
        socketio_path='/ws/socket.io',
        redis_port=6379
):
    redis_url = f'redis://{redis_host}:{redis_port}' if redis_host else None
    conf.socketio = SocketIO(app, path=socketio_path, message_queue=redis_url)

    @conf.socketio.on('connect')
    def handle_connect():
        if not session.get("user"):
            raise ConnectionRefusedError('unauthorized!')
        logger.debug(f'Client connected: {session["user"]}')

    @conf.socketio.on('disconnect')
    def handle_disconnect():
        logger.debug('Client disconnected')

    return conf.socketio

Now, we can emit an event from outside the Flask application.

from lib.ws import emit_event, setup_ws
from settings import REDIS_HOST

setup_ws(redis_host=REDIS_HOST)
emit_event('message', 'Hi')

The application needs a Redis server. I set up the server using docker.

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Source code available in my github.

Python Flask and OAuth2: Building a Secure Authentication System

I typically use Flask for APIs and Django for web applications that utilize sessions and OAuth authentication. However, do I truly need Django for these functionalities? The answer is no. While Django provides pre-built components, similar capabilities are also accessible in Flask, and implementing them is quite straightforward. Additionally, I am a strong advocate of microframeworks. Today, we’ll demonstrate how to employ OAuth2 authentication using Flask. Let’s begin.

OAuth2 encompasses various flows, but today, we’ll focus on the most common one for web applications. The concept involves checking for a valid session. If one exists, great, but if not, the application will generate a session with a state (a randomly generated string) and then redirect to the OAuth2 server login page. Subsequently, the user will perform the login on the login server. Following that, the OAuth2 server will redirect to a validated callback URL with an authorization code (while also returning the provided state). The callback URL will then verify whether the state provided by the OAuth2 server matches the one in the session. Next, the callback route on your server, utilizing the authorization code, will obtain an access token (via a POST request to the OAuth2 server). With this access token, you can retrieve user information from the OAuth2 server and establish a valid session.

First we create a Flask application with sessions

from flask import Flask
from flask_session import Session

from settings import SECRET, SESSION


app = Flask(__name__)
app.secret_key = SECRET
app.config.update(SESSION)
Session(app)

Session configuration:

SESSION = dict(
    SESSION_PERMANENT=False,
    SESSION_TYPE="filesystem",
    SESSION_COOKIE_SECURE=True,
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE='Lax',
    SESSION_COOKIE_DOMAIN=False,
)

I like to use blueprints to manage the Flask, so let’s add our application:

from modules.home.app import blueprint as home

...
app.register_blueprint(home, url_prefix=f'/')

I set up the blueprint in a init.py file

from pathlib import Path

from flask import Blueprint

from lib.oauth import check_session

base = Path(__file__).resolve().parent
blueprint = Blueprint(
    'front_home', __name__,
    template_folder=base.joinpath('templates')
)


@blueprint.before_request
def auth():
    return check_session()

You can see that we’re using a before_request middleware to check the session in every route of the blueprint.

def check_session():
    if not session.get("user"):
        state = secrets.token_urlsafe(32)
        session['state'] = state
        authorize = OAUTH['AUTHORIZE_URL']
        query_string = urlencode({
            'scope': OAUTH.get('SCOPE', 'read write'),
            'prompt': OAUTH.get('PROMPT', 'login'),
            'approval_prompt': OAUTH.get('APPROVAL_PROMPT', 'auto'),
            'state': state,
            'response_type': OAUTH.get('RESPONSE_TYPE', 'code'),
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID']
        })
        return redirect(f"{authorize}?{query_string}")

And the routes of the blueprint:

from flask import render_template, session

from modules.home import blueprint


@blueprint.get(f"/")
def home():
    username = session['user']['username']
    return render_template('index.html',
                           username=username)

To do the login we need also to code our callback route. We will add a blueprint for that.

from lib.oauth import blueprint as oauth

...
app.register_blueprint(oauth)

That’s the OAuth2 callback:

import logging

import requests
from flask import Blueprint, abort
from flask import request, session, redirect

from settings import OAUTH

logger = logging.getLogger(__name__)

blueprint = Blueprint('oauth', __name__, url_prefix=f'/oauth')


@blueprint.get('/callback')
def callback():
    # Obtain the state from the request
    state = request.args.get('state')
    if 'state' not in session:
        return redirect(f"/")
    # Check if provided state match wht the session saved one
    if state == session['state']:
        # Obtain the authorization code from the request
        authorization_code = request.args.get('code')
        token_data = {
            'grant_type': OAUTH.get('GRANT_TYPE', 'authorization_code'),
            'code': authorization_code,
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID'],
            'client_secret': OAUTH['CLIENT_SECRET']
        }
        # POST to OAuth2 server to obtain the access_token
        response = requests.post(OAUTH['TOKEN_URL'],
                                 data=token_data,
                                 headers={'Accept': 'application/json'})
        response_data = response.json()
        headers = {
            "Authorization": f"Bearer {response_data.get('access_token')}",
            'Accept': 'application/json'
        }
        # With the access_token you can obtain the user information
        user_response = requests.get(OAUTH['USER_URL'],
                                     data=token_data,
                                     headers=headers)
        if user_response.ok:
            # Now you are able to create the session 
            user_data = user_response.json()
            session['user'] = dict(
                username=user_data['login'],
                name=user_data['name'],
                email=user_data['email']
            )
            session.pop('state', default=None)
        else:
            abort(401)
        return redirect(f"/")
    else:
        abort(401)

Mainly that’s all. In this example we’re using Github’s OAuth2 server. You can use different ones, and also with your own OAuth2 server. Maybe, depending on the server, they way to obtain the user_data, can be different, and you should adapt it to your needs.

In my example I’m saving my OAuth2 credentials in a .env file. With this technique I can use different configurations depending on my environment (production, staging, …)

CLIENT_ID=my_client_id
CLIENT_SECRET=my_client_secret
TOKEN_URL=https://github.com/login/oauth/access_token
AUTHORIZE_URL=https://github.com/login/oauth/authorize
USER_URL=https://api.github.com/user
REDIRECT_URL=http://localhost:5000/oauth/callback

And I load this conf in my settings.py

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

OAUTH = dict(
    CLIENT_ID=os.getenv('CLIENT_ID'),
    CLIENT_SECRET=os.getenv('CLIENT_SECRET'),
    TOKEN_URL=os.getenv('TOKEN_URL'),
    AUTHORIZE_URL=os.getenv('AUTHORIZE_URL'),
    USER_URL=os.getenv('USER_URL'),
    REDIRECT_URL=os.getenv('REDIRECT_URL'),
)

And that’s all. Full code in my github account.

Domain Events in Legacy Applications using Python and PostgreSQL

Sometimes we need to generate domain events in our application. It can be simple when you start an application from scratch, but it can be a nightmare when you have a legacy application. Today we're going to explain how to generate domain events from PostgreSQL. We can set up triggers within our database tables to generate domain events. In those triggers, we can use pg_notify to emit events and after that, we can use a listener to consume those events. This approach works, but we need to set up those triggers in every table that we want to generate events from. Today we're going to use logical replication to generate domain events. With this approach, we can generate events from all tables in our database without the need to set up triggers in every table.

First of all, we need to create a publication. A publication is a set of tables that we want to replicate. We can create a publication with the following SQL command:
CREATE PUBLICATION pub1 FOR ALL TABLES;
SELECT pg_create_logical_replication_slot('slot1', 'pgoutput');
When we create our replication slot, we can choose between different plugins. In this case, we're going to use pgoutput. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. pgoutput is a plugin that is included in PostgreSQL by default that sends the information in bytea format. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. We're going to create the subscription in Python. We can do a simple subscription with the following code:
import psycopg2.extras

from settings import DSN


conn = psycopg2.connect(DSN, connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()

cur.start_replication(
    slot_name='slot1', 
    decode=False,
    options={'proto_version': '1', 'publication_names': 'pub1'})


def consume(msg):
    payload = msg.payload
    print(payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)


cur.consume_stream(consume)
With this simple script, we're listening to all changes in our database. We can use this script to generate domain events in our application. We need to decode the payload to get the changes in our tables. There is a library to decode the payload called pypgoutput. I have had problems with this library, so I have used only one part of the library to decode the payload (decoders.py).

The main script is like this:
import logging

from lib.consumer import Consumer
from lib.models import Types, Event
from settings import DSN, PUBLICATION_NAME, SLOT_NAME

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(event: Event):
    logger.info(
        f"{event.ts} id:{event.tx_id} [{event.type}] "
        f"{event.schema_name}.{event.table_name} with values {event.values}")


consumer = Consumer(DSN)
consumer.on(Types.UPDATE, 'public.*', callback)

consumer.start(
    slot_name=SLOT_NAME,
    publication_name=PUBLICATION_NAME)
For my example, I am using a database with the following schema:
CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);
It is important to activate the logical replication in the database. We can do it with the following command:
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
ALTER SYSTEM SET max_worker_processes = 10;
And I am registering an event on every update of the actors table with the callback function. The callback function is called with the event that contains the type of the event, the schema name, the table name, and the values of the row that has been updated. In callback function we can do wathever we want with the event. In this case, I am just logging the event, but maybe you can emit this event to a message broker such as Kafka, RabbitMQ or a MQTT broker.

That is the main notification part of the script. The other part is the conversion of the values of the row. The values are in bytea format, so we need to convert them to Python types. The conversion is done with the following function:
def convert_value(oid, value):
    if value is None:
        return None
    python_type = OID_MAP.get(oid, str)
    try:
        if python_type == bool:
            return value == 't'
        elif python_type == datetime:
            return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
        elif python_type == date:
            return datetime.strptime(value, '%Y-%m-%d').date()
        elif python_type == dict:
            import json
            return json.loads(value)
        elif python_type == uuid.UUID:
            return uuid.UUID(value)
        else:
            return python_type(value)
    except Exception as e:
        logger.error(f"Error converting {value} with OID {oid}: {e}")
        return value


def get_event(message_type, rel, tx, payload) -> Event | None:
    current_type = Types(message_type)
    decoder_map = {
        Types.INSERT: decoders.Insert,
        Types.UPDATE: decoders.Update,
        Types.DELETE: decoders.Delete,
        Types.TRUNCATE: decoders.Truncate
    }
    data = decoder_map.get(current_type, lambda x: None)(payload)

    if data:
        if current_type == Types.TRUNCATE:
            fields = []
        else:
            fields = get_fields(rel, getattr(data, 'old_tuple', None), getattr(data, 'new_tuple', None))

        event = Event(
            type=current_type,
            tx_id=tx.tx_id,
            ts=tx.commit_ts,
            schema_name=rel.namespace,
            table_name=rel.relation_name,
            values=fields
        )
        return event
    return None


def get_fields(rel, old, new):
    fields = [
        Field(
            name=c.name,
            old=convert_value(c.type_id, old.column_data[i].col_data) if old else None,
            new=convert_value(c.type_id, new.column_data[i].col_data) if new else None,
            pkey=c.part_of_pkey == 1
        )
        for i, c in enumerate(rel.columns)
    ]
    return fields
When a client is connected we can see it using a simple query:
SELECT * FROM pg_stat_replication;
And also we can see the replication slots with the following query:
SELECT
    pg_current_wal_lsn() AS current_lsn,
    slot_name,
    restart_lsn,
    confirmed_flush_lsn
FROM
    pg_replication_slots
WHERE
    slot_type = 'logical';
The script is just an experiment. Maybe it can be adapted to a real application, but probably it needs more work, especially in data type conversion.

In conclusion, using logical replication to generate domain events from PostgreSQL offers a powerful and flexible approach, especially for legacy systems or applications where modifying the existing database structure is challenging. This method allows us to capture changes across all tables without the need for individual triggers, potentially simplifying event sourcing and change data capture processes. However, it's important to note that this approach comes with its own set of considerations, such as performance impact on the database, handling of large volumes of events, and ensuring data consistency. As with any experimental technique, thorough testing and careful consideration of your specific use case are crucial before implementing this in a production environment. Despite these challenges, the potential for creating a robust, database-driven event system makes this an exciting area for further exploration and development.

Full source code in my github account

Transforming Natural Language to SQL Queries with Python and LangChain

LLMs are highly proficient at generating code, including SQL queries from natural language text. Today, we’re going to experiment with this capability to see how effectively we can transform natural language instructions into SQL queries. The idea is to leverage the power of natural language processing to simplify the process of writing complex SQL statements. For this experiment, I’ve downloaded a CSV file containing data from IMDB, which includes various attributes related to movies, such as titles, release years, genres, and ratings. By using this dataset, we can test the LLM’s ability to generate accurate and efficient SQL queries based on different natural language prompts. Here’s an example of what the data looks like:

nconst,primaryname,birthyear,deathyear,primaryprofession,knownfortitles
nm0325022,Käthe Gold,1907,1997,"actress,archive_footage","tt0026069,tt0032498,tt0436641,tt0026066"
nm0325025,Lee Gold,1919,1985,writer,"tt0034433,tt0040392,tt0048226,tt0099219"
nm0325028,Louise Gold,1956,,"actress,miscellaneous,soundtrack","tt0074028,tt0104940,tt0083791,tt2281587"
...

Now, we will create a PostgreSQL database using Docker. Docker allows us to quickly set up and manage containerized applications, making it an ideal tool for this purpose. Below is the Dockerfile we will use to set up our PostgreSQL database:

FROM postgres:16.3-alpine
COPY actors.csv /docker-entrypoint-initdb.d/actors.csv
COPY init.sql /docker-entrypoint-initdb.d/

Next, we will set up the database and import the CSV data into an ‘actors’ table using the Docker entrypoint. Below is how we configure the Docker entrypoint script to initialize the PostgreSQL database and import the CSV data:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

COPY actors FROM '/docker-entrypoint-initdb.d/actors.csv' CSV HEADER;

That’s the docker-compose file to set up the PostgreSQL database

version: '3.6'

services:
  pg:
    build:
      context: .docker/pg
      dockerfile: Dockerfile
    ports:
      - 5432:5432
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

Now we can start with the python script. We’re going to use cick library to build cli scrpt. The python application interacts with a database to execute SQL queries generated from user input. The process begins with obtaining a MovieChain object through the get_chain function, which takes an argument llm. This MovieChain object is then used to generate an SQL query based on the user’s input q through its get_sql method. After that we just execute the SQL query into the PostgreSQL and print the results.

import click
from dbutils import get_conn, Db, get_cursor
from lib.chains.movie import get_chain
from lib.llm.groq import llm
from settings import DSN


@click.command()
@click.option('--q', required=True, help='question to ask')
def run(q):
    chain = get_chain(llm)
    sql = chain.get_sql(q)
    click.echo(f"q: {q}")
    click.echo(sql)
    click.echo('')
    if sql:
        conn = get_conn(DSN, named=True, autocommit=True)
        db = Db(get_cursor(conn=conn))
        data = db.fetch_all(sql)
        for row in data:
            print(row)

The MovieChain class interacts with an LLM (in this example, we’re using Groq).

import logging
from langchain_core.messages import SystemMessage, HumanMessage

from .prompts import PROMPT

logger = logging.getLogger(__name__)


class MovieChain:

    def __init__(self, llm):
        self.llm = llm

        self.prompt = SystemMessage(content=PROMPT)

    def get_sql(self, q: str):
        user_message = HumanMessage(content=q)
        try:
            ai_msg = self.llm.invoke([self.prompt, user_message])
            output_message = ai_msg.content if not isinstance(ai_msg, str) else ai_msg

            return output_message
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

The Chain uses two prompts: the system prompt that creates the proper context to assist the LLM in generating the SQL query. We’re providing the create table script.

PROMPT = """
You are an expert in generating SQL queries based on user questions.
You have access to a database with the following table schema:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

Please generate an SQL query to answer the following user question.
Ensure the query is valid, secure, and tailored to the provided schema.
Return only the SQL query without additional explanations.
Don't use quotes around the query in any case.
"""

And that’s all. With it we can ask quetions about this dataset and llm genetes the SQL for us.

python cli.py movie --q="List the living actors under 10 years old."

q: List the living actors under 10 years old.
SELECT * FROM actors WHERE deathyear IS NULL AND birthyear > (EXTRACT(YEAR FROM CURRENT_DATE) - 10);
...
python cli.py movie --q="List the living actors who were born in the same year as Mel Gibson."

q: List the living actors who were born in the same year as Mel Gibson
SELECT * FROM actors WHERE birthyear = (SELECT birthyear FROM actors WHERE primaryname = 'Mel Gibson') AND deathyear IS NULL;
...
cli.py movie --q="List the deceased actors who were born in the same year as Mel Gibson."

q: List the deceased actors who were born in the same year as Mel Gibson.
SELECT * 
FROM actors 
WHERE deathyear IS NOT NULL 
AND birthyear = (SELECT birthyear 
                 FROM actors 
                 WHERE primaryname = 'Mel Gibson');
...
python cli.py movie --q="What is the name, date of birth, and age of the oldest living actor born in the 70s?"

q: What is the name, date of birth, and age of the oldest living actor born in the 70s?
SELECT primaryname, birthyear, (2023 - birthyear) AS age 
FROM actors 
WHERE birthyear >= 1970 AND birthyear < 1980 AND deathyear IS NULL 
ORDER BY birthyear ASC 
LIMIT 1;

{'primaryname': 'Missy Gold', 'birthyear': 1970, 'age': 53}

With projects like these, where we execute “random” SQL generated by an LLM, it’s crucial to manage user access to the database carefully. Restricting access helps mitigate potential SQL injection risks, especially depending on the prompts provided by the user when interacting with the LLM.

Full source code in my github account.

Leveraging AI to perform Code Audits based on Git Commit Diff with Python and LangChain

A few days ago, I came across a project on GitHub by my friend Jordi called Commitia. Commitia is a simple command line tool that helps you write commit messages. It works by passing a git diff to an LLM model, which then suggests a commit message based on the diff. I liked the idea of using LLM models to assist in the development process by interacting with git diffs. So, I decided to create a similar tool using Python and LangChain, just for practice. I’ll use Click to create the command line interface and LangChain to interact with the LLM model. I’ll use Azure LLM, but any LLM model that supports custom functions, like Groq LLM or OpenAI, can be used.

import click

from lib.chains.git import get_chain
from lib.git_utils import get_current_diff
from lib.llm.azure import llm
from settings import BASE_DIR


@click.command()
def run():
    current_diff = get_current_diff(BASE_DIR / '..')
    chain = get_chain(llm)
    ia_response = chain.get_commit_message(current_diff)
    click.echo(ia_response)

That’s the chain

import logging

from langchain_core.messages import SystemMessage, HumanMessage

from lib.models import DiffData
from .prompts import PROMPT_COMMIT_MESSAGE

logger = logging.getLogger(__name__)


class GitChain:

    def __init__(self, llm):
        self.llm = llm
        self.prompt_commit_message = SystemMessage(content=PROMPT_COMMIT_MESSAGE)

    @staticmethod
    def _get_diff_content(diff: DiffData):
        return "\n".join((d.diff for d in diff.diffs))

    def get_commit_message(self, diff: DiffData):
        try:
            user_message = HumanMessage(content=self._get_diff_content(diff))
            messages = [self.prompt_commit_message, user_message]
            ai_msg = self.llm.invoke(messages)
            return ai_msg if isinstance(ai_msg, str) else ai_msg.content
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

I’m going to use the same prompt that Jordi uses in Commitia.

PROMPT_COMMIT_MESSAGE = """
You are an assistant to write the commit message.
The user will send you the content of the commit diff, and you will reply with the commit message.
It must be a commit message of one single line. Be concise, just write the message, do not give any explanation.
"""

To obtain the git diff I’m going to use gitpython library.

from git.repo import Repo

from .models import Diff, DiffData, Status

def _get_file_mode(diff):
    if diff.new_file:
        return Status.CREATED
    elif diff.deleted_file:
        return Status.DELETED
    elif diff.copied_file:
        return Status.COPIED
    else:
        return Status.MODIFIED


    
def _build_diff_data(commit, diffs) -> DiffData:
    return DiffData(
        user=str(commit.author),
        date=commit.committed_datetime,
        diffs=[Diff(
            diff=str(diff),
            path=diff.b_path if diff.new_file else diff.a_path,
            status=_get_file_mode(diff)
        ) for diff in diffs]
    )

def get_current_diff(repo_path) -> DiffData:
    repo = Repo(repo_path)
    commit = repo.head.commit
    diffs = commit.diff(None, create_patch=True)

    return _build_diff_data(commit, diffs)

I’m using the following models to represent the data.

from datetime import datetime
from enum import Enum
from typing import List

from pydantic import BaseModel


class Status(str, Enum):
    CREATED = 'C'
    MODIFIED = 'M'
    DELETED = 'D'
    COPIED = 'C'


class Diff(BaseModel):
    diff: str
    path: str
    status: Status


class DiffData(BaseModel):
    user: str
    date: datetime
    diffs: List[Diff]

After this experiment cloning Commitia I’m going to do another experiment. Now the idea is create a code review of based on the git diff. I’m going to use the same structure of the previous experiment. I can only need to change the prompt.

PROMPT_CODE_AUDIT = """
You are experience developer and need to perform a code review of a git diff.
You should identify potential errors, provide suggestions for improvement,
and highlight best practices in the provided code.

You should provide a global score of the code quality if you detect any issue based on the following criteria:
- NONE: 0.0
- LOW: Between 0.1 and 3.9
- MEDIUM: Between 4.0 and 6.9
- HIGH: Between 7.0 and 8.9
- CRITICAL Between 9.0 and 10.0

Your output should use the following template:
### Score
Global score of risks: [NONE, LOW, MEDIUM, HIGH, CRITICAL]

### Diff Explanation
First you must provide a brief explanation of the diff in a single line. Be concise and do not give any explanation.
Then you should provide a detailed explanation of the changes made in the diff.

### Audit summary
Detailed explanation of the identified gaps and their potential impact, if there are any significant findings.
"""

As you can see, I’m using a prompt to analyze the code and provide a score for its quality. I also want to perform actions based on the score, such as taking specific measures if the score is critical. To achieve this, we need to use a custom function. Therefore, we need an LLM model that supports calling custom functions. I added the audit_diff method to the chain to handle this.

import logging
from enum import Enum

from langchain_core.messages import SystemMessage, HumanMessage

from lib.models import DiffData
from .prompts import PROMPT_CODE_AUDIT, PROMPT_COMMIT_MESSAGE

logger = logging.getLogger(__name__)


class Score(int, Enum):
    NONE = 1
    LOW = 2
    MEDIUM = 3
    HIGH = 4
    CRITICAL = 5


class GitChain:

    def __init__(self, llm, tools):
        self.llm = llm
        if hasattr(llm, 'bind_tools'):
            self.llm_with_tools = llm.bind_tools(list(tools.values()))
        else:
            logger.info("LLM does not support bind_tools method")
            self.llm_with_tools = llm
        self.prompt_code_audit = SystemMessage(content=PROMPT_CODE_AUDIT)
        self.prompt_commit_message = SystemMessage(content=PROMPT_COMMIT_MESSAGE)
        self.tools = tools

    @staticmethod
    def _get_diff_content(diff: DiffData):
        return "\n".join((d.diff for d in diff.diffs))

    def _get_status_from_message(self, message) -> Score | None:
        ai_msg = self.llm_with_tools.invoke([HumanMessage(content=message)])
        return self._get_tool_output(ai_msg)

    def get_commit_message(self, diff: DiffData):
        try:
            user_message = HumanMessage(content=self._get_diff_content(diff))
            messages = [self.prompt_commit_message, user_message]
            ai_msg = self.llm.invoke(messages)
            return ai_msg if isinstance(ai_msg, str) else ai_msg.content
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

    def audit_diff(self, diff: DiffData):
        user_message = HumanMessage(content=self._get_diff_content(diff))
        try:
            ai_msg = self.llm.invoke([self.prompt_code_audit, user_message])
            output_message = ai_msg.content if not isinstance(ai_msg, str) else ai_msg

            return self._get_status_from_message(output_message), output_message
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

    def _get_tool_output(self, ai_msg):
        status = None
        for tool_call in ai_msg.tool_calls:
            tool_output = self.tools[tool_call["name"]].invoke(tool_call["args"])
            logger.info(f"Tool: '{tool_call['name']}'")
            status = tool_output
        return status

The audit_diff method takes a DiffData object as an argument, which represents the differences in the code that need to be audited. The first line inside the method creates a HumanMessage object from the content of the DiffData object by calling the _get_diff_content method, which combines all the diffs into a single string. Next, the method invokes the LLM with a system message prompt for code auditing and the human message. The LLM’s response is stored in ai_msg. If ai_msg is a string, it is used as is; otherwise, the content of ai_msg is used. The method then calls _get_status_from_message with output_message as the argument. This method invokes the LLM with tools using the output_message as input and gets the tool output. The method returns the tool output status and the output_message. In summary, the audit_diff method audits code differences using a Language Learning Model and a set of tools, and returns the audit status and the AI message content.

Now I can invoke the chain to audit the code and print the results. Also, I can use the score to perform an action.

import click
from rich.console import Console
from rich.markdown import Markdown

from lib.chains.git import get_chain
from lib.chains.git.chain import Score
from lib.git_utils import get_current_diff
from lib.llm.azure import llm
from settings import BASE_DIR


@click.command()
def run():
    current_diff = get_current_diff(BASE_DIR / '..')
    chain = get_chain(llm)
    status, ia_response = chain.audit_diff(current_diff)

    click.echo('Audit results:')
    click.echo('--------------')
    click.echo(f"{current_diff.user} at {current_diff.date} made the following changes at:")
    click.echo('')
    click.echo('Affected files:')
    for diff in current_diff.diffs:
        print(f"[{diff.status.value}] {diff.path}")
    click.echo('')

    console = Console()
    console.print(Markdown(ia_response))

    click.echo('')
    if status == Score.CRITICAL:
        click.echo('[WARNING] Critical issues found.')
    else:
        click.echo('No critical issues found.')

In conclusion, integrating AI into the development workflow can significantly enhance productivity and code quality. By using tools like LangChain and LLM models, we can automate the generation of commit messages and perform detailed code audits based on git diffs. This not only saves time but also ensures consistency and accuracy in commit messages and code reviews. As we continue to explore and implement these AI-driven solutions, we open up new possibilities for more efficient and effective software development practices. It is not magic, it is just code.

Full source code in my github.