Building Production-Ready AI Agents with Strands-Agents and Python

Today we’re going to build an AI agent that can predict the weather using Strands-Agents framework and Python. This project is designed to show how to integrate external data sources, advanced computational tools, and AI capabilities into a cohesive system. For this experiment we’re going to use Strands-Agents framework, which provides a robust foundation for building intelligent agents that can interact with various tools and APIs. Strands-Agents comes with built-in tools that allow us to create agents that can perform complex tasks by orchestrating multiple tools and APIs. For this project we’re going to use the following tools:

  • calculator: for performing mathematical and financial calculations.
  • think: for reflecting on data and generating ideas.
  • file_write: for saving results and analyses to files.
  • python_repl: for executing Python code and performing advanced analyses.

The last one is particularly useful for overcoming the limitations of large language models (LLMs) when it comes to deterministic calculations. By using a Python REPL, we can ensure that our agent can perform precise computations without relying solely on the LLM’s probabilistic outputs. We have Pandas and Scikit-learn for statistical analysis, which allows us to perform advanced data manipulation and machine learning tasks, and the agent will be able to use these libraries to analyze weather data and generate forecasts. Also, I’ve created a custom tool to fetch hourly weather data from the Open-Meteo API, which provides real-time weather information for specific locations.

import logging
from datetime import datetime, date
from typing import List

import requests
from strands import tool

from modules.weather.models import (
    TemperatureReading, HumidityReading, ApparentTemperatureReading,
    PrecipitationReading, EvapotranspirationReading, SurfacePressureReading, MeteoData)

logger = logging.getLogger(__name__)


class Tools:
    def __init__(self, latitude: float, longitude: float):
        self.latitude = latitude
        self.longitude = longitude

    def get_tools(self) -> List[tool]:
        @tool
        def get_hourly_weather_data(from_date: date, to_date: date) -> MeteoData:
            """
            Get hourly weather data for a specific date range.
            Notes:
                - The response is a MeteoData object containing lists of readings for temperature, humidity,
                  apparent temperature, precipitation, evapotranspiration, and surface pressure.
                - Each reading has a timestamp and a value.

            Returns:
                MeteoData: Object containing weather readings for the specified date range
            """

            start_date = from_date.strftime('%Y-%m-%d')
            end_date = to_date.strftime('%Y-%m-%d')
            url = (f"https://api.open-meteo.com/v1/forecast?"
                   f"latitude={self.latitude}&"
                   f"longitude={self.longitude}&"
                   f"hourly=temperature_2m,relative_humidity_2m,apparent_temperature,precipitation,evapotranspiration,surface_pressure&"
                   f"start_date={start_date}&"
                   f"end_date={end_date}")
            response = requests.get(url)

            meteo = MeteoData(
                temperature=[],
                humidity=[],
                apparent_temperature=[],
                precipitation=[],
                evapotranspiration=[],
                surface_pressure=[]
            )
            data = response.json()

            weather_data_time = data['hourly']['time']

            logger.info(f"[get_hourly_weather_data] Fetched weather data from {start_date} to {end_date}. {len(weather_data_time)} records found.")
            for iso in weather_data_time:
                time = datetime.fromisoformat(iso)
                meteo.temperature.append(TemperatureReading(
                    time=time,
                    value=data['hourly']['temperature_2m'][data['hourly']['time'].index(iso)]))
                meteo.humidity.append(HumidityReading(
                    time=time,
                    value=data['hourly']['relative_humidity_2m'][data['hourly']['time'].index(iso)]))
                meteo.apparent_temperature.append(ApparentTemperatureReading(
                    time=time,
                    value=data['hourly']['apparent_temperature'][data['hourly']['time'].index(iso)]))
                meteo.precipitation.append(PrecipitationReading(
                    time=time,
                    value=data['hourly']['precipitation'][data['hourly']['time'].index(iso)]))
                meteo.evapotranspiration.append(EvapotranspirationReading(
                    time=time,
                    value=data['hourly']['evapotranspiration'][data['hourly']['time'].index(iso)]))
                meteo.surface_pressure.append(SurfacePressureReading(
                    time=time,
                    value=data['hourly']['surface_pressure'][data['hourly']['time'].index(iso)]))
            return meteo

        return [get_hourly_weather_data, ]

To allow the LLM to interact with this tool, we define a Pydantic model that describes the expected input and output formats. This ensures that the agent can correctly interpret the data it receives from the API and use it effectively in its analyses.

from datetime import datetime

from pydantic import BaseModel, Field


class TemperatureReading(BaseModel):
    """Temperature reading at 2 meters"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(description="Temperature in °C")


class HumidityReading(BaseModel):
    """Relative humidity reading at 2 meters"""
    time: datetime = Field(..., description="Timestamp")
    value: int = Field(..., ge=0, le=100, description="Relative humidity in %")


class ApparentTemperatureReading(BaseModel):
    """Apparent temperature reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., description="Apparent temperature in °C")


class PrecipitationReading(BaseModel):
    """Precipitation reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., ge=0, description="Precipitation in mm")


class EvapotranspirationReading(BaseModel):
    """Evapotranspiration reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., description="Evapotranspiration in mm")


class SurfacePressureReading(BaseModel):
    """Surface pressure reading"""
    time: datetime = Field(..., description="Timestamp")
    value: float = Field(..., gt=0, description="Surface pressure in hPa")


class MeteoData(BaseModel):
    """Model to store meteorological data"""
    temperature: list[TemperatureReading] = Field(..., description="List of temperature readings")
    humidity: list[HumidityReading] = Field(..., description="List of humidity readings")
    apparent_temperature: list[ApparentTemperatureReading] = Field(..., description="List of apparent temperature readings")
    precipitation: list[PrecipitationReading] = Field(..., description="List of precipitation readings")
    evapotranspiration: list[EvapotranspirationReading] = Field(..., description="List of evapotranspiration readings")
    surface_pressure: list[SurfacePressureReading] = Field(..., description="List of surface pressure readings")

The use of Strands-Agents is very simple. I’ve encapsulated the agent logic in a single function that initializes the agent with the necessary tools and prompts. The agent can then be used to generate weather forecasts or answer specific weather-related questions.

_ = ai(
    system_prompt=SYSTEM_PROMPT,
    user_prompt="What will the weather be like tomorrow?")

If I run this code, agent will use the provided tools to generate the answer. In the output of the command, you will see the agent’s reasoning, the tools it used, and the final answer. First it uses the current time tool to get the current date and time (using current_time tool), then it uses the get_hourly_weather_data tool to fetch the weather data, after that uses think tool to analyze the data, and finally it uses the python_repl tool to generate the needed calculations, using Pandas, and produce the final weather forecast. Here’s an example of the output you might see when running the agent:

12/07/2025 15:28:18 [INFO] Check agent weather.
12/07/2025 15:28:18 [INFO] Found credentials in shared credentials file: ~/.aws/credentials
12/07/2025 15:28:18 [INFO] Creating Strands MetricsClient
I'll help you get the weather forecast for tomorrow. First, let me get the current time to determine what "tomorrow" is, and then I'll fetch the weather data for you.
Tool #1: current_time
Now I'll get the hourly weather data for tomorrow (July 13, 2025):
Tool #2: get_hourly_weather_data
12/07/2025 15:28:24 [INFO] [get_hourly_weather_data] Fetched weather data from 2025-07-13 to 2025-07-13. 24 records found.
Let me analyze this weather data to provide you with a comprehensive forecast for tomorrow (July 13, 2025):
Tool #3: think
Let me analyze the weather data directly to provide you with tomorrow's forecast:
Tool #4: python_repl
WEATHER FORECAST FOR TOMORROW (July 13, 2025)
==================================================
Temperature Range: 20.5°C to 23.6°C
Average Temperature: 22.2°C
Feels Like Temperature: Up to 24.6°C

Humidity Range: 70% to 83%
Average Humidity: 77%

Total Precipitation: 1.0mm
Precipitation Timing: Brief shower 4-5 PM

Pressure: Rising from 1012.1 to 1019.9 hPa
Pressure Change: +7.8 hPa (improving conditions)

Tool #5: python_repl

DETAILED HOURLY BREAKDOWN
==============================

🌅 MORNING (6 AM - 12 PM):
Temperature: 20.6°C to 23.0°C
Humidity: 70% to 82%
Conditions: Clear, comfortable temperatures

☀️ AFTERNOON (12 PM - 6 PM):
Temperature: 22.6°C to 23.6°C
Humidity: 71% to 81%
Precipitation: 1.0mm (brief shower 4-5 PM)
Conditions: Warmest part of day, brief afternoon shower

🌙 EVENING (6 PM - 12 AM):
Temperature: 22.0°C to 22.4°C
Humidity: 75% to 82%
Conditions: Pleasant evening temperatures, increasing humidity

📋 SUMMARY & RECOMMENDATIONS:
• Mild and pleasant day with comfortable temperatures
• Brief shower expected between 4-5 PM (carry umbrella)
• Rising atmospheric pressure indicates stable weather
• High humidity in morning and evening - may feel muggy
• Good day for outdoor activities, except during afternoon shower
• Light layers recommended due to temperature variation
## Weather Forecast for Tomorrow (July 13, 2025)

**Overall Conditions:** A pleasant, mild day with mostly clear skies and a brief afternoon shower.

### 🌡️ **Temperature**
- **Range:** 20.5°C to 23.6°C (69°F to 74°F)
- **Feels like:** Up to 24.6°C (76°F)
- **Coolest:** Early morning around 2-6 AM
- **Warmest:** Mid-afternoon around 2 PM

### 🌧️ **Precipitation**
- **Total:** 1.0mm of rain
- **Timing:** Brief shower between 4-5 PM
- **Duration:** Short-lived, about 1 hour
- **Recommendation:** Carry an umbrella for the afternoon

### 💨 **Atmospheric Conditions**
- **Humidity:** 70-83% (higher in morning/evening)
- **Pressure:** Rising from 1012 to 1020 hPa (+7.8 hPa increase)
- **Trend:** Improving and stabilizing weather conditions

### ⏰ **Time-Based Breakdown**

**Morning (6 AM - 12 PM):** Clear and comfortable, 20.6-23°C
**Afternoon (12 PM - 6 PM):** Warmest period with brief shower, 22.6-23.6°C  
**Evening (6 PM - 12 AM):** Pleasant and stable, 22-22.4°C

### 📝 **Recommendations**
- **Clothing:** Light layers, as temperatures vary throughout the day
- **Activities:** Great for outdoor plans, just avoid 4-5 PM for the shower
- **Comfort:** May feel slightly muggy due to higher humidity
- **Overall:** A very pleasant summer day with stable, improving weather conditions

The rising atmospheric pressure indicates this will be a stable weather day with good conditions for most outdoor activities!
Process finished with exit code 0

Here you can see the ai function.

import logging

from botocore.config import Config
from strands import Agent
from strands.agent import AgentResult
from strands.models import BedrockModel
from strands_tools import calculator, file_write, current_time, think, python_repl

from core.aws import get_aws_session
from modules.weather.tools import Tools
from settings import (
    IA_MODEL, IA_TEMPERATURE, LLM_READ_TIMEOUT, LLM_CONNECT_TIMEOUT,
    LLM_MAX_ATTEMPTS, MY_LATITUDE, MY_LONGITUDE, )

logger = logging.getLogger(__name__)


def get_agent(
        system_prompt: str,
        read_timeout: int = LLM_READ_TIMEOUT,
        connect_timeout: int = LLM_CONNECT_TIMEOUT,
        max_attempts: int = LLM_MAX_ATTEMPTS) -> Agent:
    config = Config(
        read_timeout=read_timeout,
        connect_timeout=connect_timeout,
        retries={'max_attempts': max_attempts}
    )
    session = get_aws_session()

    base_tools = [calculator, think, python_repl, file_write, current_time]
    custom_tools = Tools(latitude=MY_LATITUDE, longitude=MY_LONGITUDE).get_tools()
    all_tools = base_tools + custom_tools

    bedrock_model = BedrockModel(
        model_id=IA_MODEL,
        temperature=IA_TEMPERATURE,
        boto_session=session,
        boto_client_config=config,
    )
    return Agent(
        model=bedrock_model,
        tools=all_tools,
        system_prompt=system_prompt
    )


def ai(
        system_prompt: str,
        user_prompt: str,
        read_timeout: int = 300,
        connect_timeout: int = 60,
        max_attempts: int = 5) -> AgentResult:
    agent = get_agent(
        system_prompt=system_prompt,
        read_timeout=read_timeout,
        connect_timeout=connect_timeout,
        max_attempts=max_attempts)

    return agent(user_prompt)

As you can see, the agent is only a few lines of code. The magic is in the prompts and the tools that it uses. The agent can be used to generate weather forecasts, analyze historical weather data, and provide practical recommendations based on the weather conditions. This is the main prompt:

FORECAST_PROMPT = f"""
## Instructions for the weather forecast
Your mission is to analyze weather data and provide accurate and useful forecasts for the next {{days}} days.
You have access to a tool called `get_hourly_weather_data` that allows you to obtain hourly weather data.
As a meteorology expert, you must thoroughly analyze the data and provide accurate and useful forecasts.

Take into account possible extreme heat days, especially in summer.
Remember that extreme heat is considered when maximum and minimum temperatures exceed local temperature thresholds for several consecutive days,
often during a heatwave. These temperatures, along with humidity, can be harmful to health, especially for vulnerable groups.

## Report style
All reports must be written in English.
The report must be clear, concise, and easy to understand.
It should include:
- A summary of current weather conditions.
- A detailed forecast for the coming days, including temperature, precipitation, wind, and any other relevant data.
- Practical recommendations based on the forecast, such as precautions to take or recommended activities.
- Be creative and innovative in your approach, using advanced data visualization techniques to enhance the report.

## Data visualization
The report, in markdown, must be visually appealing and innovative.
You will use tables, lists, and other formatting elements to enhance readability.

### Graph format
- Generate the graph configuration in JSON format, compatible with the Vegalite library.
- Ensure the JSON is valid and compatible with the Vegalite library.
- The graphs must be innovative, leveraging the library's potential. Do not limit yourself to simple bar or line charts. Aim for a wow effect.

- Required JSON structure:
    * title: main title of the graph, at the top of the graph. The title must be brief and descriptive.
    * the title must be in the layout.title.text directive
    * layout.showlegend will be true/false, to show the graph legend. Some graphs do not need a legend, such as simple line charts.
- After each graph, generate a blockquote briefly explaining what the graph shows and its context.

...

For the visualization I’m using MkDocs , a simple static site generator for Markdown files. To have more advanced visualizations, I’m using the Vega-Lite library, which allows you to create interactive and visually appealing charts. The agent generates the JSON configuration for the graphs in a format compatible with Vega-Lite, which can then be rendered in the Markdown reports.

For AI, I’m using Claude 3.5 Sonnet, provided by Amazon Bedrock. For the experiment it’s enough, but if you create a cron job to run the agent every day, you’ll have your 5-day forecasting system ready to go. The project tries to show how to use AI agents to solve real-world problems, and how to integrate them with external data sources and tools. The agent can be extended to include more advanced features, such as integrating with other APIs or using more complex machine learning models for weather prediction.

Full code in my github account.

Building an Agentic AI with Python, LangChain, AWS Bedrock and Claude 4 Sonnet

Today we are going to build an agent with IA. It is just an example of how to build a agent with LangChain and AWS Bedrock and Claude 4 Sonnet. The agent will be a “mathematical expert” capable of performing complex calculations and providing detailed explanations of its reasoning process. The idea is to provide the agent with the ability to perform mathematical operations like addition, subtraction. In fact, with additions and subtractions, we can perform all the mathematical operations, like multiplication, division, exponentiation, square root, etc. The agent will be able to perform these operations step by step, providing a detailed explanation of its reasoning process. I know that we don’t need to use AI to perform these operations, but the idea is to show how to build an agent with LangChain and AWS Bedrock and Claude 4 Sonnet.

The mathematical agent implements the tool-calling pattern, allowing the LLM to dynamically select and execute mathematical operations:

import logging

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate

from core.llm.aws import get_llm, Models
from modules.prompts import AGENT_SYSTEM_PROMPT
from modules.tools import MathTools
from settings import MAX_TOKENS

logger = logging.getLogger(__name__)


def run(question: str, model: Models = Models.CLAUDE_4):
    prompt = ChatPromptTemplate.from_messages([
        ("system", AGENT_SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}")
    ])
    math_tools = MathTools()
    tools = math_tools.get_tools()

    llm = get_llm(model=model, max_tokens=MAX_TOKENS)
    agent = create_tool_calling_agent(llm, tools, prompt)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=10
    )

    response = agent_executor.invoke({
        "input": question
    })

    logger.info(f"Agent response: {response['output']}")

Tools are defined using LangChain’s @tool decorator, providing automatic schema generation and type validation. Really we don’t need to create a class for the tools, but I have done it because I want to add an extra feature to the agent: the ability to keep a history of the operations performed. This will allow the agent to provide a detailed explanation of its reasoning process, showing the steps taken to arrive at the final result.

import logging
from typing import List

from langchain.tools import tool

logger = logging.getLogger(__name__)


class MathTools:

    def __init__(self):
        self.history = []

    def _diff_values(self, a: int, b: int) -> int:
        result = a - b
        self.history.append(f"{a} - {b} = {result}")
        return result

    def _sum_values(self, a: int, b: int) -> int:
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result

    def _get_history(self) -> str:
        if not self.history:
            return "No previous operations"
        return "\n".join(self.history[-5:])  # Last 5

    def get_tools(self) -> List:
        @tool
        def diff_values(a: int, b: int) -> int:
            """Calculates the difference between two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: difference of a - b
            """
            logger.info(f"Calculating difference: {a} - {b}")
            return self._diff_values(a, b)

        @tool
        def sum_values(a: int, b: int) -> int:
            """Sums two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: sum of a + b
            """
            logger.info(f"Calculating sum: {a} + {b}")
            return self._sum_values(a, b)

        @tool
        def get_history() -> str:
            """Gets the operation history
            Returns:
                str: last operations
            """
            logger.info("Retrieving operation history")
            return self._get_history()

        return [diff_values, sum_values, get_history]

The system prompt is carefully crafted to guide the agent’s behavior and establish clear operational boundaries:

AGENT_SYSTEM_PROMPT = """
You are an expert mathematical agent specialized in calculations.

You have access to the following tools:
- diff_values: Calculates the difference between two numbers
- sum_values: Sums two numbers
- get_history: Gets the operation history

Guidelines:
1. Only answer questions related to mathematical operations.
2. For complex operations, use step-by-step calculations:
   - Multiplication: Repeated addition
   - Division: Repeated subtraction
   - Exponentiation: Repeated multiplication
   - Square root: Use methods like Babylonian method or prime factorization.
"""

Now we can invoke our agent by asking questions such as ‘What’s the square root of 16 divided by two, squared?’. The agent will iterate using only the provided tools to obtain the result.

And that’s all. This project demonstrates how to build a production-ready AI agent using LangChain and AWS Bedrock. It’s just a boilerplate, but it can be extended to create more complex agents with additional capabilities and understand how AI agents work.

Full code in my GitHub account.

OAuth2 Authentication in Streamlit Applications with Nginx and OAuth2-Proxy

Normally, when I want to provide authentication to a service, I use OAuth2. There are libraries to integrate this authentication mechanism into a web application, but sometimes we cannot do this easily because it is a third-party service over which we have no control. In these cases, it is possible that this third-party service has support for OAuth2 and can also log in with OAuth2. But sometimes this is not possible, or it is too complicated. In these cases, a solution is to use a proxy that handles the authentication and communicates with the third-party service. In this example, we will use a Streamlit application as if it were a third-party application.

import streamlit as st

st.set_page_config(
    page_title="Home",
    page_icon="👋",
)
st.write("# Welcome to Streamlit! 👋")
st.markdown(
    """
    Streamlit is an open-source app framework built specifically for
    Machine Learning and Data Science projects.
    **👈 Select a demo from the sidebar** to see some examples
    of what Streamlit can do!
    ### Want to learn more?
    - Check out [streamlit.io](https://streamlit.io)
    - Jump into our [documentation](https://docs.streamlit.io)
    - Ask a question in our [community
        forums](https://discuss.streamlit.io)
    ### See more complex demos
    - Use a neural net to [analyze the Udacity Self-driving Car Image
        Dataset](https://github.com/streamlit/demo-self-driving)
    - Explore a [New York City rideshare dataset](https://github.com/streamlit/demo-uber-nyc-pickups)
"""
)

st.sidebar.success("Select a demo above.")

Our Streamlit application has a page.

from random import randint

import streamlit as st

st.set_page_config(
    page_title="Hello",
    page_icon="👋",
)

st.markdown("# Plotting Demo")
st.sidebar.header("Plotting Demo")
st.write("This demo illustrates a combination of plotting with Streamlit. Enjoy!")

data = [dict(name=f"name{i}", value=randint(1, 1000)) for i in range(1, 101)]

progress_bar = st.sidebar.progress(0)
status_text = st.sidebar.empty()
chart = st.line_chart([item['value'] for item in data])

progress_bar.empty()

st.button("Re-run")

To use OAuth authentication in the Streamlit application, we are using Nginx as a reverse proxy with the auth_request directive to direct requests to an OAuth2-proxy service deployed in our stack. OAuth2-proxy can be configured to authenticate any OAuth2 server compatible with OpenID. In my example, I am using GitHub, but you can use ActiveDirectory, Google, Keycloak, or even your own OAuth2 server. This is my Nginx configuration:

This is my Nginx configuration:

upstream app {
    server streamlit:8501;
}

upstream oauth2 {
    server oauth2-proxy:4180;
}

server {
    listen 8000;

    location / {
        auth_request /oauth2/auth;
        error_page 401 = @error401;
        try_files $uri @proxy_to_app;
    }

    location /_stcore/stream {
        auth_request /oauth2/auth;
        error_page 401 = @error401;
        proxy_pass http://app/_stcore/stream;
        proxy_http_version 1.1;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 86400;
    }

    location @error401 {
        return 302 /oauth2/sign_in;
    }

    location /oauth2/ {
        try_files $uri @proxy_to_oauth2;
    }

    location @proxy_to_oauth2 {
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_pass http://oauth2;
    }

    location @proxy_to_app {
        proxy_set_header X-Forwarded-Proto https;
        proxy_set_header X-Url-Scheme $scheme;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_redirect off;
        proxy_pass http://app;
    }
}

The complete stack can be seen in the docker-compose.yml:

version: '3.9'

services:
  streamlit:
    build: .
    environment:
      - ENVIRONMENT=docker
    command: ["streamlit", "run", "st.py", "--server.port=8501", "--server.address=0.0.0.0"]

  nginx:
    build: .docker/nginx
    ports:
      - "8000:8000"

  oauth2-proxy:
    image: quay.io/oauth2-proxy/oauth2-proxy:v7.8.1
    env_file:
      - .env

And that’s all. The advantage of using oauth2-proxy is that we don’t need to do anything within the Streamlit application to have OAuth2 authentication. This greatly simplifies the integration process, as all the authentication logic is handled outside the main application. Additionally, oauth2-proxy is compatible with any OAuth2 server that complies with OpenID, giving us the flexibility to use different authentication providers. By using Nginx as a reverse proxy, we can efficiently redirect and manage authentication requests, ensuring that only authenticated users can access our Streamlit application.

Full code available in my github account.

Real-time Data Visualization with R Shiny and External APIs

Today we will create a simple R Shiny frontend application that fetches real-time data from an external API. First, we have a backend API that provides a simple JSON response protected by a bearer token.

import random
from functools import wraps

from flask import Flask, jsonify, request

app = Flask(__name__)

VALID_TOKEN = "api-secret-token"


def token_required(valid_token=VALID_TOKEN):
    def decorator(f):
        @wraps(f)
        def decorated(*args, **kwargs):
            token = None
            auth_header = request.headers.get('Authorization')
            if auth_header and auth_header.startswith('Bearer '):
                token = auth_header.split(' ')[1]

            if not token or token != valid_token:
                return jsonify({'message': 'Not valid token'}), 401

            return f(*args, **kwargs)

        return decorated

    return decorator


def generate_phone_number():
    area_code = random.randint(200, 999)
    prefix = random.randint(100, 999)
    line = random.randint(1000, 9999)
    return f"+1 {area_code}-{prefix}-{line}"


@app.get('/data')
@token_required(valid_token=VALID_TOKEN)
def get_data():
    return jsonify([
        {"phone": generate_phone_number(), "name": "Juan Pérez", "email": "juan@example.com"},
        {"phone": generate_phone_number(), "name": "María García", "email": "maria@example.com"},
        {"phone": generate_phone_number(), "name": "Carlos López", "email": "carlos@example.com"},
        {"phone": generate_phone_number(), "name": "Ana Martínez", "email": "ana@example.com"},
        {"phone": generate_phone_number(), "name": "Pablo Sánchez", "email": "pablo@example.com"},
        {"phone": generate_phone_number(), "name": "Laura Rodríguez", "email": "laura@example.com"},
        {"phone": generate_phone_number(), "name": "Diego Fernández", "email": "diego@example.com"},
        {"phone": generate_phone_number(), "name": "Carmen Gómez", "email": "carmen@example.com"},
        {"phone": generate_phone_number(), "name": "Javier Díaz", "email": "javier@example.com"},
        {"phone": generate_phone_number(), "name": "Sofía Ruiz", "email": "sofia@example.com"},
        {"phone": generate_phone_number(), "name": "Miguel Álvarez", "email": "miguel@example.com"},
        {"phone": generate_phone_number(), "name": "Lucía Jiménez", "email": "lucia@example.com"},
        {"phone": generate_phone_number(), "name": "Alejandro Moreno", "email": "alejandro@example.com"},
        {"phone": generate_phone_number(), "name": "Elena Muñoz", "email": "elena@example.com"},
        {"phone": generate_phone_number(), "name": "David Alonso", "email": "david@example.com"},
        {"phone": generate_phone_number(), "name": "Natalia Torres", "email": "natalia@example.com"},
        {"phone": generate_phone_number(), "name": "Roberto Gutiérrez", "email": "roberto@example.com"},
        {"phone": generate_phone_number(), "name": "Cristina Navarro", "email": "cristina@example.com"},
        {"phone": generate_phone_number(), "name": "Antonio Ramos", "email": "antonio@example.com"},
        {"phone": generate_phone_number(), "name": "Isabel Ortega", "email": "isabel@example.com"}
    ])

The frontend application is a simple R Shiny app that fetches data from the API and displays it in a table. The app also includes a button to refresh the data.

We’ll use renv to manage the R package dependencies. To set up the environment, run the following commands in your R console:

install.packages("renv")
renv::init()

Then, install the required packages:

install.packages("shiny")
install.packages("readxl")
install.packages("dplyr")
install.packages("qcc")
install.packages("ggplot2")
install.packages("shinyWidgets")
install.packages("dotenv")
install.packages("DT")
install.packages("httr")
install.packages("jsonlite")

renv::snapshot()

That’s the main.R file.

library(shiny)

source("api_client.R")

args <- commandArgs(trailingOnly = TRUE)
port <- if (length(args) >= 1) as.numeric(args[1]) else 3838
host <- if (length(args) >= 2) args[2] else "0.0.0.0"
launch_browser <- if (length(args) >= 3) as.logical(args[3]) else TRUE

runApp("app.R", port = port, host = host, launch.browser = launch_browser)

And the shiny app is in the app.R file.

library(shiny)

library(ggplot2)
library(dplyr)
library(shinyWidgets)
library(dotenv)
library(DT)

df <- data.frame()
load_dot_env()

config <- list(
  api_url = Sys.getenv("API_URL"),
  api_token = Sys.getenv("API_KEY")
)

ui <- fluidPage(
  titlePanel("R api call example"),

  fluidRow(
    column(12,
           actionButton("refresh", "Refresh data", icon = icon("refresh"), class = "btn-primary")
    )
  ),

  mainPanel(
    DTOutput("table")
  )
)

server <- function(input, output, session) {
  data <- reactiveVal(df)

  refreshData(data, config$api_url, config$api_token)

  observeEvent(input$refrescar, {
    refreshData(data, config$api_url, config$api_token)
  })

  output$table <- renderDT({
    data()
  })
}

shinyApp(ui = ui, server = server)

The api_client.R file contains the function to fetch data from the API.

get_data <- function(uri, token = NULL) {
  library(httr)
  library(jsonlite)

  showNotification("Updating data...", type = "message")

  headers <- c(`Content-Type` = "application/json", `Accept` = "application/json")
  if (!is.null(token)) {
    headers <- c(headers, Authorization = paste("Bearer", token))
  }

  response <- GET(url = uri, add_headers(.headers = headers))

  if (http_error(response)) {
    stop(sprintf("Error en la petición: %s", status_code(response)))
  }

  content_text <- content(response, "text", encoding = "UTF-8")
  df <- fromJSON(content_text, flatten = TRUE)

  if (is.list(df) && "data" %in% names(df)) {
    df <- df$data
  }

  if (!is.data.frame(df)) {
    df <- as.data.frame(df)
  }

  return(df)
}

We’re also using dotenv to manage environment variables. Create a .env file in the root of your project with the following content:

API_URL=http://localhost:5000/data
API_KEY=api-secret-token

Full code in my github account.

Implementing Industrial OPC UA Communication with Python and Asyncio

Today we’re going to work with an industrial protocol called OPC UA. We’ll be using the opcua-asyncio library to create a simple OPC UA server and client. We’ll also be using the `asyncio` library to handle the asynchronous communication between the server and the client. The idea es build a OPC UA server that exposes a variable and a client that reads and writes to that variable.

To simulate a changing variable, I’ve created a simple script that changes one variable every second with the value of the current time and persists it to a Redis database.

import logging
import time

import redis

from settings import REDIS_HOST, REDIS_PORT

logger = logging.getLogger(__name__)


def update_redis_variable_loop():
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    while True:
        timestamp_ms = int(time.time() * 1_000)
        r.set('ts', timestamp_ms)
        logger.info(f"Updated variable: {timestamp_ms}")
        time.sleep(1)

The server will have an authentication mechanism using a username and password, and it will also have a self-signed certificate and a private key to encrypt the communication. To generate the self-signed certificate and private key, you can use the following commands:

openssl genpkey -algorithm RSA -out private_key.pem
openssl req -new -key private_key.pem -out certificate.csr
openssl x509 -req -days 365 -in certificate.csr -signkey private_key.pem -out certificate.pem

This OPC UA server will expose the variable that we’re updating in the Redis database.

class UserManager:
    def get_user(self, iserver, username=None, password=None, certificate=None):
        if certificate and OPC_USERS_DB.get(username, False) == password:
            logger.info(f"User '{username}' authenticated")
            return User(role=UserRole.User)
        return None


async def main():
    server = Server(user_manager=UserManager())
    await server.init()
    server.set_endpoint(OPC_ENDPOINT)

    await server.load_certificate(OPC_CERTIFICATE)
    await server.load_private_key(OPC_PRIVATE_KEY)
    server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])

    namespace_idx = await server.register_namespace(OPC_NAMESPACE)
    obj = await server.nodes.objects.add_object(namespace_idx, "Gonzalo")
    var = await obj.add_variable(namespace_idx, "T", 0, datatype=ua.VariantType.Int32)
    await var.set_writable(False)

    redis_client = await redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    logger.info(f"Starting server on {OPC_ENDPOINT}")

    async with server:
        while True:
            await asyncio.sleep(1)
            value = await redis_client.get('ts')
            if value is not None:
                value = int(value)
                logger.info(f"Set value of {var} to {value}")
                await var.write_value(value)


def server(debug: bool = False):
    asyncio.run(main(), debug=debug)

And now we create a OPC UA client that reads the variable from the server and prints it to the console.

import asyncio
import logging

from asyncua import Client

from settings import OPC_ENDPOINT, OPC_CERTIFICATE, OPC_PRIVATE_KEY, OPC_USERNAME, OPC_PASSWORD

logger = logging.getLogger(__name__)


async def main():
    c = Client(url=OPC_ENDPOINT)
    c.set_user(OPC_USERNAME)
    c.set_password(OPC_PASSWORD)
    await c.set_security_string(f"Basic256Sha256,SignAndEncrypt,{OPC_CERTIFICATE},{OPC_PRIVATE_KEY}")

    async with c:
        node = c.get_node("ns=2;i=2")
        value = await node.read_value()
        logger.info(f"Value: {value}")


def client(debug: bool = False):
    asyncio.run(main(), debug=debug)

In our example we are using click to create a CLI interface to run the server and the client.

# Start Redis server
docker-compose up

# Start the process that updates the variable in Redis
python cli.py backend

# Run the server
python cli.py server

# Run the client
python cli.py client

Full code available in my github account

Creating a standalone WebSocket Server with FastApi and JWT Authentication in Python

In this post, I will show you how to create a WebSocket server in Python that uses JWT tokens for authentication. The server is designed to be independent of the main process, making it easy to integrate into existing applications. The client-side JavaScript will handle reconnections incrementally.

The WebSocket server will be created using FastApi, the web framework built on top of Starlette. This is the entrypoint.
import logging

from fastapi import FastAPI

from asgi_ws import setup_app

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)
SECRET_KEY = "your_secret_key"

app = FastAPI()

app = setup_app(
    app=app,
    base_path='/ws',
    jwt_secret_key=SECRET_KEY,
)
The `setup_app` function is defined in the `lib.websockets` module. This function will set up the WebSocket server and the necessary routes.
def setup_app(app, jwt_secret_key: str, base_path='/ws', jwt_algorithm: str = "HS256"):
    ws_router = get_ws_router(
        jwt_secret_key=jwt_secret_key,
        jwt_algorithm=jwt_algorithm,
        base_path=base_path
    )
    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"

    app.mount("/js", StaticFiles(directory=static_dir), name="js")
    app.include_router(ws_router)

    return app
The `get_ws_router` function is defined in the same module. This function will create the WebSocket router and the necessary routes.
def get_ws_router(jwt_secret_key: str, base_path='ws', jwt_algorithm: str = "HS256"):
    ws_router = APIRouter()

    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"
    ws_router.mount(f"{base_path}/js", StaticFiles(directory=static_dir), name="js")

    manager = ConnectionManager(jwt_secret_key=jwt_secret_key, jwt_algorithm=jwt_algorithm)

    @ws_router.post(f"{base_path}/emmit")
    async def emmit_endpoint(request: Request):
        payload = await request.json()
        await manager.broadcast(payload["channel"], payload["payload"])
        return True

    @ws_router.websocket(f"{base_path}/")
    async def websocket_endpoint(websocket: WebSocket):
        token = websocket.query_params.get("token")
        if not token:
            await websocket.close(code=1008)
            raise HTTPException(status_code=401, detail="Token required")

        await manager.connect(websocket, token)
        try:
            while True:
                message: Message = await websocket.receive()
                if message["type"] == "websocket.disconnect":
                    manager.disconnect(websocket)
                    break
        except WebSocketDisconnect:
            manager.disconnect(websocket)

    return ws_router
Websockets are bidirectional communication channels that allow real-time data transfer between clients and servers, but I prefer to avoid the communication from the client to the server. When a client wants to send a message to the server, it will send an HTTP POST request to the `/emit` endpoint (via the main process). The server will then broadcast the message to all connected clients. The client will only receive messages from the server. Because of that we need a main wsgi process using FastApi or another web framework to handle the HTTP requests. 

This an example with FastApi:
<!DOCTYPE html>
<html>
<head>
    <title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>

<input type="text" id="messageText" autocomplete="off"/>
<button onclick="sendMessage()">Send</button>

<ul id='messages'>
</ul>
//localhost:8000/js/websockets.js
<script>
    async function sendMessage() {
        const channel = 'chat';
        const url = `/emit/${channel}`;
        const input = document.getElementById("messageText");
        const message = input.value;
        input.value = '';
        const body = JSON.stringify({channel: 'chat1', payload: message});
        const headers = {'Content-Type': 'application/json'};

        try {
            const response = await fetch(url, {method: 'POST', headers: headers, body: body});
        } catch (error) {
            console.error('Error:', error);
        }
    }

    (async function () {
        const getToken = async () => {
            const response = await fetch('/token');
            const {token} = await response.json();
            return token;
        };

        const messageCallback = (event) => {
            const messages = document.getElementById('messages');
            const message = document.createElement('li');
            message.textContent = event.data;
            messages.appendChild(message);
        };

        const wsManager = new WebSocketManager('ws://localhost:8000/ws/', getToken, messageCallback);
        await wsManager.connect();
    })();

</script>
</body>
</html>

Library is available at pypi

poetry add asgi_ws
pip install asgi_ws

Full code available in my github account.

Creating a Real-Time Flask Application with Flask-SocketIO and Redis

Today, we’re going to create a simple Flask application with real-time communication using websockets and the SocketIO library. We’ll leverage the Flask-SocketIO extension for integration.

Here’s the plan: while websockets support bidirectional communication, we’ll use them exclusively for server-to-client messages. For client-to-server interactions, we’ll stick with traditional HTTP communication.

Our application will include session-based authentication. To simulate login, we’ve created a route called /login that establishes a session. This session-based authentication will also apply to our websocket connections.

A key objective of this tutorial is to enable sending websocket messages from outside the web application. For instance, you might want to send messages from a cron job or an external service. To achieve this, we’ll use a message queue to facilitate communication between the SocketIO server and the client application. We’ll utilize Redis as our message queue.

That’s the main application

from flask import Flask, render_template, session, request

from lib.ws import register_ws, emit_event, EmitWebsocketRequest
from settings import REDIS_HOST, WS_PATH

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

register_ws(app=app, socketio_path=WS_PATH, redis_host=REDIS_HOST)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/login')
def login():
    session['user'] = 'Gonzalo'
    return dict(name=session['user'])


@app.post('/api/')
def api():
    data = EmitWebsocketRequest(**request.json)
    emit_event(data.channel, data.body)

    return dict(status=True)

That’s the html template

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Flask-SocketIO Websocket Example</title>
    //cdn.socket.io/4.0.0/socket.io.min.js
</head>
<body>

<h1>Flask-SocketIO Websocket Example</h1>
<label for="message">Message:</label>
<input type="text" id="message" placeholder="type a message...">

<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>

<script>
    document.addEventListener("DOMContentLoaded", function () {
        let host = location.protocol + '//' + location.hostname + ':' + location.port
        let socket = io.connect(host, {
            path: '/ws/socket.io',
            reconnection: true,
            reconnectionDelayMax: 5000,
            reconnectionDelay: 1000
        });

        socket.on('connect', function () {
            console.log('Connected to ws');
        });

        socket.on('disconnect', function () {
            console.log('Disconnected from ws');
        });

        socket.on('message', function (msg) {
            let messages = document.getElementById('messages');
            let messageItem = document.createElement('li');
            messageItem.textContent = msg;
            messages.appendChild(messageItem);
        });

        window.sendMessage = async function () {
            const url = '/api/';
            const payload = {"channel": "message", "body": this.message.value};

            try {
                const response = await fetch(url, {
                    method: 'POST',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify(payload)
                });

                if (!response.ok) {
                    console.error('Error: ' + response.statusText);
                }

                await response.json();
            } catch (error) {
                console.error('Error:', error);
            }
        };
    });
</script>
</body>
</html>

The register_ws function binds SocketIO to our Flask server. To enable sending messages from outside our Flask application, we need to instantiate SocketIO in two different ways. For this purpose, I’ve created a ws.py file. Note: I’m using Pydantic to validate the HTTP requests.

import logging
from typing import Dict, Any, Union

from flask import session
from flask_socketio import SocketIO
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class Conf:
    def __init__(self, socketio=None):
        self._socketio = socketio

    @property
    def socketio(self):
        return self._socketio

    @socketio.setter
    def socketio(self, value):
        self._socketio = value


conf = Conf()


def emit_event(channel, body):
    conf.socketio.emit(channel, body)


class EmitWebsocketRequest(BaseModel):
    channel: str
    body: Union[Dict[str, Any], str]


def setup_ws(redis_host, redis_port=6379):
    conf.socketio = SocketIO(message_queue=f'redis://{redis_host}:{redis_port}')


def register_ws(
        app,
        redis_host,
        socketio_path='/ws/socket.io',
        redis_port=6379
):
    redis_url = f'redis://{redis_host}:{redis_port}' if redis_host else None
    conf.socketio = SocketIO(app, path=socketio_path, message_queue=redis_url)

    @conf.socketio.on('connect')
    def handle_connect():
        if not session.get("user"):
            raise ConnectionRefusedError('unauthorized!')
        logger.debug(f'Client connected: {session["user"]}')

    @conf.socketio.on('disconnect')
    def handle_disconnect():
        logger.debug('Client disconnected')

    return conf.socketio

Now, we can emit an event from outside the Flask application.

from lib.ws import emit_event, setup_ws
from settings import REDIS_HOST

setup_ws(redis_host=REDIS_HOST)
emit_event('message', 'Hi')

The application needs a Redis server. I set up the server using docker.

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Source code available in my github.

Python Flask and OAuth2: Building a Secure Authentication System

I typically use Flask for APIs and Django for web applications that utilize sessions and OAuth authentication. However, do I truly need Django for these functionalities? The answer is no. While Django provides pre-built components, similar capabilities are also accessible in Flask, and implementing them is quite straightforward. Additionally, I am a strong advocate of microframeworks. Today, we’ll demonstrate how to employ OAuth2 authentication using Flask. Let’s begin.

OAuth2 encompasses various flows, but today, we’ll focus on the most common one for web applications. The concept involves checking for a valid session. If one exists, great, but if not, the application will generate a session with a state (a randomly generated string) and then redirect to the OAuth2 server login page. Subsequently, the user will perform the login on the login server. Following that, the OAuth2 server will redirect to a validated callback URL with an authorization code (while also returning the provided state). The callback URL will then verify whether the state provided by the OAuth2 server matches the one in the session. Next, the callback route on your server, utilizing the authorization code, will obtain an access token (via a POST request to the OAuth2 server). With this access token, you can retrieve user information from the OAuth2 server and establish a valid session.

First we create a Flask application with sessions

from flask import Flask
from flask_session import Session

from settings import SECRET, SESSION


app = Flask(__name__)
app.secret_key = SECRET
app.config.update(SESSION)
Session(app)

Session configuration:

SESSION = dict(
    SESSION_PERMANENT=False,
    SESSION_TYPE="filesystem",
    SESSION_COOKIE_SECURE=True,
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE='Lax',
    SESSION_COOKIE_DOMAIN=False,
)

I like to use blueprints to manage the Flask, so let’s add our application:

from modules.home.app import blueprint as home

...
app.register_blueprint(home, url_prefix=f'/')

I set up the blueprint in a init.py file

from pathlib import Path

from flask import Blueprint

from lib.oauth import check_session

base = Path(__file__).resolve().parent
blueprint = Blueprint(
    'front_home', __name__,
    template_folder=base.joinpath('templates')
)


@blueprint.before_request
def auth():
    return check_session()

You can see that we’re using a before_request middleware to check the session in every route of the blueprint.

def check_session():
    if not session.get("user"):
        state = secrets.token_urlsafe(32)
        session['state'] = state
        authorize = OAUTH['AUTHORIZE_URL']
        query_string = urlencode({
            'scope': OAUTH.get('SCOPE', 'read write'),
            'prompt': OAUTH.get('PROMPT', 'login'),
            'approval_prompt': OAUTH.get('APPROVAL_PROMPT', 'auto'),
            'state': state,
            'response_type': OAUTH.get('RESPONSE_TYPE', 'code'),
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID']
        })
        return redirect(f"{authorize}?{query_string}")

And the routes of the blueprint:

from flask import render_template, session

from modules.home import blueprint


@blueprint.get(f"/")
def home():
    username = session['user']['username']
    return render_template('index.html',
                           username=username)

To do the login we need also to code our callback route. We will add a blueprint for that.

from lib.oauth import blueprint as oauth

...
app.register_blueprint(oauth)

That’s the OAuth2 callback:

import logging

import requests
from flask import Blueprint, abort
from flask import request, session, redirect

from settings import OAUTH

logger = logging.getLogger(__name__)

blueprint = Blueprint('oauth', __name__, url_prefix=f'/oauth')


@blueprint.get('/callback')
def callback():
    # Obtain the state from the request
    state = request.args.get('state')
    if 'state' not in session:
        return redirect(f"/")
    # Check if provided state match wht the session saved one
    if state == session['state']:
        # Obtain the authorization code from the request
        authorization_code = request.args.get('code')
        token_data = {
            'grant_type': OAUTH.get('GRANT_TYPE', 'authorization_code'),
            'code': authorization_code,
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID'],
            'client_secret': OAUTH['CLIENT_SECRET']
        }
        # POST to OAuth2 server to obtain the access_token
        response = requests.post(OAUTH['TOKEN_URL'],
                                 data=token_data,
                                 headers={'Accept': 'application/json'})
        response_data = response.json()
        headers = {
            "Authorization": f"Bearer {response_data.get('access_token')}",
            'Accept': 'application/json'
        }
        # With the access_token you can obtain the user information
        user_response = requests.get(OAUTH['USER_URL'],
                                     data=token_data,
                                     headers=headers)
        if user_response.ok:
            # Now you are able to create the session 
            user_data = user_response.json()
            session['user'] = dict(
                username=user_data['login'],
                name=user_data['name'],
                email=user_data['email']
            )
            session.pop('state', default=None)
        else:
            abort(401)
        return redirect(f"/")
    else:
        abort(401)

Mainly that’s all. In this example we’re using Github’s OAuth2 server. You can use different ones, and also with your own OAuth2 server. Maybe, depending on the server, they way to obtain the user_data, can be different, and you should adapt it to your needs.

In my example I’m saving my OAuth2 credentials in a .env file. With this technique I can use different configurations depending on my environment (production, staging, …)

CLIENT_ID=my_client_id
CLIENT_SECRET=my_client_secret
TOKEN_URL=https://github.com/login/oauth/access_token
AUTHORIZE_URL=https://github.com/login/oauth/authorize
USER_URL=https://api.github.com/user
REDIRECT_URL=http://localhost:5000/oauth/callback

And I load this conf in my settings.py

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

OAUTH = dict(
    CLIENT_ID=os.getenv('CLIENT_ID'),
    CLIENT_SECRET=os.getenv('CLIENT_SECRET'),
    TOKEN_URL=os.getenv('TOKEN_URL'),
    AUTHORIZE_URL=os.getenv('AUTHORIZE_URL'),
    USER_URL=os.getenv('USER_URL'),
    REDIRECT_URL=os.getenv('REDIRECT_URL'),
)

And that’s all. Full code in my github account.

Domain Events in Legacy Applications using Python and PostgreSQL

Sometimes we need to generate domain events in our application. It can be simple when you start an application from scratch, but it can be a nightmare when you have a legacy application. Today we're going to explain how to generate domain events from PostgreSQL. We can set up triggers within our database tables to generate domain events. In those triggers, we can use pg_notify to emit events and after that, we can use a listener to consume those events. This approach works, but we need to set up those triggers in every table that we want to generate events from. Today we're going to use logical replication to generate domain events. With this approach, we can generate events from all tables in our database without the need to set up triggers in every table.

First of all, we need to create a publication. A publication is a set of tables that we want to replicate. We can create a publication with the following SQL command:
CREATE PUBLICATION pub1 FOR ALL TABLES;
SELECT pg_create_logical_replication_slot('slot1', 'pgoutput');
When we create our replication slot, we can choose between different plugins. In this case, we're going to use pgoutput. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. pgoutput is a plugin that is included in PostgreSQL by default that sends the information in bytea format. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. We're going to create the subscription in Python. We can do a simple subscription with the following code:
import psycopg2.extras

from settings import DSN


conn = psycopg2.connect(DSN, connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()

cur.start_replication(
    slot_name='slot1', 
    decode=False,
    options={'proto_version': '1', 'publication_names': 'pub1'})


def consume(msg):
    payload = msg.payload
    print(payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)


cur.consume_stream(consume)
With this simple script, we're listening to all changes in our database. We can use this script to generate domain events in our application. We need to decode the payload to get the changes in our tables. There is a library to decode the payload called pypgoutput. I have had problems with this library, so I have used only one part of the library to decode the payload (decoders.py).

The main script is like this:
import logging

from lib.consumer import Consumer
from lib.models import Types, Event
from settings import DSN, PUBLICATION_NAME, SLOT_NAME

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(event: Event):
    logger.info(
        f"{event.ts} id:{event.tx_id} [{event.type}] "
        f"{event.schema_name}.{event.table_name} with values {event.values}")


consumer = Consumer(DSN)
consumer.on(Types.UPDATE, 'public.*', callback)

consumer.start(
    slot_name=SLOT_NAME,
    publication_name=PUBLICATION_NAME)
For my example, I am using a database with the following schema:
CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);
It is important to activate the logical replication in the database. We can do it with the following command:
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
ALTER SYSTEM SET max_worker_processes = 10;
And I am registering an event on every update of the actors table with the callback function. The callback function is called with the event that contains the type of the event, the schema name, the table name, and the values of the row that has been updated. In callback function we can do wathever we want with the event. In this case, I am just logging the event, but maybe you can emit this event to a message broker such as Kafka, RabbitMQ or a MQTT broker.

That is the main notification part of the script. The other part is the conversion of the values of the row. The values are in bytea format, so we need to convert them to Python types. The conversion is done with the following function:
def convert_value(oid, value):
    if value is None:
        return None
    python_type = OID_MAP.get(oid, str)
    try:
        if python_type == bool:
            return value == 't'
        elif python_type == datetime:
            return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
        elif python_type == date:
            return datetime.strptime(value, '%Y-%m-%d').date()
        elif python_type == dict:
            import json
            return json.loads(value)
        elif python_type == uuid.UUID:
            return uuid.UUID(value)
        else:
            return python_type(value)
    except Exception as e:
        logger.error(f"Error converting {value} with OID {oid}: {e}")
        return value


def get_event(message_type, rel, tx, payload) -> Event | None:
    current_type = Types(message_type)
    decoder_map = {
        Types.INSERT: decoders.Insert,
        Types.UPDATE: decoders.Update,
        Types.DELETE: decoders.Delete,
        Types.TRUNCATE: decoders.Truncate
    }
    data = decoder_map.get(current_type, lambda x: None)(payload)

    if data:
        if current_type == Types.TRUNCATE:
            fields = []
        else:
            fields = get_fields(rel, getattr(data, 'old_tuple', None), getattr(data, 'new_tuple', None))

        event = Event(
            type=current_type,
            tx_id=tx.tx_id,
            ts=tx.commit_ts,
            schema_name=rel.namespace,
            table_name=rel.relation_name,
            values=fields
        )
        return event
    return None


def get_fields(rel, old, new):
    fields = [
        Field(
            name=c.name,
            old=convert_value(c.type_id, old.column_data[i].col_data) if old else None,
            new=convert_value(c.type_id, new.column_data[i].col_data) if new else None,
            pkey=c.part_of_pkey == 1
        )
        for i, c in enumerate(rel.columns)
    ]
    return fields
When a client is connected we can see it using a simple query:
SELECT * FROM pg_stat_replication;
And also we can see the replication slots with the following query:
SELECT
    pg_current_wal_lsn() AS current_lsn,
    slot_name,
    restart_lsn,
    confirmed_flush_lsn
FROM
    pg_replication_slots
WHERE
    slot_type = 'logical';
The script is just an experiment. Maybe it can be adapted to a real application, but probably it needs more work, especially in data type conversion.

In conclusion, using logical replication to generate domain events from PostgreSQL offers a powerful and flexible approach, especially for legacy systems or applications where modifying the existing database structure is challenging. This method allows us to capture changes across all tables without the need for individual triggers, potentially simplifying event sourcing and change data capture processes. However, it's important to note that this approach comes with its own set of considerations, such as performance impact on the database, handling of large volumes of events, and ensuring data consistency. As with any experimental technique, thorough testing and careful consideration of your specific use case are crucial before implementing this in a production environment. Despite these challenges, the potential for creating a robust, database-driven event system makes this an exciting area for further exploration and development.

Full source code in my github account

Transforming Natural Language to SQL Queries with Python and LangChain

LLMs are highly proficient at generating code, including SQL queries from natural language text. Today, we’re going to experiment with this capability to see how effectively we can transform natural language instructions into SQL queries. The idea is to leverage the power of natural language processing to simplify the process of writing complex SQL statements. For this experiment, I’ve downloaded a CSV file containing data from IMDB, which includes various attributes related to movies, such as titles, release years, genres, and ratings. By using this dataset, we can test the LLM’s ability to generate accurate and efficient SQL queries based on different natural language prompts. Here’s an example of what the data looks like:

nconst,primaryname,birthyear,deathyear,primaryprofession,knownfortitles
nm0325022,Käthe Gold,1907,1997,"actress,archive_footage","tt0026069,tt0032498,tt0436641,tt0026066"
nm0325025,Lee Gold,1919,1985,writer,"tt0034433,tt0040392,tt0048226,tt0099219"
nm0325028,Louise Gold,1956,,"actress,miscellaneous,soundtrack","tt0074028,tt0104940,tt0083791,tt2281587"
...

Now, we will create a PostgreSQL database using Docker. Docker allows us to quickly set up and manage containerized applications, making it an ideal tool for this purpose. Below is the Dockerfile we will use to set up our PostgreSQL database:

FROM postgres:16.3-alpine
COPY actors.csv /docker-entrypoint-initdb.d/actors.csv
COPY init.sql /docker-entrypoint-initdb.d/

Next, we will set up the database and import the CSV data into an ‘actors’ table using the Docker entrypoint. Below is how we configure the Docker entrypoint script to initialize the PostgreSQL database and import the CSV data:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

COPY actors FROM '/docker-entrypoint-initdb.d/actors.csv' CSV HEADER;

That’s the docker-compose file to set up the PostgreSQL database

version: '3.6'

services:
  pg:
    build:
      context: .docker/pg
      dockerfile: Dockerfile
    ports:
      - 5432:5432
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

Now we can start with the python script. We’re going to use cick library to build cli scrpt. The python application interacts with a database to execute SQL queries generated from user input. The process begins with obtaining a MovieChain object through the get_chain function, which takes an argument llm. This MovieChain object is then used to generate an SQL query based on the user’s input q through its get_sql method. After that we just execute the SQL query into the PostgreSQL and print the results.

import click
from dbutils import get_conn, Db, get_cursor
from lib.chains.movie import get_chain
from lib.llm.groq import llm
from settings import DSN


@click.command()
@click.option('--q', required=True, help='question to ask')
def run(q):
    chain = get_chain(llm)
    sql = chain.get_sql(q)
    click.echo(f"q: {q}")
    click.echo(sql)
    click.echo('')
    if sql:
        conn = get_conn(DSN, named=True, autocommit=True)
        db = Db(get_cursor(conn=conn))
        data = db.fetch_all(sql)
        for row in data:
            print(row)

The MovieChain class interacts with an LLM (in this example, we’re using Groq).

import logging
from langchain_core.messages import SystemMessage, HumanMessage

from .prompts import PROMPT

logger = logging.getLogger(__name__)


class MovieChain:

    def __init__(self, llm):
        self.llm = llm

        self.prompt = SystemMessage(content=PROMPT)

    def get_sql(self, q: str):
        user_message = HumanMessage(content=q)
        try:
            ai_msg = self.llm.invoke([self.prompt, user_message])
            output_message = ai_msg.content if not isinstance(ai_msg, str) else ai_msg

            return output_message
        except Exception as e:
            logger.error(f"Error during question processing: {e}")

The Chain uses two prompts: the system prompt that creates the proper context to assist the LLM in generating the SQL query. We’re providing the create table script.

PROMPT = """
You are an expert in generating SQL queries based on user questions.
You have access to a database with the following table schema:

CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);

Please generate an SQL query to answer the following user question.
Ensure the query is valid, secure, and tailored to the provided schema.
Return only the SQL query without additional explanations.
Don't use quotes around the query in any case.
"""

And that’s all. With it we can ask quetions about this dataset and llm genetes the SQL for us.

python cli.py movie --q="List the living actors under 10 years old."

q: List the living actors under 10 years old.
SELECT * FROM actors WHERE deathyear IS NULL AND birthyear > (EXTRACT(YEAR FROM CURRENT_DATE) - 10);
...
python cli.py movie --q="List the living actors who were born in the same year as Mel Gibson."

q: List the living actors who were born in the same year as Mel Gibson
SELECT * FROM actors WHERE birthyear = (SELECT birthyear FROM actors WHERE primaryname = 'Mel Gibson') AND deathyear IS NULL;
...
cli.py movie --q="List the deceased actors who were born in the same year as Mel Gibson."

q: List the deceased actors who were born in the same year as Mel Gibson.
SELECT * 
FROM actors 
WHERE deathyear IS NOT NULL 
AND birthyear = (SELECT birthyear 
                 FROM actors 
                 WHERE primaryname = 'Mel Gibson');
...
python cli.py movie --q="What is the name, date of birth, and age of the oldest living actor born in the 70s?"

q: What is the name, date of birth, and age of the oldest living actor born in the 70s?
SELECT primaryname, birthyear, (2023 - birthyear) AS age 
FROM actors 
WHERE birthyear >= 1970 AND birthyear < 1980 AND deathyear IS NULL 
ORDER BY birthyear ASC 
LIMIT 1;

{'primaryname': 'Missy Gold', 'birthyear': 1970, 'age': 53}

With projects like these, where we execute “random” SQL generated by an LLM, it’s crucial to manage user access to the database carefully. Restricting access helps mitigate potential SQL injection risks, especially depending on the prompts provided by the user when interacting with the LLM.

Full source code in my github account.