Domain Events in Legacy Applications using Python and PostgreSQL

Sometimes we need to generate domain events in our application. It can be simple when you start an application from scratch, but it can be a nightmare when you have a legacy application. Today we're going to explain how to generate domain events from PostgreSQL. We can set up triggers within our database tables to generate domain events. In those triggers, we can use pg_notify to emit events and after that, we can use a listener to consume those events. This approach works, but we need to set up those triggers in every table that we want to generate events from. Today we're going to use logical replication to generate domain events. With this approach, we can generate events from all tables in our database without the need to set up triggers in every table.

First of all, we need to create a publication. A publication is a set of tables that we want to replicate. We can create a publication with the following SQL command:
CREATE PUBLICATION pub1 FOR ALL TABLES;
SELECT pg_create_logical_replication_slot('slot1', 'pgoutput');
When we create our replication slot, we can choose between different plugins. In this case, we're going to use pgoutput. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. pgoutput is a plugin that is included in PostgreSQL by default that sends the information in bytea format. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. We're going to create the subscription in Python. We can do a simple subscription with the following code:
import psycopg2.extras

from settings import DSN


conn = psycopg2.connect(DSN, connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()

cur.start_replication(
    slot_name='slot1', 
    decode=False,
    options={'proto_version': '1', 'publication_names': 'pub1'})


def consume(msg):
    payload = msg.payload
    print(payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)


cur.consume_stream(consume)
With this simple script, we're listening to all changes in our database. We can use this script to generate domain events in our application. We need to decode the payload to get the changes in our tables. There is a library to decode the payload called pypgoutput. I have had problems with this library, so I have used only one part of the library to decode the payload (decoders.py).

The main script is like this:
import logging

from lib.consumer import Consumer
from lib.models import Types, Event
from settings import DSN, PUBLICATION_NAME, SLOT_NAME

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(event: Event):
    logger.info(
        f"{event.ts} id:{event.tx_id} [{event.type}] "
        f"{event.schema_name}.{event.table_name} with values {event.values}")


consumer = Consumer(DSN)
consumer.on(Types.UPDATE, 'public.*', callback)

consumer.start(
    slot_name=SLOT_NAME,
    publication_name=PUBLICATION_NAME)
For my example, I am using a database with the following schema:
CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);
It is important to activate the logical replication in the database. We can do it with the following command:
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
ALTER SYSTEM SET max_worker_processes = 10;
And I am registering an event on every update of the actors table with the callback function. The callback function is called with the event that contains the type of the event, the schema name, the table name, and the values of the row that has been updated. In callback function we can do wathever we want with the event. In this case, I am just logging the event, but maybe you can emit this event to a message broker such as Kafka, RabbitMQ or a MQTT broker.

That is the main notification part of the script. The other part is the conversion of the values of the row. The values are in bytea format, so we need to convert them to Python types. The conversion is done with the following function:
def convert_value(oid, value):
    if value is None:
        return None
    python_type = OID_MAP.get(oid, str)
    try:
        if python_type == bool:
            return value == 't'
        elif python_type == datetime:
            return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
        elif python_type == date:
            return datetime.strptime(value, '%Y-%m-%d').date()
        elif python_type == dict:
            import json
            return json.loads(value)
        elif python_type == uuid.UUID:
            return uuid.UUID(value)
        else:
            return python_type(value)
    except Exception as e:
        logger.error(f"Error converting {value} with OID {oid}: {e}")
        return value


def get_event(message_type, rel, tx, payload) -> Event | None:
    current_type = Types(message_type)
    decoder_map = {
        Types.INSERT: decoders.Insert,
        Types.UPDATE: decoders.Update,
        Types.DELETE: decoders.Delete,
        Types.TRUNCATE: decoders.Truncate
    }
    data = decoder_map.get(current_type, lambda x: None)(payload)

    if data:
        if current_type == Types.TRUNCATE:
            fields = []
        else:
            fields = get_fields(rel, getattr(data, 'old_tuple', None), getattr(data, 'new_tuple', None))

        event = Event(
            type=current_type,
            tx_id=tx.tx_id,
            ts=tx.commit_ts,
            schema_name=rel.namespace,
            table_name=rel.relation_name,
            values=fields
        )
        return event
    return None


def get_fields(rel, old, new):
    fields = [
        Field(
            name=c.name,
            old=convert_value(c.type_id, old.column_data[i].col_data) if old else None,
            new=convert_value(c.type_id, new.column_data[i].col_data) if new else None,
            pkey=c.part_of_pkey == 1
        )
        for i, c in enumerate(rel.columns)
    ]
    return fields
When a client is connected we can see it using a simple query:
SELECT * FROM pg_stat_replication;
And also we can see the replication slots with the following query:
SELECT
    pg_current_wal_lsn() AS current_lsn,
    slot_name,
    restart_lsn,
    confirmed_flush_lsn
FROM
    pg_replication_slots
WHERE
    slot_type = 'logical';
The script is just an experiment. Maybe it can be adapted to a real application, but probably it needs more work, especially in data type conversion.

In conclusion, using logical replication to generate domain events from PostgreSQL offers a powerful and flexible approach, especially for legacy systems or applications where modifying the existing database structure is challenging. This method allows us to capture changes across all tables without the need for individual triggers, potentially simplifying event sourcing and change data capture processes. However, it's important to note that this approach comes with its own set of considerations, such as performance impact on the database, handling of large volumes of events, and ensuring data consistency. As with any experimental technique, thorough testing and careful consideration of your specific use case are crucial before implementing this in a production environment. Despite these challenges, the potential for creating a robust, database-driven event system makes this an exciting area for further exploration and development.

Full source code in my github account

Transforming Natural Language to SQL Queries with Python and LangChain

LLMs are highly proficient at generating code, including SQL queries from natural language text. Today, we’re going to experiment with this capability to see how effectively we can transform natural language instructions into SQL queries. The idea is to leverage the power of natural language processing to simplify the process of writing complex SQL statements. For this experiment, I’ve downloaded a CSV file containing data from IMDB, which includes various attributes related to movies, such as titles, release years, genres, and ratings. By using this dataset, we can test the LLM’s ability to generate accurate and efficient SQL queries based on different natural language prompts. Here’s an example of what the data looks like:

nconst,primaryname,birthyear,deathyear,primaryprofession,knownfortitles
nm0325022,Käthe Gold,1907,1997,"actress,archive_footage","tt0026069,tt0032498,tt0436641,tt0026066"
nm0325025,Lee Gold,1919,1985,writer,"tt0034433,tt0040392,tt0048226,tt0099219"
nm0325028,Louise Gold,1956,,"actress,miscellaneous,soundtrack","tt0074028,tt0104940,tt0083791,tt2281587"
...

Now, we will create a PostgreSQL database using Docker. Docker allows us to quickly set up and manage containerized applications, making it an ideal tool for this purpose. Below is the Dockerfile we will use to set up our PostgreSQL database:

FROM postgres:16.3-alpine
COPY actors.csv /docker-entrypoint-initdb.d/actors.csv
COPY init.sql /docker-entrypoint-initdb.d/

Next, we will set up the database and import the CSV data into an ‘actors’ table using the Docker entrypoint. Below is how we configure the Docker entrypoint script to initialize the PostgreSQL database and import the CSV data:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

COPY actors FROM '/docker-entrypoint-initdb.d/actors.csv' CSV HEADER;

That’s the docker-compose file to set up the PostgreSQL database

version: '3.6'

services:
  pg:
    build:
      context: .docker/pg
      dockerfile: Dockerfile
    ports:
      - 5432:5432
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

Now we can start with the python script. We’re going to use cick library to build cli scrpt. The python application interacts with a database to execute SQL queries generated from user input. The process begins with obtaining a MovieChain object through the get_chain function, which takes an argument llm. This MovieChain object is then used to generate an SQL query based on the user’s input q through its get_sql method. After that we just execute the SQL query into the PostgreSQL and print the results.

import click
from dbutils import get_conn, Db, get_cursor
from lib.chains.movie import get_chain
from lib.llm.groq import llm
from settings import DSN


@click.command()
@click.option('--q', required=True, help='question to ask')
def run(q):
    chain = get_chain(llm)
    sql = chain.get_sql(q)
    click.echo(f"q: {q}")
    click.echo(sql)
    click.echo('')
    if sql:
        conn = get_conn(DSN, named=True, autocommit=True)
        db = Db(get_cursor(conn=conn))
        data = db.fetch_all(sql)
        for row in data:
            print(row)

The MovieChain class interacts with an LLM (in this example, we’re using Groq).

import logging
from langchain_core.messages import SystemMessage, HumanMessage

from .prompts import PROMPT

logger = logging.getLogger(__name__)


class MovieChain:

    def __init__(self, llm):
        self.llm = llm

        self.prompt = SystemMessage(content=PROMPT)

    def get_sql(self, q: str):
        user_message = HumanMessage(content=q)
        try:
            ai_msg = self.llm.invoke([self.prompt, user_message])
            output_message = ai_msg.content if not isinstance(ai_msg, str) else ai_msg

            return output_message
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

The Chain uses two prompts: the system prompt that creates the proper context to assist the LLM in generating the SQL query. We’re providing the create table script.

PROMPT = """
You are an expert in generating SQL queries based on user questions.
You have access to a database with the following table schema:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

Please generate an SQL query to answer the following user question.
Ensure the query is valid, secure, and tailored to the provided schema.
Return only the SQL query without additional explanations.
Don't use quotes around the query in any case.
"""

And that’s all. With it we can ask quetions about this dataset and llm genetes the SQL for us.

python cli.py movie --q="List the living actors under 10 years old."

q: List the living actors under 10 years old.
SELECT * FROM actors WHERE deathyear IS NULL AND birthyear > (EXTRACT(YEAR FROM CURRENT_DATE) - 10);
...
python cli.py movie --q="List the living actors who were born in the same year as Mel Gibson."

q: List the living actors who were born in the same year as Mel Gibson
SELECT * FROM actors WHERE birthyear = (SELECT birthyear FROM actors WHERE primaryname = 'Mel Gibson') AND deathyear IS NULL;
...
cli.py movie --q="List the deceased actors who were born in the same year as Mel Gibson."

q: List the deceased actors who were born in the same year as Mel Gibson.
SELECT * 
FROM actors 
WHERE deathyear IS NOT NULL 
AND birthyear = (SELECT birthyear 
                 FROM actors 
                 WHERE primaryname = 'Mel Gibson');
...
python cli.py movie --q="What is the name, date of birth, and age of the oldest living actor born in the 70s?"

q: What is the name, date of birth, and age of the oldest living actor born in the 70s?
SELECT primaryname, birthyear, (2023 - birthyear) AS age 
FROM actors 
WHERE birthyear >= 1970 AND birthyear < 1980 AND deathyear IS NULL 
ORDER BY birthyear ASC 
LIMIT 1;

{'primaryname': 'Missy Gold', 'birthyear': 1970, 'age': 53}

With projects like these, where we execute “random” SQL generated by an LLM, it’s crucial to manage user access to the database carefully. Restricting access helps mitigate potential SQL injection risks, especially depending on the prompts provided by the user when interacting with the LLM.

Full source code in my github account.

Leveraging AI to perform Code Audits based on Git Commit Diff with Python and LangChain

A few days ago, I came across a project on GitHub by my friend Jordi called Commitia. Commitia is a simple command line tool that helps you write commit messages. It works by passing a git diff to an LLM model, which then suggests a commit message based on the diff. I liked the idea of using LLM models to assist in the development process by interacting with git diffs. So, I decided to create a similar tool using Python and LangChain, just for practice. I’ll use Click to create the command line interface and LangChain to interact with the LLM model. I’ll use Azure LLM, but any LLM model that supports custom functions, like Groq LLM or OpenAI, can be used.

import click

from lib.chains.git import get_chain
from lib.git_utils import get_current_diff
from lib.llm.azure import llm
from settings import BASE_DIR


@click.command()
def run():
    current_diff = get_current_diff(BASE_DIR / '..')
    chain = get_chain(llm)
    ia_response = chain.get_commit_message(current_diff)
    click.echo(ia_response)

That’s the chain

import logging

from langchain_core.messages import SystemMessage, HumanMessage

from lib.models import DiffData
from .prompts import PROMPT_COMMIT_MESSAGE

logger = logging.getLogger(__name__)


class GitChain:

    def __init__(self, llm):
        self.llm = llm
        self.prompt_commit_message = SystemMessage(content=PROMPT_COMMIT_MESSAGE)

    @staticmethod
    def _get_diff_content(diff: DiffData):
        return "\n".join((d.diff for d in diff.diffs))

    def get_commit_message(self, diff: DiffData):
        try:
            user_message = HumanMessage(content=self._get_diff_content(diff))
            messages = [self.prompt_commit_message, user_message]
            ai_msg = self.llm.invoke(messages)
            return ai_msg if isinstance(ai_msg, str) else ai_msg.content
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

I’m going to use the same prompt that Jordi uses in Commitia.

PROMPT_COMMIT_MESSAGE = """
You are an assistant to write the commit message.
The user will send you the content of the commit diff, and you will reply with the commit message.
It must be a commit message of one single line. Be concise, just write the message, do not give any explanation.
"""

To obtain the git diff I’m going to use gitpython library.

from git.repo import Repo

from .models import Diff, DiffData, Status

def _get_file_mode(diff):
    if diff.new_file:
        return Status.CREATED
    elif diff.deleted_file:
        return Status.DELETED
    elif diff.copied_file:
        return Status.COPIED
    else:
        return Status.MODIFIED


    
def _build_diff_data(commit, diffs) -> DiffData:
    return DiffData(
        user=str(commit.author),
        date=commit.committed_datetime,
        diffs=[Diff(
            diff=str(diff),
            path=diff.b_path if diff.new_file else diff.a_path,
            status=_get_file_mode(diff)
        ) for diff in diffs]
    )

def get_current_diff(repo_path) -> DiffData:
    repo = Repo(repo_path)
    commit = repo.head.commit
    diffs = commit.diff(None, create_patch=True)

    return _build_diff_data(commit, diffs)

I’m using the following models to represent the data.

from datetime import datetime
from enum import Enum
from typing import List

from pydantic import BaseModel


class Status(str, Enum):
    CREATED = 'C'
    MODIFIED = 'M'
    DELETED = 'D'
    COPIED = 'C'


class Diff(BaseModel):
    diff: str
    path: str
    status: Status


class DiffData(BaseModel):
    user: str
    date: datetime
    diffs: List[Diff]

After this experiment cloning Commitia I’m going to do another experiment. Now the idea is create a code review of based on the git diff. I’m going to use the same structure of the previous experiment. I can only need to change the prompt.

PROMPT_CODE_AUDIT = """
You are experience developer and need to perform a code review of a git diff.
You should identify potential errors, provide suggestions for improvement,
and highlight best practices in the provided code.

You should provide a global score of the code quality if you detect any issue based on the following criteria:
- NONE: 0.0
- LOW: Between 0.1 and 3.9
- MEDIUM: Between 4.0 and 6.9
- HIGH: Between 7.0 and 8.9
- CRITICAL Between 9.0 and 10.0

Your output should use the following template:
### Score
Global score of risks: [NONE, LOW, MEDIUM, HIGH, CRITICAL]

### Diff Explanation
First you must provide a brief explanation of the diff in a single line. Be concise and do not give any explanation.
Then you should provide a detailed explanation of the changes made in the diff.

### Audit summary
Detailed explanation of the identified gaps and their potential impact, if there are any significant findings.
"""

As you can see, I’m using a prompt to analyze the code and provide a score for its quality. I also want to perform actions based on the score, such as taking specific measures if the score is critical. To achieve this, we need to use a custom function. Therefore, we need an LLM model that supports calling custom functions. I added the audit_diff method to the chain to handle this.

import logging
from enum import Enum

from langchain_core.messages import SystemMessage, HumanMessage

from lib.models import DiffData
from .prompts import PROMPT_CODE_AUDIT, PROMPT_COMMIT_MESSAGE

logger = logging.getLogger(__name__)


class Score(int, Enum):
    NONE = 1
    LOW = 2
    MEDIUM = 3
    HIGH = 4
    CRITICAL = 5


class GitChain:

    def __init__(self, llm, tools):
        self.llm = llm
        if hasattr(llm, 'bind_tools'):
            self.llm_with_tools = llm.bind_tools(list(tools.values()))
        else:
            logger.info("LLM does not support bind_tools method")
            self.llm_with_tools = llm
        self.prompt_code_audit = SystemMessage(content=PROMPT_CODE_AUDIT)
        self.prompt_commit_message = SystemMessage(content=PROMPT_COMMIT_MESSAGE)
        self.tools = tools

    @staticmethod
    def _get_diff_content(diff: DiffData):
        return "\n".join((d.diff for d in diff.diffs))

    def _get_status_from_message(self, message) -> Score | None:
        ai_msg = self.llm_with_tools.invoke([HumanMessage(content=message)])
        return self._get_tool_output(ai_msg)

    def get_commit_message(self, diff: DiffData):
        try:
            user_message = HumanMessage(content=self._get_diff_content(diff))
            messages = [self.prompt_commit_message, user_message]
            ai_msg = self.llm.invoke(messages)
            return ai_msg if isinstance(ai_msg, str) else ai_msg.content
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

    def audit_diff(self, diff: DiffData):
        user_message = HumanMessage(content=self._get_diff_content(diff))
        try:
            ai_msg = self.llm.invoke([self.prompt_code_audit, user_message])
            output_message = ai_msg.content if not isinstance(ai_msg, str) else ai_msg

            return self._get_status_from_message(output_message), output_message
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

    def _get_tool_output(self, ai_msg):
        status = None
        for tool_call in ai_msg.tool_calls:
            tool_output = self.tools[tool_call["name"]].invoke(tool_call["args"])
            logger.info(f"Tool: '{tool_call['name']}'")
            status = tool_output
        return status

The audit_diff method takes a DiffData object as an argument, which represents the differences in the code that need to be audited. The first line inside the method creates a HumanMessage object from the content of the DiffData object by calling the _get_diff_content method, which combines all the diffs into a single string. Next, the method invokes the LLM with a system message prompt for code auditing and the human message. The LLM’s response is stored in ai_msg. If ai_msg is a string, it is used as is; otherwise, the content of ai_msg is used. The method then calls _get_status_from_message with output_message as the argument. This method invokes the LLM with tools using the output_message as input and gets the tool output. The method returns the tool output status and the output_message. In summary, the audit_diff method audits code differences using a Language Learning Model and a set of tools, and returns the audit status and the AI message content.

Now I can invoke the chain to audit the code and print the results. Also, I can use the score to perform an action.

import click
from rich.console import Console
from rich.markdown import Markdown

from lib.chains.git import get_chain
from lib.chains.git.chain import Score
from lib.git_utils import get_current_diff
from lib.llm.azure import llm
from settings import BASE_DIR


@click.command()
def run():
    current_diff = get_current_diff(BASE_DIR / '..')
    chain = get_chain(llm)
    status, ia_response = chain.audit_diff(current_diff)

    click.echo('Audit results:')
    click.echo('--------------')
    click.echo(f"{current_diff.user} at {current_diff.date} made the following changes at:")
    click.echo('')
    click.echo('Affected files:')
    for diff in current_diff.diffs:
        print(f"[{diff.status.value}] {diff.path}")
    click.echo('')

    console = Console()
    console.print(Markdown(ia_response))

    click.echo('')
    if status == Score.CRITICAL:
        click.echo('[WARNING] Critical issues found.')
    else:
        click.echo('No critical issues found.')

In conclusion, integrating AI into the development workflow can significantly enhance productivity and code quality. By using tools like LangChain and LLM models, we can automate the generation of commit messages and perform detailed code audits based on git diffs. This not only saves time but also ensures consistency and accuracy in commit messages and code reviews. As we continue to explore and implement these AI-driven solutions, we open up new possibilities for more efficient and effective software development practices. It is not magic, it is just code.

Full source code in my github.

Integrating AI Models with Function Calls using Python and LangChain

Today we’ll explore the integration of AI models with function calls using Python and LangChain. This example displays how to leverage LangChain for orchestrating AI and natural language processing tasks. In this example we´ll integrate AI models seamlessly with custom functions. While the functions used here are straightforward examples, such as basic arithmetic operations, they illustrate the foundational concepts applicable to more complex scenarios, such as invoking external APIs or more complicated processing pipelines. We need a LLM model with function calling capabilities (not all models allow us to call custom functions). For this example, we’re going to use Groq llm which has a public api (free) with function calling support. So, we need to obtain an api key here.

That’s the main script. It only obtains the chain with our llm instance.





import logging

from lib.chains.math_chain.chain import get_chain
from lib.llm.groq import llm

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == "__main__":
    chain = get_chain(llm)

    user_prompts = [
        "How much is five times twelve?",
        "How much is five plus twelve?",
        "How much is twelve minus five?",
    ]

    for prompt in user_prompts:
        responses = chain.ask_question(prompt)
        for response in responses:
            print(f"Q: {prompt} R:{response}")

That’s the chain.

import logging

from langchain_core.messages import SystemMessage, HumanMessage

from .tools import tools

logger = logging.getLogger(__name__)


def get_chain(llm):
    return CustomMathChain(llm, tools)


class CustomMathChain:
    system_prompt_content = """
        You are a model that has various mathematical functions.
        You can only respond to questions related to functions that you know.
        """

    def __init__(self, llm, tools):
        self.llm_with_tools = llm.bind_tools(list(tools.values()))
        self.system_message = SystemMessage(content=self.system_prompt_content)
        self.tools = tools

    def ask_question(self, user_prompt):
        responses = []
        try:
            user_message = HumanMessage(content=user_prompt)
            messages = [self.system_message, user_message]
            ai_msg = self.llm_with_tools.invoke(messages)

            for tool_call in ai_msg.tool_calls:
                tool_output = self.tools[tool_call["name"]].invoke(tool_call["args"])
                logger.info(f"Tool: '{tool_call['name']}' called output: {tool_output}")
                responses.append(tool_output)

            return responses
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

This custom chain utilizes functions defined here, employing the @tool decorator. It is crucial to properly define input and output variables and provide thorough documentation for our tools. AI leverages this information to determine the appropriate function call for each scenario. Various methods exist for defining our tools; here, I’ve opted for the simplest approach. For more detailed guidance on defining custom functions, refer to this resource.

from langchain_core.tools import tool


@tool
def ia_sum(a: int, b: int) -> int:
    """ Return the sum of `a` and `b` """
    return a + b


@tool
def ia_diff(a: int, b: int) -> int:
    """ Return the difference of `a` and `b` """
    return a - b


@tool
def ia_multiply(a: int, b: int) -> int:
    """ Return the product of `a` and `b` """
    return a * b


tools = {
    "ia_sum": ia_sum,
    "ia_diff": ia_diff,
    "ia_multiply": ia_multiply
}

And that’s all! Working with our custom functions is quite straightforward. As mentioned earlier, we’re using very simple functions (add, diff, and multiply). In reality, we don’t need an LLM or AI to perform these arithmetic operations. However, imagine integrating real-world functions that access APIs and your business models. AI can handle natural language processing to interpret user input and identify the correct function to execute the task.

Source code in my github account.

Building a local Plato expert AI with LLaMA3 and LangChain

Today, I’m delving into the realm of AI. My aim is to construct an AI capable of engaging in conversation about a specific document. For this experiment, I’ve chosen Plato’s ‘Apology of Socrates.’ My goal is to develop an expert on this text, allowing me to pose questions and receive insightful responses. Let’s dive in.

First, I need a LLaMA3 model locally on my computer (MBP M2 24GB). To do that we can use Ollama. It’s pretty straightforward to do that on Mac. Just follow the instructions, do

brew install ollama

and that’s all. We can start the server.

ollama start

Now we need the model. We’re going LLaMA3. A 4.7 GB model that we can download using:

ollama pull llama3

And that’s all. Our server is up and running ready to receive requests. Now we’re going to create our script. We can
use simple HTTP requests to interact with Ollama using postman, for example, but it’s simpler to use a framework
to handle the communications. We’re going to use [LangChain](https://www.langchain.com/).

IAs models has a limitation of the number of tokens that we can use as I/O parameters. Apology of Socrates is a book. Not excessively big but big enough to overcome this limit so, we need to split it in chucks. Also, we need to convert those chunks into a vector store to be able the model to understand it. LangChain provides us document loaders to read the document and to create this vector store. In my example I’m using an Apology of Socrates in txt, so I’m going to use a TextLoader, but there are different loaders for PDFs, S3, Dataframes and much more things available in LangChain SDK. With this function I obtain the vector store from a path.

import logging

from langchain_community.document_loaders import TextLoader
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter

logger = logging.getLogger(__name__)


def get_vector_store_from_path(file_path):
    loader = TextLoader(file_path)
    data = loader.load()

    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=100)
    all_splits = text_splitter.split_documents(data)

    logger.info(f"Text divided in {len(all_splits)} splits")
    return Chroma.from_documents(
        documents=all_splits, embedding=GPT4AllEmbeddings()
    )

Now we need a chain to ask question to oru model. With this function I obtain my chain.

import logging

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough

logger = logging.getLogger(__name__)


def get_chain(template, vector_store, llm):
    prompt = ChatPromptTemplate.from_template(template)
    output_parser = StrOutputParser()

    setup_and_retrieval = RunnableParallel(dict(
        context=vector_store.as_retriever(),
        question=RunnablePassthrough(),
    ))
    return setup_and_retrieval | prompt | llm | output_parser

I’m using an Ollama llm model, running locally on my computer as I explain before. LangChain allows us to use
different llm models (Azure, OpenAI,…). We can use those models if we’ve an account (they aren’t for free)

from langchain_community.llms.ollama import Ollama
from langchain_core.callbacks import CallbackManager, StreamingStdOutCallbackHandler
import logging
from settings import OLLAMA_MODEL

logger = logging.getLogger(__name__)

llm = Ollama(
    model=OLLAMA_MODEL,
    verbose=True,
    callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]),
)
logger.info(f"Model {OLLAMA_MODEL} loaded")

With those functions I can build finally my script. As you can see, I prepare a template telling to llm what I want and the set of questions I’m going to ask the model. Our main function will first fetch the vector store (it takes several seconds). After that will load the llm from the chain (takes time also). Then we iterate between questions and print the llm’s answer in the terminal.

import logging

from lib.llm.ollama import llm
from lib.utils import get_chain, get_vector_store_from_path
from settings import DOCUMENT_PATH

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def ask_question(chain, question):
    logger.info(f"QUESTION: {question}")
    response = chain.invoke(question)

    print(response)


def main(template, path, questions):
    vector_store = get_vector_store_from_path(path)
    chain = get_chain(
        template=template,
        vector_store=vector_store,
        llm=llm)
    for question in questions:
        ask_question(
            chain=chain,
            question=question
        )


if __name__ == "__main__":
    template = """
        Answer the question based only on the context I give you.
        Answer using quotes from the text.

        Context: {context}
        Question: {question}
        """
    questions = (
        "What are the general ideas of the text?"
        "What is Socrates' position regarding his imminent condemnation?"
        "Can you list the protagonists of the plot?"
    )
    main(template=template, path=DOCUMENT_PATH, questions=questions)

And that’s all. We have a Plato expert to chat with about one specific context (in this case Apology of Socrates). However, for a production-grade project, it’s crucial to store our vector data in a database to avoid repetitive generation.

Note: In my example the questions, template and Plato’s book is in Spanish. Plato’s book public domain. Source code available on my github.

Connecting Grafana to PostgreSQL with Python for Time Series Analysis

In the world of data analysis and graphs, we have three important tools: Grafana, PostgreSQL, and Python. They work together to help us look at data and track how it changes over time. In this article, we’ll learn step by step how to use Grafana with a PostgreSQL database. We’ll also discover how to use Python to record data that changes over time. By the end of this article, you’ll know how to set up these tools, and you’ll see how they can be useful for your work with data.

First, we create our table. We also create a sequence for the primary key.

CREATE TABLE MEASUREMENTLOG
(
    id              numeric(10)            NOT NULL,
    key             character varying(100) NOT NULL,
    datetime        TIMESTAMP WITHOUT TIME ZONE NOT NULL,
    status          numeric(2) NOT NULL,

    CONSTRAINT MEASUREMENTLOG_pkey PRIMARY KEY (id)
);

create sequence SEQ_MEASUREMENTLOG
    minvalue 0
    maxvalue 999999999999999999
    start with 1
    increment by 1
    cache 1;

And a simple python script to persists a timeseries.

from random import randint
from time import sleep
from datetime import datetime
import os
import logging
import pytz

from dbutils import transactional, get_conn

from settings import DSN

tz = pytz.timezone('Europe/Madrid')

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def persists(key, dt, status):
    with transactional(conn=get_conn(DSN)) as db:
        seq_log = db.fetch_all("SELECT nextval('seq_measurementlog')")[0][0]
        db.insert('measurementlog', dict(
            id=seq_log,
            key=key,
            datetime=dt,
            status=status
        ))


KEY = os.getenv('KEY')
status = 0
while True:
    now = datetime.now(tz)
    persists(
        key=KEY,
        dt=now,
        status=status
    )
    logger.info(f"[{now}] status: {status}")
    status = 1 if status == 0 else 0
    sleep(randint(5, 15))

Now we set up PostgreSQL database and Grafana in a docker-compose.yml.

More information about the configuration of postgresql and grafana here in the links

version: '3'

services:
  pg:
    image: pg
    restart: unless-stopped
    build:
      context: .docker/pg/
      dockerfile: Dockerfile
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGTZ: ${TIMEZONE}
      PGDATA: /var/lib/postgresql/data/pgdata
    ports:
      - "5432:5432"
  grafana:
    image: grafana
    restart: unless-stopped
    build:
      context: .docker/grafana/
      dockerfile: Dockerfile
    environment:
      - GF_TIMEZONE=${TIMEZONE}
      - GF_SECURITY_ADMIN_USER=${GF_SECURITY_ADMIN_USER}
      - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD}
      - GF_USERS_DEFAULT_THEME=${GF_USERS_DEFAULT_THEME}
      - GF_USERS_ALLOW_SIGN_UP=${GF_USERS_ALLOW_SIGN_UP}
      - GF_USERS_ALLOW_ORG_CREATE=${GF_USERS_ALLOW_ORG_CREATE}
      - GF_AUTH_ANONYMOUS_ENABLED=${GF_AUTH_ANONYMOUS_ENABLED}
    ports:
      - "3000:3000"
    depends_on:
      - pg

We run the stack, Connect the grafana at port 3000 and configure the datasource

After that we can create the dashboard

We are using this query

SELECT 
    datetime, 
    status 
FROM 
    measurementlog 
WHERE 
    key = 'id1' and 
    $__timeFilter(datetime)

And that’s the dashboard

Proyect available in my github account.

Sending logs to AWS CloudWatch with a sidecar pattern and Python

In a Docker Swarm environment, the sidecar pattern is a common architectural approach used to extend the functionality of a primary container without directly modifying it.

First there’s a primary container. This is the main application or service you want to run within a Docker Swarm service. It’s responsible for the core functionality of your application. In our case, this container will be the log generator. It will save logs within a json format.

The sidecar container is a secondary container that runs alongside the primary container. It’s tightly coupled with the primary container and assists it by providing additional services or functionality. In our example, the sidecar responsibility will be push logs to AWS CloudWatch. The idea is sharing a docker volume between both containers. Whit this technique, our primary container will not be affected in network latency generating logs.

The idea is to generate something like this:

version: '3.6'

services:
  api:
    image: api:latest
    logging:
      options:
        max-size: 10m
    deploy:
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
      - PROCESS_ID=api
    ports:
      - 5000:5000
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000 --timeout 180

  filebeat:
    image: cw:production
    command: bash -c "python cw.py && sleep 1m"
    deploy:
      restart_policy:
        condition: any
    environment:
      - LOG_GROUP=python_logs_example
      - LOG_STREAM_PREFIX=default
    volumes:
      - logs_volume:/src/logs
volumes:
  logs_volume:

Let’s go. First, we need to setup our aws credentials. We can use a profile or a IAM user

if AWS_PROFILE_NAME:
    session = boto3.Session(profile_name=AWS_PROFILE_NAME, region_name=AWS_REGION)
else:
    session = boto3.Session(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION)

logs = session. Client('logs')

then we need to setup the logGroup and logStream in CloudWatch. We can generate them by hand, but I preffer to generate them programatically, if they don’t exist.

def init_cloudwatch_stream():
    log_stream_name = f"{LOG_STREAM_PREFIX}_{datetime.now().strftime('%Y%m%d')}"

    log_groups = logs.describe_log_groups(logGroupNamePrefix=LOG_GROUP)['logGroups']
    if not any(group['logGroupName'] == LOG_GROUP for group in log_groups):
        logs.create_log_group(logGroupName=LOG_GROUP)

    log_streams = logs.describe_log_streams(
        logGroupName=LOG_GROUP,
        logStreamNamePrefix=log_stream_name
    )['logStreams']

    if not any(stream['logStreamName'] == log_stream_name for stream in log_streams):
        logs.create_log_stream(logGroupName=LOG_GROUP, logStreamName=log_stream_name)

    return log_stream_name

Now we need to upload logs to CloudWatch. We need to use put_log_events. To send multiple logs we need to use a sequenceToken (not the first time). To do that I use this trick.

function_parameters = dict(
    logGroupName=LOG_GROUP,
    logStreamName=log_stream_name
)

for f in glob(f'{LOG_PATH}/*.{LOG_EXTENSION}'):
    function_parameters['logEvents'] = get_log_events_from_file(f)
    response = logs.put_log_events(**function_parameters)
    function_parameters['sequenceToken'] = response['nextSequenceToken']

We also need to read log files and maybe change the fields according to your needs

def get_log_events_from_file(file):
    exclude_fields = ('@timestamp', 'logger')
    return [
        dict(
            timestamp=int(datetime.fromisoformat(d['@timestamp']).timestamp() * 1000),
            message=json.dumps({k: v for k, v in d.items() if k not in exclude_fields})
        ) for d in [json.loads(linea) for linea in open(file, 'r')]]

I like to have all the settings of my application in a file called settings.py. It’s a pattern that I’ve copied from Django. In this file also I read environment variables from a dotenv file.

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

AWS_PROFILE_NAME = os.getenv('AWS_PROFILE_NAME', False)
AWS_REGION = os.getenv('AWS_REGION')

LOG_GROUP = os.getenv('LOG_GROUP', 'python_logs_example')
LOG_STREAM_PREFIX = os.getenv('LOG_STREAM_PREFIX', 'default')

LOG_EXTENSION = 'log'
LOG_PATH = os.getenv('LOG_PATH', Path(BASE_DIR).resolve())

And that’s all. Your logs in CloudWatch uploaded in a background process decoupled from the main process.

Source code available in my github.

Real-time Object Detection and Streaming with Python, OpenCV, AWS Kinesis, and YOLOv8

I’ve got the following idea in my mind. I want to detect objects with YoloV8 and OpenCV. Here a simple example:

import cv2 as cv
from ultralytics import YOLO
import logging

logging.getLogger("ultralytics").setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

X_RESIZE = 800
Y_RESIZE = 450

YOLO_MODEL="yolov8m.pt"
YOLO_CLASSES_TO_DETECT = 'bottle',
THRESHOLD = 0.80

cap = cv.VideoCapture(0)
model = YOLO(YOLO_MODEL)
while True:
    _, original_frame = cap.read()
    frame = cv.resize(original_frame, (X_RESIZE, Y_RESIZE), interpolation=cv.INTER_LINEAR)

    results = model.predict(frame)

    color = (0, 255, 0)
    for result in results:
        for box in result.boxes:
            class_id = result.names[box.cls[0].item()]
            status = id == class_id
            cords = box.xyxy[0].tolist()
            probability = round(box.conf[0].item(), 2)

            if probability > THRESHOLD and class_id in YOLO_CLASSES_TO_DETECT:
                logger.info(f"{class_id} {round(probability * 100)}%")
                x1, y1, x2, y2 = [round(x) for x in cords]
                cv.rectangle(frame, (x1, y1), (x2, y2), color, 2)

    cv.imshow(f"Cam", frame)

    if cv.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv.destroyAllWindows()

However, my idea is slightly different. I intend to capture the camera stream, transmit it to AWS Kinesis, and subsequently, within a separate process, ingest this Kinesis stream for object detection or any other purpose using the camera frames. To achieve this, our initial step is to publish our OpenCV frames into Kinesis. producer.py

import logging

from lib.video import kinesis_producer
from settings import KINESIS_STREAM_NAME, CAM_URI, WIDTH

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == '__main__':
    kinesis_producer(
        kinesis_stream_name=KINESIS_STREAM_NAME,
        cam_uri=CAM_URI,
        width=WIDTH
    )
def kinesis_producer(kinesis_stream_name, cam_uri, width):
    logger.info(f"start emitting stream from cam={cam_uri} to kinesis")
    kinesis = aws_get_service('kinesis')
    cap = cv.VideoCapture(cam_uri)
    try:
        while True:
            ret, frame = cap.read()
            if ret:
                scale = width / frame.shape[1]
                height = int(frame.shape[0] * scale)

                scaled_frame = cv.resize(frame, (width, height))
                _, img_encoded = cv.imencode('.jpg', scaled_frame)
                kinesis.put_record(
                    StreamName=kinesis_stream_name,
                    Data=img_encoded.tobytes(),
                    PartitionKey='1'
                )

    except KeyboardInterrupt:
        cap.release()

Now our consumer.py

import logging

from lib.video import kinesis_consumer
from settings import KINESIS_STREAM_NAME

logging.getLogger("ultralytics").setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == '__main__':
    kinesis_consumer(
        kinesis_stream_name=KINESIS_STREAM_NAME
    )

To get the frames from kinesis stream we need to consider the shards. According to your needs you should change it a little bit.

def kinesis_consumer(kinesis_stream_name):
    logger.info(f"start reading kinesis stream from={kinesis_stream_name}")
    kinesis = aws_get_service('kinesis')
    model = YOLO(YOLO_MODEL)
    response = kinesis.describe_stream(StreamName=kinesis_stream_name)
    shard_id = response['StreamDescription']['Shards'][0]['ShardId']
    while True:
        shard_iterator_response = kinesis.get_shard_iterator(
            StreamName=kinesis_stream_name,
            ShardId=shard_id,
            ShardIteratorType='LATEST'
        )
        shard_iterator = shard_iterator_response['ShardIterator']
        response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=10
        )

        for record in response['Records']:
            image_data = record['Data']
            frame = cv.imdecode(np.frombuffer(image_data, np.uint8), cv.IMREAD_COLOR)
            results = model.predict(frame)
            if detect_id_in_results(results):
                logger.info('Detected')

def detect_id_in_results(results):
    status = False
    for result in results:
        for box in result.boxes:
            class_id = result.names[box.cls[0].item()]
            probability = round(box.conf[0].item(), 2)
            if probability > THRESHOLD and class_id in YOLO_CLASSES_TO_DETECT:
                status = True
    return status

And that’s all. Real-time Object Detection with Kinesis stream. However, computer vision examples are often simple and quite straightforward. On the other hand, real-world computer vision projects tend to be more intricate, involving considerations such as cameras, lighting conditions, performance optimization, and accuracy. But remember, this is just an example.

Full code in my github.

AWS SQS 2 HTTP server with Python

Simple service that listens to a sqs queue and bypass the payload to a http server, sending a POST request with the SQS’s payload and a Bearer Token.

The main script read a .env file with the aws credentials and http endpoint.

# AWS CREDENTIALS
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_REGION=eu-west-1
#AWS_PROFILE=xxx
# SQS
SQS_QUEUE_URL=https://sqs.eu-west-1.amazonaws.com/xxx/name
# REST ENDPOINT
HTTP_ENDPOINT=http://localhost:5555
AUTH_BEARER_TOKEN=xxx
import logging

from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())

from lib.sqs2http import loop_step, get_sqs

for library in ['botocore', 'boto3']:
    logging.getLogger(library).setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X'
)

logger = logging.getLogger(__name__)

if __name__ == "__main__":
    logger.info("sqs2http start")
    sqs = get_sqs()
    while True:
        loop_step(sqs)

The main loop is like this:

def loop_step(sqs):
    response = sqs.receive_message(
        QueueUrl=SQS_QUEUE_URL,
        MaxNumberOfMessages=SQS_MAX_NUMBER_OF_MESSAGES)

    if 'Messages' in response:
        for message in response.get('Messages', []):
            try:
                body = message.get('Body')
                logger.info(body)
                do_post(body)
                remove_from_sqs(sqs, message.get('ReceiptHandle'))

            except Exception as e:
                logger. Exception(e)

To post to the http server we can use requests.

import requests
import json

from settings import HTTP_ENDPOINT, AUTH_BEARER_TOKEN

def do_post(body):
    requests.post(
        url=HTTP_ENDPOINT,
        data=json.dumps(body),
        headers={
            'Authorization': AUTH_BEARER_TOKEN,
            'Content-Type': 'application/json'
        })

With this simple script you can set up a web server that process all incoming SQS messages. Source code in my github.

Deploying Python Applications in a Docker Swarm Cluster with Traefik Reverse Proxy and HTTPS

In this article, we will explore the process of deploying Python applications in a Docker Swarm cluster, utilizing a Traefik reverse proxy and ensuring secure communication through HTTPS. By following this approach, we can enhance the scalability, availability, and security of our Python applications.

HTTP is not a secure protocol. When deploying a web service using HTTP, it is important to be aware that anyone can intercept the traffic between the browser and the server. An attacker simply needs to use a sniffer tool like Wireshark, for example, to view the traffic in plain text, including passwords and sensitive data. The solution to this issue is to use the HTTPS protocol. HTTPS provides two important benefits: first, it ensures that the server is who it claims to be through certificates, and second, it encrypts the traffic between the client and server. When exposing anything to the internet, the use of HTTPS is considered mandatory. However, in some cases, such as internal APIs within a local network, HTTPS may not be utilized. In this article, we will focus on enabling HTTPS for services in a Docker Swarm cluster. Let’s get started.

To achieve this, we will utilize Traefik as a reverse proxy, serving as the sole entry point for our services deployed within the Swarm cluster. Our deployed stacks will not directly expose any ports outside the cluster; instead, they will be mapped to Traefik. Traefik will then handle the task of exposing these services on specific paths. To establish this setup, both our stacks and Traefik will utilize the same external network. Therefore, the first step is to define this network within our cluster.

docker network create --driver overlay external-net

That’s our Traefik service configuration

version: "3.9"

services:
  traefik:
    image: traefik:v2.10
    command:
      - "--log.level=INFO"
      - "--api.insecure=false"
      - "--api=true"
      - "--api.dashboard=true"
      - "--providers.docker=true"
      - "--providers.docker.exposedByDefault=false"
      - "--entrypoints.web.address=:80"
      - "--entrypoints.websecure.address=:443"
      - "--entrypoints.web.http.redirections.entryPoint.to=websecure"
      - "--entrypoints.web.http.redirections.entryPoint.scheme=https"
    environment:
      - TZ=Europe/Madrid
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock:ro"

    networks:
      - external-net

networks:
  external-net:
    external: true

Now, let’s define our service. In this example, we will have three replicas of a Flask API backend behind a Nginx proxy, which is a common Python scenario.

from flask import Flask
import os

app = Flask(__name__)


@app.get("/service1")
def health():
    return dict(
        status=True,
        slot=os.getenv('SLOT')
    )

And that’s the configuration of the service:

version: '3.9'

services:
  nginx:
    image: flaskdemo_nginx:latest
    labels:
      - "traefik.enable=true"
      - "traefik.http.routers.app.tls=true"
      - "traefik.http.routers.app.rule=PathPrefix(`/service1`)"
      - "traefik.http.services.app.loadbalancer.server.port=80"
    depends_on:
      - backend
    networks:
      - external-net
      - default-net

  backend:
    image: flaskdemo:latest
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000 --timeout 180
    environment:
      SLOT: "{{.Task.Slot}}"
    deploy:
      replicas: 3
    networks:
      - default-net

networks:
  default-net:
  external-net:
    external: true

It uses two images, one for the Flask backend. As we can see in the Dockerfile, we are utilizing a Python 3.11 base image. In the Dockerfile, we set up a non-root user, configure the container, and install dependencies using Poetry.

FROM python:3.11 AS base

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV APP_HOME=/src
ENV APP_USER=appuser

RUN groupadd -r $APP_USER && \
    useradd -r -g $APP_USER -d $APP_HOME -s /sbin/nologin -c "Docker image user" $APP_USER

ENV TZ 'Europe/Madrid'
RUN echo $TZ > /etc/timezone && \
    apt-get update && apt-get install --no-install-recommends -y tzdata && \
    rm /etc/localtime && \
    ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
    dpkg-reconfigure -f noninteractive tzdata && \
    apt-get clean

RUN pip install --upgrade pip poetry

FROM base

WORKDIR $APP_HOME
COPY --chown=$APP_USER:$APP_USER pyproject.toml poetry.lock ./

RUN poetry config virtualenvs.create false && \
    poetry install --no-root --no-interaction --no-ansi --no-dev

COPY --chown=$APP_USER:$APP_USER src $APP_HOME
RUN find "$APP_HOME" -name '__pycache__' -type d -exec rm -r {} +

RUN chown -R $APP_USER:$APP_USER $APP_HOME

USER $APP_USER

We also have a Nginx proxy that serves the replicas of the backend.

upstream loadbalancer {
    server backend:5000;
}

server {
    server_tokens off;
    client_max_body_size 20M;
    proxy_busy_buffers_size   512k;
    proxy_buffers   4 512k;
    proxy_buffer_size   256k;
    proxy_set_header Host $host;
    add_header X-Frame-Options SAMEORIGIN;
    real_ip_header X-Forwarded-For;
    real_ip_recursive on;
    set_real_ip_from 0.0.0.0/0;
    proxy_read_timeout 300;
    proxy_connect_timeout 300;
    proxy_send_timeout 300;

    location /service1 {
        proxy_pass http://loadbalancer;
    }

    location /service1/health {
        default_type text/html;
        access_log off;
        return 200 'Ok!';
    }
}
FROM nginx:1.23.4-alpine-slim

RUN rm /etc/nginx/conf.d/default.conf
COPY nginx.conf /etc/nginx/conf.d

With those two containers we can set up the service

version: '3.8'

services:
  nginx:
    image: flaskdemo_nginx:latest
    labels:
      - "traefik.enable=true"
      - "traefik.http.routers.app.tls=true"
      - "traefik.http.routers.app.rule=PathPrefix(`/service1`)"
      - "traefik.http.services.app.loadbalancer.server.port=80"
    depends_on:
      - backend
    networks:
      - external-net
      - default-net

  backend:
    image: flaskdemo:latest
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000 --timeout 180
    environment:
      SLOT: "{{.Task.Slot}}"
    deploy:
      replicas: 3
    networks:
      - default-net

networks:
  default-net:
  external-net:
    external: true

As we can observe, the magic of Traefik lies within the labels assigned to the exposed service, which in our case is Nginx. These labels define the path that Traefik will utilize to serve the service, specified as “/service1” in our example. Additionally, we instruct Traefik to use HTTPS for this service. It is crucial to ensure that our exposed Nginx service is placed within the same external network as Traefik, as demonstrated by the “external-net” in our example.

On the other hand, the backend service, represented by our Flask application, does not necessarily need to reside in this network. In fact, it is preferable to segregate our service networks, utilizing a private non-external network, such as the “default-net,” to establish communication solely between Nginx and the backend.

Note: With this configuration, we are utilizing the default HTTPS certificate provided by Traefik. It should be noted that this certificate does not guarantee server authority, which may result in browser warnings. However, it does ensure that the traffic is encrypted. Alternatively, there are other options available such as using a self-signed certificate, purchasing a certificate from a certificate authority, or obtaining a free valid certificate from Let’s Encrypt. However, these alternatives are beyond the scope of this post.

Now ce can build our containers

docker build -t flaskdemo .
docker build -t flaskdemo_nginx .docker/nginx

and deploy to our Swarm cluster (in my example at localhost)

docker stack deploy -c traefik-stack.yml traefik
docker stack deploy -c service-stack.yml service1

And that’s it! Our private API is now up and running, utilizing HTTPS for secure communication. Full code in my github.