Implementing Industrial OPC UA Communication with Python and Asyncio

Today we’re going to work with an industrial protocol called OPC UA. We’ll be using the opcua-asyncio library to create a simple OPC UA server and client. We’ll also be using the `asyncio` library to handle the asynchronous communication between the server and the client. The idea es build a OPC UA server that exposes a variable and a client that reads and writes to that variable.

To simulate a changing variable, I’ve created a simple script that changes one variable every second with the value of the current time and persists it to a Redis database.

import logging
import time

import redis

from settings import REDIS_HOST, REDIS_PORT

logger = logging.getLogger(__name__)


def update_redis_variable_loop():
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    while True:
        timestamp_ms = int(time.time() * 1_000)
        r.set('ts', timestamp_ms)
        logger.info(f"Updated variable: {timestamp_ms}")
        time.sleep(1)

The server will have an authentication mechanism using a username and password, and it will also have a self-signed certificate and a private key to encrypt the communication. To generate the self-signed certificate and private key, you can use the following commands:

openssl genpkey -algorithm RSA -out private_key.pem
openssl req -new -key private_key.pem -out certificate.csr
openssl x509 -req -days 365 -in certificate.csr -signkey private_key.pem -out certificate.pem

This OPC UA server will expose the variable that we’re updating in the Redis database.

class UserManager:
    def get_user(self, iserver, username=None, password=None, certificate=None):
        if certificate and OPC_USERS_DB.get(username, False) == password:
            logger.info(f"User '{username}' authenticated")
            return User(role=UserRole.User)
        return None


async def main():
    server = Server(user_manager=UserManager())
    await server.init()
    server.set_endpoint(OPC_ENDPOINT)

    await server.load_certificate(OPC_CERTIFICATE)
    await server.load_private_key(OPC_PRIVATE_KEY)
    server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])

    namespace_idx = await server.register_namespace(OPC_NAMESPACE)
    obj = await server.nodes.objects.add_object(namespace_idx, "Gonzalo")
    var = await obj.add_variable(namespace_idx, "T", 0, datatype=ua.VariantType.Int32)
    await var.set_writable(False)

    redis_client = await redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    logger.info(f"Starting server on {OPC_ENDPOINT}")

    async with server:
        while True:
            await asyncio.sleep(1)
            value = await redis_client.get('ts')
            if value is not None:
                value = int(value)
                logger.info(f"Set value of {var} to {value}")
                await var.write_value(value)


def server(debug: bool = False):
    asyncio.run(main(), debug=debug)

And now we create a OPC UA client that reads the variable from the server and prints it to the console.

import asyncio
import logging

from asyncua import Client

from settings import OPC_ENDPOINT, OPC_CERTIFICATE, OPC_PRIVATE_KEY, OPC_USERNAME, OPC_PASSWORD

logger = logging.getLogger(__name__)


async def main():
    c = Client(url=OPC_ENDPOINT)
    c.set_user(OPC_USERNAME)
    c.set_password(OPC_PASSWORD)
    await c.set_security_string(f"Basic256Sha256,SignAndEncrypt,{OPC_CERTIFICATE},{OPC_PRIVATE_KEY}")

    async with c:
        node = c.get_node("ns=2;i=2")
        value = await node.read_value()
        logger.info(f"Value: {value}")


def client(debug: bool = False):
    asyncio.run(main(), debug=debug)

In our example we are using click to create a CLI interface to run the server and the client.

# Start Redis server
docker-compose up

# Start the process that updates the variable in Redis
python cli.py backend

# Run the server
python cli.py server

# Run the client
python cli.py client

Full code available in my github account

Creating a standalone WebSocket Server with FastApi and JWT Authentication in Python

In this post, I will show you how to create a WebSocket server in Python that uses JWT tokens for authentication. The server is designed to be independent of the main process, making it easy to integrate into existing applications. The client-side JavaScript will handle reconnections incrementally.

The WebSocket server will be created using FastApi, the web framework built on top of Starlette. This is the entrypoint.
import logging

from fastapi import FastAPI

from asgi_ws import setup_app

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)
SECRET_KEY = "your_secret_key"

app = FastAPI()

app = setup_app(
    app=app,
    base_path='/ws',
    jwt_secret_key=SECRET_KEY,
)
The `setup_app` function is defined in the `lib.websockets` module. This function will set up the WebSocket server and the necessary routes.
def setup_app(app, jwt_secret_key: str, base_path='/ws', jwt_algorithm: str = "HS256"):
    ws_router = get_ws_router(
        jwt_secret_key=jwt_secret_key,
        jwt_algorithm=jwt_algorithm,
        base_path=base_path
    )
    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"

    app.mount("/js", StaticFiles(directory=static_dir), name="js")
    app.include_router(ws_router)

    return app
The `get_ws_router` function is defined in the same module. This function will create the WebSocket router and the necessary routes.
def get_ws_router(jwt_secret_key: str, base_path='ws', jwt_algorithm: str = "HS256"):
    ws_router = APIRouter()

    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"
    ws_router.mount(f"{base_path}/js", StaticFiles(directory=static_dir), name="js")

    manager = ConnectionManager(jwt_secret_key=jwt_secret_key, jwt_algorithm=jwt_algorithm)

    @ws_router.post(f"{base_path}/emmit")
    async def emmit_endpoint(request: Request):
        payload = await request.json()
        await manager.broadcast(payload["channel"], payload["payload"])
        return True

    @ws_router.websocket(f"{base_path}/")
    async def websocket_endpoint(websocket: WebSocket):
        token = websocket.query_params.get("token")
        if not token:
            await websocket.close(code=1008)
            raise HTTPException(status_code=401, detail="Token required")

        await manager.connect(websocket, token)
        try:
            while True:
                message: Message = await websocket.receive()
                if message["type"] == "websocket.disconnect":
                    manager.disconnect(websocket)
                    break
        except WebSocketDisconnect:
            manager.disconnect(websocket)

    return ws_router
Websockets are bidirectional communication channels that allow real-time data transfer between clients and servers, but I prefer to avoid the communication from the client to the server. When a client wants to send a message to the server, it will send an HTTP POST request to the `/emit` endpoint (via the main process). The server will then broadcast the message to all connected clients. The client will only receive messages from the server. Because of that we need a main wsgi process using FastApi or another web framework to handle the HTTP requests. 

This an example with FastApi:
<pre class="wp-block-syntaxhighlighter-code"><!DOCTYPE html>
<html>
<head>
    <title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>

<input type="text" id="messageText" autocomplete="off"/>
<button onclick="sendMessage()">Send</button>

<ul id='messages'>
</ul>
<a href="8000/js/websockets.js">//localhost:8000/js/websockets.js</a>
<script>
    async function sendMessage() {
        const channel = 'chat';
        const url = `/emit/${channel}`;
        const input = document.getElementById("messageText");
        const message = input.value;
        input.value = '';
        const body = JSON.stringify({channel: 'chat1', payload: message});
        const headers = {'Content-Type': 'application/json'};

        try {
            const response = await fetch(url, {method: 'POST', headers: headers, body: body});
        } catch (error) {
            console.error('Error:', error);
        }
    }

    (async function () {
        const getToken = async () => {
            const response = await fetch('/token');
            const {token} = await response.json();
            return token;
        };

        const messageCallback = (event) => {
            const messages = document.getElementById('messages');
            const message = document.createElement('li');
            message.textContent = event.data;
            messages.appendChild(message);
        };

        const wsManager = new WebSocketManager('ws://localhost:8000/ws/', getToken, messageCallback);
        await wsManager.connect();
    })();

</script>
</body>
</html></pre>

Library is available at pypi

poetry add asgi_ws
pip install asgi_ws

Full code available in my github account.

Creating a Real-Time Flask Application with Flask-SocketIO and Redis

Today, we’re going to create a simple Flask application with real-time communication using websockets and the SocketIO library. We’ll leverage the Flask-SocketIO extension for integration.

Here’s the plan: while websockets support bidirectional communication, we’ll use them exclusively for server-to-client messages. For client-to-server interactions, we’ll stick with traditional HTTP communication.

Our application will include session-based authentication. To simulate login, we’ve created a route called /login that establishes a session. This session-based authentication will also apply to our websocket connections.

A key objective of this tutorial is to enable sending websocket messages from outside the web application. For instance, you might want to send messages from a cron job or an external service. To achieve this, we’ll use a message queue to facilitate communication between the SocketIO server and the client application. We’ll utilize Redis as our message queue.

That’s the main application

from flask import Flask, render_template, session, request

from lib.ws import register_ws, emit_event, EmitWebsocketRequest
from settings import REDIS_HOST, WS_PATH

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

register_ws(app=app, socketio_path=WS_PATH, redis_host=REDIS_HOST)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/login')
def login():
    session['user'] = 'Gonzalo'
    return dict(name=session['user'])


@app.post('/api/')
def api():
    data = EmitWebsocketRequest(**request.json)
    emit_event(data.channel, data.body)

    return dict(status=True)

That’s the html template

<pre class="wp-block-syntaxhighlighter-code"><!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Flask-SocketIO Websocket Example</title>
    <a href="//cdn.socket.io/4.0.0/socket.io.min.js">//cdn.socket.io/4.0.0/socket.io.min.js</a>
</head>
<body>

<h1>Flask-SocketIO Websocket Example</h1>
<label for="message">Message:</label>
<input type="text" id="message" placeholder="type a message...">

<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>

<script>
    document.addEventListener("DOMContentLoaded", function () {
        let host = location.protocol + '//' + location.hostname + ':' + location.port
        let socket = io.connect(host, {
            path: '/ws/socket.io',
            reconnection: true,
            reconnectionDelayMax: 5000,
            reconnectionDelay: 1000
        });

        socket.on('connect', function () {
            console.log('Connected to ws');
        });

        socket.on('disconnect', function () {
            console.log('Disconnected from ws');
        });

        socket.on('message', function (msg) {
            let messages = document.getElementById('messages');
            let messageItem = document.createElement('li');
            messageItem.textContent = msg;
            messages.appendChild(messageItem);
        });

        window.sendMessage = async function () {
            const url = '/api/';
            const payload = {"channel": "message", "body": this.message.value};

            try {
                const response = await fetch(url, {
                    method: 'POST',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify(payload)
                });

                if (!response.ok) {
                    console.error('Error: ' + response.statusText);
                }

                await response.json();
            } catch (error) {
                console.error('Error:', error);
            }
        };
    });
</script>
</body>
</html></pre>

The register_ws function binds SocketIO to our Flask server. To enable sending messages from outside our Flask application, we need to instantiate SocketIO in two different ways. For this purpose, I’ve created a ws.py file. Note: I’m using Pydantic to validate the HTTP requests.

import logging
from typing import Dict, Any, Union

from flask import session
from flask_socketio import SocketIO
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class Conf:
    def __init__(self, socketio=None):
        self._socketio = socketio

    @property
    def socketio(self):
        return self._socketio

    @socketio.setter
    def socketio(self, value):
        self._socketio = value


conf = Conf()


def emit_event(channel, body):
    conf.socketio.emit(channel, body)


class EmitWebsocketRequest(BaseModel):
    channel: str
    body: Union[Dict[str, Any], str]


def setup_ws(redis_host, redis_port=6379):
    conf.socketio = SocketIO(message_queue=f'redis://{redis_host}:{redis_port}')


def register_ws(
        app,
        redis_host,
        socketio_path='/ws/socket.io',
        redis_port=6379
):
    redis_url = f'redis://{redis_host}:{redis_port}' if redis_host else None
    conf.socketio = SocketIO(app, path=socketio_path, message_queue=redis_url)

    @conf.socketio.on('connect')
    def handle_connect():
        if not session.get("user"):
            raise ConnectionRefusedError('unauthorized!')
        logger.debug(f'Client connected: {session["user"]}')

    @conf.socketio.on('disconnect')
    def handle_disconnect():
        logger.debug('Client disconnected')

    return conf.socketio

Now, we can emit an event from outside the Flask application.

from lib.ws import emit_event, setup_ws
from settings import REDIS_HOST

setup_ws(redis_host=REDIS_HOST)
emit_event('message', 'Hi')

The application needs a Redis server. I set up the server using docker.

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Source code available in my github.

Python Flask and OAuth2: Building a Secure Authentication System

I typically use Flask for APIs and Django for web applications that utilize sessions and OAuth authentication. However, do I truly need Django for these functionalities? The answer is no. While Django provides pre-built components, similar capabilities are also accessible in Flask, and implementing them is quite straightforward. Additionally, I am a strong advocate of microframeworks. Today, we’ll demonstrate how to employ OAuth2 authentication using Flask. Let’s begin.

OAuth2 encompasses various flows, but today, we’ll focus on the most common one for web applications. The concept involves checking for a valid session. If one exists, great, but if not, the application will generate a session with a state (a randomly generated string) and then redirect to the OAuth2 server login page. Subsequently, the user will perform the login on the login server. Following that, the OAuth2 server will redirect to a validated callback URL with an authorization code (while also returning the provided state). The callback URL will then verify whether the state provided by the OAuth2 server matches the one in the session. Next, the callback route on your server, utilizing the authorization code, will obtain an access token (via a POST request to the OAuth2 server). With this access token, you can retrieve user information from the OAuth2 server and establish a valid session.

First we create a Flask application with sessions

from flask import Flask
from flask_session import Session

from settings import SECRET, SESSION


app = Flask(__name__)
app.secret_key = SECRET
app.config.update(SESSION)
Session(app)

Session configuration:

SESSION = dict(
    SESSION_PERMANENT=False,
    SESSION_TYPE="filesystem",
    SESSION_COOKIE_SECURE=True,
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE='Lax',
    SESSION_COOKIE_DOMAIN=False,
)

I like to use blueprints to manage the Flask, so let’s add our application:

from modules.home.app import blueprint as home

...
app.register_blueprint(home, url_prefix=f'/')

I set up the blueprint in a init.py file

from pathlib import Path

from flask import Blueprint

from lib.oauth import check_session

base = Path(__file__).resolve().parent
blueprint = Blueprint(
    'front_home', __name__,
    template_folder=base.joinpath('templates')
)


@blueprint.before_request
def auth():
    return check_session()

You can see that we’re using a before_request middleware to check the session in every route of the blueprint.

def check_session():
    if not session.get("user"):
        state = secrets.token_urlsafe(32)
        session['state'] = state
        authorize = OAUTH['AUTHORIZE_URL']
        query_string = urlencode({
            'scope': OAUTH.get('SCOPE', 'read write'),
            'prompt': OAUTH.get('PROMPT', 'login'),
            'approval_prompt': OAUTH.get('APPROVAL_PROMPT', 'auto'),
            'state': state,
            'response_type': OAUTH.get('RESPONSE_TYPE', 'code'),
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID']
        })
        return redirect(f"{authorize}?{query_string}")

And the routes of the blueprint:

from flask import render_template, session

from modules.home import blueprint


@blueprint.get(f"/")
def home():
    username = session['user']['username']
    return render_template('index.html',
                           username=username)

To do the login we need also to code our callback route. We will add a blueprint for that.

from lib.oauth import blueprint as oauth

...
app.register_blueprint(oauth)

That’s the OAuth2 callback:

import logging

import requests
from flask import Blueprint, abort
from flask import request, session, redirect

from settings import OAUTH

logger = logging.getLogger(__name__)

blueprint = Blueprint('oauth', __name__, url_prefix=f'/oauth')


@blueprint.get('/callback')
def callback():
    # Obtain the state from the request
    state = request.args.get('state')
    if 'state' not in session:
        return redirect(f"/")
    # Check if provided state match wht the session saved one
    if state == session['state']:
        # Obtain the authorization code from the request
        authorization_code = request.args.get('code')
        token_data = {
            'grant_type': OAUTH.get('GRANT_TYPE', 'authorization_code'),
            'code': authorization_code,
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID'],
            'client_secret': OAUTH['CLIENT_SECRET']
        }
        # POST to OAuth2 server to obtain the access_token
        response = requests.post(OAUTH['TOKEN_URL'],
                                 data=token_data,
                                 headers={'Accept': 'application/json'})
        response_data = response.json()
        headers = {
            "Authorization": f"Bearer {response_data.get('access_token')}",
            'Accept': 'application/json'
        }
        # With the access_token you can obtain the user information
        user_response = requests.get(OAUTH['USER_URL'],
                                     data=token_data,
                                     headers=headers)
        if user_response.ok:
            # Now you are able to create the session 
            user_data = user_response.json()
            session['user'] = dict(
                username=user_data['login'],
                name=user_data['name'],
                email=user_data['email']
            )
            session.pop('state', default=None)
        else:
            abort(401)
        return redirect(f"/")
    else:
        abort(401)

Mainly that’s all. In this example we’re using Github’s OAuth2 server. You can use different ones, and also with your own OAuth2 server. Maybe, depending on the server, they way to obtain the user_data, can be different, and you should adapt it to your needs.

In my example I’m saving my OAuth2 credentials in a .env file. With this technique I can use different configurations depending on my environment (production, staging, …)

CLIENT_ID=my_client_id
CLIENT_SECRET=my_client_secret
TOKEN_URL=https://github.com/login/oauth/access_token
AUTHORIZE_URL=https://github.com/login/oauth/authorize
USER_URL=https://api.github.com/user
REDIRECT_URL=http://localhost:5000/oauth/callback

And I load this conf in my settings.py

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

OAUTH = dict(
    CLIENT_ID=os.getenv('CLIENT_ID'),
    CLIENT_SECRET=os.getenv('CLIENT_SECRET'),
    TOKEN_URL=os.getenv('TOKEN_URL'),
    AUTHORIZE_URL=os.getenv('AUTHORIZE_URL'),
    USER_URL=os.getenv('USER_URL'),
    REDIRECT_URL=os.getenv('REDIRECT_URL'),
)

And that’s all. Full code in my github account.

Domain Events in Legacy Applications using Python and PostgreSQL

Sometimes we need to generate domain events in our application. It can be simple when you start an application from scratch, but it can be a nightmare when you have a legacy application. Today we're going to explain how to generate domain events from PostgreSQL. We can set up triggers within our database tables to generate domain events. In those triggers, we can use pg_notify to emit events and after that, we can use a listener to consume those events. This approach works, but we need to set up those triggers in every table that we want to generate events from. Today we're going to use logical replication to generate domain events. With this approach, we can generate events from all tables in our database without the need to set up triggers in every table.

First of all, we need to create a publication. A publication is a set of tables that we want to replicate. We can create a publication with the following SQL command:
CREATE PUBLICATION pub1 FOR ALL TABLES;
SELECT pg_create_logical_replication_slot('slot1', 'pgoutput');
When we create our replication slot, we can choose between different plugins. In this case, we're going to use pgoutput. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. pgoutput is a plugin that is included in PostgreSQL by default that sends the information in bytea format. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. We're going to create the subscription in Python. We can do a simple subscription with the following code:
import psycopg2.extras

from settings import DSN


conn = psycopg2.connect(DSN, connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()

cur.start_replication(
    slot_name='slot1', 
    decode=False,
    options={'proto_version': '1', 'publication_names': 'pub1'})


def consume(msg):
    payload = msg.payload
    print(payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)


cur.consume_stream(consume)
With this simple script, we're listening to all changes in our database. We can use this script to generate domain events in our application. We need to decode the payload to get the changes in our tables. There is a library to decode the payload called pypgoutput. I have had problems with this library, so I have used only one part of the library to decode the payload (decoders.py).

The main script is like this:
import logging

from lib.consumer import Consumer
from lib.models import Types, Event
from settings import DSN, PUBLICATION_NAME, SLOT_NAME

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(event: Event):
    logger.info(
        f"{event.ts} id:{event.tx_id} [{event.type}] "
        f"{event.schema_name}.{event.table_name} with values {event.values}")


consumer = Consumer(DSN)
consumer.on(Types.UPDATE, 'public.*', callback)

consumer.start(
    slot_name=SLOT_NAME,
    publication_name=PUBLICATION_NAME)
For my example, I am using a database with the following schema:
CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);
It is important to activate the logical replication in the database. We can do it with the following command:
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
ALTER SYSTEM SET max_worker_processes = 10;
And I am registering an event on every update of the actors table with the callback function. The callback function is called with the event that contains the type of the event, the schema name, the table name, and the values of the row that has been updated. In callback function we can do wathever we want with the event. In this case, I am just logging the event, but maybe you can emit this event to a message broker such as Kafka, RabbitMQ or a MQTT broker.

That is the main notification part of the script. The other part is the conversion of the values of the row. The values are in bytea format, so we need to convert them to Python types. The conversion is done with the following function:
def convert_value(oid, value):
    if value is None:
        return None
    python_type = OID_MAP.get(oid, str)
    try:
        if python_type == bool:
            return value == 't'
        elif python_type == datetime:
            return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
        elif python_type == date:
            return datetime.strptime(value, '%Y-%m-%d').date()
        elif python_type == dict:
            import json
            return json.loads(value)
        elif python_type == uuid.UUID:
            return uuid.UUID(value)
        else:
            return python_type(value)
    except Exception as e:
        logger.error(f"Error converting {value} with OID {oid}: {e}")
        return value


def get_event(message_type, rel, tx, payload) -> Event | None:
    current_type = Types(message_type)
    decoder_map = {
        Types.INSERT: decoders.Insert,
        Types.UPDATE: decoders.Update,
        Types.DELETE: decoders.Delete,
        Types.TRUNCATE: decoders.Truncate
    }
    data = decoder_map.get(current_type, lambda x: None)(payload)

    if data:
        if current_type == Types.TRUNCATE:
            fields = []
        else:
            fields = get_fields(rel, getattr(data, 'old_tuple', None), getattr(data, 'new_tuple', None))

        event = Event(
            type=current_type,
            tx_id=tx.tx_id,
            ts=tx.commit_ts,
            schema_name=rel.namespace,
            table_name=rel.relation_name,
            values=fields
        )
        return event
    return None


def get_fields(rel, old, new):
    fields = [
        Field(
            name=c.name,
            old=convert_value(c.type_id, old.column_data[i].col_data) if old else None,
            new=convert_value(c.type_id, new.column_data[i].col_data) if new else None,
            pkey=c.part_of_pkey == 1
        )
        for i, c in enumerate(rel.columns)
    ]
    return fields
When a client is connected we can see it using a simple query:
SELECT * FROM pg_stat_replication;
And also we can see the replication slots with the following query:
SELECT
    pg_current_wal_lsn() AS current_lsn,
    slot_name,
    restart_lsn,
    confirmed_flush_lsn
FROM
    pg_replication_slots
WHERE
    slot_type = 'logical';
The script is just an experiment. Maybe it can be adapted to a real application, but probably it needs more work, especially in data type conversion.

In conclusion, using logical replication to generate domain events from PostgreSQL offers a powerful and flexible approach, especially for legacy systems or applications where modifying the existing database structure is challenging. This method allows us to capture changes across all tables without the need for individual triggers, potentially simplifying event sourcing and change data capture processes. However, it's important to note that this approach comes with its own set of considerations, such as performance impact on the database, handling of large volumes of events, and ensuring data consistency. As with any experimental technique, thorough testing and careful consideration of your specific use case are crucial before implementing this in a production environment. Despite these challenges, the potential for creating a robust, database-driven event system makes this an exciting area for further exploration and development.

Full source code in my github account

Transforming Natural Language to SQL Queries with Python and LangChain

LLMs are highly proficient at generating code, including SQL queries from natural language text. Today, we’re going to experiment with this capability to see how effectively we can transform natural language instructions into SQL queries. The idea is to leverage the power of natural language processing to simplify the process of writing complex SQL statements. For this experiment, I’ve downloaded a CSV file containing data from IMDB, which includes various attributes related to movies, such as titles, release years, genres, and ratings. By using this dataset, we can test the LLM’s ability to generate accurate and efficient SQL queries based on different natural language prompts. Here’s an example of what the data looks like:

nconst,primaryname,birthyear,deathyear,primaryprofession,knownfortitles
nm0325022,Käthe Gold,1907,1997,"actress,archive_footage","tt0026069,tt0032498,tt0436641,tt0026066"
nm0325025,Lee Gold,1919,1985,writer,"tt0034433,tt0040392,tt0048226,tt0099219"
nm0325028,Louise Gold,1956,,"actress,miscellaneous,soundtrack","tt0074028,tt0104940,tt0083791,tt2281587"
...

Now, we will create a PostgreSQL database using Docker. Docker allows us to quickly set up and manage containerized applications, making it an ideal tool for this purpose. Below is the Dockerfile we will use to set up our PostgreSQL database:

FROM postgres:16.3-alpine
COPY actors.csv /docker-entrypoint-initdb.d/actors.csv
COPY init.sql /docker-entrypoint-initdb.d/

Next, we will set up the database and import the CSV data into an ‘actors’ table using the Docker entrypoint. Below is how we configure the Docker entrypoint script to initialize the PostgreSQL database and import the CSV data:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

COPY actors FROM '/docker-entrypoint-initdb.d/actors.csv' CSV HEADER;

That’s the docker-compose file to set up the PostgreSQL database

version: '3.6'

services:
  pg:
    build:
      context: .docker/pg
      dockerfile: Dockerfile
    ports:
      - 5432:5432
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

Now we can start with the python script. We’re going to use cick library to build cli scrpt. The python application interacts with a database to execute SQL queries generated from user input. The process begins with obtaining a MovieChain object through the get_chain function, which takes an argument llm. This MovieChain object is then used to generate an SQL query based on the user’s input q through its get_sql method. After that we just execute the SQL query into the PostgreSQL and print the results.

import click
from dbutils import get_conn, Db, get_cursor
from lib.chains.movie import get_chain
from lib.llm.groq import llm
from settings import DSN


@click.command()
@click.option('--q', required=True, help='question to ask')
def run(q):
    chain = get_chain(llm)
    sql = chain.get_sql(q)
    click.echo(f"q: {q}")
    click.echo(sql)
    click.echo('')
    if sql:
        conn = get_conn(DSN, named=True, autocommit=True)
        db = Db(get_cursor(conn=conn))
        data = db.fetch_all(sql)
        for row in data:
            print(row)

The MovieChain class interacts with an LLM (in this example, we’re using Groq).

import logging
from langchain_core.messages import SystemMessage, HumanMessage

from .prompts import PROMPT

logger = logging.getLogger(__name__)


class MovieChain:

    def __init__(self, llm):
        self.llm = llm

        self.prompt = SystemMessage(content=PROMPT)

    def get_sql(self, q: str):
        user_message = HumanMessage(content=q)
        try:
            ai_msg = self.llm.invoke([self.prompt, user_message])
            output_message = ai_msg.content if not isinstance(ai_msg, str) else ai_msg

            return output_message
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

The Chain uses two prompts: the system prompt that creates the proper context to assist the LLM in generating the SQL query. We’re providing the create table script.

PROMPT = """
You are an expert in generating SQL queries based on user questions.
You have access to a database with the following table schema:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

Please generate an SQL query to answer the following user question.
Ensure the query is valid, secure, and tailored to the provided schema.
Return only the SQL query without additional explanations.
Don't use quotes around the query in any case.
"""

And that’s all. With it we can ask quetions about this dataset and llm genetes the SQL for us.

python cli.py movie --q="List the living actors under 10 years old."

q: List the living actors under 10 years old.
SELECT * FROM actors WHERE deathyear IS NULL AND birthyear > (EXTRACT(YEAR FROM CURRENT_DATE) - 10);
...
python cli.py movie --q="List the living actors who were born in the same year as Mel Gibson."

q: List the living actors who were born in the same year as Mel Gibson
SELECT * FROM actors WHERE birthyear = (SELECT birthyear FROM actors WHERE primaryname = 'Mel Gibson') AND deathyear IS NULL;
...
cli.py movie --q="List the deceased actors who were born in the same year as Mel Gibson."

q: List the deceased actors who were born in the same year as Mel Gibson.
SELECT * 
FROM actors 
WHERE deathyear IS NOT NULL 
AND birthyear = (SELECT birthyear 
                 FROM actors 
                 WHERE primaryname = 'Mel Gibson');
...
python cli.py movie --q="What is the name, date of birth, and age of the oldest living actor born in the 70s?"

q: What is the name, date of birth, and age of the oldest living actor born in the 70s?
SELECT primaryname, birthyear, (2023 - birthyear) AS age 
FROM actors 
WHERE birthyear >= 1970 AND birthyear < 1980 AND deathyear IS NULL 
ORDER BY birthyear ASC 
LIMIT 1;

{'primaryname': 'Missy Gold', 'birthyear': 1970, 'age': 53}

With projects like these, where we execute “random” SQL generated by an LLM, it’s crucial to manage user access to the database carefully. Restricting access helps mitigate potential SQL injection risks, especially depending on the prompts provided by the user when interacting with the LLM.

Full source code in my github account.

Leveraging AI to perform Code Audits based on Git Commit Diff with Python and LangChain

A few days ago, I came across a project on GitHub by my friend Jordi called Commitia. Commitia is a simple command line tool that helps you write commit messages. It works by passing a git diff to an LLM model, which then suggests a commit message based on the diff. I liked the idea of using LLM models to assist in the development process by interacting with git diffs. So, I decided to create a similar tool using Python and LangChain, just for practice. I’ll use Click to create the command line interface and LangChain to interact with the LLM model. I’ll use Azure LLM, but any LLM model that supports custom functions, like Groq LLM or OpenAI, can be used.

import click

from lib.chains.git import get_chain
from lib.git_utils import get_current_diff
from lib.llm.azure import llm
from settings import BASE_DIR


@click.command()
def run():
    current_diff = get_current_diff(BASE_DIR / '..')
    chain = get_chain(llm)
    ia_response = chain.get_commit_message(current_diff)
    click.echo(ia_response)

That’s the chain

import logging

from langchain_core.messages import SystemMessage, HumanMessage

from lib.models import DiffData
from .prompts import PROMPT_COMMIT_MESSAGE

logger = logging.getLogger(__name__)


class GitChain:

    def __init__(self, llm):
        self.llm = llm
        self.prompt_commit_message = SystemMessage(content=PROMPT_COMMIT_MESSAGE)

    @staticmethod
    def _get_diff_content(diff: DiffData):
        return "\n".join((d.diff for d in diff.diffs))

    def get_commit_message(self, diff: DiffData):
        try:
            user_message = HumanMessage(content=self._get_diff_content(diff))
            messages = [self.prompt_commit_message, user_message]
            ai_msg = self.llm.invoke(messages)
            return ai_msg if isinstance(ai_msg, str) else ai_msg.content
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

I’m going to use the same prompt that Jordi uses in Commitia.

PROMPT_COMMIT_MESSAGE = """
You are an assistant to write the commit message.
The user will send you the content of the commit diff, and you will reply with the commit message.
It must be a commit message of one single line. Be concise, just write the message, do not give any explanation.
"""

To obtain the git diff I’m going to use gitpython library.

from git.repo import Repo

from .models import Diff, DiffData, Status

def _get_file_mode(diff):
    if diff.new_file:
        return Status.CREATED
    elif diff.deleted_file:
        return Status.DELETED
    elif diff.copied_file:
        return Status.COPIED
    else:
        return Status.MODIFIED


    
def _build_diff_data(commit, diffs) -> DiffData:
    return DiffData(
        user=str(commit.author),
        date=commit.committed_datetime,
        diffs=[Diff(
            diff=str(diff),
            path=diff.b_path if diff.new_file else diff.a_path,
            status=_get_file_mode(diff)
        ) for diff in diffs]
    )

def get_current_diff(repo_path) -> DiffData:
    repo = Repo(repo_path)
    commit = repo.head.commit
    diffs = commit.diff(None, create_patch=True)

    return _build_diff_data(commit, diffs)

I’m using the following models to represent the data.

from datetime import datetime
from enum import Enum
from typing import List

from pydantic import BaseModel


class Status(str, Enum):
    CREATED = 'C'
    MODIFIED = 'M'
    DELETED = 'D'
    COPIED = 'C'


class Diff(BaseModel):
    diff: str
    path: str
    status: Status


class DiffData(BaseModel):
    user: str
    date: datetime
    diffs: List[Diff]

After this experiment cloning Commitia I’m going to do another experiment. Now the idea is create a code review of based on the git diff. I’m going to use the same structure of the previous experiment. I can only need to change the prompt.

PROMPT_CODE_AUDIT = """
You are experience developer and need to perform a code review of a git diff.
You should identify potential errors, provide suggestions for improvement,
and highlight best practices in the provided code.

You should provide a global score of the code quality if you detect any issue based on the following criteria:
- NONE: 0.0
- LOW: Between 0.1 and 3.9
- MEDIUM: Between 4.0 and 6.9
- HIGH: Between 7.0 and 8.9
- CRITICAL Between 9.0 and 10.0

Your output should use the following template:
### Score
Global score of risks: [NONE, LOW, MEDIUM, HIGH, CRITICAL]

### Diff Explanation
First you must provide a brief explanation of the diff in a single line. Be concise and do not give any explanation.
Then you should provide a detailed explanation of the changes made in the diff.

### Audit summary
Detailed explanation of the identified gaps and their potential impact, if there are any significant findings.
"""

As you can see, I’m using a prompt to analyze the code and provide a score for its quality. I also want to perform actions based on the score, such as taking specific measures if the score is critical. To achieve this, we need to use a custom function. Therefore, we need an LLM model that supports calling custom functions. I added the audit_diff method to the chain to handle this.

import logging
from enum import Enum

from langchain_core.messages import SystemMessage, HumanMessage

from lib.models import DiffData
from .prompts import PROMPT_CODE_AUDIT, PROMPT_COMMIT_MESSAGE

logger = logging.getLogger(__name__)


class Score(int, Enum):
    NONE = 1
    LOW = 2
    MEDIUM = 3
    HIGH = 4
    CRITICAL = 5


class GitChain:

    def __init__(self, llm, tools):
        self.llm = llm
        if hasattr(llm, 'bind_tools'):
            self.llm_with_tools = llm.bind_tools(list(tools.values()))
        else:
            logger.info("LLM does not support bind_tools method")
            self.llm_with_tools = llm
        self.prompt_code_audit = SystemMessage(content=PROMPT_CODE_AUDIT)
        self.prompt_commit_message = SystemMessage(content=PROMPT_COMMIT_MESSAGE)
        self.tools = tools

    @staticmethod
    def _get_diff_content(diff: DiffData):
        return "\n".join((d.diff for d in diff.diffs))

    def _get_status_from_message(self, message) -> Score | None:
        ai_msg = self.llm_with_tools.invoke([HumanMessage(content=message)])
        return self._get_tool_output(ai_msg)

    def get_commit_message(self, diff: DiffData):
        try:
            user_message = HumanMessage(content=self._get_diff_content(diff))
            messages = [self.prompt_commit_message, user_message]
            ai_msg = self.llm.invoke(messages)
            return ai_msg if isinstance(ai_msg, str) else ai_msg.content
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

    def audit_diff(self, diff: DiffData):
        user_message = HumanMessage(content=self._get_diff_content(diff))
        try:
            ai_msg = self.llm.invoke([self.prompt_code_audit, user_message])
            output_message = ai_msg.content if not isinstance(ai_msg, str) else ai_msg

            return self._get_status_from_message(output_message), output_message
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

    def _get_tool_output(self, ai_msg):
        status = None
        for tool_call in ai_msg.tool_calls:
            tool_output = self.tools[tool_call["name"]].invoke(tool_call["args"])
            logger.info(f"Tool: '{tool_call['name']}'")
            status = tool_output
        return status

The audit_diff method takes a DiffData object as an argument, which represents the differences in the code that need to be audited. The first line inside the method creates a HumanMessage object from the content of the DiffData object by calling the _get_diff_content method, which combines all the diffs into a single string. Next, the method invokes the LLM with a system message prompt for code auditing and the human message. The LLM’s response is stored in ai_msg. If ai_msg is a string, it is used as is; otherwise, the content of ai_msg is used. The method then calls _get_status_from_message with output_message as the argument. This method invokes the LLM with tools using the output_message as input and gets the tool output. The method returns the tool output status and the output_message. In summary, the audit_diff method audits code differences using a Language Learning Model and a set of tools, and returns the audit status and the AI message content.

Now I can invoke the chain to audit the code and print the results. Also, I can use the score to perform an action.

import click
from rich.console import Console
from rich.markdown import Markdown

from lib.chains.git import get_chain
from lib.chains.git.chain import Score
from lib.git_utils import get_current_diff
from lib.llm.azure import llm
from settings import BASE_DIR


@click.command()
def run():
    current_diff = get_current_diff(BASE_DIR / '..')
    chain = get_chain(llm)
    status, ia_response = chain.audit_diff(current_diff)

    click.echo('Audit results:')
    click.echo('--------------')
    click.echo(f"{current_diff.user} at {current_diff.date} made the following changes at:")
    click.echo('')
    click.echo('Affected files:')
    for diff in current_diff.diffs:
        print(f"[{diff.status.value}] {diff.path}")
    click.echo('')

    console = Console()
    console.print(Markdown(ia_response))

    click.echo('')
    if status == Score.CRITICAL:
        click.echo('[WARNING] Critical issues found.')
    else:
        click.echo('No critical issues found.')

In conclusion, integrating AI into the development workflow can significantly enhance productivity and code quality. By using tools like LangChain and LLM models, we can automate the generation of commit messages and perform detailed code audits based on git diffs. This not only saves time but also ensures consistency and accuracy in commit messages and code reviews. As we continue to explore and implement these AI-driven solutions, we open up new possibilities for more efficient and effective software development practices. It is not magic, it is just code.

Full source code in my github.

Integrating AI Models with Function Calls using Python and LangChain

Today we’ll explore the integration of AI models with function calls using Python and LangChain. This example displays how to leverage LangChain for orchestrating AI and natural language processing tasks. In this example we´ll integrate AI models seamlessly with custom functions. While the functions used here are straightforward examples, such as basic arithmetic operations, they illustrate the foundational concepts applicable to more complex scenarios, such as invoking external APIs or more complicated processing pipelines. We need a LLM model with function calling capabilities (not all models allow us to call custom functions). For this example, we’re going to use Groq llm which has a public api (free) with function calling support. So, we need to obtain an api key here.

That’s the main script. It only obtains the chain with our llm instance.


import logging

from lib.chains.math_chain.chain import get_chain
from lib.llm.groq import llm

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == "__main__":
    chain = get_chain(llm)

    user_prompts = [
        "How much is five times twelve?",
        "How much is five plus twelve?",
        "How much is twelve minus five?",
    ]

    for prompt in user_prompts:
        responses = chain.ask_question(prompt)
        for response in responses:
            print(f"Q: {prompt} R:{response}")

That’s the chain.

import logging

from langchain_core.messages import SystemMessage, HumanMessage

from .tools import tools

logger = logging.getLogger(__name__)


def get_chain(llm):
    return CustomMathChain(llm, tools)


class CustomMathChain:
    system_prompt_content = """
        You are a model that has various mathematical functions.
        You can only respond to questions related to functions that you know.
        """

    def __init__(self, llm, tools):
        self.llm_with_tools = llm.bind_tools(list(tools.values()))
        self.system_message = SystemMessage(content=self.system_prompt_content)
        self.tools = tools

    def ask_question(self, user_prompt):
        responses = []
        try:
            user_message = HumanMessage(content=user_prompt)
            messages = [self.system_message, user_message]
            ai_msg = self.llm_with_tools.invoke(messages)

            for tool_call in ai_msg.tool_calls:
                tool_output = self.tools[tool_call["name"]].invoke(tool_call["args"])
                logger.info(f"Tool: '{tool_call['name']}' called output: {tool_output}")
                responses.append(tool_output)

            return responses
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

This custom chain utilizes functions defined here, employing the @tool decorator. It is crucial to properly define input and output variables and provide thorough documentation for our tools. AI leverages this information to determine the appropriate function call for each scenario. Various methods exist for defining our tools; here, I’ve opted for the simplest approach. For more detailed guidance on defining custom functions, refer to this resource.

from langchain_core.tools import tool


@tool
def ia_sum(a: int, b: int) -> int:
    """ Return the sum of `a` and `b` """
    return a + b


@tool
def ia_diff(a: int, b: int) -> int:
    """ Return the difference of `a` and `b` """
    return a - b


@tool
def ia_multiply(a: int, b: int) -> int:
    """ Return the product of `a` and `b` """
    return a * b


tools = {
    "ia_sum": ia_sum,
    "ia_diff": ia_diff,
    "ia_multiply": ia_multiply
}

And that’s all! Working with our custom functions is quite straightforward. As mentioned earlier, we’re using very simple functions (add, diff, and multiply). In reality, we don’t need an LLM or AI to perform these arithmetic operations. However, imagine integrating real-world functions that access APIs and your business models. AI can handle natural language processing to interpret user input and identify the correct function to execute the task.

Source code in my github account.

Building a local Plato expert AI with LLaMA3 and LangChain

Today, I’m delving into the realm of AI. My aim is to construct an AI capable of engaging in conversation about a specific document. For this experiment, I’ve chosen Plato’s ‘Apology of Socrates.’ My goal is to develop an expert on this text, allowing me to pose questions and receive insightful responses. Let’s dive in.

First, I need a LLaMA3 model locally on my computer (MBP M2 24GB). To do that we can use Ollama. It’s pretty straightforward to do that on Mac. Just follow the instructions, do

brew install ollama

and that’s all. We can start the server.

ollama start

Now we need the model. We’re going LLaMA3. A 4.7 GB model that we can download using:

ollama pull llama3

And that’s all. Our server is up and running ready to receive requests. Now we’re going to create our script. We can
use simple HTTP requests to interact with Ollama using postman, for example, but it’s simpler to use a framework
to handle the communications. We’re going to use [LangChain](https://www.langchain.com/).

IAs models has a limitation of the number of tokens that we can use as I/O parameters. Apology of Socrates is a book. Not excessively big but big enough to overcome this limit so, we need to split it in chucks. Also, we need to convert those chunks into a vector store to be able the model to understand it. LangChain provides us document loaders to read the document and to create this vector store. In my example I’m using an Apology of Socrates in txt, so I’m going to use a TextLoader, but there are different loaders for PDFs, S3, Dataframes and much more things available in LangChain SDK. With this function I obtain the vector store from a path.

import logging

from langchain_community.document_loaders import TextLoader
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter

logger = logging.getLogger(__name__)


def get_vector_store_from_path(file_path):
    loader = TextLoader(file_path)
    data = loader.load()

    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=100)
    all_splits = text_splitter.split_documents(data)

    logger.info(f"Text divided in {len(all_splits)} splits")
    return Chroma.from_documents(
        documents=all_splits, embedding=GPT4AllEmbeddings()
    )

Now we need a chain to ask question to oru model. With this function I obtain my chain.

import logging

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

logger = logging.getLogger(__name__)


def get_chain(template, vector_store, llm):
    prompt = ChatPromptTemplate.from_template(template)
    output_parser = StrOutputParser()

    setup_and_retrieval = RunnableParallel(dict(
        context=vector_store.as_retriever(),
        question=RunnablePassthrough(),
    ))
    return setup_and_retrieval | prompt | llm | output_parser

I’m using an Ollama llm model, running locally on my computer as I explain before. LangChain allows us to use
different llm models (Azure, OpenAI,…). We can use those models if we’ve an account (they aren’t for free)

from langchain_community.llms.ollama import Ollama
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler
import logging
from settings import OLLAMA_MODEL

logger = logging.getLogger(__name__)

llm = Ollama(
    model=OLLAMA_MODEL,
    verbose=True,
    callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]),
)
logger.info(f"Model {OLLAMA_MODEL} loaded")

With those functions I can build finally my script. As you can see, I prepare a template telling to llm what I want and the set of questions I’m going to ask the model. Our main function will first fetch the vector store (it takes several seconds). After that will load the llm from the chain (takes time also). Then we iterate between questions and print the llm’s answer in the terminal.

import logging

from lib.llm.ollama import llm
from lib.utils import get_chain, get_vector_store_from_path
from settings import DOCUMENT_PATH

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def ask_question(chain, question):
    logger.info(f"QUESTION: {question}")
    response = chain.invoke(question)

    print(response)


def main(template, path, questions):
    vector_store = get_vector_store_from_path(path)
    chain = get_chain(
        template=template,
        vector_store=vector_store,
        llm=llm)
    for question in questions:
        ask_question(
            chain=chain,
            question=question
        )


if __name__ == "__main__":
    template = """
        Answer the question based only on the context I give you.
        Answer using quotes from the text.

        Context: {context}
        Question: {question}
        """
    questions = (
        "What are the general ideas of the text?"
        "What is Socrates' position regarding his imminent condemnation?"
        "Can you list the protagonists of the plot?"
    )
    main(template=template, path=DOCUMENT_PATH, questions=questions)

And that’s all. We have a Plato expert to chat with about one specific context (in this case Apology of Socrates). However, for a production-grade project, it’s crucial to store our vector data in a database to avoid repetitive generation.

Note: In my example the questions, template and Plato’s book is in Spanish. Plato’s book public domain. Source code available on my github.

Connecting Grafana to PostgreSQL with Python for Time Series Analysis

In the world of data analysis and graphs, we have three important tools: Grafana, PostgreSQL, and Python. They work together to help us look at data and track how it changes over time. In this article, we’ll learn step by step how to use Grafana with a PostgreSQL database. We’ll also discover how to use Python to record data that changes over time. By the end of this article, you’ll know how to set up these tools, and you’ll see how they can be useful for your work with data.

First, we create our table. We also create a sequence for the primary key.

CREATE TABLE MEASUREMENTLOG
(
    id              numeric(10)            NOT NULL,
    key             character varying(100) NOT NULL,
    datetime        TIMESTAMP WITHOUT TIME ZONE NOT NULL,
    status          numeric(2) NOT NULL,

    CONSTRAINT MEASUREMENTLOG_pkey PRIMARY KEY (id)
);

create sequence SEQ_MEASUREMENTLOG
    minvalue 0
    maxvalue 999999999999999999
    start with 1
    increment by 1
    cache 1;

And a simple python script to persists a timeseries.

from random import randint
from time import sleep
from datetime import datetime
import os
import logging
import pytz

from dbutils import transactional, get_conn

from settings import DSN

tz = pytz.timezone('Europe/Madrid')

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def persists(key, dt, status):
    with transactional(conn=get_conn(DSN)) as db:
        seq_log = db.fetch_all("SELECT nextval('seq_measurementlog')")[0][0]
        db.insert('measurementlog', dict(
            id=seq_log,
            key=key,
            datetime=dt,
            status=status
        ))


KEY = os.getenv('KEY')
status = 0
while True:
    now = datetime.now(tz)
    persists(
        key=KEY,
        dt=now,
        status=status
    )
    logger.info(f"[{now}] status: {status}")
    status = 1 if status == 0 else 0
    sleep(randint(5, 15))

Now we set up PostgreSQL database and Grafana in a docker-compose.yml.

More information about the configuration of postgresql and grafana here in the links

version: '3'

services:
  pg:
    image: pg
    restart: unless-stopped
    build:
      context: .docker/pg/
      dockerfile: Dockerfile
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGTZ: ${TIMEZONE}
      PGDATA: /var/lib/postgresql/data/pgdata
    ports:
      - "5432:5432"
  grafana:
    image: grafana
    restart: unless-stopped
    build:
      context: .docker/grafana/
      dockerfile: Dockerfile
    environment:
      - GF_TIMEZONE=${TIMEZONE}
      - GF_SECURITY_ADMIN_USER=${GF_SECURITY_ADMIN_USER}
      - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD}
      - GF_USERS_DEFAULT_THEME=${GF_USERS_DEFAULT_THEME}
      - GF_USERS_ALLOW_SIGN_UP=${GF_USERS_ALLOW_SIGN_UP}
      - GF_USERS_ALLOW_ORG_CREATE=${GF_USERS_ALLOW_ORG_CREATE}
      - GF_AUTH_ANONYMOUS_ENABLED=${GF_AUTH_ANONYMOUS_ENABLED}
    ports:
      - "3000:3000"
    depends_on:
      - pg

We run the stack, Connect the grafana at port 3000 and configure the datasource

After that we can create the dashboard

We are using this query

SELECT 
    datetime, 
    status 
FROM 
    measurementlog 
WHERE 
    key = 'id1' and 
    $__timeFilter(datetime)

And that’s the dashboard

Proyect available in my github account.