AI Eurobeat Producer: Generating Music in Real-Time with AI Agents, Python, and MIDI

What if you could describe the music you want to hear and have an AI produce it in real-time, sending MIDI notes directly to your DAW? That’s exactly what I built: a Python application that uses AI agents to generate Eurobeat and 90s techno patterns, outputting them as live MIDI to Akai’s MPC Beats.

I’m not a musician. I enjoy playing guitar from time to time, but I have zero experience with music production software. However, I’m gifted myself a Akai MPK mini Plus MIDI controller, which has 8 knobs and 8 pads, and I experimented with using it to control a music generation agent. No idea what I’m doing, but it’s fun.

As Akai MIDI controller can be connected to a laptop, and there I’ve got Python, this saturday morning I decided to build a simple prototype that connects an AI agent to MIDI output. The idea is simple. You write a prompt like “Energetic eurobeat in Am, Daft Punk style”, and an AI agent powered by Claude on AWS Bedrock generates patterns for 8 tracks: two drum kits, bass, rhodes, pluck, pad, and a lead melody. The patterns are sent as MIDI messages to MPC Beats, where each track is routed to a different virtual instrument. You can then modify the music live by writing new instructions, and use the physical knobs and pads on an Akai MPK Mini Plus to mute/unmute tracks, regenerate patterns, or reset the session.

I’m using the MPC Beats because it’s free and has a simple MIDI setup, but in theory this could work with any DAW that accepts MIDI input. The whole system is built in Python using Strands Agents for the AI orchestration, mido + python-rtmidi for MIDI I/O, and Rich for the terminal UI.

The Architecture

The flow is straightforward:

Project Structure

src/
  settings.py           # Configuration: BPM, tracks, MIDI devices
  cli.py                # Click CLI entry point
  commands/play.py      # Main play command
  agent/
    prompts.py          # System prompts for the AI producer
    tools.py            # PatternStore + @tool functions
    factory.py          # Agent creation
  midi/
    device.py           # MIDI device detection
    melody_player.py    # Threaded melody loop player
    drum_player.py      # Threaded drum loop player
  session/
    state.py            # State machine (IDLE/GENERATING/PLAYING)
    session.py          # Session orchestrator
  ui/
    menu.py             # Interactive terminal menu

Configuration

Everything starts with settings.py. The MIDI devices and AWS region are loaded from environment variables, while the musical parameters are defined as constants:

BPM = 122
BAR_DURATION = round((60 / BPM) * 4, 3)
LOOP_BARS = 4
LOOP_DURATION = round(BAR_DURATION * LOOP_BARS, 3)
TRACKS = {
1: {"name": "Drums", "channel": 0, "type": "drums"},
2: {"name": "Drums Detroit", "channel": 1, "type": "drums"},
3: {"name": "Rhodes", "channel": 2, "type": "melody"},
4: {"name": "Pluck", "channel": 3, "type": "melody"},
5: {"name": "Bass", "channel": 4, "type": "melody"},
6: {"name": "Org Bass", "channel": 5, "type": "melody"},
7: {"name": "Pad", "channel": 6, "type": "melody"},
8: {"name": "Lead", "channel": 7, "type": "melody"},
}

Each track maps to a MIDI channel. Tracks 1-2 are drum kits (offset-based timing), tracks 3-8 are melodic instruments (duration-based timing). The MPC Beats “House Template” provides the virtual instruments: a Classic drum kit, a Detroit percussion kit, Electric Rhodes, Tube Pluck, Bassline, Organ Bass, Tube Pad, and an Instant Go lead synth.

The Bridge Between AI and MIDI: PatternStore and Tools

The core of the system is the PatternStore, a simple shared store where the AI writes patterns and the MIDI players read them:

class PatternStore:
def __init__(self):
self._patterns: dict[int, list] = {}
def set(self, track_id: int, pattern: list) -> None:
self._patterns[track_id] = pattern
def get(self, track_id: int) -> list | None:
return self._patterns.get(track_id)
def clear(self) -> None:
self._patterns.clear()

The Strands @tool functions are created via a factory that closes over the store:

def create_tools(store: PatternStore) -> list:
@tool
def set_melody_pattern(track_id: int, pattern: str) -> str:
"""Define a melodic line for a specific track."""
data = json.loads(pattern)
store.set(track_id, data)
total = sum(n["duration"] for n in data)
name = TRACKS[track_id]["name"]
return f"OK - {name}: {len(data)} notes, total duration {total:.3f}s"
@tool
def set_drum_pattern(track_id: int, pattern: str) -> str:
"""Define a drum pattern for a specific drum track."""
data = json.loads(pattern)
store.set(track_id, data)
name = TRACKS[track_id]["name"]
return f"OK - {name}: {len(data)} hits"
return [set_drum_pattern, set_melody_pattern]

A melody pattern is a JSON array of {note, duration, velocity} objects where the sum of durations must equal LOOP_DURATION (4 bars). A drum pattern uses {note, velocity, offset} where offset is the time in seconds from the loop start. The note value -1 represents silence, which is crucial for creating space in the arrangement.

The Agent

The agent is a Strands Agent using Claude Sonnet on AWS Bedrock. The system prompt is heavily detailed with music production instructions: frequency ranges for each track, velocity guidelines, and structural rules. The key instruction is “less is more” – not all tracks should play notes all the time:

def create_agent(store: PatternStore) -> Agent:
return Agent(
model=BedrockModel(
model_id=Models.CLAUDE_SONNET,
region_name=AWS_REGION,
),
tools=create_tools(store),
system_prompt=SYSTEM_PROMPT,
callback_handler=None,
)

There are two agents: one for initial generation (calls all 8 tools) and one for live modifications (only modifies the tracks that need to change). A third, lighter agent using Haiku generates the menu suggestions to keep latency and cost low.

MIDI Players

Two player classes handle the actual MIDI output. The MelodyLoopPlayer iterates through note events with durations:

def _loop(self, melody: list):
while not self.stop_event.is_set():
current = self.store.get(self.track_id) or melody
for ev in current:
if self.stop_event.is_set():
break
note = ev["note"]
vel = ev.get("velocity", 80)
if note >= 0:
self._send("note_on", note=note, velocity=vel, channel=self.channel)
deadline = time.time() + ev["duration"]
while not self.stop_event.is_set() and time.time() < deadline:
time.sleep(0.02)
if note >= 0:
self._send("note_off", note=note, velocity=0, channel=self.channel)

The DrumLoopPlayer uses offset-based timing instead, scheduling hits at specific points within the loop. Both players read from the PatternStore on each loop iteration, which enables hot-swapping patterns during live modifications.

The Session

The Session class orchestrates everything. It manages the state machine (IDLE -> GENERATING -> PLAYING), owns the PatternStore, creates the agents, and handles MIDI input from the controller:

class Session:
def __init__(self):
self.state = State.IDLE
self.store = PatternStore()
self.agent = create_agent(self.store)
self.live_agent = create_live_agent(self.store)
self._agent_busy = threading.Lock()

When generation completes, playback starts with a progressive intro – tracks are unmuted one by one with a 2-bar delay between each, creating a build-up effect:

def _start_playback(self):
self.state = State.PLAYING
for tid in TRACKS:
self.players[tid].muted = True
self.players[tid].start(patterns[tid])
intro_delay = BAR_DURATION * 2
for i, tid in enumerate(INTRO_ORDER):
timer = threading.Timer(intro_delay * i, self._unmute_track, args=(tid,))
timer.start()

How It Works

  1. Run python cli.py play
  2. The app detects your MPK Mini Plus and shows a menu with AI-generated suggestions
  3. Select a suggestion or write your own prompt
  4. The AI generates 8 track patterns (takes a few seconds)
  5. Playback begins with a progressive build-up
  6. Write new instructions to modify the music live
  7. Use knobs K1-K8 to mute/unmute individual tracks
  8. PAD 1 regenerates with the same prompt, PAD 2 resets everything

Tech Stack

  • Python 3.13 with Poetry
  • Strands Agents for AI agent orchestration
  • AWS Bedrock (Claude Sonnet + Haiku) for pattern generation
  • mido + python-rtmidi for MIDI I/O
  • Akai MPK Mini Plus as MIDI controller
  • MPC Beats as the DAW/sound engine
  • Rich for terminal UI
  • Click for CLI

And that’s all. Full source code available on GitHub.

Predicting the future: time series forecasting with AI Agents and Amazon Chronos-Bolt

Predicting the future is something we all try to do. Whether it’s energy consumption, sensor readings, or production metrics, having a reliable forecast helps us make better decisions. The problem is that building a good forecasting model traditionally requires deep statistical knowledge, and a lot of tuning. What if we could just hand our data to an AI agent and ask “what’s going to happen next”?

That’s exactly what this project does. It combines Strands Agents with Amazon Chronos-Bolt, a foundation model for time series forecasting available on AWS Bedrock Marketplace, to create an AI agent that can forecast any numerical time series through natural language.

The architecture

The idea is simple. We have a Strands Agent powered by Claude (via AWS Bedrock) that understands natural language. When the user asks for a forecast, the agent calls a custom tool that invokes Chronos-Bolt to generate predictions. The agent then interprets the results and explains them in plain language.

The key here is that the agent doesn’t just return raw numbers. It understands the context, explains trends, and presents the confidence intervals in a way that makes sense.

The forecast tool

The tool is defined using the @tool decorator from Strands. This decorator turns a regular Python function into something the agent can discover and invoke on its own:

@tool
def forecast_time_series(
values: Annotated[
list[float],
"Historical time series values in chronological order. "
"Values should be evenly spaced (e.g., hourly, daily). Minimum 10 values.",
],
prediction_length: Annotated[
int,
"Number of future steps to predict. "
"Uses the same time unit as the input data.",
],
quantile_levels: Annotated[
Optional[list[float]],
"Quantile levels for confidence intervals. Default: [0.1, 0.5, 0.9]. "
"0.5 is the median forecast, 0.1 and 0.9 define the 80% confidence band.",
] = None,
) -> dict:

The Annotated type hints serve a dual purpose: they validate types at runtime and provide descriptions that the LLM reads to understand how to use the tool. This means the agent knows it needs a list of floats, a prediction length, and optionally custom quantile levels, all from the type annotations alone.

The tool validates the input (minimum 10 values, maximum 50,000, prediction length between 1 and 1,000), filters out NaN values, and then calls the Chronos-Bolt client:

result = invoke_chronos(
values=clean_values,
prediction_length=prediction_length,
quantile_levels=quantile_levels,
)
return {
"status": "success",
"content": [{"text": "\n".join(summary_lines)}],
"metadata": {
"quantiles": result.quantiles,
"prediction_length": result.prediction_length,
"history_length": result.history_length,
},
}

The response includes both a human-readable summary (in content) and the raw quantile data (in metadata), so the agent can reference exact numbers when explaining the forecast.

The Chronos-Bolt client

Chronos-Bolt is accessed through the Bedrock runtime API. The client sends the historical values and receives predictions at different quantile levels:

def invoke_chronos(
values: list[float],
prediction_length: int,
quantile_levels: list[float] | None = None,
) -> ForecastResult:
client = _get_bedrock_runtime_client()
payload = {
"inputs": [{"target": values}],
"parameters": {
"prediction_length": prediction_length,
"quantile_levels": quantiles,
},
}
response = client.invoke_model(
modelId=CHRONOS_ENDPOINT_ARN,
body=json.dumps(payload),
contentType="application/json",
accept="application/json",
)

The invoke_model call uses the SageMaker endpoint ARN deployed through Bedrock Marketplace. Chronos-Bolt returns predictions organized by quantile levels, by default, the 10th, 50th (median), and 90th percentiles. This gives us not just a single forecast line, but a confidence band: the 80% interval between the 10th and 90th percentiles tells us how uncertain the model is about its predictions.

The Bedrock runtime client is configured with generous timeouts (120s read, 30s connect) and automatic retries, since inference on time series data can take a moment depending on the history length:

def _get_bedrock_runtime_client():
return boto3.client(
"bedrock-runtime",
region_name=AWS_REGION,
config=Config(
read_timeout=120,
connect_timeout=30,
retries={"max_attempts": 3},
),
)

The agent

Wiring everything together is straightforward. We create a BedrockModel pointing to Claude and pass our forecast tool to the Agent:

from strands import Agent
from strands.models.bedrock import BedrockModel
from settings import AWS_REGION, Models
from forecast import forecast_time_series
SYSTEM_PROMPT = """You are a time series forecasting assistant powered by Amazon Chronos-Bolt.
You help users predict future values from historical numerical data. When a user provides
time series data or describes a scenario, use the forecast_time_series tool to generate
predictions.
When presenting results:
- Show the median forecast (quantile 0.5) as the main prediction
- Explain the confidence band (quantiles 0.1 and 0.9) as the uncertainty range
- Summarize trends in plain language
"""
def create_agent() -> Agent:
bedrock_model = BedrockModel(
model_id=Models.CLAUDE_SONNET,
region_name=AWS_REGION,
)
return Agent(
model=bedrock_model,
system_prompt=SYSTEM_PROMPT,
tools=[forecast_time_series],
)

The system prompt is important here. It tells Claude that it has forecasting capabilities and how to present the results. Without it, the agent would still call the tool correctly (thanks to the Annotated descriptions), but it might not explain the confidence bands or summarize trends as clearly.

Running it

The CLI entry point (cli.py) registers commands and wires everything together. The forecast command generates synthetic hourly data (a sine wave with noise) by default and asks the agent to forecast. You can also pass a custom prompt.

The entry point is minimal:

import click
from commands.forecast import run as forecast
@click.group()
def cli():
pass
cli.add_command(cmd=forecast, name="forecast")
if __name__ == "__main__":
cli()

The actual command lives in commands/forecast.py:

@click.command()
@click.option("--prompt", "-p", default=None, help="Custom prompt for the agent.")
def run(prompt: str | None):
agent = create_agent()
if prompt is None:
values = generate_sample_data(num_points=100)
values_str = ", ".join(f"{v:.2f}" for v in values)
prompt = (
f"I have the following hourly sensor readings from the last 100 hours:\n"
f"[{values_str}]\n\n"
f"Please forecast the next 24 hours and explain the predicted trend."
)
response = agent(prompt)
click.echo(response)

The sine wave is a good choice for a demo because it has a clear periodic pattern that Chronos-Bolt should capture well. With 100 hours of history (about 4 full cycles of a 24-hour pattern), the model has enough data to identify the periodicity and project it forward.

Example

(venv) ➜ src python cli.py forecast
2026-02-27 14:11:16,471 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2026-02-27 14:11:16,506 - INFO - Creating Strands MetricsClient
Sure! Let me run the forecast on your 100-hour sensor readings right away.
Tool #1: forecast_time_series
2026-02-27 14:11:22,981 - INFO - Starting forecast: history=100, prediction_length=24
2026-02-27 14:11:22,981 - INFO - Invoking Chronos-Bolt: history_length=100, prediction_length=24, quantiles=[0.1, 0.5, 0.9]
2026-02-27 14:11:22,994 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2026-02-27 14:11:23,697 - INFO - Forecast complete: 24 steps, 4 quantiles
Here are the results of your 24-hour forecast:

Script response

📊 24-Hour Sensor Forecast

Hour Low (10th %) Median (50th %) High (90th %)
+1 ~-0.29 ~0.85 ~0.99
+2 ~0.29 ~0.95 ~0.99
+3 ~0.49 ~0.99 ~0.99
+4 ~0.47 ~0.83 ~0.99
+5 ~0.24 ~0.72 ~0.96
+6 ~0.09 ~0.53 ~0.83
+7 ~-0.25 ~0.19 ~0.59
+8 ~-0.60 ~-0.22 ~0.25
+9 ~-0.91 ~-0.51 ~-0.04
+10 ~-0.99 ~-0.76 ~-0.30
+11 ~-0.99 ~-0.87 ~-0.46
+12 ~-0.99 ~-0.93 ~-0.56
+13 ~-0.99 ~-0.96 ~-0.57
+14 ~-0.99 ~-0.93 ~-0.52
+15 ~-0.99 ~-0.87 ~-0.44
+16 ~-0.99 ~-0.79 ~-0.35
+17 ~-0.96 ~-0.63 ~-0.16
+18 ~-0.86 ~-0.46 ~0.02
+19 ~-0.66 ~-0.24 ~0.21
+20 ~-0.44 ~-0.01 ~0.42
+21 ~-0.21 ~0.22 ~0.60
+22 ~-0.04 ~0.45 ~0.78
+23 ~0.16 ~0.66 ~0.90
+24 ~0.31 ~0.80 ~0.96

🔍 Trend Explanation

Your sensor data clearly exhibits a sinusoidal/oscillating pattern — cycling between peaks near +1.0 and troughs near -1.0 roughly every ~24 hours. The model has picked this up beautifully! Here’s what to expect:

  1. Hours +1 to +4 — Continued Peak 🔼: The signal is currently rising (ending at ~0.67–0.70), and the model predicts it will crest near ~+0.85 to +0.99 in the first few hours — completing the current positive phase.

  2. Hours +5 to +13 — Descent & Trough 🔽: The signal then descends sharply, hitting a trough around hours +12 to +14 with a median near -0.93 to -0.96 — matching the negative peaks seen in the historical data.

  3. Hours +14 to +24 — Recovery 🔼: After bottoming out, the signal climbs back up, reaching approximately +0.80 by hour +24, setting up the next positive cycle.

📐 Confidence Band

The 80% confidence interval (low–high columns) is relatively tight, reflecting the model’s high confidence in the periodic nature of this signal. The widest uncertainty occurs around the transition zones (hours +7–+9 and +17–+19), which is typical for oscillating signals near the zero-crossing points.

In short: your sensor is behaving like a clean oscillating signal with an ~24-hour period, and the next full cycle looks very consistent with historical behavior.Here are the results of your 24-hour forecast:


📊 24-Hour Sensor Forecast

Hour Low (10th %) Median (50th %) High (90th %)
+1 ~-0.29 ~0.85 ~0.99
+2 ~0.29 ~0.95 ~0.99
+3 ~0.49 ~0.99 ~0.99
+4 ~0.47 ~0.83 ~0.99
+5 ~0.24 ~0.72 ~0.96
+6 ~0.09 ~0.53 ~0.83
+7 ~-0.25 ~0.19 ~0.59
+8 ~-0.60 ~-0.22 ~0.25
+9 ~-0.91 ~-0.51 ~-0.04
+10 ~-0.99 ~-0.76 ~-0.30
+11 ~-0.99 ~-0.87 ~-0.46
+12 ~-0.99 ~-0.93 ~-0.56
+13 ~-0.99 ~-0.96 ~-0.57
+14 ~-0.99 ~-0.93 ~-0.52
+15 ~-0.99 ~-0.87 ~-0.44
+16 ~-0.99 ~-0.79 ~-0.35
+17 ~-0.96 ~-0.63 ~-0.16
+18 ~-0.86 ~-0.46 ~0.02
+19 ~-0.66 ~-0.24 ~0.21
+20 ~-0.44 ~-0.01 ~0.42
+21 ~-0.21 ~0.22 ~0.60
+22 ~-0.04 ~0.45 ~0.78
+23 ~0.16 ~0.66 ~0.90
+24 ~0.31 ~0.80 ~0.96

🔍 Trend Explanation

Your sensor data clearly exhibits a sinusoidal/oscillating pattern — cycling between peaks near +1.0 and troughs near -1.0 roughly every ~24 hours. The model has picked this up beautifully! Here’s what to expect:

  1. Hours +1 to +4 — Continued Peak 🔼: The signal is currently rising (ending at ~0.67–0.70), and the model predicts it will crest near ~+0.85 to +0.99 in the first few hours — completing the current positive phase.

  2. Hours +5 to +13 — Descent & Trough 🔽: The signal then descends sharply, hitting a trough around hours +12 to +14 with a median near -0.93 to -0.96 — matching the negative peaks seen in the historical data.

  3. Hours +14 to +24 — Recovery 🔼: After bottoming out, the signal climbs back up, reaching approximately +0.80 by hour +24, setting up the next positive cycle.

📐 Confidence Band

The 80% confidence interval (low–high columns) is relatively tight, reflecting the model’s high confidence in the periodic nature of this signal. The widest uncertainty occurs around the transition zones (hours +7–+9 and +17–+19), which is typical for oscillating signals near the zero-crossing points.

In short: your sensor is behaving like a clean oscillating signal with an ~24-hour period, and the next full cycle looks very consistent with historical behavior.


And that’s all! Full code in my GitHub account.

Transforming Raw Spreadsheets into Professional Excel Reports with AI Agents and Python

We all deal with spreadsheets. They’re everywhere, financial reports, sales data, operational metrics. But raw data in a flat table is just that: raw data. To extract insights, you need dashboards, charts, KPIs, conditional formatting, and executive summaries. Doing this manually is tedious. What if an AI agent could take any raw .xlsx file and transform it into a professional, multi-sheet workbook with formulas, charts, and insights, automatically?

That’s exactly what this project does. The idea is simple: you give it a spreadsheet, and an AI agent running Python inside a AWS sandbox analyzes the data, builds a Dashboard with KPI formulas, formats the source data, generates an executive summary with real insights, and creates analysis sheets with charts, all using Excel formulas, never hardcoded values.

The two-agent pattern

The core of the system is a two-agent architecture. An outer orchestrator agent (Claude Sonnet) manages the workflow, while an inner agent (Claude Opus) does the actual Excel work inside an AWS Bedrock Code Interpreter sandbox. This separation keeps the orchestration clean and lets the inner agent focus entirely on writing Python code with openpyxl.

The CLI entry point uses Click. When you run the command, it creates the orchestrator agent with the xlsx_enhancer tool:

@click.command()
@click.argument("input_file", type=click.Path(exists=True))
@click.argument("output_file", type=click.Path(), required=False)
def run(input_file: str, output_file: str | None):
if not output_file:
p = Path(input_file)
output_file = str(p.parent / f"enhanced_{p.name}")
agent = create_agent(
system_prompt=ORCHESTRATOR_PROMPT,
tools=[xlsx_enhancer],
hooks=[ToolProgressHook()],
)
response = agent(
f"Process the Excel file at {input_file} and save the enhanced version to {output_file}"
)
click.echo(f"Done: {str(response)}")

The agent factory wraps the Strands SDK configuration, model selection, retry logic, sliding window conversation management:

def create_agent(
system_prompt: str,
model: str = Models.CLAUDE_45,
tools: Optional[List[Any]] = None,
hooks: Optional[List[HookProvider]] = None,
temperature: float = 0.3,
read_timeout: int = 300,
connect_timeout: int = 60,
max_attempts: int = 10,
maximum_messages_to_keep: int = 30,
should_truncate_results: bool = True,
callback_handler: Any = None,
) -> Agent:
bedrock_model = create_bedrock_model(
model=model,
temperature=temperature,
read_timeout=read_timeout,
connect_timeout=connect_timeout,
max_attempts=max_attempts,
)
return Agent(
system_prompt=system_prompt,
model=bedrock_model,
conversation_manager=SlidingWindowConversationManager(
window_size=maximum_messages_to_keep,
should_truncate_results=should_truncate_results,
),
tools=tools,
hooks=hooks,
callback_handler=callback_handler,
)

The xlsx_enhancer tool

This is the centerpiece. It’s a Strands @tool that orchestrates a 4-step pipeline: upload the file to the sandbox, run the inner agent, verify the output, and download the result from the sandbox.

@tool
def xlsx_enhancer(input_file: str, output_file: str, instructions: str = "") -> dict:
"""Enhance an Excel file with professional formatting, dashboards, charts, and analysis sheets."""
input_path = Path(input_file)
output_path = Path(output_file)
if not input_path.exists():
return XlsxResult(success=False, error=f"Input file not found: {input_file}").model_dump()
if input_path.suffix.lower() != ".xlsx":
return XlsxResult(success=False, error=f"Input file must be .xlsx, got: {input_path.suffix}").model_dump()
user_prompt = USER_PROMPT
if instructions.strip():
user_prompt = f"{USER_PROMPT}\n\n## Additional Instructions\n{instructions}"
try:
code_tool = AgentCoreCodeInterpreter(region=AWS_REGION)
sandbox = SandboxIO(code_tool)
# 1. Upload
sandbox.upload(input_path, SANDBOX_INPUT)
# 2. Run the inner XLSX agent
agent = create_agent(
system_prompt=SYSTEM_PROMPT,
model=Models.CLAUDE_46_OPUS,
tools=[code_tool.code_interpreter],
)
response = agent(user_prompt)
# 3. Verify output exists in sandbox
if not sandbox.verify_exists(SANDBOX_OUTPUT):
return XlsxResult(
success=False,
error=f"The XLSX agent did not produce '{SANDBOX_OUTPUT}'",
).model_dump()
# 4. Download
output_path.parent.mkdir(parents=True, exist_ok=True)
sandbox.download(SANDBOX_OUTPUT, output_path)
return XlsxResult(success=True, output_path=str(output_path)).model_dump()
except SandboxIOError as e:
return XlsxResult(success=False, error=f"Sandbox I/O failed: {e}").model_dump()

The inner agent receives two carefully crafted prompts. The system prompt enforces hard rules about Excel integrity, formulas instead of hardcoded values, sheet name constraints, error handling. The user prompt defines the exact structure: Dashboard with KPI formulas, formatted Data sheet, executive Summary with LLM-generated insights, and Analysis sheets with charts.

The formula-first philosophy

One of the most important design decisions is that the agent never hardcodes computed values in cells. Every number in the output workbook comes from an Excel formula:

# FORBIDDEN - Computing in Python
total = df['Sales'].sum()
sheet['B10'] = total # Hardcodes a value
# REQUIRED - Excel formulas
sheet['B10'] = '=SUM(Data!D:D)'
sheet['C10'] = '=SUMIF(Data!A:A,"Category",Data!B:B)'
sheet['D10'] = '=IFERROR(AVERAGEIF(Data!A:A,A10,Data!D:D),0)'

This means the resulting Excel file is alive, change a value in the Data sheet and every KPI, every analysis table, every chart updates automatically. The IFERROR wrapping prevents #DIV/0! errors that would otherwise break AVERAGEIF formulas when a category has no data.

Handling binary files in the sandbox

The AWS Bedrock Code Interpreter sandbox runs Python in an isolated environment. Uploading the source file is straightforward, the bedrock client handles binary blobs natively. But downloading the result is trickier: the download_file method decodes everything as UTF-8, which corrupts binary xlsx files.

The solution is to base64-encode the file inside the sandbox and extract the text from the stream:

class SandboxIO:
def __init__(self, code_tool: AgentCoreCodeInterpreter):
self._code_tool = code_tool
def _get_client(self):
session_name, error = self._code_tool._ensure_session(None)
if error:
raise SandboxIOError(f"Failed to ensure session: {error}")
session_info = self._code_tool._sessions.get(session_name)
return session_info.client
def upload(self, local_path: Path, sandbox_name: str = "input.xlsx") -> None:
file_bytes = local_path.read_bytes()
client = self._get_client()
client.upload_file(path=sandbox_name, content=file_bytes)
def download(self, sandbox_name: str, local_path: Path) -> None:
client = self._get_client()
result = client.execute_code(
"import base64, os\n"
f"p = '{sandbox_name}'\n"
"data = open(p, 'rb').read()\n"
"print(base64.b64encode(data).decode())\n"
)
b64_text = _extract_stream_text(result)
file_bytes = base64.b64decode(b64_text.strip())
if not file_bytes.startswith(b"PK\x03\x04"):
raise SandboxIOError("Downloaded file is not a valid xlsx")
local_path.write_bytes(file_bytes)

The PK\x03\x04 check validates the ZIP magic bytes — every xlsx file is a ZIP archive internally.

The original xlsx file

This is the original file we feed into the agent. It’s a flat table with rows and columns. No formatting, no formulas, just bored raw data.

What the agent produces

Given a raw financial spreadsheet, the agent generates a multi-sheet workbook:

  • Dashboard: KPI cards with formulas (=SUM(Data!D:D), =COUNT(Data!A:A)), color-coded metrics, and a hyperlinked index to all sheets
  • Data: The original data with dark blue headers, alternating row colors, auto-filters, data bars on numeric columns, and frozen panes
  • Summary: An executive summary written by the LLM, key findings, concentration risks, trends, anomalies, and actionable recommendations
  • Analysis sheets: One per categorical column, each with a SUMIF/COUNTIF/AVERAGEIF table and a bar chart

The agent also detects the language of the input data and uses the same language for all generated content, sheet names, titles, labels, and the executive summary.

Monitoring tool execution

A simple hook tracks how long each tool execution takes. It can be extended to integrate with our application and provide real-time feedback to users about the agent’s progress:

class ToolProgressHook(HookProvider):
def __init__(self) -> None:
self._start_time: float = 0
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeToolCallEvent, self.on_tool_start)
registry.add_callback(AfterToolCallEvent, self.on_tool_end)
def on_tool_start(self, event: BeforeToolCallEvent) -> None:
self._start_time = time.time()
tool_name = event.tool_use.get("name", "unknown")
logger.info("Tool started: %s", tool_name)
def on_tool_end(self, event: AfterToolCallEvent) -> None:
elapsed = time.time() - self._start_time
tool_name = event.tool_use.get("name", "unknown")
logger.info("Tool finished: %s (%.1fs)", tool_name, elapsed)

And that’s all. With tools like Strands Agents and AWS Bedrock’s Code Interpreter, we can build AI agents that go beyond text generation, they produce real, functional artifacts. A raw spreadsheet goes in, a professional report comes out. No templates, no manual formatting, just an agent that understands data and knows how to present it.

Full code in my github account.

What if the bug fixed itself? Letting AI agents detect bugs, fix the code, and create PRs proactively.

What if an AI could not only identify errors in your logs but actually fix them and create a pull request? I have done this experiment to do exactly that.

We can put or application logs in CloudWatch and use AI agents with a worker-coordinator pattern (I’ll share a post explaining this). Today the idea is going one step further. We will detecte errors in our logs, and for certain types of fixable errors, we will let an AI agent fix the code and create a pull request automatically.

The core of the system is a tool decorated with @tool from Strands Agents. This makes it available to any AI agent that needs to trigger a fix:

from strands import tool

@tool
async def register_error_for_fix(error: LogEntry) -> bool:
    """
    Register an error for automatic fixing.
    Clones repo, creates fix branch, uses Claude to fix, creates PR.
    """
    repo = _setup_repo()

    branch_name = _create_fix_branch(repo, error)
    if branch_name is None:
        return True  # Branch already exists, skip

    claude_response = await _invoke_claude_fix(error.message)
    if claude_response is None:
        return False

    pr_info = pr_title_generator(claude_response)
    _commit_and_push(repo, branch_name, pr_info)
    _create_pull_request(branch_name, pr_info)

    return True

Step by Step Implementation

1. Repository Setup with GitPython

The tool first clones the repo or pulls the latest changes:

from git import Repo

def _setup_repo() -> Repo:
    repo_url = f"https://x-access-token:{GITHUB_TOKEN}@github.com/{GITHUB_REPO}.git"

    if (WORK_DIR / ".git").exists():
        repo = Repo(WORK_DIR)
        repo.git.pull(repo_url)
    else:
        repo = Repo.clone_from(repo_url, WORK_DIR)

    return repo

2. Branch Creation with Deduplication

Each fix gets its own branch with a timestamp. If the branch already exists remotely, we skip it to avoid duplicate PRs:

def _create_fix_branch(repo: Repo, error: LogEntry) -> str | None:
    branch_name = f"autofix/{error.fix_short_name}_{error.timestamp.strftime('%Y%m%d-%H%M%S')}"

    remote_refs = [ref.name for ref in repo.remote().refs]
    if f"origin/{branch_name}" in remote_refs:
        logger.info(f"Branch {branch_name} already exists, skipping")
        return None

    new_branch = repo.create_head(branch_name)
    new_branch.checkout()
    return branch_name

3. The Magic: Claude Code SDK

This is where the actual fix happens. Claude Code SDK allows Claude to read and edit files in the codebase:

from claude_code_sdk import ClaudeCodeOptions, query

async def _invoke_claude_fix(error_message: str) -> str | None:
    prompt = f"Fix this error in the codebase: {error_message}"

    options = ClaudeCodeOptions(
        cwd=str(WORK_DIR),
        allowed_tools=["Read", "Edit"]  # Safe: no Write, no Bash
    )

    response = None
    async for response in query(prompt=prompt, options=options):
        logger.info(f"Claude response: {response}")

    return response.result if response else None

Note that we only allow Read and Edit tools – no Write (creating new files) or Bash (running commands). This keeps the fixes focused and safe.

4. PR Title Generation with Claude Haiku

For fast and cheap PR title generation, I use Claude Haiku with structured output:

from pydantic import BaseModel, Field

class PrTitleModel(BaseModel):
    pr_title: str = Field(..., description="Concise PR title")
    pr_description: str = Field(..., description="Detailed PR description")

def pr_title_generator(response: str) -> PrTitleModel:
    agent = create_agent(
        system_prompt=PR_PROMPT,
        model=Models.CLAUDE_45_HAIKU,
        tools=[]
    )

    result = agent(
        prompt=f"This is response from claude code: {response}\n\n"
               f"Generate a concise title for a GitHub pull request.",
        structured_output_model=PrTitleModel
    )

    return result.structured_output

The prompt enforces Conventional Commits style:

PR_PROMPT = """
You are an assistant expert in generating pull request titles for GitHub.
OBJECTIVE:
- Generate concise and descriptive titles for pull requests.
- IMPORTANT: Use Conventional Commits as a style reference.
CRITERIA:
- The title must summarize the main changes or fixes.
- Keep the title under 10 words.

5. Commit, Push, and Create PR

Finally, we commit everything, push to the remote, and create the PR via GitHub API:

def _commit_and_push(repo: Repo, branch_name: str, pr_info: PrTitleModel) -> None:
    repo.git.add(A=True)
    repo.index.commit(pr_info.pr_title)
    repo.git.push(get_authenticated_repo_url(), branch_name)

def _create_pull_request(branch_name: str, pr_info: PrTitleModel) -> None:
    gh = Github(GITHUB_TOKEN)
    gh_repo = gh.get_repo(GITHUB_REPO)
    gh_repo.create_pull(
        title=pr_info.pr_title,
        body=pr_info.pr_description,
        head=branch_name,
        base="main"
    )

The Triage Agent: Deciding What to Fix

The tool is exposed to a triage agent that analyzes logs and decides when to use it. The agent follows the ReAct pattern (Reasoning + Acting), where it explicitly reasons about each error before deciding to act:

TRIAGE_PROMPT = """
You are a senior DevOps engineer performing triage of production errors.

REGISTRATION CRITERIA:
- The error may be occurring frequently. Register ONLY ONCE.
- The error has a clear stacktrace that indicates the root cause.
- The error can be corrected with a quick fix.

DISCARD CRITERIA:
✗ Single/isolated errors (may be malicious input)
✗ Errors from external services (network, timeouts)
✗ Errors without a clear stacktrace
✗ Errors that require business decisions

Use the ReAct pattern:
Thought: [your analysis of the error]
Action: [register_error_for_fix if criteria met]
Observation: [tool result]
... (repeat for each error type)
Final Answer: [summary of registered errors]

This pattern forces the agent to reason explicitly before taking action, making decisions more transparent and debuggable.

The agent is given tools and makes the decision autonomously:

agent = create_agent(
    system_prompt=TRIAGE_PROMPT,
    model=Models.CLAUDE_45,
    tools=[register_error_for_fix]
)

result = agent(prompt=[
    {"text": f"Question: {question}"},
    {"text": f"Log context: {logs_json}"},
])

To test the system, I created a sample repository with intentional bugs and generated CloudWatch-like logs. The triage agent analyzes the logs, identifies fixable errors, and invokes the register_error_for_fix tool to create PRs automatically.

That’s the code (with the bug):

import logging
import traceback

from flask import Flask, jsonify

from lib.logger import setup_logging
from settings import APP, PROCESS, LOG_PATH, ENVIRONMENT

logger = logging.getLogger(__name__)

app = Flask(__name__)

setup_logging(
    env=ENVIRONMENT,
    app=APP,
    process=PROCESS,
    log_path=LOG_PATH)

for logger_name in ["werkzeug"]:
    logging.getLogger(logger_name).setLevel(logging.CRITICAL)


@app.errorhandler(Exception)
def handle_exception(e):
    logger.error(
        "Unhandled exception: %s",
        e,
        extra={"traceback": traceback.format_exc()},
    )
    return jsonify(error=str(e)), 500


@app.get("/div/<int:a>/<int:b>")
def divide(a: int, b: int):
    return dict(result=a / b)

As you can see, the /div// endpoint has a bug: it does not handle division by zero properly. We have executed the error and generated logs accordingly. As we have the logs in CloudWatch’s log group /projects/autofix we can execute a command to analyze them:

pyhon cli.py log --group /projects/autofix --question "Analyze those logs" --start 2026-01-16

The AI agent will identify the division by zero error, decide it is fixable, and create a PR that modifies the code (using claude code in headless mode) to handle this case properly.

And that’s all! The AI agent has autonomously created a PR that fixes the bug. Now we can easily accept or reject the PR after human review. The bug has been fixed!

This experiment shows that AI agents can go beyond analysis to take action. By giving Claude Code SDK access to a sandboxed environment with limited tools (Read, Edit only), we get a system that can autonomously fix bugs while remaining controllable.

The key is setting clear boundaries: the triage agent decides what to fix based on strict criteria, and the fix agent is constrained to how it can modify code. This separation keeps the system predictable and safe.

Full code in my github

Using Map-Reduce to process large documents with AI Agents and Python

We live in the era of Large Language Models (LLMs) with massive context windows. Claude 3.5 Sonnet offers 200k tokens, and Gemini 1.5 Pro goes up to 2 million. So, why do we still need to worry about document processing strategies? The answer is yes, we do. For example, AWS Bedrock has a strict limit of 4.5MB for documents, regardless of token count. That’s means we can’t just stuff file greater than 4.5MB into a prompt. Today we’ll show you how I built a production-ready document processing agent that handles large files by implementing a Map-Reduce pattern using Python, AWS Bedrock, and Strands Agents.

The core idea is simple: instead of asking the LLM to “read this book and answer” we break the book into chapters, analyze each chapter in parallel, and then synthesize the results.

Here is the high-level flow:

The heart of the implementation is the DocumentProcessor class. It decides whether to process a file as a whole or split it based on a size threshold. We define a threshold (e.g., 4.3MB) to stay safely within Bedrock’s limits. If the file is larger, we trigger the _process_big method.

# src/lib/processor/processor.py

BYTES_THRESHOLD = 4_300_000

async def _process_file(self, file: DocumentFile, question: str, with_callback=True):
    file_bytes = Path(file.path).read_bytes()
    # Strategy pattern: Choose the right processor based on file size
    processor = self._process_big if len(file_bytes) > BYTES_THRESHOLD else self._process
    async for chunk in processor(file_bytes, file, question, with_callback):
        yield chunk

To increase the performance, we use asyncio to process the file in parallel and we use a semaphore to control the number of workers.

async def _process_big(self, file_bytes: bytes, file: DocumentFile, question: str, with_callback=True) -> AsyncIterator[str]:
    # ... splitting logic ...
    semaphore = asyncio.Semaphore(self.max_workers)

    # Create async tasks for each chunk
    tasks = [
        self._process_chunk(chunk, i, file_name, question, handler.format, semaphore)
        for i, chunk in enumerate(chunks, 1)
    ]

    # Run in parallel
    results = await asyncio.gather(*tasks)
    
    # Sort results to maintain document order
    results.sort(key=lambda x: x[0])
    responses_from_chunks = [response for _, response in results]

Each chunk is processed by an isolated agent instance that only sees that specific fragment and the user’s question. Once we have the partial analyses, we consolidate them. This acts as a compression step: we’ve turned raw pages into relevant insights.

def _consolidate_and_truncate(self, responses: list[str], num_chunks: int) -> str:
    consolidated = "\n\n".join(responses)
    
    if len(consolidated) > MAX_CONTEXT_CHARS:
        # Safety mechanism to ensure we don't overflow the final context
        return consolidated[:MAX_CONTEXT_CHARS] + "\n... [TRUNCATED]"
    return consolidated

Finally, we feed this consolidated context to the agent for the final answer. In a long-running async process, feedback is critical. I implemented an Observer pattern to decouple the processing logic from the UI/Logging.

# src/main.py

class DocumentProcessorEventListener(ProcessingEventListener):
    async def on_chunk_start(self, chunk_number: int, file_name: str):
        logger.info(f"[Worker {chunk_number}] Processing chunk for file {file_name}")

    async def on_chunk_end(self, chunk_number: int, file_name: str, response: str):
        logger.info(f"[Worker {chunk_number}] Completed chunk for file {file_name}")

By breaking down large tasks, we not only bypass technical limits but often get better results. The model focuses on smaller sections, reducing hallucinations, and the final answer is grounded in a pre-processed summary of facts.

We don’t just send text; we send the raw document bytes. This allows the model (Claude 4.5 Sonnet via Bedrock) to use its native document processing capabilities. Here is how we construct the message payload:

# src/lib/processor/processor.py

def _create_document_message(self, file_format: str, file_name: str, file_bytes: bytes, text: str) -> list:
    return [
        {
            "role": "user",
            "content": [
                {
                    "document": {
                        "format": file_format,
                        "name": file_name,
                        "source": {"bytes": file_bytes},
                    },
                },
                {"text": text},
            ],
        },
    ]

When processing chunks, we don’t want the model to be chatty. We need raw information extraction. We use a “Spartan” system prompt that enforces brevity and objectivity, ensuring the consolidation phase receives high-signal input.

# src/lib/processor/prompts.py

SYSTEM_CHUNK_PROMPT = f"""
You are an artificial intelligence assistant specialized in reading and analyzing files.
You have received a chunk of a large file.
...
If the user's question cannot be answered with the information in the current chunk, do not answer it directly.

{SYSTEM_PROMPT_SPARTAN}

The SYSTEM_PROMPT_SPARTAN (injected above) explicitly forbids conversational filler, ensuring we maximize the token budget for actual data.

The project handles pdf and xlsx files. The rest of the file types are not processed and are given to the LLM as-is.

With this architecture, we can process large files in a production environment. This allows us to easily plug in different interfaces, whether it’s a CLI logger (as shown) or a WebSocket update for a UI frontend like Chainlit.

Full code in my github

Chat with your Data: Building a File-Aware AI Agent with AWS Bedrock and Chainlit

We all know LLMs are powerful, but their true potential is unlocked when they can see your data. While RAG (Retrieval-Augmented Generation) is great for massive knowledge bases, sometimes you just want to drag and drop a file and ask questions about it.

Today we’ll build a “File-Aware” AI agent that can natively understand a wide range of document formats—from PDFs and Excel sheets to Word docs and Markdown files. We’ll use AWS Bedrock with Claude 4.5 Sonnet for the reasoning engine and Chainlit for the conversational UI.

The idea is straightforward: Upload a file, inject it into the model’s context, and let the LLM do the rest. No vector databases, no complex indexing pipelines—just direct context injection for immediate analysis.

The architecture is simple yet effective. We intercept file uploads in the UI, process them into a format the LLM understands, and pass them along with the user’s query.

┌──────────────┐      ┌──────────────┐      ┌────────────────────┐
│   Chainlit   │      │  Orchestrator│      │   AWS Bedrock      │
│      UI      │─────►│    Agent     │─────►│(Claude 4.5 Sonnet) │
└──────┬───────┘      └──────────────┘      └────────────────────┘
       │                      ▲
       │    ┌────────────┐    │
       └───►│ File Proc. │────┘
            │   Logic    │
            └────────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for high-quality reasoning and large context windows.
  • Chainlit for a chat-like interface with native file upload support.
  • Python for the backend logic.

The core challenge is handling different file types and presenting them to the LLM. We support a variety of formats by mapping them to Bedrock’s expected input types.

To enable file uploads in Chainlit, you need to configure the [features.spontaneous_file_upload] section in your .chainlit/config.toml. This is where you define which MIME types are accepted.

[features.spontaneous_file_upload]
    enabled = true
    accept = [
        "application/pdf",
        "text/csv",
        "application/msword",
        "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        "application/vnd.ms-excel",
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "text/html",
        "text/plain",
        "text/markdown",
        "text/x-markdown"
    ]
    max_files = 20
    max_size_mb = 500
The main agent loop handles the conversation. It checks for uploaded files, processes them, and constructs the message payload for the LLM. We also include robust error handling to manage context window limits gracefully.
def get_question_from_message(message: cl.Message):
    content_blocks = None
    if message.elements:
        content_blocks = get_content_blocks_from_message(message)

    if content_blocks:
        content_blocks.append({"text": message.content or "Write a summary of the document"})
        question = content_blocks
    else:
        question = message.content

    return question


def get_content_blocks_from_message(message: cl.Message):
    docs = [f for f in message.elements if f.type == "file" and f.mime in MIME_MAP]
    content_blocks = []

    for doc in docs:
        file = Path(doc.path)
        file_bytes = file.read_bytes()
        shutil.rmtree(file.parent)

        content_blocks.append({
            "document": {
                "name": sanitize_filename(doc.name),
                "format": MIME_MAP[doc.mime],
                "source": {"bytes": file_bytes}
            }
        })

    return content_blocks

@cl.on_message
async def handle_message(message: cl.Message):
    task = asyncio.create_task(process_user_task(
        question=get_question_from_message(message),
        debug=DEBUG))
    cl.user_session.set("task", task)
    try:
        await task
    except asyncio.CancelledError:
        logger.info("User task was cancelled.")

This pattern allows for ad-hoc analysis. You don’t need to pre-ingest data. You can:

  1. Analyze Financials: Upload an Excel sheet and ask for trends.
  2. Review Contracts: Upload a PDF and ask for clause summaries.
  3. Debug Code: Upload a source file and ask for a bug fix.
By leveraging the large context window of modern models like Claude 4.5 Sonnet, we can feed entire documents directly into the prompt, providing the model with full visibility without the information loss often associated with RAG chunking.

And that's all. With tools like Chainlit and powerful APIs like AWS Bedrock, we can create robust, multi-modal assistants that integrate seamlessly into our daily workflows.

Full code in my github account.

Building scalable multi-purpose AI agents: Orchestrating Multi-Agent Systems with Strands Agents and Chainlit

We can build simple AI agents that handle specific tasks quite easily today. But what about building AI systems that can handle multiple domains effectively? One approach is to create a single monolithic agent that tries to do everything, but this quickly runs into problems of context pollution, maintenance complexity, and scaling limitations. In this article, we’ll show a production-ready pattern for building multi-purpose AI systems using an orchestrator architecture that coordinates domain-specific agents.

The idea is simple: Don’t build one agent to rule them all instead, create specialized agents that excel in their domains and coordinate them through an intelligent orchestrator. The solution is an orchestrator agent that routes requests to specialized sub-agents, each with focused expertise and dedicated tools. Think of it as a smart router that understands intent and delegates accordingly.

That’s the core of the Orchestrator Pattern for multi-agent systems:

User Query → Orchestrator Agent → Specialized Agent(s) → Orchestrator → Response

For our example we have three specialized agents:

  1. Weather Agent: Expert in meteorological data and weather patterns. It uses external weather APIs to fetch historical and current weather data.
  2. Logistics Agent: Specialist in supply chain and shipping operations. Fake logistics data is generated to simulate shipment tracking, route optimization, and delivery performance analysis.
  3. Production Agent: Focused on manufacturing operations and production metrics. Also, fake production data is generated to analyze production KPIs.

That’s the architecture in a nutshell:

┌─────────────────────────────────────────────┐
│          Orchestrator Agent                 │
│  (Routes &amp; Synthesizes)                 │
└────────┬─────────┬─────────┬────────────────┘
         │         │         │
    ┌────▼────┐ ┌──▼─────┐ ┌─▼─────────┐
    │ Weather │ │Logistic│ │Production │
    │  Agent  │ │ Agent  │ │  Agent    │
    └────┬────┘ └──┬─────┘ └┬──────────┘
         │         │        │
    ┌────▼────┐ ┌──▼─────┐ ┌▼──────────┐
    │External │ │Database│ │ Database  │
    │   API   │ │ Tools  │ │  Tools    │
    └─────────┘ └────────┘ └───────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for agent reasoning
  • Strands Agents framework for agent orchestration
  • Chainlit for the conversational UI
  • FastAPI for the async backend
  • PostgreSQL for storing conversation history and domain data

The orchestrator’s job is simple but critical: understand the user’s intent and route to the right specialist(s).

MAIN_SYSTEM_PROMPT = """You are an intelligent orchestrator agent 
responsible for routing user requests to specialized sub-agents 
based on their domain expertise.

## Available Specialized Agents

### 1. Production Agent
**Domain**: Manufacturing operations, production metrics, quality control
**Handles**: Production KPIs, machine performance, downtime analysis

### 2. Logistics Agent
**Domain**: Supply chain, shipping, transportation operations
**Handles**: Shipment tracking, route optimization, delivery performance

### 3. Weather Agent
**Domain**: Meteorological data and weather patterns
**Handles**: Historical weather, atmospheric conditions, climate trends

## Your Decision Process
1. Analyze the request for key terms and domains
2. Determine scope (single vs multi-domain)
3. Route to appropriate agent(s)
4. Synthesize results when multiple agents are involved
"""

The orchestrator receives specialized agents as tools:

def get_orchestrator_tools() -> List[Any]:
    from tools.logistics.agent import logistics_assistant
    from tools.production.agent import production_assistant
    from tools.weather.agent import weather_assistant

    tools = [
        calculator,
        think,
        current_time,
        AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter,
        logistics_assistant,  # Specialized agent as tool
        production_assistant,  # Specialized agent as tool
        weather_assistant     # Specialized agent as tool
    ]
    return tools

Each specialized agent follows a consistent pattern. Here’s the weather agent:

@tool
@stream_to_step("weather_assistant")
async def weather_assistant(query: str):
    """
    A research assistant specialized in weather topics with streaming support.
    """
    try:
        tools = [
            calculator,
            think,
            current_time,
            AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter
        ]
        # Domain-specific tools
        tools += WeatherTools(latitude=MY_LATITUDE, longitude=MY_LONGITUDE).get_tools()

        research_agent = get_agent(
            system_prompt=WEATHER_ASSISTANT_PROMPT,
            tools=tools
        )

        async for token in research_agent.stream_async(query):
            yield token

    except Exception as e:
        yield f"Error in research assistant: {str(e)}"

Each agent has access to domain-specific tools. For example, the weather agent uses external APIs:

class WeatherTools:
    def __init__(self, latitude: float, longitude: float):
        self.latitude = latitude
        self.longitude = longitude

    def get_tools(self) -> List[tool]:
        @tool
        def get_hourly_weather_data(from_date: date, to_date: date) -> MeteoData:
            """Get hourly weather data for a specific date range."""
            url = (f"https://api.open-meteo.com/v1/forecast?"
                   f"latitude={self.latitude}&longitude={self.longitude}&"
                   f"hourly=temperature_2m,relative_humidity_2m...")
            response = requests.get(url)
            return parse_weather_response(response.json())
        
        return [get_hourly_weather_data]

The logistics and production agents use synthetic data generators for demonstration:

class LogisticsTools:
    def get_tools(self) -> List[tool]:
        @tool
        def get_logistics_data(
            from_date: date,
            to_date: date,
            origins: Optional[List[str]] = None,
            destinations: Optional[List[str]] = None,
        ) -> LogisticsDataset:
            """Generate synthetic logistics shipment data."""
            # Generate realistic shipment data with delays, costs, routes
            records = generate_synthetic_shipments(...)
            return LogisticsDataset(records=records, aggregates=...)
        
        return [get_logistics_data]

For UI we’re going to use Chainlit. The Chainlit integration provides real-time visibility into agent execution:

class LoggingHooks(HookProvider):
    async def before_tool(self, event: BeforeToolCallEvent) -> None:
        step = cl.Step(name=f"{event.tool_use['name']}", type="tool")
        await step.send()
        cl.user_session.set(f"step_{event.tool_use['name']}", step)

    async def after_tool(self, event: AfterToolCallEvent) -> None:
        step = cl.user_session.get(f"step_{event.tool_use['name']}")
        if step:
            await step.update()

@cl.on_message
async def handle_message(message: cl.Message):
    agent = cl.user_session.get("agent")
    message_history = cl.user_session.get("message_history")
    message_history.append({"role": "user", "content": message.content})
    
    response = await agent.run_async(message.content)
    await cl.Message(content=response).send()

This creates a transparent experience where users see:

  • Which agent is handling their request
  • What tools are being invoked
  • Real-time streaming of responses

Now we can handle a variety of user queries: For example:

User: “What was the average temperature last week?”

Flow:

  1. Orchestrator identifies weather domain
  2. Routes to weather_assistant
  3. Weather agent calls get_hourly_weather_data
  4. Analyzes and returns formatted response

Or multi-domain queries:

User: “Did weather conditions affect our shipment delays yesterday?”

Flow:

  1. Orchestrator identifies weather + logistics domains
  2. Routes to weather_assistant for climate data
  3. Routes to logistics_assistant for shipment data
  4. Synthesizes correlation analysis
  5. Returns unified insight

And complex analytics:

User: “Analyze production efficiency trends and correlate with weather and logistics performance based in yesterday’s data.”

Flow:

  1. Orchestrator coordinates all three agents
  2. Production agent retrieves manufacturing KPIs
  3. Weather agent provides environmental data
  4. Logistics agent supplies delivery metrics
  5. Orchestrator synthesizes multi-domain analysis

This architecture scales naturally in multiple dimensions. We can easily add new specialized agents without disrupting existing functionality. WE only need to create the new agent and register it as a tool with the orchestratortrator prompt with new domain description. That’s it.

The orchestrator pattern transforms multi-domain AI from a monolithic challenge into a composable architecture. Each agent focuses on what it does best, while the orchestrator provides intelligent coordination.

Full code in my github.

Building ReAct AI agents with sandboxed Python code execution using AWS Bedrock and LangGraph

In industrial environments, data analysis is crucial for optimizing processes, detecting anomalies, and making informed decisions. Manufacturing plants, energy systems, and industrial IoT generate massive amounts of data from sensors, machines, and control systems. Traditionally, analyzing this data requires specialized knowledge in both industrial processes and data science, creating a bottleneck for quick insights.

I’ve been exploring agentic AI frameworks lately, particularly for complex data analysis tasks. While working on industrial data problems, I realized that combining the reasoning capabilities of Large Language Models with specialized tools could create a powerful solution for industrial data analysis. This project demonstrates how to build a ReAct ( Reasoning and Acting) AI agent using LangGraph that can analyze manufacturing data, understand industrial processes, and provide actionable insights.

The goal of this project is to create an AI agent that can analyze industrial datasets (manufacturing metrics, sensor readings, process control data) and provide expert-level insights about production optimization, quality control, and process efficiency. Using LangGraph’s ReAct agent framework with AWS Bedrock, the system can execute Python code dynamically in a sandboxed environment, process large datasets, and reason about industrial contexts.

The dataset is a fake sample of industrial data with manufacturing metrics like temperature, speed, humidity, pressure, operator experience, scrap rates, and unplanned stops. In fact, I’ve generated the dataset using chatgpt

This project uses several key components:

  • LangGraph ReAct Agent: For building the multi-tool AI agent with ReAct (Reasoning and Acting) patterns that can dynamically choose tools and reason about results
  • AWS Bedrock: Claude Sonnet 4 as the underlying LLM for reasoning and code generation
  • Sandboxed Code Interpreter: Secure execution of Python code for data analysis using AWS Agent Core. One tool taken from strands-agents-tools library.
  • Industrial Domain Expertise: Specialized system prompts with knowledge of manufacturing processes, quality control, and industrial IoT

The agent has access to powerful tools:

  • Code Interpreter: Executes Python code safely in a sandboxed AWS environment using pandas, numpy, scipy, and other scientific libraries
  • Data Processing: Handles large industrial datasets with memory-efficient strategies
  • Industrial Context: Understands manufacturing processes, sensor data, and quality metrics

The system uses AWS Agent Core’s sandboxed code interpreter, which means:

  • Python code is executed in an isolated environment
  • No risk to the host system
  • Access to scientific computing libraries (pandas, numpy, scipy)
  • Memory management for large datasets

The core of the system is surprisingly simple. The ReAct agent is built using LangGraph’s create_react_agent with custom tools:

from langgraph.prebuilt import create_react_agent
from typing import List
import pandas as pd
from langchain_core.callbacks import BaseCallbackHandler


def analyze_df(df: pd.DataFrame, system_prompt: str, user_prompt: str,
               callbacks: List[BaseCallbackHandler], streaming: bool = False):
    code_interpreter_tools = CodeInterpreter()
    tools = code_interpreter_tools.get_tools()

    agent = create_react_agent(
        model=get_llm(model=DEFAULT_MODEL, streaming=streaming,
                      budget_tokens=12288, callbacks=callbacks),
        tools=tools,
        prompt=system_prompt
    )

    agent_prompt = f"""
    I have a DataFrame with the following data:
    - Columns: {list(df.columns)}
    - Shape: {df.shape}
    - data: {df}
    
    The output must be an executive summary with the key points.
    The response must be only markdown, not plots.
    """
    messages = [
        ("user", agent_prompt),
        ("user", user_prompt)
    ]
    agent_input = {"messages": messages}
    return agent. Invoke(agent_input)

The ReAct pattern (Reasoning and Acting) allows the agent to:

  1. Reason about what analysis is needed
  2. Act by calling the appropriate tools (in this case: code interpreter)
  3. Observe the results of code execution
  4. Re-reason and potentially call more tools if needed

This creates a dynamic loop where the agent can iteratively analyze data, examine results, and refine its approach – much more powerful than a single code execution.

The magic happens in the system prompt, which provides the agent with industrial domain expertise:

SYSTEM_PROMPT = """
# Industrial Data Analysis Agent - System Prompt

You are an expert AI agent specialized in industrial data analysis and programming. 
You excel at solving complex data problems in manufacturing, process control, 
energy systems, and industrial IoT environments.

## Core Capabilities
- Execute Python code using pandas, numpy, scipy
- Handle large datasets with chunking strategies  
- Process time-series data, sensor readings, production metrics
- Perform statistical analysis, anomaly detection, predictive modeling

## Industrial Domain Expertise
- Manufacturing processes and production optimization
- Process control systems (PID controllers, SCADA, DCS)
- Industrial IoT sensor data and telemetry
- Quality control and Six Sigma methodologies
- Energy consumption analysis and optimization
- Predictive maintenance and failure analysis
"""

The code interpreter tool is wrapped with safety validations:

def validate_code_ast(code: str) -> bool:
    """Validate Python code using AST to ensure safety."""
    try:
        ast.parse(code)
        return True
    except SyntaxError:
        return False


@tool
def code_interpreter(code: str) -> str:
    """Executes Python code in a sandboxed environment."""
    if not validate_code_ast(code):
        raise UnsafeCodeError("Unsafe code or syntax errors.")

    return code_tool(code_interpreter_input={
        "action": {
            "type": "executeCode",
            "session_name": session_name,
            "code": code,
            "language": "python"
        }
    })
The system uses Claude Sonnet 4 through AWS Bedrock with optimized parameters for industrial analysis:
def get_llm(model: str = DEFAULT_MODEL, max_tokens: int = 4096,
            temperature: float = TemperatureLevel.BALANCED,
            top_k: int = TopKLevel.DIVERSE,
            top_p: float = TopPLevel.CREATIVE) -> BaseChatModel:
    model_kwargs = {
        "max_tokens": max_tokens,
        "temperature": temperature,
        "top_k": top_k,
        "top_p": top_p
    }

    return ChatBedrock(
        model=model,
        client=aws_get_service('bedrock-runtime'),
        model_kwargs=model_kwargs
    )
The project includes fake sample industrial data with manufacturing metrics:

- `machine_id`: Equipment identifier
- `shift`: Production shift (A/M/N for morning/afternoon/night)
- `temperature`, `speed`, `humidity`, `pressure`: Process parameters
- `operator_experience`: Years of operator experience
- `scrap_kg`: Quality metric (waste produced)
- `unplanned_stop`: Equipment failure indicator

A typical analysis query might be: "Do temperature and speed setpoints vary across shifts?"
The agent will stream the response as it generates it.

The agent will:

1. Load and examine the dataset structure
2. Generate appropriate Python code for analysis
3. Execute the code in a sandboxed environment
4. Provide insights about shift-based variations
5. Suggest process optimization recommendations
import logging

import pandas as pd
from langchain_core.callbacks import StreamingStdOutCallbackHandler

from modules.df_analyzer import analyze_df
from prompts import SYSTEM_PROMPT

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


class StreamingCallbackHandler(StreamingStdOutCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end='', flush=True)


df = pd.read_csv('fake_data.csv')

user_prompt = "Do temperature and speed setpoints vary across shifts?"
for chunk in analyze_df(
        user_prompt=user_prompt,
        df=df,
        system_prompt=SYSTEM_PROMPT,
        callbacks=[StreamingCallbackHandler()],
        streaming=True):
    logger.debug(chunk)

This project demonstrates the power of agentic AI for specialized domains. Instead of building custom analytics dashboards or writing specific analysis scripts, we provide the agent with:

  1. Domain Knowledge: Through specialized system prompts
  2. Tools: Safe code execution capabilities
  3. Context: The actual data to analyze

The agent can then:

  • Generate appropriate analysis code
  • Execute it safely
  • Interpret results with industrial context
  • Provide actionable recommendations

The result is a flexible system that can handle various industrial analysis tasks without pre-programmed solutions. The agent reasons about the problem, writes the necessary code (sandboxed), and provides expert-level insights.

Full code in my github.

Building an Agentic AI with Python, LangChain, AWS Bedrock and Claude 4 Sonnet

Today we are going to build an agent with IA. It is just an example of how to build a agent with LangChain and AWS Bedrock and Claude 4 Sonnet. The agent will be a “mathematical expert” capable of performing complex calculations and providing detailed explanations of its reasoning process. The idea is to provide the agent with the ability to perform mathematical operations like addition, subtraction. In fact, with additions and subtractions, we can perform all the mathematical operations, like multiplication, division, exponentiation, square root, etc. The agent will be able to perform these operations step by step, providing a detailed explanation of its reasoning process. I know that we don’t need to use AI to perform these operations, but the idea is to show how to build an agent with LangChain and AWS Bedrock and Claude 4 Sonnet.

The mathematical agent implements the tool-calling pattern, allowing the LLM to dynamically select and execute mathematical operations:

import logging

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate

from core.llm.aws import get_llm, Models
from modules.prompts import AGENT_SYSTEM_PROMPT
from modules.tools import MathTools
from settings import MAX_TOKENS

logger = logging.getLogger(__name__)


def run(question: str, model: Models = Models.CLAUDE_4):
    prompt = ChatPromptTemplate.from_messages([
        ("system", AGENT_SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}")
    ])
    math_tools = MathTools()
    tools = math_tools.get_tools()

    llm = get_llm(model=model, max_tokens=MAX_TOKENS)
    agent = create_tool_calling_agent(llm, tools, prompt)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=10
    )

    response = agent_executor.invoke({
        "input": question
    })

    logger.info(f"Agent response: {response['output']}")

Tools are defined using LangChain’s @tool decorator, providing automatic schema generation and type validation. Really we don’t need to create a class for the tools, but I have done it because I want to add an extra feature to the agent: the ability to keep a history of the operations performed. This will allow the agent to provide a detailed explanation of its reasoning process, showing the steps taken to arrive at the final result.

import logging
from typing import List

from langchain.tools import tool

logger = logging.getLogger(__name__)


class MathTools:

    def __init__(self):
        self.history = []

    def _diff_values(self, a: int, b: int) -> int:
        result = a - b
        self.history.append(f"{a} - {b} = {result}")
        return result

    def _sum_values(self, a: int, b: int) -> int:
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result

    def _get_history(self) -> str:
        if not self.history:
            return "No previous operations"
        return "\n".join(self.history[-5:])  # Last 5

    def get_tools(self) -> List:
        @tool
        def diff_values(a: int, b: int) -> int:
            """Calculates the difference between two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: difference of a - b
            """
            logger.info(f"Calculating difference: {a} - {b}")
            return self._diff_values(a, b)

        @tool
        def sum_values(a: int, b: int) -> int:
            """Sums two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: sum of a + b
            """
            logger.info(f"Calculating sum: {a} + {b}")
            return self._sum_values(a, b)

        @tool
        def get_history() -> str:
            """Gets the operation history
            Returns:
                str: last operations
            """
            logger.info("Retrieving operation history")
            return self._get_history()

        return [diff_values, sum_values, get_history]

The system prompt is carefully crafted to guide the agent’s behavior and establish clear operational boundaries:

AGENT_SYSTEM_PROMPT = """
You are an expert mathematical agent specialized in calculations.

You have access to the following tools:
- diff_values: Calculates the difference between two numbers
- sum_values: Sums two numbers
- get_history: Gets the operation history

Guidelines:
1. Only answer questions related to mathematical operations.
2. For complex operations, use step-by-step calculations:
   - Multiplication: Repeated addition
   - Division: Repeated subtraction
   - Exponentiation: Repeated multiplication
   - Square root: Use methods like Babylonian method or prime factorization.
"""

Now we can invoke our agent by asking questions such as ‘What’s the square root of 16 divided by two, squared?’. The agent will iterate using only the provided tools to obtain the result.

And that’s all. This project demonstrates how to build a production-ready AI agent using LangChain and AWS Bedrock. It’s just a boilerplate, but it can be extended to create more complex agents with additional capabilities and understand how AI agents work.

Full code in my GitHub account.

Sending logs to AWS CloudWatch with a sidecar pattern and Python

In a Docker Swarm environment, the sidecar pattern is a common architectural approach used to extend the functionality of a primary container without directly modifying it.

First there’s a primary container. This is the main application or service you want to run within a Docker Swarm service. It’s responsible for the core functionality of your application. In our case, this container will be the log generator. It will save logs within a json format.

The sidecar container is a secondary container that runs alongside the primary container. It’s tightly coupled with the primary container and assists it by providing additional services or functionality. In our example, the sidecar responsibility will be push logs to AWS CloudWatch. The idea is sharing a docker volume between both containers. Whit this technique, our primary container will not be affected in network latency generating logs.

The idea is to generate something like this:

version: '3.6'

services:
  api:
    image: api:latest
    logging:
      options:
        max-size: 10m
    deploy:
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
      - PROCESS_ID=api
    ports:
      - 5000:5000
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000 --timeout 180

  filebeat:
    image: cw:production
    command: bash -c "python cw.py && sleep 1m"
    deploy:
      restart_policy:
        condition: any
    environment:
      - LOG_GROUP=python_logs_example
      - LOG_STREAM_PREFIX=default
    volumes:
      - logs_volume:/src/logs
volumes:
  logs_volume:

Let’s go. First, we need to setup our aws credentials. We can use a profile or a IAM user

if AWS_PROFILE_NAME:
    session = boto3.Session(profile_name=AWS_PROFILE_NAME, region_name=AWS_REGION)
else:
    session = boto3.Session(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION)

logs = session. Client('logs')

then we need to setup the logGroup and logStream in CloudWatch. We can generate them by hand, but I preffer to generate them programatically, if they don’t exist.

def init_cloudwatch_stream():
    log_stream_name = f"{LOG_STREAM_PREFIX}_{datetime.now().strftime('%Y%m%d')}"

    log_groups = logs.describe_log_groups(logGroupNamePrefix=LOG_GROUP)['logGroups']
    if not any(group['logGroupName'] == LOG_GROUP for group in log_groups):
        logs.create_log_group(logGroupName=LOG_GROUP)

    log_streams = logs.describe_log_streams(
        logGroupName=LOG_GROUP,
        logStreamNamePrefix=log_stream_name
    )['logStreams']

    if not any(stream['logStreamName'] == log_stream_name for stream in log_streams):
        logs.create_log_stream(logGroupName=LOG_GROUP, logStreamName=log_stream_name)

    return log_stream_name

Now we need to upload logs to CloudWatch. We need to use put_log_events. To send multiple logs we need to use a sequenceToken (not the first time). To do that I use this trick.

function_parameters = dict(
    logGroupName=LOG_GROUP,
    logStreamName=log_stream_name
)

for f in glob(f'{LOG_PATH}/*.{LOG_EXTENSION}'):
    function_parameters['logEvents'] = get_log_events_from_file(f)
    response = logs.put_log_events(**function_parameters)
    function_parameters['sequenceToken'] = response['nextSequenceToken']

We also need to read log files and maybe change the fields according to your needs

def get_log_events_from_file(file):
    exclude_fields = ('@timestamp', 'logger')
    return [
        dict(
            timestamp=int(datetime.fromisoformat(d['@timestamp']).timestamp() * 1000),
            message=json.dumps({k: v for k, v in d.items() if k not in exclude_fields})
        ) for d in [json.loads(linea) for linea in open(file, 'r')]]

I like to have all the settings of my application in a file called settings.py. It’s a pattern that I’ve copied from Django. In this file also I read environment variables from a dotenv file.

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

AWS_PROFILE_NAME = os.getenv('AWS_PROFILE_NAME', False)
AWS_REGION = os.getenv('AWS_REGION')

LOG_GROUP = os.getenv('LOG_GROUP', 'python_logs_example')
LOG_STREAM_PREFIX = os.getenv('LOG_STREAM_PREFIX', 'default')

LOG_EXTENSION = 'log'
LOG_PATH = os.getenv('LOG_PATH', Path(BASE_DIR).resolve())

And that’s all. Your logs in CloudWatch uploaded in a background process decoupled from the main process.

Source code available in my github.