What if the bug fixed itself? Letting AI agents detect bugs, fix the code, and create PRs proactively.

What if an AI could not only identify errors in your logs but actually fix them and create a pull request? I have done this experiment to do exactly that.

We can put or application logs in CloudWatch and use AI agents with a worker-coordinator pattern (I’ll share a post explaining this). Today the idea is going one step further. We will detecte errors in our logs, and for certain types of fixable errors, we will let an AI agent fix the code and create a pull request automatically.

The core of the system is a tool decorated with @tool from Strands Agents. This makes it available to any AI agent that needs to trigger a fix:

from strands import tool

@tool
async def register_error_for_fix(error: LogEntry) -> bool:
    """
    Register an error for automatic fixing.
    Clones repo, creates fix branch, uses Claude to fix, creates PR.
    """
    repo = _setup_repo()

    branch_name = _create_fix_branch(repo, error)
    if branch_name is None:
        return True  # Branch already exists, skip

    claude_response = await _invoke_claude_fix(error.message)
    if claude_response is None:
        return False

    pr_info = pr_title_generator(claude_response)
    _commit_and_push(repo, branch_name, pr_info)
    _create_pull_request(branch_name, pr_info)

    return True

Step by Step Implementation

1. Repository Setup with GitPython

The tool first clones the repo or pulls the latest changes:

from git import Repo

def _setup_repo() -> Repo:
    repo_url = f"https://x-access-token:{GITHUB_TOKEN}@github.com/{GITHUB_REPO}.git"

    if (WORK_DIR / ".git").exists():
        repo = Repo(WORK_DIR)
        repo.git.pull(repo_url)
    else:
        repo = Repo.clone_from(repo_url, WORK_DIR)

    return repo

2. Branch Creation with Deduplication

Each fix gets its own branch with a timestamp. If the branch already exists remotely, we skip it to avoid duplicate PRs:

def _create_fix_branch(repo: Repo, error: LogEntry) -> str | None:
    branch_name = f"autofix/{error.fix_short_name}_{error.timestamp.strftime('%Y%m%d-%H%M%S')}"

    remote_refs = [ref.name for ref in repo.remote().refs]
    if f"origin/{branch_name}" in remote_refs:
        logger.info(f"Branch {branch_name} already exists, skipping")
        return None

    new_branch = repo.create_head(branch_name)
    new_branch.checkout()
    return branch_name

3. The Magic: Claude Code SDK

This is where the actual fix happens. Claude Code SDK allows Claude to read and edit files in the codebase:

from claude_code_sdk import ClaudeCodeOptions, query

async def _invoke_claude_fix(error_message: str) -> str | None:
    prompt = f"Fix this error in the codebase: {error_message}"

    options = ClaudeCodeOptions(
        cwd=str(WORK_DIR),
        allowed_tools=["Read", "Edit"]  # Safe: no Write, no Bash
    )

    response = None
    async for response in query(prompt=prompt, options=options):
        logger.info(f"Claude response: {response}")

    return response.result if response else None

Note that we only allow Read and Edit tools – no Write (creating new files) or Bash (running commands). This keeps the fixes focused and safe.

4. PR Title Generation with Claude Haiku

For fast and cheap PR title generation, I use Claude Haiku with structured output:

from pydantic import BaseModel, Field

class PrTitleModel(BaseModel):
    pr_title: str = Field(..., description="Concise PR title")
    pr_description: str = Field(..., description="Detailed PR description")

def pr_title_generator(response: str) -> PrTitleModel:
    agent = create_agent(
        system_prompt=PR_PROMPT,
        model=Models.CLAUDE_45_HAIKU,
        tools=[]
    )

    result = agent(
        prompt=f"This is response from claude code: {response}\n\n"
               f"Generate a concise title for a GitHub pull request.",
        structured_output_model=PrTitleModel
    )

    return result.structured_output

The prompt enforces Conventional Commits style:

PR_PROMPT = """
You are an assistant expert in generating pull request titles for GitHub.
OBJECTIVE:
- Generate concise and descriptive titles for pull requests.
- IMPORTANT: Use Conventional Commits as a style reference.
CRITERIA:
- The title must summarize the main changes or fixes.
- Keep the title under 10 words.

5. Commit, Push, and Create PR

Finally, we commit everything, push to the remote, and create the PR via GitHub API:

def _commit_and_push(repo: Repo, branch_name: str, pr_info: PrTitleModel) -> None:
    repo.git.add(A=True)
    repo.index.commit(pr_info.pr_title)
    repo.git.push(get_authenticated_repo_url(), branch_name)

def _create_pull_request(branch_name: str, pr_info: PrTitleModel) -> None:
    gh = Github(GITHUB_TOKEN)
    gh_repo = gh.get_repo(GITHUB_REPO)
    gh_repo.create_pull(
        title=pr_info.pr_title,
        body=pr_info.pr_description,
        head=branch_name,
        base="main"
    )

The Triage Agent: Deciding What to Fix

The tool is exposed to a triage agent that analyzes logs and decides when to use it. The agent follows the ReAct pattern (Reasoning + Acting), where it explicitly reasons about each error before deciding to act:

TRIAGE_PROMPT = """
You are a senior DevOps engineer performing triage of production errors.

REGISTRATION CRITERIA:
- The error may be occurring frequently. Register ONLY ONCE.
- The error has a clear stacktrace that indicates the root cause.
- The error can be corrected with a quick fix.

DISCARD CRITERIA:
✗ Single/isolated errors (may be malicious input)
✗ Errors from external services (network, timeouts)
✗ Errors without a clear stacktrace
✗ Errors that require business decisions

Use the ReAct pattern:
Thought: [your analysis of the error]
Action: [register_error_for_fix if criteria met]
Observation: [tool result]
... (repeat for each error type)
Final Answer: [summary of registered errors]

This pattern forces the agent to reason explicitly before taking action, making decisions more transparent and debuggable.

The agent is given tools and makes the decision autonomously:

agent = create_agent(
    system_prompt=TRIAGE_PROMPT,
    model=Models.CLAUDE_45,
    tools=[register_error_for_fix]
)

result = agent(prompt=[
    {"text": f"Question: {question}"},
    {"text": f"Log context: {logs_json}"},
])

To test the system, I created a sample repository with intentional bugs and generated CloudWatch-like logs. The triage agent analyzes the logs, identifies fixable errors, and invokes the register_error_for_fix tool to create PRs automatically.

That’s the code (with the bug):

import logging
import traceback

from flask import Flask, jsonify

from lib.logger import setup_logging
from settings import APP, PROCESS, LOG_PATH, ENVIRONMENT

logger = logging.getLogger(__name__)

app = Flask(__name__)

setup_logging(
    env=ENVIRONMENT,
    app=APP,
    process=PROCESS,
    log_path=LOG_PATH)

for logger_name in ["werkzeug"]:
    logging.getLogger(logger_name).setLevel(logging.CRITICAL)


@app.errorhandler(Exception)
def handle_exception(e):
    logger.error(
        "Unhandled exception: %s",
        e,
        extra={"traceback": traceback.format_exc()},
    )
    return jsonify(error=str(e)), 500


@app.get("/div/<int:a>/<int:b>")
def divide(a: int, b: int):
    return dict(result=a / b)

As you can see, the /div// endpoint has a bug: it does not handle division by zero properly. We have executed the error and generated logs accordingly. As we have the logs in CloudWatch’s log group /projects/autofix we can execute a command to analyze them:

pyhon cli.py log --group /projects/autofix --question "Analyze those logs" --start 2026-01-16

The AI agent will identify the division by zero error, decide it is fixable, and create a PR that modifies the code (using claude code in headless mode) to handle this case properly.

And that’s all! The AI agent has autonomously created a PR that fixes the bug. Now we can easily accept or reject the PR after human review. The bug has been fixed!

This experiment shows that AI agents can go beyond analysis to take action. By giving Claude Code SDK access to a sandboxed environment with limited tools (Read, Edit only), we get a system that can autonomously fix bugs while remaining controllable.

The key is setting clear boundaries: the triage agent decides what to fix based on strict criteria, and the fix agent is constrained to how it can modify code. This separation keeps the system predictable and safe.

Full code in my github

Using Map-Reduce to process large documents with AI Agents and Python

We live in the era of Large Language Models (LLMs) with massive context windows. Claude 3.5 Sonnet offers 200k tokens, and Gemini 1.5 Pro goes up to 2 million. So, why do we still need to worry about document processing strategies? The answer is yes, we do. For example, AWS Bedrock has a strict limit of 4.5MB for documents, regardless of token count. That’s means we can’t just stuff file greater than 4.5MB into a prompt. Today we’ll show you how I built a production-ready document processing agent that handles large files by implementing a Map-Reduce pattern using Python, AWS Bedrock, and Strands Agents.

The core idea is simple: instead of asking the LLM to “read this book and answer” we break the book into chapters, analyze each chapter in parallel, and then synthesize the results.

Here is the high-level flow:

The heart of the implementation is the DocumentProcessor class. It decides whether to process a file as a whole or split it based on a size threshold. We define a threshold (e.g., 4.3MB) to stay safely within Bedrock’s limits. If the file is larger, we trigger the _process_big method.

# src/lib/processor/processor.py

BYTES_THRESHOLD = 4_300_000

async def _process_file(self, file: DocumentFile, question: str, with_callback=True):
    file_bytes = Path(file.path).read_bytes()
    # Strategy pattern: Choose the right processor based on file size
    processor = self._process_big if len(file_bytes) > BYTES_THRESHOLD else self._process
    async for chunk in processor(file_bytes, file, question, with_callback):
        yield chunk

To increase the performance, we use asyncio to process the file in parallel and we use a semaphore to control the number of workers.

async def _process_big(self, file_bytes: bytes, file: DocumentFile, question: str, with_callback=True) -> AsyncIterator[str]:
    # ... splitting logic ...
    semaphore = asyncio.Semaphore(self.max_workers)

    # Create async tasks for each chunk
    tasks = [
        self._process_chunk(chunk, i, file_name, question, handler.format, semaphore)
        for i, chunk in enumerate(chunks, 1)
    ]

    # Run in parallel
    results = await asyncio.gather(*tasks)
    
    # Sort results to maintain document order
    results.sort(key=lambda x: x[0])
    responses_from_chunks = [response for _, response in results]

Each chunk is processed by an isolated agent instance that only sees that specific fragment and the user’s question. Once we have the partial analyses, we consolidate them. This acts as a compression step: we’ve turned raw pages into relevant insights.

def _consolidate_and_truncate(self, responses: list[str], num_chunks: int) -> str:
    consolidated = "\n\n".join(responses)
    
    if len(consolidated) > MAX_CONTEXT_CHARS:
        # Safety mechanism to ensure we don't overflow the final context
        return consolidated[:MAX_CONTEXT_CHARS] + "\n... [TRUNCATED]"
    return consolidated

Finally, we feed this consolidated context to the agent for the final answer. In a long-running async process, feedback is critical. I implemented an Observer pattern to decouple the processing logic from the UI/Logging.

# src/main.py

class DocumentProcessorEventListener(ProcessingEventListener):
    async def on_chunk_start(self, chunk_number: int, file_name: str):
        logger.info(f"[Worker {chunk_number}] Processing chunk for file {file_name}")

    async def on_chunk_end(self, chunk_number: int, file_name: str, response: str):
        logger.info(f"[Worker {chunk_number}] Completed chunk for file {file_name}")

By breaking down large tasks, we not only bypass technical limits but often get better results. The model focuses on smaller sections, reducing hallucinations, and the final answer is grounded in a pre-processed summary of facts.

We don’t just send text; we send the raw document bytes. This allows the model (Claude 4.5 Sonnet via Bedrock) to use its native document processing capabilities. Here is how we construct the message payload:

# src/lib/processor/processor.py

def _create_document_message(self, file_format: str, file_name: str, file_bytes: bytes, text: str) -> list:
    return [
        {
            "role": "user",
            "content": [
                {
                    "document": {
                        "format": file_format,
                        "name": file_name,
                        "source": {"bytes": file_bytes},
                    },
                },
                {"text": text},
            ],
        },
    ]

When processing chunks, we don’t want the model to be chatty. We need raw information extraction. We use a “Spartan” system prompt that enforces brevity and objectivity, ensuring the consolidation phase receives high-signal input.

# src/lib/processor/prompts.py

SYSTEM_CHUNK_PROMPT = f"""
You are an artificial intelligence assistant specialized in reading and analyzing files.
You have received a chunk of a large file.
...
If the user's question cannot be answered with the information in the current chunk, do not answer it directly.

{SYSTEM_PROMPT_SPARTAN}

The SYSTEM_PROMPT_SPARTAN (injected above) explicitly forbids conversational filler, ensuring we maximize the token budget for actual data.

The project handles pdf and xlsx files. The rest of the file types are not processed and are given to the LLM as-is.

With this architecture, we can process large files in a production environment. This allows us to easily plug in different interfaces, whether it’s a CLI logger (as shown) or a WebSocket update for a UI frontend like Chainlit.

Full code in my github

Chat with your Data: Building a File-Aware AI Agent with AWS Bedrock and Chainlit

We all know LLMs are powerful, but their true potential is unlocked when they can see your data. While RAG (Retrieval-Augmented Generation) is great for massive knowledge bases, sometimes you just want to drag and drop a file and ask questions about it.

Today we’ll build a “File-Aware” AI agent that can natively understand a wide range of document formats—from PDFs and Excel sheets to Word docs and Markdown files. We’ll use AWS Bedrock with Claude 4.5 Sonnet for the reasoning engine and Chainlit for the conversational UI.

The idea is straightforward: Upload a file, inject it into the model’s context, and let the LLM do the rest. No vector databases, no complex indexing pipelines—just direct context injection for immediate analysis.

The architecture is simple yet effective. We intercept file uploads in the UI, process them into a format the LLM understands, and pass them along with the user’s query.

┌──────────────┐      ┌──────────────┐      ┌────────────────────┐
│   Chainlit   │      │  Orchestrator│      │   AWS Bedrock      │
│      UI      │─────►│    Agent     │─────►│(Claude 4.5 Sonnet) │
└──────┬───────┘      └──────────────┘      └────────────────────┘
       │                      ▲
       │    ┌────────────┐    │
       └───►│ File Proc. │────┘
            │   Logic    │
            └────────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for high-quality reasoning and large context windows.
  • Chainlit for a chat-like interface with native file upload support.
  • Python for the backend logic.

The core challenge is handling different file types and presenting them to the LLM. We support a variety of formats by mapping them to Bedrock’s expected input types.

To enable file uploads in Chainlit, you need to configure the [features.spontaneous_file_upload] section in your .chainlit/config.toml. This is where you define which MIME types are accepted.

[features.spontaneous_file_upload]
    enabled = true
    accept = [
        "application/pdf",
        "text/csv",
        "application/msword",
        "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        "application/vnd.ms-excel",
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "text/html",
        "text/plain",
        "text/markdown",
        "text/x-markdown"
    ]
    max_files = 20
    max_size_mb = 500
The main agent loop handles the conversation. It checks for uploaded files, processes them, and constructs the message payload for the LLM. We also include robust error handling to manage context window limits gracefully.
def get_question_from_message(message: cl.Message):
    content_blocks = None
    if message.elements:
        content_blocks = get_content_blocks_from_message(message)

    if content_blocks:
        content_blocks.append({"text": message.content or "Write a summary of the document"})
        question = content_blocks
    else:
        question = message.content

    return question


def get_content_blocks_from_message(message: cl.Message):
    docs = [f for f in message.elements if f.type == "file" and f.mime in MIME_MAP]
    content_blocks = []

    for doc in docs:
        file = Path(doc.path)
        file_bytes = file.read_bytes()
        shutil.rmtree(file.parent)

        content_blocks.append({
            "document": {
                "name": sanitize_filename(doc.name),
                "format": MIME_MAP[doc.mime],
                "source": {"bytes": file_bytes}
            }
        })

    return content_blocks

@cl.on_message
async def handle_message(message: cl.Message):
    task = asyncio.create_task(process_user_task(
        question=get_question_from_message(message),
        debug=DEBUG))
    cl.user_session.set("task", task)
    try:
        await task
    except asyncio.CancelledError:
        logger.info("User task was cancelled.")

This pattern allows for ad-hoc analysis. You don’t need to pre-ingest data. You can:

  1. Analyze Financials: Upload an Excel sheet and ask for trends.
  2. Review Contracts: Upload a PDF and ask for clause summaries.
  3. Debug Code: Upload a source file and ask for a bug fix.
By leveraging the large context window of modern models like Claude 4.5 Sonnet, we can feed entire documents directly into the prompt, providing the model with full visibility without the information loss often associated with RAG chunking.

And that's all. With tools like Chainlit and powerful APIs like AWS Bedrock, we can create robust, multi-modal assistants that integrate seamlessly into our daily workflows.

Full code in my github account.

Building scalable multi-purpose AI agents: Orchestrating Multi-Agent Systems with Strands Agents and Chainlit

We can build simple AI agents that handle specific tasks quite easily today. But what about building AI systems that can handle multiple domains effectively? One approach is to create a single monolithic agent that tries to do everything, but this quickly runs into problems of context pollution, maintenance complexity, and scaling limitations. In this article, we’ll show a production-ready pattern for building multi-purpose AI systems using an orchestrator architecture that coordinates domain-specific agents.

The idea is simple: Don’t build one agent to rule them all instead, create specialized agents that excel in their domains and coordinate them through an intelligent orchestrator. The solution is an orchestrator agent that routes requests to specialized sub-agents, each with focused expertise and dedicated tools. Think of it as a smart router that understands intent and delegates accordingly.

That’s the core of the Orchestrator Pattern for multi-agent systems:

User Query → Orchestrator Agent → Specialized Agent(s) → Orchestrator → Response

For our example we have three specialized agents:

  1. Weather Agent: Expert in meteorological data and weather patterns. It uses external weather APIs to fetch historical and current weather data.
  2. Logistics Agent: Specialist in supply chain and shipping operations. Fake logistics data is generated to simulate shipment tracking, route optimization, and delivery performance analysis.
  3. Production Agent: Focused on manufacturing operations and production metrics. Also, fake production data is generated to analyze production KPIs.

That’s the architecture in a nutshell:

┌─────────────────────────────────────────────┐
│          Orchestrator Agent                 │
│  (Routes &amp; Synthesizes)                 │
└────────┬─────────┬─────────┬────────────────┘
         │         │         │
    ┌────▼────┐ ┌──▼─────┐ ┌─▼─────────┐
    │ Weather │ │Logistic│ │Production │
    │  Agent  │ │ Agent  │ │  Agent    │
    └────┬────┘ └──┬─────┘ └┬──────────┘
         │         │        │
    ┌────▼────┐ ┌──▼─────┐ ┌▼──────────┐
    │External │ │Database│ │ Database  │
    │   API   │ │ Tools  │ │  Tools    │
    └─────────┘ └────────┘ └───────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for agent reasoning
  • Strands Agents framework for agent orchestration
  • Chainlit for the conversational UI
  • FastAPI for the async backend
  • PostgreSQL for storing conversation history and domain data

The orchestrator’s job is simple but critical: understand the user’s intent and route to the right specialist(s).

MAIN_SYSTEM_PROMPT = """You are an intelligent orchestrator agent 
responsible for routing user requests to specialized sub-agents 
based on their domain expertise.

## Available Specialized Agents

### 1. Production Agent
**Domain**: Manufacturing operations, production metrics, quality control
**Handles**: Production KPIs, machine performance, downtime analysis

### 2. Logistics Agent
**Domain**: Supply chain, shipping, transportation operations
**Handles**: Shipment tracking, route optimization, delivery performance

### 3. Weather Agent
**Domain**: Meteorological data and weather patterns
**Handles**: Historical weather, atmospheric conditions, climate trends

## Your Decision Process
1. Analyze the request for key terms and domains
2. Determine scope (single vs multi-domain)
3. Route to appropriate agent(s)
4. Synthesize results when multiple agents are involved
"""

The orchestrator receives specialized agents as tools:

def get_orchestrator_tools() -> List[Any]:
    from tools.logistics.agent import logistics_assistant
    from tools.production.agent import production_assistant
    from tools.weather.agent import weather_assistant

    tools = [
        calculator,
        think,
        current_time,
        AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter,
        logistics_assistant,  # Specialized agent as tool
        production_assistant,  # Specialized agent as tool
        weather_assistant     # Specialized agent as tool
    ]
    return tools

Each specialized agent follows a consistent pattern. Here’s the weather agent:

@tool
@stream_to_step("weather_assistant")
async def weather_assistant(query: str):
    """
    A research assistant specialized in weather topics with streaming support.
    """
    try:
        tools = [
            calculator,
            think,
            current_time,
            AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter
        ]
        # Domain-specific tools
        tools += WeatherTools(latitude=MY_LATITUDE, longitude=MY_LONGITUDE).get_tools()

        research_agent = get_agent(
            system_prompt=WEATHER_ASSISTANT_PROMPT,
            tools=tools
        )

        async for token in research_agent.stream_async(query):
            yield token

    except Exception as e:
        yield f"Error in research assistant: {str(e)}"

Each agent has access to domain-specific tools. For example, the weather agent uses external APIs:

class WeatherTools:
    def __init__(self, latitude: float, longitude: float):
        self.latitude = latitude
        self.longitude = longitude

    def get_tools(self) -> List[tool]:
        @tool
        def get_hourly_weather_data(from_date: date, to_date: date) -> MeteoData:
            """Get hourly weather data for a specific date range."""
            url = (f"https://api.open-meteo.com/v1/forecast?"
                   f"latitude={self.latitude}&longitude={self.longitude}&"
                   f"hourly=temperature_2m,relative_humidity_2m...")
            response = requests.get(url)
            return parse_weather_response(response.json())
        
        return [get_hourly_weather_data]

The logistics and production agents use synthetic data generators for demonstration:

class LogisticsTools:
    def get_tools(self) -> List[tool]:
        @tool
        def get_logistics_data(
            from_date: date,
            to_date: date,
            origins: Optional[List[str]] = None,
            destinations: Optional[List[str]] = None,
        ) -> LogisticsDataset:
            """Generate synthetic logistics shipment data."""
            # Generate realistic shipment data with delays, costs, routes
            records = generate_synthetic_shipments(...)
            return LogisticsDataset(records=records, aggregates=...)
        
        return [get_logistics_data]

For UI we’re going to use Chainlit. The Chainlit integration provides real-time visibility into agent execution:

class LoggingHooks(HookProvider):
    async def before_tool(self, event: BeforeToolCallEvent) -> None:
        step = cl.Step(name=f"{event.tool_use['name']}", type="tool")
        await step.send()
        cl.user_session.set(f"step_{event.tool_use['name']}", step)

    async def after_tool(self, event: AfterToolCallEvent) -> None:
        step = cl.user_session.get(f"step_{event.tool_use['name']}")
        if step:
            await step.update()

@cl.on_message
async def handle_message(message: cl.Message):
    agent = cl.user_session.get("agent")
    message_history = cl.user_session.get("message_history")
    message_history.append({"role": "user", "content": message.content})
    
    response = await agent.run_async(message.content)
    await cl.Message(content=response).send()

This creates a transparent experience where users see:

  • Which agent is handling their request
  • What tools are being invoked
  • Real-time streaming of responses

Now we can handle a variety of user queries: For example:

User: “What was the average temperature last week?”

Flow:

  1. Orchestrator identifies weather domain
  2. Routes to weather_assistant
  3. Weather agent calls get_hourly_weather_data
  4. Analyzes and returns formatted response

Or multi-domain queries:

User: “Did weather conditions affect our shipment delays yesterday?”

Flow:

  1. Orchestrator identifies weather + logistics domains
  2. Routes to weather_assistant for climate data
  3. Routes to logistics_assistant for shipment data
  4. Synthesizes correlation analysis
  5. Returns unified insight

And complex analytics:

User: “Analyze production efficiency trends and correlate with weather and logistics performance based in yesterday’s data.”

Flow:

  1. Orchestrator coordinates all three agents
  2. Production agent retrieves manufacturing KPIs
  3. Weather agent provides environmental data
  4. Logistics agent supplies delivery metrics
  5. Orchestrator synthesizes multi-domain analysis

This architecture scales naturally in multiple dimensions. We can easily add new specialized agents without disrupting existing functionality. WE only need to create the new agent and register it as a tool with the orchestratortrator prompt with new domain description. That’s it.

The orchestrator pattern transforms multi-domain AI from a monolithic challenge into a composable architecture. Each agent focuses on what it does best, while the orchestrator provides intelligent coordination.

Full code in my github.

Building ReAct AI agents with sandboxed Python code execution using AWS Bedrock and LangGraph

In industrial environments, data analysis is crucial for optimizing processes, detecting anomalies, and making informed decisions. Manufacturing plants, energy systems, and industrial IoT generate massive amounts of data from sensors, machines, and control systems. Traditionally, analyzing this data requires specialized knowledge in both industrial processes and data science, creating a bottleneck for quick insights.

I’ve been exploring agentic AI frameworks lately, particularly for complex data analysis tasks. While working on industrial data problems, I realized that combining the reasoning capabilities of Large Language Models with specialized tools could create a powerful solution for industrial data analysis. This project demonstrates how to build a ReAct ( Reasoning and Acting) AI agent using LangGraph that can analyze manufacturing data, understand industrial processes, and provide actionable insights.

The goal of this project is to create an AI agent that can analyze industrial datasets (manufacturing metrics, sensor readings, process control data) and provide expert-level insights about production optimization, quality control, and process efficiency. Using LangGraph’s ReAct agent framework with AWS Bedrock, the system can execute Python code dynamically in a sandboxed environment, process large datasets, and reason about industrial contexts.

The dataset is a fake sample of industrial data with manufacturing metrics like temperature, speed, humidity, pressure, operator experience, scrap rates, and unplanned stops. In fact, I’ve generated the dataset using chatgpt

This project uses several key components:

  • LangGraph ReAct Agent: For building the multi-tool AI agent with ReAct (Reasoning and Acting) patterns that can dynamically choose tools and reason about results
  • AWS Bedrock: Claude Sonnet 4 as the underlying LLM for reasoning and code generation
  • Sandboxed Code Interpreter: Secure execution of Python code for data analysis using AWS Agent Core. One tool taken from strands-agents-tools library.
  • Industrial Domain Expertise: Specialized system prompts with knowledge of manufacturing processes, quality control, and industrial IoT

The agent has access to powerful tools:

  • Code Interpreter: Executes Python code safely in a sandboxed AWS environment using pandas, numpy, scipy, and other scientific libraries
  • Data Processing: Handles large industrial datasets with memory-efficient strategies
  • Industrial Context: Understands manufacturing processes, sensor data, and quality metrics

The system uses AWS Agent Core’s sandboxed code interpreter, which means:

  • Python code is executed in an isolated environment
  • No risk to the host system
  • Access to scientific computing libraries (pandas, numpy, scipy)
  • Memory management for large datasets

The core of the system is surprisingly simple. The ReAct agent is built using LangGraph’s create_react_agent with custom tools:

from langgraph.prebuilt import create_react_agent
from typing import List
import pandas as pd
from langchain_core.callbacks import BaseCallbackHandler


def analyze_df(df: pd.DataFrame, system_prompt: str, user_prompt: str,
               callbacks: List[BaseCallbackHandler], streaming: bool = False):
    code_interpreter_tools = CodeInterpreter()
    tools = code_interpreter_tools.get_tools()

    agent = create_react_agent(
        model=get_llm(model=DEFAULT_MODEL, streaming=streaming,
                      budget_tokens=12288, callbacks=callbacks),
        tools=tools,
        prompt=system_prompt
    )

    agent_prompt = f"""
    I have a DataFrame with the following data:
    - Columns: {list(df.columns)}
    - Shape: {df.shape}
    - data: {df}
    
    The output must be an executive summary with the key points.
    The response must be only markdown, not plots.
    """
    messages = [
        ("user", agent_prompt),
        ("user", user_prompt)
    ]
    agent_input = {"messages": messages}
    return agent. Invoke(agent_input)

The ReAct pattern (Reasoning and Acting) allows the agent to:

  1. Reason about what analysis is needed
  2. Act by calling the appropriate tools (in this case: code interpreter)
  3. Observe the results of code execution
  4. Re-reason and potentially call more tools if needed

This creates a dynamic loop where the agent can iteratively analyze data, examine results, and refine its approach – much more powerful than a single code execution.

The magic happens in the system prompt, which provides the agent with industrial domain expertise:

SYSTEM_PROMPT = """
# Industrial Data Analysis Agent - System Prompt

You are an expert AI agent specialized in industrial data analysis and programming. 
You excel at solving complex data problems in manufacturing, process control, 
energy systems, and industrial IoT environments.

## Core Capabilities
- Execute Python code using pandas, numpy, scipy
- Handle large datasets with chunking strategies  
- Process time-series data, sensor readings, production metrics
- Perform statistical analysis, anomaly detection, predictive modeling

## Industrial Domain Expertise
- Manufacturing processes and production optimization
- Process control systems (PID controllers, SCADA, DCS)
- Industrial IoT sensor data and telemetry
- Quality control and Six Sigma methodologies
- Energy consumption analysis and optimization
- Predictive maintenance and failure analysis
"""

The code interpreter tool is wrapped with safety validations:

def validate_code_ast(code: str) -> bool:
    """Validate Python code using AST to ensure safety."""
    try:
        ast.parse(code)
        return True
    except SyntaxError:
        return False


@tool
def code_interpreter(code: str) -> str:
    """Executes Python code in a sandboxed environment."""
    if not validate_code_ast(code):
        raise UnsafeCodeError("Unsafe code or syntax errors.")

    return code_tool(code_interpreter_input={
        "action": {
            "type": "executeCode",
            "session_name": session_name,
            "code": code,
            "language": "python"
        }
    })
The system uses Claude Sonnet 4 through AWS Bedrock with optimized parameters for industrial analysis:
def get_llm(model: str = DEFAULT_MODEL, max_tokens: int = 4096,
            temperature: float = TemperatureLevel.BALANCED,
            top_k: int = TopKLevel.DIVERSE,
            top_p: float = TopPLevel.CREATIVE) -> BaseChatModel:
    model_kwargs = {
        "max_tokens": max_tokens,
        "temperature": temperature,
        "top_k": top_k,
        "top_p": top_p
    }

    return ChatBedrock(
        model=model,
        client=aws_get_service('bedrock-runtime'),
        model_kwargs=model_kwargs
    )
The project includes fake sample industrial data with manufacturing metrics:

- `machine_id`: Equipment identifier
- `shift`: Production shift (A/M/N for morning/afternoon/night)
- `temperature`, `speed`, `humidity`, `pressure`: Process parameters
- `operator_experience`: Years of operator experience
- `scrap_kg`: Quality metric (waste produced)
- `unplanned_stop`: Equipment failure indicator

A typical analysis query might be: "Do temperature and speed setpoints vary across shifts?"
The agent will stream the response as it generates it.

The agent will:

1. Load and examine the dataset structure
2. Generate appropriate Python code for analysis
3. Execute the code in a sandboxed environment
4. Provide insights about shift-based variations
5. Suggest process optimization recommendations
import logging

import pandas as pd
from langchain_core.callbacks import StreamingStdOutCallbackHandler

from modules.df_analyzer import analyze_df
from prompts import SYSTEM_PROMPT

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


class StreamingCallbackHandler(StreamingStdOutCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end='', flush=True)


df = pd.read_csv('fake_data.csv')

user_prompt = "Do temperature and speed setpoints vary across shifts?"
for chunk in analyze_df(
        user_prompt=user_prompt,
        df=df,
        system_prompt=SYSTEM_PROMPT,
        callbacks=[StreamingCallbackHandler()],
        streaming=True):
    logger.debug(chunk)

This project demonstrates the power of agentic AI for specialized domains. Instead of building custom analytics dashboards or writing specific analysis scripts, we provide the agent with:

  1. Domain Knowledge: Through specialized system prompts
  2. Tools: Safe code execution capabilities
  3. Context: The actual data to analyze

The agent can then:

  • Generate appropriate analysis code
  • Execute it safely
  • Interpret results with industrial context
  • Provide actionable recommendations

The result is a flexible system that can handle various industrial analysis tasks without pre-programmed solutions. The agent reasons about the problem, writes the necessary code (sandboxed), and provides expert-level insights.

Full code in my github.

Building an Agentic AI with Python, LangChain, AWS Bedrock and Claude 4 Sonnet

Today we are going to build an agent with IA. It is just an example of how to build a agent with LangChain and AWS Bedrock and Claude 4 Sonnet. The agent will be a “mathematical expert” capable of performing complex calculations and providing detailed explanations of its reasoning process. The idea is to provide the agent with the ability to perform mathematical operations like addition, subtraction. In fact, with additions and subtractions, we can perform all the mathematical operations, like multiplication, division, exponentiation, square root, etc. The agent will be able to perform these operations step by step, providing a detailed explanation of its reasoning process. I know that we don’t need to use AI to perform these operations, but the idea is to show how to build an agent with LangChain and AWS Bedrock and Claude 4 Sonnet.

The mathematical agent implements the tool-calling pattern, allowing the LLM to dynamically select and execute mathematical operations:

import logging

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate

from core.llm.aws import get_llm, Models
from modules.prompts import AGENT_SYSTEM_PROMPT
from modules.tools import MathTools
from settings import MAX_TOKENS

logger = logging.getLogger(__name__)


def run(question: str, model: Models = Models.CLAUDE_4):
    prompt = ChatPromptTemplate.from_messages([
        ("system", AGENT_SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}")
    ])
    math_tools = MathTools()
    tools = math_tools.get_tools()

    llm = get_llm(model=model, max_tokens=MAX_TOKENS)
    agent = create_tool_calling_agent(llm, tools, prompt)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=10
    )

    response = agent_executor.invoke({
        "input": question
    })

    logger.info(f"Agent response: {response['output']}")

Tools are defined using LangChain’s @tool decorator, providing automatic schema generation and type validation. Really we don’t need to create a class for the tools, but I have done it because I want to add an extra feature to the agent: the ability to keep a history of the operations performed. This will allow the agent to provide a detailed explanation of its reasoning process, showing the steps taken to arrive at the final result.

import logging
from typing import List

from langchain.tools import tool

logger = logging.getLogger(__name__)


class MathTools:

    def __init__(self):
        self.history = []

    def _diff_values(self, a: int, b: int) -> int:
        result = a - b
        self.history.append(f"{a} - {b} = {result}")
        return result

    def _sum_values(self, a: int, b: int) -> int:
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result

    def _get_history(self) -> str:
        if not self.history:
            return "No previous operations"
        return "\n".join(self.history[-5:])  # Last 5

    def get_tools(self) -> List:
        @tool
        def diff_values(a: int, b: int) -> int:
            """Calculates the difference between two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: difference of a - b
            """
            logger.info(f"Calculating difference: {a} - {b}")
            return self._diff_values(a, b)

        @tool
        def sum_values(a: int, b: int) -> int:
            """Sums two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: sum of a + b
            """
            logger.info(f"Calculating sum: {a} + {b}")
            return self._sum_values(a, b)

        @tool
        def get_history() -> str:
            """Gets the operation history
            Returns:
                str: last operations
            """
            logger.info("Retrieving operation history")
            return self._get_history()

        return [diff_values, sum_values, get_history]

The system prompt is carefully crafted to guide the agent’s behavior and establish clear operational boundaries:

AGENT_SYSTEM_PROMPT = """
You are an expert mathematical agent specialized in calculations.

You have access to the following tools:
- diff_values: Calculates the difference between two numbers
- sum_values: Sums two numbers
- get_history: Gets the operation history

Guidelines:
1. Only answer questions related to mathematical operations.
2. For complex operations, use step-by-step calculations:
   - Multiplication: Repeated addition
   - Division: Repeated subtraction
   - Exponentiation: Repeated multiplication
   - Square root: Use methods like Babylonian method or prime factorization.
"""

Now we can invoke our agent by asking questions such as ‘What’s the square root of 16 divided by two, squared?’. The agent will iterate using only the provided tools to obtain the result.

And that’s all. This project demonstrates how to build a production-ready AI agent using LangChain and AWS Bedrock. It’s just a boilerplate, but it can be extended to create more complex agents with additional capabilities and understand how AI agents work.

Full code in my GitHub account.

Sending logs to AWS CloudWatch with a sidecar pattern and Python

In a Docker Swarm environment, the sidecar pattern is a common architectural approach used to extend the functionality of a primary container without directly modifying it.

First there’s a primary container. This is the main application or service you want to run within a Docker Swarm service. It’s responsible for the core functionality of your application. In our case, this container will be the log generator. It will save logs within a json format.

The sidecar container is a secondary container that runs alongside the primary container. It’s tightly coupled with the primary container and assists it by providing additional services or functionality. In our example, the sidecar responsibility will be push logs to AWS CloudWatch. The idea is sharing a docker volume between both containers. Whit this technique, our primary container will not be affected in network latency generating logs.

The idea is to generate something like this:

version: '3.6'

services:
  api:
    image: api:latest
    logging:
      options:
        max-size: 10m
    deploy:
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
      - PROCESS_ID=api
    ports:
      - 5000:5000
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000 --timeout 180

  filebeat:
    image: cw:production
    command: bash -c "python cw.py && sleep 1m"
    deploy:
      restart_policy:
        condition: any
    environment:
      - LOG_GROUP=python_logs_example
      - LOG_STREAM_PREFIX=default
    volumes:
      - logs_volume:/src/logs
volumes:
  logs_volume:

Let’s go. First, we need to setup our aws credentials. We can use a profile or a IAM user

if AWS_PROFILE_NAME:
    session = boto3.Session(profile_name=AWS_PROFILE_NAME, region_name=AWS_REGION)
else:
    session = boto3.Session(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION)

logs = session. Client('logs')

then we need to setup the logGroup and logStream in CloudWatch. We can generate them by hand, but I preffer to generate them programatically, if they don’t exist.

def init_cloudwatch_stream():
    log_stream_name = f"{LOG_STREAM_PREFIX}_{datetime.now().strftime('%Y%m%d')}"

    log_groups = logs.describe_log_groups(logGroupNamePrefix=LOG_GROUP)['logGroups']
    if not any(group['logGroupName'] == LOG_GROUP for group in log_groups):
        logs.create_log_group(logGroupName=LOG_GROUP)

    log_streams = logs.describe_log_streams(
        logGroupName=LOG_GROUP,
        logStreamNamePrefix=log_stream_name
    )['logStreams']

    if not any(stream['logStreamName'] == log_stream_name for stream in log_streams):
        logs.create_log_stream(logGroupName=LOG_GROUP, logStreamName=log_stream_name)

    return log_stream_name

Now we need to upload logs to CloudWatch. We need to use put_log_events. To send multiple logs we need to use a sequenceToken (not the first time). To do that I use this trick.

function_parameters = dict(
    logGroupName=LOG_GROUP,
    logStreamName=log_stream_name
)

for f in glob(f'{LOG_PATH}/*.{LOG_EXTENSION}'):
    function_parameters['logEvents'] = get_log_events_from_file(f)
    response = logs.put_log_events(**function_parameters)
    function_parameters['sequenceToken'] = response['nextSequenceToken']

We also need to read log files and maybe change the fields according to your needs

def get_log_events_from_file(file):
    exclude_fields = ('@timestamp', 'logger')
    return [
        dict(
            timestamp=int(datetime.fromisoformat(d['@timestamp']).timestamp() * 1000),
            message=json.dumps({k: v for k, v in d.items() if k not in exclude_fields})
        ) for d in [json.loads(linea) for linea in open(file, 'r')]]

I like to have all the settings of my application in a file called settings.py. It’s a pattern that I’ve copied from Django. In this file also I read environment variables from a dotenv file.

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

AWS_PROFILE_NAME = os.getenv('AWS_PROFILE_NAME', False)
AWS_REGION = os.getenv('AWS_REGION')

LOG_GROUP = os.getenv('LOG_GROUP', 'python_logs_example')
LOG_STREAM_PREFIX = os.getenv('LOG_STREAM_PREFIX', 'default')

LOG_EXTENSION = 'log'
LOG_PATH = os.getenv('LOG_PATH', Path(BASE_DIR).resolve())

And that’s all. Your logs in CloudWatch uploaded in a background process decoupled from the main process.

Source code available in my github.

Real-time Object Detection and Streaming with Python, OpenCV, AWS Kinesis, and YOLOv8

I’ve got the following idea in my mind. I want to detect objects with YoloV8 and OpenCV. Here a simple example:

import cv2 as cv
from ultralytics import YOLO
import logging

logging.getLogger("ultralytics").setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

X_RESIZE = 800
Y_RESIZE = 450

YOLO_MODEL="yolov8m.pt"
YOLO_CLASSES_TO_DETECT = 'bottle',
THRESHOLD = 0.80

cap = cv.VideoCapture(0)
model = YOLO(YOLO_MODEL)
while True:
    _, original_frame = cap.read()
    frame = cv.resize(original_frame, (X_RESIZE, Y_RESIZE), interpolation=cv.INTER_LINEAR)

    results = model.predict(frame)

    color = (0, 255, 0)
    for result in results:
        for box in result.boxes:
            class_id = result.names[box.cls[0].item()]
            status = id == class_id
            cords = box.xyxy[0].tolist()
            probability = round(box.conf[0].item(), 2)

            if probability > THRESHOLD and class_id in YOLO_CLASSES_TO_DETECT:
                logger.info(f"{class_id} {round(probability * 100)}%")
                x1, y1, x2, y2 = [round(x) for x in cords]
                cv.rectangle(frame, (x1, y1), (x2, y2), color, 2)

    cv.imshow(f"Cam", frame)

    if cv.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv.destroyAllWindows()

However, my idea is slightly different. I intend to capture the camera stream, transmit it to AWS Kinesis, and subsequently, within a separate process, ingest this Kinesis stream for object detection or any other purpose using the camera frames. To achieve this, our initial step is to publish our OpenCV frames into Kinesis. producer.py

import logging

from lib.video import kinesis_producer
from settings import KINESIS_STREAM_NAME, CAM_URI, WIDTH

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == '__main__':
    kinesis_producer(
        kinesis_stream_name=KINESIS_STREAM_NAME,
        cam_uri=CAM_URI,
        width=WIDTH
    )
def kinesis_producer(kinesis_stream_name, cam_uri, width):
    logger.info(f"start emitting stream from cam={cam_uri} to kinesis")
    kinesis = aws_get_service('kinesis')
    cap = cv.VideoCapture(cam_uri)
    try:
        while True:
            ret, frame = cap.read()
            if ret:
                scale = width / frame.shape[1]
                height = int(frame.shape[0] * scale)

                scaled_frame = cv.resize(frame, (width, height))
                _, img_encoded = cv.imencode('.jpg', scaled_frame)
                kinesis.put_record(
                    StreamName=kinesis_stream_name,
                    Data=img_encoded.tobytes(),
                    PartitionKey='1'
                )

    except KeyboardInterrupt:
        cap.release()

Now our consumer.py

import logging

from lib.video import kinesis_consumer
from settings import KINESIS_STREAM_NAME

logging.getLogger("ultralytics").setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == '__main__':
    kinesis_consumer(
        kinesis_stream_name=KINESIS_STREAM_NAME
    )

To get the frames from kinesis stream we need to consider the shards. According to your needs you should change it a little bit.

def kinesis_consumer(kinesis_stream_name):
    logger.info(f"start reading kinesis stream from={kinesis_stream_name}")
    kinesis = aws_get_service('kinesis')
    model = YOLO(YOLO_MODEL)
    response = kinesis.describe_stream(StreamName=kinesis_stream_name)
    shard_id = response['StreamDescription']['Shards'][0]['ShardId']
    while True:
        shard_iterator_response = kinesis.get_shard_iterator(
            StreamName=kinesis_stream_name,
            ShardId=shard_id,
            ShardIteratorType='LATEST'
        )
        shard_iterator = shard_iterator_response['ShardIterator']
        response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=10
        )

        for record in response['Records']:
            image_data = record['Data']
            frame = cv.imdecode(np.frombuffer(image_data, np.uint8), cv.IMREAD_COLOR)
            results = model.predict(frame)
            if detect_id_in_results(results):
                logger.info('Detected')

def detect_id_in_results(results):
    status = False
    for result in results:
        for box in result.boxes:
            class_id = result.names[box.cls[0].item()]
            probability = round(box.conf[0].item(), 2)
            if probability > THRESHOLD and class_id in YOLO_CLASSES_TO_DETECT:
                status = True
    return status

And that’s all. Real-time Object Detection with Kinesis stream. However, computer vision examples are often simple and quite straightforward. On the other hand, real-world computer vision projects tend to be more intricate, involving considerations such as cameras, lighting conditions, performance optimization, and accuracy. But remember, this is just an example.

Full code in my github.

AWS SQS 2 HTTP server with Python

Simple service that listens to a sqs queue and bypass the payload to a http server, sending a POST request with the SQS’s payload and a Bearer Token.

The main script read a .env file with the aws credentials and http endpoint.

# AWS CREDENTIALS
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_REGION=eu-west-1
#AWS_PROFILE=xxx
# SQS
SQS_QUEUE_URL=https://sqs.eu-west-1.amazonaws.com/xxx/name
# REST ENDPOINT
HTTP_ENDPOINT=http://localhost:5555
AUTH_BEARER_TOKEN=xxx
import logging

from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())

from lib.sqs2http import loop_step, get_sqs

for library in ['botocore', 'boto3']:
    logging.getLogger(library).setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X'
)

logger = logging.getLogger(__name__)

if __name__ == "__main__":
    logger.info("sqs2http start")
    sqs = get_sqs()
    while True:
        loop_step(sqs)

The main loop is like this:

def loop_step(sqs):
    response = sqs.receive_message(
        QueueUrl=SQS_QUEUE_URL,
        MaxNumberOfMessages=SQS_MAX_NUMBER_OF_MESSAGES)

    if 'Messages' in response:
        for message in response.get('Messages', []):
            try:
                body = message.get('Body')
                logger.info(body)
                do_post(body)
                remove_from_sqs(sqs, message.get('ReceiptHandle'))

            except Exception as e:
                logger. Exception(e)

To post to the http server we can use requests.

import requests
import json

from settings import HTTP_ENDPOINT, AUTH_BEARER_TOKEN

def do_post(body):
    requests.post(
        url=HTTP_ENDPOINT,
        data=json.dumps(body),
        headers={
            'Authorization': AUTH_BEARER_TOKEN,
            'Content-Type': 'application/json'
        })

With this simple script you can set up a web server that process all incoming SQS messages. Source code in my github.

Deploying Django application to AWS EC2 instance with Docker

In AWS we have several ways to deploy Django (and not Django applications) with Docker. We can use ECS or EKS clusters. If we don’t have one ECS or Kubernetes cluster up and running, maybe it can be complex. Today I want to show how deploy a Django application in production mode within a EC2 host. Let’s start.

I’m getting older to provision one host by hand I prefer to use docker. The idea is create one EC2 instance (one simple Amazon Linux AMI AWS-supported image). This host don’t have docker installed. We need to install it. When we launch one instance, when we’re configuring the instance, we can specify user data to configure an instance or run a configuration script during launch.

We only need to put this shell script to set up docker

#! /bin/bash
yum update -y
yum install -y docker
usermod -a -G docker ec2-user
curl -L https://github.com/docker/compose/releases/download/1.25.5/docker-compose-`uname -s`-`uname -m` | sudo tee /usr/local/bin/docker-compose > /dev/null
chmod +x /usr/local/bin/docker-compose
service docker start
chkconfig docker on

rm /etc/localtime
ln -s /usr/share/zoneinfo/Europe/Madrid /etc/localtime

ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

docker swarm init

We also need to attach one IAM role to our instance. This IAM role only need to allow us the following policies:

  • AmazonEC2ContainerRegistryReadOnly (because we’re going to use AWS ECR as container registry)
  • CloudWatchAgentServerPolicy (because we’re going to emit our logs to Cloudwatch)

Also we need to set up a security group to allow incoming SSH connections to port 22 and HTTP connections (in our example to port 8000)

When we launch our instance we need to provide a keypair to connect via ssh. I like to put this keypair in my .ssh/config

Host xxx.eu-central-1.compute.amazonaws.com
    User ec2-user
    Identityfile ~/.ssh/keypair-xxx.pem

To deploy our application we need to follow those steps:

  • Build our docker images
  • Push our images to a container registry (in this case ECR)
  • Deploy the application.

I’ve created a simple shell script called deploy.sh to perform all tasks:

#!/usr/bin/env bash

set -a
[ -f deploy.env ] && . deploy.env
set +a

echo "$(tput setaf 1)Building docker images ...$(tput sgr0)"
docker build -t ec2-web -t ec2-web:latest -t $ECR/ec2-web:latest .
docker build -t ec2-nginx -t $ECR/ec2-nginx:latest .docker/nginx

echo "$(tput setaf 1)Pusing to ECR ...$(tput sgr0)"
aws ecr get-login-password --region $REGION --profile $PROFILE |
  docker login --username AWS --password-stdin $ECR
docker push $ECR/ec2-web:latest
docker push $ECR/ec2-nginx:latest

CMD="docker stack deploy -c $DOCKER_COMPOSE_YML ec2 --with-registry-auth"
echo "$(tput setaf 1)Deploying to EC2 ($CMD)...$(tput sgr0)"
echo "$CMD"

DOCKER_HOST="ssh://$HOST" $CMD
echo "$(tput setaf 1)Building finished $(date +'%Y%m%d.%H%M%S')$(tput sgr0)"

This script assumes that there’s a deploy.env file with our personal configuration (AWS profile, the host of the EC2, instance, The ECR and things like that)

PROFILE=xxxxxxx

DOKER_COMPOSE_YML=docker-compose.yml
HOST=ec2-user@xxxx.eu-central-1.compute.amazonaws.com

ECR=9999999999.dkr.ecr.eu-central-1.amazonaws.com
REGION=eu-central-1

In this example I’m using docker swarm to deploy the application. I want to play also with secrets. This dummy application don’t have any sensitive information but I’ve created one "ec2.supersecret" variable

echo "super secret text" | docker secret create ec2.supersecret -

That’s the docker-compose.yml file:

version: '3.8'
services:
  web:
    image: 999999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-web:latest
    command: /bin/bash ./docker-entrypoint.sh
    environment:
      DEBUG: 'False'
    secrets:
      - ec2.supersecret
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: app
    volumes:
      - static_volume:/src/staticfiles
  nginx:
    image: 99999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-nginx:latest
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: nginx
    volumes:
      - static_volume:/src/staticfiles:ro
    ports:
      - 8000:80
    depends_on:
      - web
volumes:
  static_volume:

secrets:
  ec2.supersecret:
    external: true

And that’s all. Maybe ECS or EKS are better solutions to deploy docker applications in AWS, but we also can deploy easily to one docker host in a EC2 instance that it can be ready within a couple of minutes.

Source code in my github