What if the bug fixed itself? Letting AI agents detect bugs, fix the code, and create PRs proactively.

What if an AI could not only identify errors in your logs but actually fix them and create a pull request? I have done this experiment to do exactly that.

We can put or application logs in CloudWatch and use AI agents with a worker-coordinator pattern (I’ll share a post explaining this). Today the idea is going one step further. We will detecte errors in our logs, and for certain types of fixable errors, we will let an AI agent fix the code and create a pull request automatically.

The core of the system is a tool decorated with @tool from Strands Agents. This makes it available to any AI agent that needs to trigger a fix:

from strands import tool

@tool
async def register_error_for_fix(error: LogEntry) -> bool:
    """
    Register an error for automatic fixing.
    Clones repo, creates fix branch, uses Claude to fix, creates PR.
    """
    repo = _setup_repo()

    branch_name = _create_fix_branch(repo, error)
    if branch_name is None:
        return True  # Branch already exists, skip

    claude_response = await _invoke_claude_fix(error.message)
    if claude_response is None:
        return False

    pr_info = pr_title_generator(claude_response)
    _commit_and_push(repo, branch_name, pr_info)
    _create_pull_request(branch_name, pr_info)

    return True

Step by Step Implementation

1. Repository Setup with GitPython

The tool first clones the repo or pulls the latest changes:

from git import Repo

def _setup_repo() -> Repo:
    repo_url = f"https://x-access-token:{GITHUB_TOKEN}@github.com/{GITHUB_REPO}.git"

    if (WORK_DIR / ".git").exists():
        repo = Repo(WORK_DIR)
        repo.git.pull(repo_url)
    else:
        repo = Repo.clone_from(repo_url, WORK_DIR)

    return repo

2. Branch Creation with Deduplication

Each fix gets its own branch with a timestamp. If the branch already exists remotely, we skip it to avoid duplicate PRs:

def _create_fix_branch(repo: Repo, error: LogEntry) -> str | None:
    branch_name = f"autofix/{error.fix_short_name}_{error.timestamp.strftime('%Y%m%d-%H%M%S')}"

    remote_refs = [ref.name for ref in repo.remote().refs]
    if f"origin/{branch_name}" in remote_refs:
        logger.info(f"Branch {branch_name} already exists, skipping")
        return None

    new_branch = repo.create_head(branch_name)
    new_branch.checkout()
    return branch_name

3. The Magic: Claude Code SDK

This is where the actual fix happens. Claude Code SDK allows Claude to read and edit files in the codebase:

from claude_code_sdk import ClaudeCodeOptions, query

async def _invoke_claude_fix(error_message: str) -> str | None:
    prompt = f"Fix this error in the codebase: {error_message}"

    options = ClaudeCodeOptions(
        cwd=str(WORK_DIR),
        allowed_tools=["Read", "Edit"]  # Safe: no Write, no Bash
    )

    response = None
    async for response in query(prompt=prompt, options=options):
        logger.info(f"Claude response: {response}")

    return response.result if response else None

Note that we only allow Read and Edit tools – no Write (creating new files) or Bash (running commands). This keeps the fixes focused and safe.

4. PR Title Generation with Claude Haiku

For fast and cheap PR title generation, I use Claude Haiku with structured output:

from pydantic import BaseModel, Field

class PrTitleModel(BaseModel):
    pr_title: str = Field(..., description="Concise PR title")
    pr_description: str = Field(..., description="Detailed PR description")

def pr_title_generator(response: str) -> PrTitleModel:
    agent = create_agent(
        system_prompt=PR_PROMPT,
        model=Models.CLAUDE_45_HAIKU,
        tools=[]
    )

    result = agent(
        prompt=f"This is response from claude code: {response}\n\n"
               f"Generate a concise title for a GitHub pull request.",
        structured_output_model=PrTitleModel
    )

    return result.structured_output

The prompt enforces Conventional Commits style:

PR_PROMPT = """
You are an assistant expert in generating pull request titles for GitHub.
OBJECTIVE:
- Generate concise and descriptive titles for pull requests.
- IMPORTANT: Use Conventional Commits as a style reference.
CRITERIA:
- The title must summarize the main changes or fixes.
- Keep the title under 10 words.

5. Commit, Push, and Create PR

Finally, we commit everything, push to the remote, and create the PR via GitHub API:

def _commit_and_push(repo: Repo, branch_name: str, pr_info: PrTitleModel) -> None:
    repo.git.add(A=True)
    repo.index.commit(pr_info.pr_title)
    repo.git.push(get_authenticated_repo_url(), branch_name)

def _create_pull_request(branch_name: str, pr_info: PrTitleModel) -> None:
    gh = Github(GITHUB_TOKEN)
    gh_repo = gh.get_repo(GITHUB_REPO)
    gh_repo.create_pull(
        title=pr_info.pr_title,
        body=pr_info.pr_description,
        head=branch_name,
        base="main"
    )

The Triage Agent: Deciding What to Fix

The tool is exposed to a triage agent that analyzes logs and decides when to use it. The agent follows the ReAct pattern (Reasoning + Acting), where it explicitly reasons about each error before deciding to act:

TRIAGE_PROMPT = """
You are a senior DevOps engineer performing triage of production errors.

REGISTRATION CRITERIA:
- The error may be occurring frequently. Register ONLY ONCE.
- The error has a clear stacktrace that indicates the root cause.
- The error can be corrected with a quick fix.

DISCARD CRITERIA:
✗ Single/isolated errors (may be malicious input)
✗ Errors from external services (network, timeouts)
✗ Errors without a clear stacktrace
✗ Errors that require business decisions

Use the ReAct pattern:
Thought: [your analysis of the error]
Action: [register_error_for_fix if criteria met]
Observation: [tool result]
... (repeat for each error type)
Final Answer: [summary of registered errors]

This pattern forces the agent to reason explicitly before taking action, making decisions more transparent and debuggable.

The agent is given tools and makes the decision autonomously:

agent = create_agent(
    system_prompt=TRIAGE_PROMPT,
    model=Models.CLAUDE_45,
    tools=[register_error_for_fix]
)

result = agent(prompt=[
    {"text": f"Question: {question}"},
    {"text": f"Log context: {logs_json}"},
])

To test the system, I created a sample repository with intentional bugs and generated CloudWatch-like logs. The triage agent analyzes the logs, identifies fixable errors, and invokes the register_error_for_fix tool to create PRs automatically.

That’s the code (with the bug):

import logging
import traceback

from flask import Flask, jsonify

from lib.logger import setup_logging
from settings import APP, PROCESS, LOG_PATH, ENVIRONMENT

logger = logging.getLogger(__name__)

app = Flask(__name__)

setup_logging(
    env=ENVIRONMENT,
    app=APP,
    process=PROCESS,
    log_path=LOG_PATH)

for logger_name in ["werkzeug"]:
    logging.getLogger(logger_name).setLevel(logging.CRITICAL)


@app.errorhandler(Exception)
def handle_exception(e):
    logger.error(
        "Unhandled exception: %s",
        e,
        extra={"traceback": traceback.format_exc()},
    )
    return jsonify(error=str(e)), 500


@app.get("/div/<int:a>/<int:b>")
def divide(a: int, b: int):
    return dict(result=a / b)

As you can see, the /div// endpoint has a bug: it does not handle division by zero properly. We have executed the error and generated logs accordingly. As we have the logs in CloudWatch’s log group /projects/autofix we can execute a command to analyze them:

pyhon cli.py log --group /projects/autofix --question "Analyze those logs" --start 2026-01-16

The AI agent will identify the division by zero error, decide it is fixable, and create a PR that modifies the code (using claude code in headless mode) to handle this case properly.

And that’s all! The AI agent has autonomously created a PR that fixes the bug. Now we can easily accept or reject the PR after human review. The bug has been fixed!

This experiment shows that AI agents can go beyond analysis to take action. By giving Claude Code SDK access to a sandboxed environment with limited tools (Read, Edit only), we get a system that can autonomously fix bugs while remaining controllable.

The key is setting clear boundaries: the triage agent decides what to fix based on strict criteria, and the fix agent is constrained to how it can modify code. This separation keeps the system predictable and safe.

Full code in my github

Sending logs to AWS CloudWatch with a sidecar pattern and Python

In a Docker Swarm environment, the sidecar pattern is a common architectural approach used to extend the functionality of a primary container without directly modifying it.

First there’s a primary container. This is the main application or service you want to run within a Docker Swarm service. It’s responsible for the core functionality of your application. In our case, this container will be the log generator. It will save logs within a json format.

The sidecar container is a secondary container that runs alongside the primary container. It’s tightly coupled with the primary container and assists it by providing additional services or functionality. In our example, the sidecar responsibility will be push logs to AWS CloudWatch. The idea is sharing a docker volume between both containers. Whit this technique, our primary container will not be affected in network latency generating logs.

The idea is to generate something like this:

version: '3.6'

services:
  api:
    image: api:latest
    logging:
      options:
        max-size: 10m
    deploy:
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
      - PROCESS_ID=api
    ports:
      - 5000:5000
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000 --timeout 180

  filebeat:
    image: cw:production
    command: bash -c "python cw.py && sleep 1m"
    deploy:
      restart_policy:
        condition: any
    environment:
      - LOG_GROUP=python_logs_example
      - LOG_STREAM_PREFIX=default
    volumes:
      - logs_volume:/src/logs
volumes:
  logs_volume:

Let’s go. First, we need to setup our aws credentials. We can use a profile or a IAM user

if AWS_PROFILE_NAME:
    session = boto3.Session(profile_name=AWS_PROFILE_NAME, region_name=AWS_REGION)
else:
    session = boto3.Session(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION)

logs = session. Client('logs')

then we need to setup the logGroup and logStream in CloudWatch. We can generate them by hand, but I preffer to generate them programatically, if they don’t exist.

def init_cloudwatch_stream():
    log_stream_name = f"{LOG_STREAM_PREFIX}_{datetime.now().strftime('%Y%m%d')}"

    log_groups = logs.describe_log_groups(logGroupNamePrefix=LOG_GROUP)['logGroups']
    if not any(group['logGroupName'] == LOG_GROUP for group in log_groups):
        logs.create_log_group(logGroupName=LOG_GROUP)

    log_streams = logs.describe_log_streams(
        logGroupName=LOG_GROUP,
        logStreamNamePrefix=log_stream_name
    )['logStreams']

    if not any(stream['logStreamName'] == log_stream_name for stream in log_streams):
        logs.create_log_stream(logGroupName=LOG_GROUP, logStreamName=log_stream_name)

    return log_stream_name

Now we need to upload logs to CloudWatch. We need to use put_log_events. To send multiple logs we need to use a sequenceToken (not the first time). To do that I use this trick.

function_parameters = dict(
    logGroupName=LOG_GROUP,
    logStreamName=log_stream_name
)

for f in glob(f'{LOG_PATH}/*.{LOG_EXTENSION}'):
    function_parameters['logEvents'] = get_log_events_from_file(f)
    response = logs.put_log_events(**function_parameters)
    function_parameters['sequenceToken'] = response['nextSequenceToken']

We also need to read log files and maybe change the fields according to your needs

def get_log_events_from_file(file):
    exclude_fields = ('@timestamp', 'logger')
    return [
        dict(
            timestamp=int(datetime.fromisoformat(d['@timestamp']).timestamp() * 1000),
            message=json.dumps({k: v for k, v in d.items() if k not in exclude_fields})
        ) for d in [json.loads(linea) for linea in open(file, 'r')]]

I like to have all the settings of my application in a file called settings.py. It’s a pattern that I’ve copied from Django. In this file also I read environment variables from a dotenv file.

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

AWS_PROFILE_NAME = os.getenv('AWS_PROFILE_NAME', False)
AWS_REGION = os.getenv('AWS_REGION')

LOG_GROUP = os.getenv('LOG_GROUP', 'python_logs_example')
LOG_STREAM_PREFIX = os.getenv('LOG_STREAM_PREFIX', 'default')

LOG_EXTENSION = 'log'
LOG_PATH = os.getenv('LOG_PATH', Path(BASE_DIR).resolve())

And that’s all. Your logs in CloudWatch uploaded in a background process decoupled from the main process.

Source code available in my github.

Handling Amazon SNS messages with PHP, Lumen and CloudWatch

This days I’m involve with Amazon’s AWS and since I am migrating my backends to Lumen I’m going to play a little bit with AWS and Lumen. Today I want to create a simple Lumen server to handle SNS notifications. One end-point to listen to SNS and another one to emit notifications. I also want to register logs within CloudWatch. Let’s start.

First the Lumen server.

use Laravel\Lumen\Application;

require __DIR__ . '/../vendor/autoload.php';

(new Dotenv\Dotenv(__DIR__ . "/../env"))->load();

$app = new Application();

$app->register(App\Providers\LogServiceProvider::class);
$app->register(App\Providers\AwsServiceProvider::class);

$app->group(['namespace' => 'App\Http\Controllers'], function (Application $app) {
    $app->get("/push", "SnsController@push");
    $app->post("/read", "SnsController@read");
});

$app->run();

As we can see there’s a route to push notifications and another one to read messages.

To work with SNS I will create a simple service provider

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use Aws\Sns\SnsClient;

class AwsServiceProvider extends ServiceProvider
{
    public function register()
    {
        $awsCredentials = [
            'region'      => getenv('AWS_REGION'),
            'version'     => getenv('AWS_VERSION'),
            'credentials' => [
                'key'    => getenv('AWS_CREDENTIALS_KEY'),
                'secret' => getenv('AWS_CREDENTIALS_SECRET'),
            ],
        ];

        $this->app->instance(SnsClient::class, new SnsClient($awsCredentials));
    }
}

Now We can create the routes in SnsController. Sns has a confirmation mechanism to validate endpoints. It’s well explained here

namespace App\Http\Controllers;

use Aws\Sns\SnsClient;
use Illuminate\Http\Request;
use Laravel\Lumen\Routing\Controller;
use Monolog\Logger;

class SnsController extends Controller
{
    private $request;
    private $logger;

    public function __construct(Request $request, Logger $logger)
    {
        $this->request = $request;
        $this->logger  = $logger;
    }

    public function push(SnsClient $snsClient)
    {
        $snsClient->publish([
            'TopicArn' => getenv('AWS_SNS_TOPIC1'),
            'Message'  => 'hi',
            'Subject'  => 'Subject',
        ]);

        return ['push'];
    }

    public function read(SnsClient $snsClient)
    {
        $data = $this->request->json()->all();

        if ($this->request->headers->get('X-Amz-Sns-Message-Type') == 'SubscriptionConfirmation') {
            $this->logger->notice("sns:confirmSubscription");
            $snsClient->confirmSubscription([
                'TopicArn' => getenv('AWS_SNS_TOPIC1'),
                'Token'    => $data['Token'],
            ]);
        } else {
            $this->logger->warn("read", [
                'Subject'   => $data['Subject'],
                'Message'   => $data['Message'],
                'Timestamp' => $data['Timestamp'],
            ]);
        }

        return "OK";
    }
}

Finally I want to use CloudWatch so I will configure Monolog with another service provider. It’s also well explained here:

namespace App\Providers;

use Aws\CloudWatchLogs\CloudWatchLogsClient;
use Illuminate\Support\ServiceProvider;
use Maxbanton\Cwh\Handler\CloudWatch;
use Monolog\Formatter\LineFormatter;
use Monolog\Logger;

class LogServiceProvider extends ServiceProvider
{
    public function register()
    {
        $awsCredentials = [
            'region'      => getenv('AWS_REGION'),
            'version'     => getenv('AWS_VERSION'),
            'credentials' => [
                'key'    => getenv('AWS_CREDENTIALS_KEY'),
                'secret' => getenv('AWS_CREDENTIALS_SECRET'),
            ],
        ];

        $cwClient = new CloudWatchLogsClient($awsCredentials);

        $cwRetentionDays      = getenv('CW_RETENTIONDAYS');
        $cwGroupName          = getenv('CW_GROUPNAME');
        $cwStreamNameInstance = getenv('CW_STREAMNAMEINSTANCE');
        $loggerName           = getenv('CF_LOGGERNAME');

        $logger  = new Logger($loggerName);
        $handler = new CloudWatch($cwClient, $cwGroupName, $cwStreamNameInstance, $cwRetentionDays);
        $handler->setFormatter(new LineFormatter(null, null, false, true));

        $logger->pushHandler($handler);

        $this->app->instance(Logger::class, $logger);
    }
}

Debugging those kind of webhooks with a EC2 instance sometimes is a bit hard. But we can easily expose our local webserver to internet with ngrok.
We only need to start our local server

php -S 0.0.0.0:8080 -t www

And create a tunnel wiht ngrok

ngrok http 8080

And that’s up. Lumen and SNS up and running.

Code available in my github