Chat with your Data: Building a File-Aware AI Agent with AWS Bedrock and Chainlit

We all know LLMs are powerful, but their true potential is unlocked when they can see your data. While RAG (Retrieval-Augmented Generation) is great for massive knowledge bases, sometimes you just want to drag and drop a file and ask questions about it.

Today we’ll build a “File-Aware” AI agent that can natively understand a wide range of document formats—from PDFs and Excel sheets to Word docs and Markdown files. We’ll use AWS Bedrock with Claude 4.5 Sonnet for the reasoning engine and Chainlit for the conversational UI.

The idea is straightforward: Upload a file, inject it into the model’s context, and let the LLM do the rest. No vector databases, no complex indexing pipelines—just direct context injection for immediate analysis.

The architecture is simple yet effective. We intercept file uploads in the UI, process them into a format the LLM understands, and pass them along with the user’s query.

┌──────────────┐      ┌──────────────┐      ┌────────────────────┐
│   Chainlit   │      │  Orchestrator│      │   AWS Bedrock      │
│      UI      │─────►│    Agent     │─────►│(Claude 4.5 Sonnet) │
└──────┬───────┘      └──────────────┘      └────────────────────┘
       │                      ▲
       │    ┌────────────┐    │
       └───►│ File Proc. │────┘
            │   Logic    │
            └────────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for high-quality reasoning and large context windows.
  • Chainlit for a chat-like interface with native file upload support.
  • Python for the backend logic.

The core challenge is handling different file types and presenting them to the LLM. We support a variety of formats by mapping them to Bedrock’s expected input types.

To enable file uploads in Chainlit, you need to configure the [features.spontaneous_file_upload] section in your .chainlit/config.toml. This is where you define which MIME types are accepted.

[features.spontaneous_file_upload]
    enabled = true
    accept = [
        "application/pdf",
        "text/csv",
        "application/msword",
        "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        "application/vnd.ms-excel",
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "text/html",
        "text/plain",
        "text/markdown",
        "text/x-markdown"
    ]
    max_files = 20
    max_size_mb = 500
The main agent loop handles the conversation. It checks for uploaded files, processes them, and constructs the message payload for the LLM. We also include robust error handling to manage context window limits gracefully.
def get_question_from_message(message: cl.Message):
    content_blocks = None
    if message.elements:
        content_blocks = get_content_blocks_from_message(message)

    if content_blocks:
        content_blocks.append({"text": message.content or "Write a summary of the document"})
        question = content_blocks
    else:
        question = message.content

    return question


def get_content_blocks_from_message(message: cl.Message):
    docs = [f for f in message.elements if f.type == "file" and f.mime in MIME_MAP]
    content_blocks = []

    for doc in docs:
        file = Path(doc.path)
        file_bytes = file.read_bytes()
        shutil.rmtree(file.parent)

        content_blocks.append({
            "document": {
                "name": sanitize_filename(doc.name),
                "format": MIME_MAP[doc.mime],
                "source": {"bytes": file_bytes}
            }
        })

    return content_blocks

@cl.on_message
async def handle_message(message: cl.Message):
    task = asyncio.create_task(process_user_task(
        question=get_question_from_message(message),
        debug=DEBUG))
    cl.user_session.set("task", task)
    try:
        await task
    except asyncio.CancelledError:
        logger.info("User task was cancelled.")

This pattern allows for ad-hoc analysis. You don’t need to pre-ingest data. You can:

  1. Analyze Financials: Upload an Excel sheet and ask for trends.
  2. Review Contracts: Upload a PDF and ask for clause summaries.
  3. Debug Code: Upload a source file and ask for a bug fix.
By leveraging the large context window of modern models like Claude 4.5 Sonnet, we can feed entire documents directly into the prompt, providing the model with full visibility without the information loss often associated with RAG chunking.

And that's all. With tools like Chainlit and powerful APIs like AWS Bedrock, we can create robust, multi-modal assistants that integrate seamlessly into our daily workflows.

Full code in my github account.

Building scalable multi-purpose AI agents: Orchestrating Multi-Agent Systems with Strands Agents and Chainlit

We can build simple AI agents that handle specific tasks quite easily today. But what about building AI systems that can handle multiple domains effectively? One approach is to create a single monolithic agent that tries to do everything, but this quickly runs into problems of context pollution, maintenance complexity, and scaling limitations. In this article, we’ll show a production-ready pattern for building multi-purpose AI systems using an orchestrator architecture that coordinates domain-specific agents.

The idea is simple: Don’t build one agent to rule them all instead, create specialized agents that excel in their domains and coordinate them through an intelligent orchestrator. The solution is an orchestrator agent that routes requests to specialized sub-agents, each with focused expertise and dedicated tools. Think of it as a smart router that understands intent and delegates accordingly.

That’s the core of the Orchestrator Pattern for multi-agent systems:

User Query → Orchestrator Agent → Specialized Agent(s) → Orchestrator → Response

For our example we have three specialized agents:

  1. Weather Agent: Expert in meteorological data and weather patterns. It uses external weather APIs to fetch historical and current weather data.
  2. Logistics Agent: Specialist in supply chain and shipping operations. Fake logistics data is generated to simulate shipment tracking, route optimization, and delivery performance analysis.
  3. Production Agent: Focused on manufacturing operations and production metrics. Also, fake production data is generated to analyze production KPIs.

That’s the architecture in a nutshell:

┌─────────────────────────────────────────────┐
│          Orchestrator Agent                 │
│  (Routes & Synthesizes)                 │
└────────┬─────────┬─────────┬────────────────┘
         │         │         │
    ┌────▼────┐ ┌──▼─────┐ ┌─▼─────────┐
    │ Weather │ │Logistic│ │Production │
    │  Agent  │ │ Agent  │ │  Agent    │
    └────┬────┘ └──┬─────┘ └┬──────────┘
         │         │        │
    ┌────▼────┐ ┌──▼─────┐ ┌▼──────────┐
    │External │ │Database│ │ Database  │
    │   API   │ │ Tools  │ │  Tools    │
    └─────────┘ └────────┘ └───────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for agent reasoning
  • Strands Agents framework for agent orchestration
  • Chainlit for the conversational UI
  • FastAPI for the async backend
  • PostgreSQL for storing conversation history and domain data

The orchestrator’s job is simple but critical: understand the user’s intent and route to the right specialist(s).

MAIN_SYSTEM_PROMPT = """You are an intelligent orchestrator agent 
responsible for routing user requests to specialized sub-agents 
based on their domain expertise.

## Available Specialized Agents

### 1. Production Agent
**Domain**: Manufacturing operations, production metrics, quality control
**Handles**: Production KPIs, machine performance, downtime analysis

### 2. Logistics Agent
**Domain**: Supply chain, shipping, transportation operations
**Handles**: Shipment tracking, route optimization, delivery performance

### 3. Weather Agent
**Domain**: Meteorological data and weather patterns
**Handles**: Historical weather, atmospheric conditions, climate trends

## Your Decision Process
1. Analyze the request for key terms and domains
2. Determine scope (single vs multi-domain)
3. Route to appropriate agent(s)
4. Synthesize results when multiple agents are involved
"""

The orchestrator receives specialized agents as tools:

def get_orchestrator_tools() -> List[Any]:
    from tools.logistics.agent import logistics_assistant
    from tools.production.agent import production_assistant
    from tools.weather.agent import weather_assistant

    tools = [
        calculator,
        think,
        current_time,
        AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter,
        logistics_assistant,  # Specialized agent as tool
        production_assistant,  # Specialized agent as tool
        weather_assistant     # Specialized agent as tool
    ]
    return tools

Each specialized agent follows a consistent pattern. Here’s the weather agent:

@tool
@stream_to_step("weather_assistant")
async def weather_assistant(query: str):
    """
    A research assistant specialized in weather topics with streaming support.
    """
    try:
        tools = [
            calculator,
            think,
            current_time,
            AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter
        ]
        # Domain-specific tools
        tools += WeatherTools(latitude=MY_LATITUDE, longitude=MY_LONGITUDE).get_tools()

        research_agent = get_agent(
            system_prompt=WEATHER_ASSISTANT_PROMPT,
            tools=tools
        )

        async for token in research_agent.stream_async(query):
            yield token

    except Exception as e:
        yield f"Error in research assistant: {str(e)}"

Each agent has access to domain-specific tools. For example, the weather agent uses external APIs:

class WeatherTools:
    def __init__(self, latitude: float, longitude: float):
        self.latitude = latitude
        self.longitude = longitude

    def get_tools(self) -> List[tool]:
        @tool
        def get_hourly_weather_data(from_date: date, to_date: date) -> MeteoData:
            """Get hourly weather data for a specific date range."""
            url = (f"https://api.open-meteo.com/v1/forecast?"
                   f"latitude={self.latitude}&longitude={self.longitude}&"
                   f"hourly=temperature_2m,relative_humidity_2m...")
            response = requests.get(url)
            return parse_weather_response(response.json())
        
        return [get_hourly_weather_data]

The logistics and production agents use synthetic data generators for demonstration:

class LogisticsTools:
    def get_tools(self) -> List[tool]:
        @tool
        def get_logistics_data(
            from_date: date,
            to_date: date,
            origins: Optional[List[str]] = None,
            destinations: Optional[List[str]] = None,
        ) -> LogisticsDataset:
            """Generate synthetic logistics shipment data."""
            # Generate realistic shipment data with delays, costs, routes
            records = generate_synthetic_shipments(...)
            return LogisticsDataset(records=records, aggregates=...)
        
        return [get_logistics_data]

For UI we’re going to use Chainlit. The Chainlit integration provides real-time visibility into agent execution:

class LoggingHooks(HookProvider):
    async def before_tool(self, event: BeforeToolCallEvent) -> None:
        step = cl.Step(name=f"{event.tool_use['name']}", type="tool")
        await step.send()
        cl.user_session.set(f"step_{event.tool_use['name']}", step)

    async def after_tool(self, event: AfterToolCallEvent) -> None:
        step = cl.user_session.get(f"step_{event.tool_use['name']}")
        if step:
            await step.update()

@cl.on_message
async def handle_message(message: cl.Message):
    agent = cl.user_session.get("agent")
    message_history = cl.user_session.get("message_history")
    message_history.append({"role": "user", "content": message.content})
    
    response = await agent.run_async(message.content)
    await cl.Message(content=response).send()

This creates a transparent experience where users see:

  • Which agent is handling their request
  • What tools are being invoked
  • Real-time streaming of responses

Now we can handle a variety of user queries: For example:

User: “What was the average temperature last week?”

Flow:

  1. Orchestrator identifies weather domain
  2. Routes to weather_assistant
  3. Weather agent calls get_hourly_weather_data
  4. Analyzes and returns formatted response

Or multi-domain queries:

User: “Did weather conditions affect our shipment delays yesterday?”

Flow:

  1. Orchestrator identifies weather + logistics domains
  2. Routes to weather_assistant for climate data
  3. Routes to logistics_assistant for shipment data
  4. Synthesizes correlation analysis
  5. Returns unified insight

And complex analytics:

User: “Analyze production efficiency trends and correlate with weather and logistics performance based in yesterday’s data.”

Flow:

  1. Orchestrator coordinates all three agents
  2. Production agent retrieves manufacturing KPIs
  3. Weather agent provides environmental data
  4. Logistics agent supplies delivery metrics
  5. Orchestrator synthesizes multi-domain analysis

This architecture scales naturally in multiple dimensions. We can easily add new specialized agents without disrupting existing functionality. WE only need to create the new agent and register it as a tool with the orchestratortrator prompt with new domain description. That’s it.

The orchestrator pattern transforms multi-domain AI from a monolithic challenge into a composable architecture. Each agent focuses on what it does best, while the orchestrator provides intelligent coordination.

Full code in my github.

Building ReAct AI agents with sandboxed Python code execution using AWS Bedrock and LangGraph

In industrial environments, data analysis is crucial for optimizing processes, detecting anomalies, and making informed decisions. Manufacturing plants, energy systems, and industrial IoT generate massive amounts of data from sensors, machines, and control systems. Traditionally, analyzing this data requires specialized knowledge in both industrial processes and data science, creating a bottleneck for quick insights.

I’ve been exploring agentic AI frameworks lately, particularly for complex data analysis tasks. While working on industrial data problems, I realized that combining the reasoning capabilities of Large Language Models with specialized tools could create a powerful solution for industrial data analysis. This project demonstrates how to build a ReAct ( Reasoning and Acting) AI agent using LangGraph that can analyze manufacturing data, understand industrial processes, and provide actionable insights.

The goal of this project is to create an AI agent that can analyze industrial datasets (manufacturing metrics, sensor readings, process control data) and provide expert-level insights about production optimization, quality control, and process efficiency. Using LangGraph’s ReAct agent framework with AWS Bedrock, the system can execute Python code dynamically in a sandboxed environment, process large datasets, and reason about industrial contexts.

The dataset is a fake sample of industrial data with manufacturing metrics like temperature, speed, humidity, pressure, operator experience, scrap rates, and unplanned stops. In fact, I’ve generated the dataset using chatgpt

This project uses several key components:

  • LangGraph ReAct Agent: For building the multi-tool AI agent with ReAct (Reasoning and Acting) patterns that can dynamically choose tools and reason about results
  • AWS Bedrock: Claude Sonnet 4 as the underlying LLM for reasoning and code generation
  • Sandboxed Code Interpreter: Secure execution of Python code for data analysis using AWS Agent Core. One tool taken from strands-agents-tools library.
  • Industrial Domain Expertise: Specialized system prompts with knowledge of manufacturing processes, quality control, and industrial IoT

The agent has access to powerful tools:

  • Code Interpreter: Executes Python code safely in a sandboxed AWS environment using pandas, numpy, scipy, and other scientific libraries
  • Data Processing: Handles large industrial datasets with memory-efficient strategies
  • Industrial Context: Understands manufacturing processes, sensor data, and quality metrics

The system uses AWS Agent Core’s sandboxed code interpreter, which means:

  • Python code is executed in an isolated environment
  • No risk to the host system
  • Access to scientific computing libraries (pandas, numpy, scipy)
  • Memory management for large datasets

The core of the system is surprisingly simple. The ReAct agent is built using LangGraph’s create_react_agent with custom tools:

from langgraph.prebuilt import create_react_agent
from typing import List
import pandas as pd
from langchain_core.callbacks import BaseCallbackHandler


def analyze_df(df: pd.DataFrame, system_prompt: str, user_prompt: str,
               callbacks: List[BaseCallbackHandler], streaming: bool = False):
    code_interpreter_tools = CodeInterpreter()
    tools = code_interpreter_tools.get_tools()

    agent = create_react_agent(
        model=get_llm(model=DEFAULT_MODEL, streaming=streaming,
                      budget_tokens=12288, callbacks=callbacks),
        tools=tools,
        prompt=system_prompt
    )

    agent_prompt = f"""
    I have a DataFrame with the following data:
    - Columns: {list(df.columns)}
    - Shape: {df.shape}
    - data: {df}
    
    The output must be an executive summary with the key points.
    The response must be only markdown, not plots.
    """
    messages = [
        ("user", agent_prompt),
        ("user", user_prompt)
    ]
    agent_input = {"messages": messages}
    return agent. Invoke(agent_input)

The ReAct pattern (Reasoning and Acting) allows the agent to:

  1. Reason about what analysis is needed
  2. Act by calling the appropriate tools (in this case: code interpreter)
  3. Observe the results of code execution
  4. Re-reason and potentially call more tools if needed

This creates a dynamic loop where the agent can iteratively analyze data, examine results, and refine its approach – much more powerful than a single code execution.

The magic happens in the system prompt, which provides the agent with industrial domain expertise:

SYSTEM_PROMPT = """
# Industrial Data Analysis Agent - System Prompt

You are an expert AI agent specialized in industrial data analysis and programming. 
You excel at solving complex data problems in manufacturing, process control, 
energy systems, and industrial IoT environments.

## Core Capabilities
- Execute Python code using pandas, numpy, scipy
- Handle large datasets with chunking strategies  
- Process time-series data, sensor readings, production metrics
- Perform statistical analysis, anomaly detection, predictive modeling

## Industrial Domain Expertise
- Manufacturing processes and production optimization
- Process control systems (PID controllers, SCADA, DCS)
- Industrial IoT sensor data and telemetry
- Quality control and Six Sigma methodologies
- Energy consumption analysis and optimization
- Predictive maintenance and failure analysis
"""

The code interpreter tool is wrapped with safety validations:

def validate_code_ast(code: str) -> bool:
    """Validate Python code using AST to ensure safety."""
    try:
        ast.parse(code)
        return True
    except SyntaxError:
        return False


@tool
def code_interpreter(code: str) -> str:
    """Executes Python code in a sandboxed environment."""
    if not validate_code_ast(code):
        raise UnsafeCodeError("Unsafe code or syntax errors.")

    return code_tool(code_interpreter_input={
        "action": {
            "type": "executeCode",
            "session_name": session_name,
            "code": code,
            "language": "python"
        }
    })
The system uses Claude Sonnet 4 through AWS Bedrock with optimized parameters for industrial analysis:
def get_llm(model: str = DEFAULT_MODEL, max_tokens: int = 4096,
            temperature: float = TemperatureLevel.BALANCED,
            top_k: int = TopKLevel.DIVERSE,
            top_p: float = TopPLevel.CREATIVE) -> BaseChatModel:
    model_kwargs = {
        "max_tokens": max_tokens,
        "temperature": temperature,
        "top_k": top_k,
        "top_p": top_p
    }

    return ChatBedrock(
        model=model,
        client=aws_get_service('bedrock-runtime'),
        model_kwargs=model_kwargs
    )
The project includes fake sample industrial data with manufacturing metrics:

- `machine_id`: Equipment identifier
- `shift`: Production shift (A/M/N for morning/afternoon/night)
- `temperature`, `speed`, `humidity`, `pressure`: Process parameters
- `operator_experience`: Years of operator experience
- `scrap_kg`: Quality metric (waste produced)
- `unplanned_stop`: Equipment failure indicator

A typical analysis query might be: "Do temperature and speed setpoints vary across shifts?"
The agent will stream the response as it generates it.

The agent will:

1. Load and examine the dataset structure
2. Generate appropriate Python code for analysis
3. Execute the code in a sandboxed environment
4. Provide insights about shift-based variations
5. Suggest process optimization recommendations
import logging

import pandas as pd
from langchain_core.callbacks import StreamingStdOutCallbackHandler

from modules.df_analyzer import analyze_df
from prompts import SYSTEM_PROMPT

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


class StreamingCallbackHandler(StreamingStdOutCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end='', flush=True)


df = pd.read_csv('fake_data.csv')

user_prompt = "Do temperature and speed setpoints vary across shifts?"
for chunk in analyze_df(
        user_prompt=user_prompt,
        df=df,
        system_prompt=SYSTEM_PROMPT,
        callbacks=[StreamingCallbackHandler()],
        streaming=True):
    logger.debug(chunk)

This project demonstrates the power of agentic AI for specialized domains. Instead of building custom analytics dashboards or writing specific analysis scripts, we provide the agent with:

  1. Domain Knowledge: Through specialized system prompts
  2. Tools: Safe code execution capabilities
  3. Context: The actual data to analyze

The agent can then:

  • Generate appropriate analysis code
  • Execute it safely
  • Interpret results with industrial context
  • Provide actionable recommendations

The result is a flexible system that can handle various industrial analysis tasks without pre-programmed solutions. The agent reasons about the problem, writes the necessary code (sandboxed), and provides expert-level insights.

Full code in my github.

Building an Agentic AI with Python, LangChain, AWS Bedrock and Claude 4 Sonnet

Today we are going to build an agent with IA. It is just an example of how to build a agent with LangChain and AWS Bedrock and Claude 4 Sonnet. The agent will be a “mathematical expert” capable of performing complex calculations and providing detailed explanations of its reasoning process. The idea is to provide the agent with the ability to perform mathematical operations like addition, subtraction. In fact, with additions and subtractions, we can perform all the mathematical operations, like multiplication, division, exponentiation, square root, etc. The agent will be able to perform these operations step by step, providing a detailed explanation of its reasoning process. I know that we don’t need to use AI to perform these operations, but the idea is to show how to build an agent with LangChain and AWS Bedrock and Claude 4 Sonnet.

The mathematical agent implements the tool-calling pattern, allowing the LLM to dynamically select and execute mathematical operations:

import logging

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate

from core.llm.aws import get_llm, Models
from modules.prompts import AGENT_SYSTEM_PROMPT
from modules.tools import MathTools
from settings import MAX_TOKENS

logger = logging.getLogger(__name__)


def run(question: str, model: Models = Models.CLAUDE_4):
    prompt = ChatPromptTemplate.from_messages([
        ("system", AGENT_SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}")
    ])
    math_tools = MathTools()
    tools = math_tools.get_tools()

    llm = get_llm(model=model, max_tokens=MAX_TOKENS)
    agent = create_tool_calling_agent(llm, tools, prompt)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=10
    )

    response = agent_executor.invoke({
        "input": question
    })

    logger.info(f"Agent response: {response['output']}")

Tools are defined using LangChain’s @tool decorator, providing automatic schema generation and type validation. Really we don’t need to create a class for the tools, but I have done it because I want to add an extra feature to the agent: the ability to keep a history of the operations performed. This will allow the agent to provide a detailed explanation of its reasoning process, showing the steps taken to arrive at the final result.

import logging
from typing import List

from langchain.tools import tool

logger = logging.getLogger(__name__)


class MathTools:

    def __init__(self):
        self.history = []

    def _diff_values(self, a: int, b: int) -> int:
        result = a - b
        self.history.append(f"{a} - {b} = {result}")
        return result

    def _sum_values(self, a: int, b: int) -> int:
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result

    def _get_history(self) -> str:
        if not self.history:
            return "No previous operations"
        return "\n".join(self.history[-5:])  # Last 5

    def get_tools(self) -> List:
        @tool
        def diff_values(a: int, b: int) -> int:
            """Calculates the difference between two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: difference of a - b
            """
            logger.info(f"Calculating difference: {a} - {b}")
            return self._diff_values(a, b)

        @tool
        def sum_values(a: int, b: int) -> int:
            """Sums two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: sum of a + b
            """
            logger.info(f"Calculating sum: {a} + {b}")
            return self._sum_values(a, b)

        @tool
        def get_history() -> str:
            """Gets the operation history
            Returns:
                str: last operations
            """
            logger.info("Retrieving operation history")
            return self._get_history()

        return [diff_values, sum_values, get_history]

The system prompt is carefully crafted to guide the agent’s behavior and establish clear operational boundaries:

AGENT_SYSTEM_PROMPT = """
You are an expert mathematical agent specialized in calculations.

You have access to the following tools:
- diff_values: Calculates the difference between two numbers
- sum_values: Sums two numbers
- get_history: Gets the operation history

Guidelines:
1. Only answer questions related to mathematical operations.
2. For complex operations, use step-by-step calculations:
   - Multiplication: Repeated addition
   - Division: Repeated subtraction
   - Exponentiation: Repeated multiplication
   - Square root: Use methods like Babylonian method or prime factorization.
"""

Now we can invoke our agent by asking questions such as ‘What’s the square root of 16 divided by two, squared?’. The agent will iterate using only the provided tools to obtain the result.

And that’s all. This project demonstrates how to build a production-ready AI agent using LangChain and AWS Bedrock. It’s just a boilerplate, but it can be extended to create more complex agents with additional capabilities and understand how AI agents work.

Full code in my GitHub account.

Python Flask and OAuth2: Building a Secure Authentication System

I typically use Flask for APIs and Django for web applications that utilize sessions and OAuth authentication. However, do I truly need Django for these functionalities? The answer is no. While Django provides pre-built components, similar capabilities are also accessible in Flask, and implementing them is quite straightforward. Additionally, I am a strong advocate of microframeworks. Today, we’ll demonstrate how to employ OAuth2 authentication using Flask. Let’s begin.

OAuth2 encompasses various flows, but today, we’ll focus on the most common one for web applications. The concept involves checking for a valid session. If one exists, great, but if not, the application will generate a session with a state (a randomly generated string) and then redirect to the OAuth2 server login page. Subsequently, the user will perform the login on the login server. Following that, the OAuth2 server will redirect to a validated callback URL with an authorization code (while also returning the provided state). The callback URL will then verify whether the state provided by the OAuth2 server matches the one in the session. Next, the callback route on your server, utilizing the authorization code, will obtain an access token (via a POST request to the OAuth2 server). With this access token, you can retrieve user information from the OAuth2 server and establish a valid session.

First we create a Flask application with sessions

from flask import Flask
from flask_session import Session

from settings import SECRET, SESSION


app = Flask(__name__)
app.secret_key = SECRET
app.config.update(SESSION)
Session(app)

Session configuration:

SESSION = dict(
    SESSION_PERMANENT=False,
    SESSION_TYPE="filesystem",
    SESSION_COOKIE_SECURE=True,
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE='Lax',
    SESSION_COOKIE_DOMAIN=False,
)

I like to use blueprints to manage the Flask, so let’s add our application:

from modules.home.app import blueprint as home

...
app.register_blueprint(home, url_prefix=f'/')

I set up the blueprint in a init.py file

from pathlib import Path

from flask import Blueprint

from lib.oauth import check_session

base = Path(__file__).resolve().parent
blueprint = Blueprint(
    'front_home', __name__,
    template_folder=base.joinpath('templates')
)


@blueprint.before_request
def auth():
    return check_session()

You can see that we’re using a before_request middleware to check the session in every route of the blueprint.

def check_session():
    if not session.get("user"):
        state = secrets.token_urlsafe(32)
        session['state'] = state
        authorize = OAUTH['AUTHORIZE_URL']
        query_string = urlencode({
            'scope': OAUTH.get('SCOPE', 'read write'),
            'prompt': OAUTH.get('PROMPT', 'login'),
            'approval_prompt': OAUTH.get('APPROVAL_PROMPT', 'auto'),
            'state': state,
            'response_type': OAUTH.get('RESPONSE_TYPE', 'code'),
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID']
        })
        return redirect(f"{authorize}?{query_string}")

And the routes of the blueprint:

from flask import render_template, session

from modules.home import blueprint


@blueprint.get(f"/")
def home():
    username = session['user']['username']
    return render_template('index.html',
                           username=username)

To do the login we need also to code our callback route. We will add a blueprint for that.

from lib.oauth import blueprint as oauth

...
app.register_blueprint(oauth)

That’s the OAuth2 callback:

import logging

import requests
from flask import Blueprint, abort
from flask import request, session, redirect

from settings import OAUTH

logger = logging.getLogger(__name__)

blueprint = Blueprint('oauth', __name__, url_prefix=f'/oauth')


@blueprint.get('/callback')
def callback():
    # Obtain the state from the request
    state = request.args.get('state')
    if 'state' not in session:
        return redirect(f"/")
    # Check if provided state match wht the session saved one
    if state == session['state']:
        # Obtain the authorization code from the request
        authorization_code = request.args.get('code')
        token_data = {
            'grant_type': OAUTH.get('GRANT_TYPE', 'authorization_code'),
            'code': authorization_code,
            'redirect_uri': OAUTH['REDIRECT_URL'],
            'client_id': OAUTH['CLIENT_ID'],
            'client_secret': OAUTH['CLIENT_SECRET']
        }
        # POST to OAuth2 server to obtain the access_token
        response = requests.post(OAUTH['TOKEN_URL'],
                                 data=token_data,
                                 headers={'Accept': 'application/json'})
        response_data = response.json()
        headers = {
            "Authorization": f"Bearer {response_data.get('access_token')}",
            'Accept': 'application/json'
        }
        # With the access_token you can obtain the user information
        user_response = requests.get(OAUTH['USER_URL'],
                                     data=token_data,
                                     headers=headers)
        if user_response.ok:
            # Now you are able to create the session 
            user_data = user_response.json()
            session['user'] = dict(
                username=user_data['login'],
                name=user_data['name'],
                email=user_data['email']
            )
            session.pop('state', default=None)
        else:
            abort(401)
        return redirect(f"/")
    else:
        abort(401)

Mainly that’s all. In this example we’re using Github’s OAuth2 server. You can use different ones, and also with your own OAuth2 server. Maybe, depending on the server, they way to obtain the user_data, can be different, and you should adapt it to your needs.

In my example I’m saving my OAuth2 credentials in a .env file. With this technique I can use different configurations depending on my environment (production, staging, …)

CLIENT_ID=my_client_id
CLIENT_SECRET=my_client_secret
TOKEN_URL=https://github.com/login/oauth/access_token
AUTHORIZE_URL=https://github.com/login/oauth/authorize
USER_URL=https://api.github.com/user
REDIRECT_URL=http://localhost:5000/oauth/callback

And I load this conf in my settings.py

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

OAUTH = dict(
    CLIENT_ID=os.getenv('CLIENT_ID'),
    CLIENT_SECRET=os.getenv('CLIENT_SECRET'),
    TOKEN_URL=os.getenv('TOKEN_URL'),
    AUTHORIZE_URL=os.getenv('AUTHORIZE_URL'),
    USER_URL=os.getenv('USER_URL'),
    REDIRECT_URL=os.getenv('REDIRECT_URL'),
)

And that’s all. Full code in my github account.

Bundles in Silex using Stack

In the last Desymfony conference I was speaking with Luis Cordova and he introduced me “Stack” (I must admit Stack was in my to-study-list but only marked as favorite). The idea behind Stack is really cool. (In fact every project where Igor Wiedler appears is brilliant, even the chicken one :)).

Nowadays almost every modern framework/applications implements HttpKernelInterface (Symfony, Laravel, Drupal, Silex, Yolo and even the framework that I’m working in ;)) and we can build complex applications mixing different components and decorate our applications with an elegant syntax.

The first thing than come to my mind after studying Stack is to join different Silex applications in a similar way than Symfony (the full stack framework) uses bundles. And the best part of this idea is that it’s pretty straightforward. Let me show you one example:

Imagine that we’re working with one application with a blog and one API. In this case our blog and our API are Silex applications (but they can be one Symfony application and one Silex application for example).

That’s our API application:

use Silex\Application;

$app = new Application();
$app->get('/', function () {
        return "Hello from API";
    });

$app->run();

And here our blog application:

use Silex\Application;

$app = new Application();
$app->get('/', function () {
        return "Hello from Blog";
    });

$app->run();

We can organize our application using mounted controllers or even using RouteCollections but today we’re going to use Stack and it’s cool url-map.

First we are going to create our base application. To do this we’re going to implement the simplest Kernel in the world, that’s answers with “Hello” to every request:

use Symfony\Component\HttpKernel\HttpKernelInterface;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;

class MyKernel implements HttpKernelInterface
{
    public function handle(Request $request, $type = HttpKernelInterface::MASTER_REQUEST, $catch = true)
    {
        return new Response("Hello");
    }
}

Stack needs HttpKernelInterface and Silex\Application implements this interface, so we can change our Silex applications to return the instance instead to run the application:

// app/api.php
use Silex\Application;

$app = new Application();
$app->get('/', function () {
        return "Hello from API";
    });

return $app;
// app/blog.php
use Silex\Application;

$app = new Application();
$app->get('/', function () {
        return "Hello from API";
    });

return $app;

And now we will attach those two Silex applications to our Kernel:

use Symfony\Component\HttpFoundation\Request;

$app = (new Stack\Builder())
    ->push('Stack\UrlMap', [
            "/blog" => include __DIR__ . '/app/blog.php',
            "/api" => include __DIR__ . '/app/api.php'
        ])->resolve(new MyKernel());

$request = Request::createFromGlobals();

$response = $app->handle($request);
$response->send();

$app->terminate($request, $response);

And that’s all. I don’t know what you think but with Stack one big window just opened in my mind. Cool, isn’t it?

You can see this working example in my github

Working with jQuery and Silex as RestFull Resource provider

The previous post was about how to use AngularJS resources with Silex. AngularJS is great and when I need to switch back to jQuery it looks like I go back 10 years in web development, but business is business and I need to live with jQuery too. Because of that this post is about how to use the Silex RestFull resources from the previous post, now with jQuery. Let’s start:

We’re going to write a simple javascript object to handle the RestFull resource using jQuery:

var Resource = (function (jQuery) {
    function Resource(url) {
        this.url = url;
    }

    Resource.prototype.query = function (parameters) {
        return jQuery.getJSON(this.url, parameters || {});
    };

    Resource.prototype.get = function (id, parameters) {
        return jQuery.getJSON(this.url + '/' + id, parameters || {});
    };

    Resource.prototype.remove = function (id, parameters) {
        return jQuery.ajax({
            url:this.url + '/' + id,
            xhrFields:JSON.stringify(parameters || {}),
            type:'DELETE',
            dataType:'json'
        });
    };

    Resource.prototype.update = function (id, parameters) {
        return jQuery.post(this.url + '/' + id, JSON.stringify(parameters || {}), 'json');
    };

    Resource.prototype.create = function (parameters) {
        return jQuery.post(this.url, JSON.stringify(parameters || {}), 'json');
    };

    return Resource;
})(jQuery);

As you can see the library returns jQuery ajax promises, so we can use done() and error() callbacks to work with the server’s data.

Here our application example:

var host = document.location.protocol + '//' + document.location.host;
var resource = new Resource(host + '/api/message/resource');

resource.create({ id: 10, author: 'myname', message: 'hola'}).done(function (data) {
    console.log("create element", data);
});

resource.query().done(function (data) {
   console.log("query all", data);
});

resource.update(10, {message: 'hi'}).done(function (data) {
    console.log("update element 1", data);
});

resource.get(10).done(function (data) {
    console.log("get element 1", data);
});

resource.remove(10).done(function (data) {
    console.log("remove element 1", data);
});

resource.query().done(function (data) {
    console.log("query all", data);
});

And that’s all. You can get the full code of the example from my github account

How to configure Symfony’s Service Container to use Twitter API

Keeping on with the series about Symfony’s Services container (another posts here and here), now we will use the service container to use Twitter API from a service.

To use Twitter API we need to handle http requests. I’ve written several post about http request with PHP (example1, example2), but today we will use one amazing library to build clients: Guzzle. Guzzle is amazing. We can easily build a Twitter client with it. There’s one example is its landing page:

<?php
$client = new Client('https://api.twitter.com/{version}', array('version' => '1.1'));
$oauth  = new OauthPlugin(array(
    'consumer_key'    => '***',
    'consumer_secret' => '***',
    'token'           => '***',
    'token_secret'    => '***'
));
$client->addSubscriber($oauth);

echo $client->get('/statuses/user_timeline.json')->send()->getBody();

If we are working within a Symfony2 application or a PHP application that uses the Symfony’s Dependency injection container component you can easily integrate this simple script in the service container. I will show you the way that I use to do it. Let’s start:

The idea is simple. First we include guzzle within our composer.json and execute composer update:

    "require": {
        "guzzle/guzzle":"dev-master"
    }

Then we will create two files, one to store our Twitter credentials and another one to configure the service container:

# twitter.conf.yml
parameters:
  twitter.baseurl: https://api.twitter.com/1.1

  twitter.config:
    consumer_key: ***
    consumer_secret: ***
    token: ***
    token_secret: ***
# twitter.yml
parameters:
  class.guzzle.response: Guzzle\Http\Message\Response
  class.guzzle.client: Guzzle\Http\Client
  class.guzzle.oauthplugin: Guzzle\Plugin\Oauth\OauthPlugin

services:
  guzzle.twitter.client:
    class: %class.guzzle.client%
    arguments: [%twitter.baseurl%]
    calls:
      - [addSubscriber, [@guzzle.twitter.oauthplugin]]

  guzzle.twitter.oauthplugin:
    class: %class.guzzle.oauthplugin%
    arguments: [%twitter.config%]

And finally we include those files in our services.yml:

# services.yml
imports:
- { resource: twitter.conf.yml }
- { resource: twitter.yml }

And that’s all. Now we can use the service without problems:

<?php

namespace Gonzalo123\AppBundle\Controller;

use Symfony\Bundle\FrameworkBundle\Controller\Controller;

class DefaultController extends Controller
{
    public function indexAction($name)
    {
        $twitterClient = $this->container->get('guzzle.twitter.client');
        $status = $twitterClient->get('statuses/user_timeline.json')
             ->send()->getBody();

        return $this->render('AppBundle:Default:index.html.twig', array(
            'status' => $status
        ));
    }
}

Building a Silex application from one Behat/Gherkin feature file

Last days I’ve playing with Behat. Behat is a behavior driven development (BDD) framework based on Ruby’s Cucumber. Basically with Behat we defenie features within one feature file. I’m not going to crate a Behat tutorial (you can read more about Behat here). Behat use Gherkin to write the features files. When I was playing with Behat I had one idea. The idea is simple: Can we use Gherking to build a Silex application?. It was a good excuse to study Gherking, indeed ;).

Here comes the feature file:

Feature: application API
  Scenario: List users
    Given url "/api/users/list.json"
    And request method is "GET"
    Then instance "\Api\Users"
    And execute function "listUsers"
    And format output into json

  Scenario: Get user info
    Given url "/api/user/{userName}.json"
    And request method is "GET"
    Then instance "\Api\User"
    And execute function "info"
    And format output into json

  Scenario: Update user information
    Given url "/api/user/{userName}.json"
    And request method is "POST"
    Then instance "\Api\User"
    And execute function "update"
    And format output into json

Our API use this simple library:

<?php

namespace Api;
use Symfony\Component\HttpFoundation\Request;

class User
{
    private $request;

    public function __construct(Request $request)
    {
        $this->request = $request;
    }

    public function info()
    {
        switch ($this->request->get('userName')) {
            case 'gonzalo':
                return array('name' => 'Gonzalo', 'surname' => 'Ayuso');
            case 'peter':
                return array('name' => 'Peter', 'surname' => 'Parker');
        }
    }

    public function update()
    {
        return array('infoUpdated');
    }
}
<?php

namespace Api;
use Symfony\Component\HttpFoundation\Request;

class Users
{
    public function listUsers()
    {
        return array('gonzalo', 'peter');
    }
}

The idea is simple. Parse the feature file with behat/gherkin component and create a silex application. And here comes the “magic”. This is a simple working prototype, just an experiment for a rainy sunday.

<?php
include __DIR__ . '/../vendor/autoload.php';
define(FEATURE_PATH, __DIR__ . '/api.feature');

use Behat\Gherkin\Lexer,
        Behat\Gherkin\Parser,
        Behat\Gherkin\Keywords\ArrayKeywords,
        Behat\Gherkin\Node\FeatureNode,
        Behat\Gherkin\Node\ScenarioNode,
        Symfony\Component\HttpFoundation\Request,
        Silex\Application;

$keywords = new ArrayKeywords([
    'en' => [
        'feature' => 'Feature',
        'background' => 'Background',
        'scenario' => 'Scenario',
        'scenario_outline' => 'Scenario Outline',
        'examples' => 'Examples',
        'given' => 'Given',
        'when' => 'When',
        'then' => 'Then',
        'and' => 'And',
        'but' => 'But'
    ],
]);

function getMatch($subject, $pattern) {
    preg_match($pattern, $subject, $matches);
    return isset($matches[1]) ? $matches[1] : NULL;
}

$app = new Application();

function getScenarioConf($scenario) {
    $silexConfItem = [];

    /** @var $scenario  ScenarioNode */
    foreach ($scenario->getSteps() as $step) {
        $route = getMatch($step->getText(), '/^url "([^"]*)"$/');

        if (!is_null($route)) {
            $silexConfItem['route'] = $route;
        }

        $requestMethod = getMatch($step->getText(), '/^request method is "([^"]*)"$/');
        if (!is_null($requestMethod)) {
            $silexConfItem['requestMethod'] = strtoupper($requestMethod);
        }

        $instance = getMatch($step->getText(), '/^instance "([^"]*)"$/');
        if (!is_null($instance)) {
            $silexConfItem['className'] = $instance;
        }

        $method = getMatch($step->getText(), '/^execute function "([^"]*)"$/');
        if (!is_null($method)) {
            $silexConfItem['method'] = $method;
        }

        if ($step->getText() == 'format output into json') {
            $silexConfItem['jsonEncode'] = TRUE;
        }
    }
    return $silexConfItem;
}

/** @var $features FeatureNode */
$features = (new Parser(new Lexer($keywords)))->parse(file_get_contents(FEATURE_PATH), FEATURE_PATH);

foreach ($features->getScenarios() as $scenario) {
    $silexConfItem = getScenarioConf($scenario);
    $app->match($silexConfItem['route'], function (Request $request) use ($app, $silexConfItem) {
            function getConstructorParams($rClass, $request) {
                $parameters =[];
                foreach ($rClass->getMethod('__construct')->getParameters() as $parameter) {
                    if ('Symfony\Component\HttpFoundation\Request' == $parameter->getClass()->name) {
                        $parameters[$parameter->getName()] = $request;
                    }
                }
                return $parameters;
            }

            $rClass = new ReflectionClass($silexConfItem['className']);
            $obj = ($rClass->hasMethod('__construct')) ?
                    $rClass->newInstanceArgs(getConstructorParams($rClass, $request)) :
                    new $silexConfItem['className'];

            $output = $obj->{$silexConfItem['method']}();

            return ($silexConfItem['jsonEncode'] === TRUE) ? $app->json($output, 200) : $output;
        }
    )->method($silexConfItem['requestMethod']);
}

$app->run();

You can see the source code in github. What do you think?

How to rewrite urls with PHP 5.4’s built-in web server

PHP 5.4 comes with a flaming built-in web server. This server is (obviously) not suitable to use in production environments, but it’s great if we want to check one project quickly:

  • git clone from github
  • composer install to install dependencies
  • run the built-in web server and test the application.
php -S localhost:8888 -t www/

But is very usual to use mod_rewrite or similar to send all requests to the front controller. With apache is pretty straight forward to do it:

<IfModule mod_rewrite.c>
    Options -MultiViews
    RewriteEngine On
    RewriteCond %{REQUEST_FILENAME} !-f
    RewriteRule ^(.*)$ index.php [QSA,L]
</IfModule>

But, does it work with the built-in web server? The answer is yes, but with another syntax. We only need to create one router file and start our server with this router:

<?php
// www/routing.php
if (preg_match('/\.(?:png|jpg|jpeg|gif)$/', $_SERVER["REQUEST_URI"])) {
    return false;
} else {
    include __DIR__ . '/index.php';
}

And now we start the server with:

php -S localhost:8888 www/routing.php

Easy, isn’t it?