Chat with your Data: Building a File-Aware AI Agent with AWS Bedrock and Chainlit

We all know LLMs are powerful, but their true potential is unlocked when they can see your data. While RAG (Retrieval-Augmented Generation) is great for massive knowledge bases, sometimes you just want to drag and drop a file and ask questions about it.

Today we’ll build a “File-Aware” AI agent that can natively understand a wide range of document formats—from PDFs and Excel sheets to Word docs and Markdown files. We’ll use AWS Bedrock with Claude 4.5 Sonnet for the reasoning engine and Chainlit for the conversational UI.

The idea is straightforward: Upload a file, inject it into the model’s context, and let the LLM do the rest. No vector databases, no complex indexing pipelines—just direct context injection for immediate analysis.

The architecture is simple yet effective. We intercept file uploads in the UI, process them into a format the LLM understands, and pass them along with the user’s query.

┌──────────────┐      ┌──────────────┐      ┌────────────────────┐
│   Chainlit   │      │  Orchestrator│      │   AWS Bedrock      │
│      UI      │─────►│    Agent     │─────►│(Claude 4.5 Sonnet) │
└──────┬───────┘      └──────────────┘      └────────────────────┘
       │                      ▲
       │    ┌────────────┐    │
       └───►│ File Proc. │────┘
            │   Logic    │
            └────────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for high-quality reasoning and large context windows.
  • Chainlit for a chat-like interface with native file upload support.
  • Python for the backend logic.

The core challenge is handling different file types and presenting them to the LLM. We support a variety of formats by mapping them to Bedrock’s expected input types.

To enable file uploads in Chainlit, you need to configure the [features.spontaneous_file_upload] section in your .chainlit/config.toml. This is where you define which MIME types are accepted.

[features.spontaneous_file_upload]
    enabled = true
    accept = [
        "application/pdf",
        "text/csv",
        "application/msword",
        "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
        "application/vnd.ms-excel",
        "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        "text/html",
        "text/plain",
        "text/markdown",
        "text/x-markdown"
    ]
    max_files = 20
    max_size_mb = 500
The main agent loop handles the conversation. It checks for uploaded files, processes them, and constructs the message payload for the LLM. We also include robust error handling to manage context window limits gracefully.
def get_question_from_message(message: cl.Message):
    content_blocks = None
    if message.elements:
        content_blocks = get_content_blocks_from_message(message)

    if content_blocks:
        content_blocks.append({"text": message.content or "Write a summary of the document"})
        question = content_blocks
    else:
        question = message.content

    return question


def get_content_blocks_from_message(message: cl.Message):
    docs = [f for f in message.elements if f.type == "file" and f.mime in MIME_MAP]
    content_blocks = []

    for doc in docs:
        file = Path(doc.path)
        file_bytes = file.read_bytes()
        shutil.rmtree(file.parent)

        content_blocks.append({
            "document": {
                "name": sanitize_filename(doc.name),
                "format": MIME_MAP[doc.mime],
                "source": {"bytes": file_bytes}
            }
        })

    return content_blocks

@cl.on_message
async def handle_message(message: cl.Message):
    task = asyncio.create_task(process_user_task(
        question=get_question_from_message(message),
        debug=DEBUG))
    cl.user_session.set("task", task)
    try:
        await task
    except asyncio.CancelledError:
        logger.info("User task was cancelled.")

This pattern allows for ad-hoc analysis. You don’t need to pre-ingest data. You can:

  1. Analyze Financials: Upload an Excel sheet and ask for trends.
  2. Review Contracts: Upload a PDF and ask for clause summaries.
  3. Debug Code: Upload a source file and ask for a bug fix.
By leveraging the large context window of modern models like Claude 4.5 Sonnet, we can feed entire documents directly into the prompt, providing the model with full visibility without the information loss often associated with RAG chunking.

And that's all. With tools like Chainlit and powerful APIs like AWS Bedrock, we can create robust, multi-modal assistants that integrate seamlessly into our daily workflows.

Full code in my github account.

Building scalable multi-purpose AI agents: Orchestrating Multi-Agent Systems with Strands Agents and Chainlit

We can build simple AI agents that handle specific tasks quite easily today. But what about building AI systems that can handle multiple domains effectively? One approach is to create a single monolithic agent that tries to do everything, but this quickly runs into problems of context pollution, maintenance complexity, and scaling limitations. In this article, we’ll show a production-ready pattern for building multi-purpose AI systems using an orchestrator architecture that coordinates domain-specific agents.

The idea is simple: Don’t build one agent to rule them all instead, create specialized agents that excel in their domains and coordinate them through an intelligent orchestrator. The solution is an orchestrator agent that routes requests to specialized sub-agents, each with focused expertise and dedicated tools. Think of it as a smart router that understands intent and delegates accordingly.

That’s the core of the Orchestrator Pattern for multi-agent systems:

User Query → Orchestrator Agent → Specialized Agent(s) → Orchestrator → Response

For our example we have three specialized agents:

  1. Weather Agent: Expert in meteorological data and weather patterns. It uses external weather APIs to fetch historical and current weather data.
  2. Logistics Agent: Specialist in supply chain and shipping operations. Fake logistics data is generated to simulate shipment tracking, route optimization, and delivery performance analysis.
  3. Production Agent: Focused on manufacturing operations and production metrics. Also, fake production data is generated to analyze production KPIs.

That’s the architecture in a nutshell:

┌─────────────────────────────────────────────┐
│          Orchestrator Agent                 │
│  (Routes & Synthesizes)                 │
└────────┬─────────┬─────────┬────────────────┘
         │         │         │
    ┌────▼────┐ ┌──▼─────┐ ┌─▼─────────┐
    │ Weather │ │Logistic│ │Production │
    │  Agent  │ │ Agent  │ │  Agent    │
    └────┬────┘ └──┬─────┘ └┬──────────┘
         │         │        │
    ┌────▼────┐ ┌──▼─────┐ ┌▼──────────┐
    │External │ │Database│ │ Database  │
    │   API   │ │ Tools  │ │  Tools    │
    └─────────┘ └────────┘ └───────────┘

The tech stack includes:

  • AWS Bedrock with Claude 4.5 Sonnet for agent reasoning
  • Strands Agents framework for agent orchestration
  • Chainlit for the conversational UI
  • FastAPI for the async backend
  • PostgreSQL for storing conversation history and domain data

The orchestrator’s job is simple but critical: understand the user’s intent and route to the right specialist(s).

MAIN_SYSTEM_PROMPT = """You are an intelligent orchestrator agent 
responsible for routing user requests to specialized sub-agents 
based on their domain expertise.

## Available Specialized Agents

### 1. Production Agent
**Domain**: Manufacturing operations, production metrics, quality control
**Handles**: Production KPIs, machine performance, downtime analysis

### 2. Logistics Agent
**Domain**: Supply chain, shipping, transportation operations
**Handles**: Shipment tracking, route optimization, delivery performance

### 3. Weather Agent
**Domain**: Meteorological data and weather patterns
**Handles**: Historical weather, atmospheric conditions, climate trends

## Your Decision Process
1. Analyze the request for key terms and domains
2. Determine scope (single vs multi-domain)
3. Route to appropriate agent(s)
4. Synthesize results when multiple agents are involved
"""

The orchestrator receives specialized agents as tools:

def get_orchestrator_tools() -> List[Any]:
    from tools.logistics.agent import logistics_assistant
    from tools.production.agent import production_assistant
    from tools.weather.agent import weather_assistant

    tools = [
        calculator,
        think,
        current_time,
        AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter,
        logistics_assistant,  # Specialized agent as tool
        production_assistant,  # Specialized agent as tool
        weather_assistant     # Specialized agent as tool
    ]
    return tools

Each specialized agent follows a consistent pattern. Here’s the weather agent:

@tool
@stream_to_step("weather_assistant")
async def weather_assistant(query: str):
    """
    A research assistant specialized in weather topics with streaming support.
    """
    try:
        tools = [
            calculator,
            think,
            current_time,
            AgentCoreCodeInterpreter(region=AWS_REGION).code_interpreter
        ]
        # Domain-specific tools
        tools += WeatherTools(latitude=MY_LATITUDE, longitude=MY_LONGITUDE).get_tools()

        research_agent = get_agent(
            system_prompt=WEATHER_ASSISTANT_PROMPT,
            tools=tools
        )

        async for token in research_agent.stream_async(query):
            yield token

    except Exception as e:
        yield f"Error in research assistant: {str(e)}"

Each agent has access to domain-specific tools. For example, the weather agent uses external APIs:

class WeatherTools:
    def __init__(self, latitude: float, longitude: float):
        self.latitude = latitude
        self.longitude = longitude

    def get_tools(self) -> List[tool]:
        @tool
        def get_hourly_weather_data(from_date: date, to_date: date) -> MeteoData:
            """Get hourly weather data for a specific date range."""
            url = (f"https://api.open-meteo.com/v1/forecast?"
                   f"latitude={self.latitude}&longitude={self.longitude}&"
                   f"hourly=temperature_2m,relative_humidity_2m...")
            response = requests.get(url)
            return parse_weather_response(response.json())
        
        return [get_hourly_weather_data]

The logistics and production agents use synthetic data generators for demonstration:

class LogisticsTools:
    def get_tools(self) -> List[tool]:
        @tool
        def get_logistics_data(
            from_date: date,
            to_date: date,
            origins: Optional[List[str]] = None,
            destinations: Optional[List[str]] = None,
        ) -> LogisticsDataset:
            """Generate synthetic logistics shipment data."""
            # Generate realistic shipment data with delays, costs, routes
            records = generate_synthetic_shipments(...)
            return LogisticsDataset(records=records, aggregates=...)
        
        return [get_logistics_data]

For UI we’re going to use Chainlit. The Chainlit integration provides real-time visibility into agent execution:

class LoggingHooks(HookProvider):
    async def before_tool(self, event: BeforeToolCallEvent) -> None:
        step = cl.Step(name=f"{event.tool_use['name']}", type="tool")
        await step.send()
        cl.user_session.set(f"step_{event.tool_use['name']}", step)

    async def after_tool(self, event: AfterToolCallEvent) -> None:
        step = cl.user_session.get(f"step_{event.tool_use['name']}")
        if step:
            await step.update()

@cl.on_message
async def handle_message(message: cl.Message):
    agent = cl.user_session.get("agent")
    message_history = cl.user_session.get("message_history")
    message_history.append({"role": "user", "content": message.content})
    
    response = await agent.run_async(message.content)
    await cl.Message(content=response).send()

This creates a transparent experience where users see:

  • Which agent is handling their request
  • What tools are being invoked
  • Real-time streaming of responses

Now we can handle a variety of user queries: For example:

User: “What was the average temperature last week?”

Flow:

  1. Orchestrator identifies weather domain
  2. Routes to weather_assistant
  3. Weather agent calls get_hourly_weather_data
  4. Analyzes and returns formatted response

Or multi-domain queries:

User: “Did weather conditions affect our shipment delays yesterday?”

Flow:

  1. Orchestrator identifies weather + logistics domains
  2. Routes to weather_assistant for climate data
  3. Routes to logistics_assistant for shipment data
  4. Synthesizes correlation analysis
  5. Returns unified insight

And complex analytics:

User: “Analyze production efficiency trends and correlate with weather and logistics performance based in yesterday’s data.”

Flow:

  1. Orchestrator coordinates all three agents
  2. Production agent retrieves manufacturing KPIs
  3. Weather agent provides environmental data
  4. Logistics agent supplies delivery metrics
  5. Orchestrator synthesizes multi-domain analysis

This architecture scales naturally in multiple dimensions. We can easily add new specialized agents without disrupting existing functionality. WE only need to create the new agent and register it as a tool with the orchestratortrator prompt with new domain description. That’s it.

The orchestrator pattern transforms multi-domain AI from a monolithic challenge into a composable architecture. Each agent focuses on what it does best, while the orchestrator provides intelligent coordination.

Full code in my github.

Building ReAct AI agents with sandboxed Python code execution using AWS Bedrock and LangGraph

In industrial environments, data analysis is crucial for optimizing processes, detecting anomalies, and making informed decisions. Manufacturing plants, energy systems, and industrial IoT generate massive amounts of data from sensors, machines, and control systems. Traditionally, analyzing this data requires specialized knowledge in both industrial processes and data science, creating a bottleneck for quick insights.

I’ve been exploring agentic AI frameworks lately, particularly for complex data analysis tasks. While working on industrial data problems, I realized that combining the reasoning capabilities of Large Language Models with specialized tools could create a powerful solution for industrial data analysis. This project demonstrates how to build a ReAct ( Reasoning and Acting) AI agent using LangGraph that can analyze manufacturing data, understand industrial processes, and provide actionable insights.

The goal of this project is to create an AI agent that can analyze industrial datasets (manufacturing metrics, sensor readings, process control data) and provide expert-level insights about production optimization, quality control, and process efficiency. Using LangGraph’s ReAct agent framework with AWS Bedrock, the system can execute Python code dynamically in a sandboxed environment, process large datasets, and reason about industrial contexts.

The dataset is a fake sample of industrial data with manufacturing metrics like temperature, speed, humidity, pressure, operator experience, scrap rates, and unplanned stops. In fact, I’ve generated the dataset using chatgpt

This project uses several key components:

  • LangGraph ReAct Agent: For building the multi-tool AI agent with ReAct (Reasoning and Acting) patterns that can dynamically choose tools and reason about results
  • AWS Bedrock: Claude Sonnet 4 as the underlying LLM for reasoning and code generation
  • Sandboxed Code Interpreter: Secure execution of Python code for data analysis using AWS Agent Core. One tool taken from strands-agents-tools library.
  • Industrial Domain Expertise: Specialized system prompts with knowledge of manufacturing processes, quality control, and industrial IoT

The agent has access to powerful tools:

  • Code Interpreter: Executes Python code safely in a sandboxed AWS environment using pandas, numpy, scipy, and other scientific libraries
  • Data Processing: Handles large industrial datasets with memory-efficient strategies
  • Industrial Context: Understands manufacturing processes, sensor data, and quality metrics

The system uses AWS Agent Core’s sandboxed code interpreter, which means:

  • Python code is executed in an isolated environment
  • No risk to the host system
  • Access to scientific computing libraries (pandas, numpy, scipy)
  • Memory management for large datasets

The core of the system is surprisingly simple. The ReAct agent is built using LangGraph’s create_react_agent with custom tools:

from langgraph.prebuilt import create_react_agent
from typing import List
import pandas as pd
from langchain_core.callbacks import BaseCallbackHandler


def analyze_df(df: pd.DataFrame, system_prompt: str, user_prompt: str,
               callbacks: List[BaseCallbackHandler], streaming: bool = False):
    code_interpreter_tools = CodeInterpreter()
    tools = code_interpreter_tools.get_tools()

    agent = create_react_agent(
        model=get_llm(model=DEFAULT_MODEL, streaming=streaming,
                      budget_tokens=12288, callbacks=callbacks),
        tools=tools,
        prompt=system_prompt
    )

    agent_prompt = f"""
    I have a DataFrame with the following data:
    - Columns: {list(df.columns)}
    - Shape: {df.shape}
    - data: {df}
    
    The output must be an executive summary with the key points.
    The response must be only markdown, not plots.
    """
    messages = [
        ("user", agent_prompt),
        ("user", user_prompt)
    ]
    agent_input = {"messages": messages}
    return agent. Invoke(agent_input)

The ReAct pattern (Reasoning and Acting) allows the agent to:

  1. Reason about what analysis is needed
  2. Act by calling the appropriate tools (in this case: code interpreter)
  3. Observe the results of code execution
  4. Re-reason and potentially call more tools if needed

This creates a dynamic loop where the agent can iteratively analyze data, examine results, and refine its approach – much more powerful than a single code execution.

The magic happens in the system prompt, which provides the agent with industrial domain expertise:

SYSTEM_PROMPT = """
# Industrial Data Analysis Agent - System Prompt

You are an expert AI agent specialized in industrial data analysis and programming. 
You excel at solving complex data problems in manufacturing, process control, 
energy systems, and industrial IoT environments.

## Core Capabilities
- Execute Python code using pandas, numpy, scipy
- Handle large datasets with chunking strategies  
- Process time-series data, sensor readings, production metrics
- Perform statistical analysis, anomaly detection, predictive modeling

## Industrial Domain Expertise
- Manufacturing processes and production optimization
- Process control systems (PID controllers, SCADA, DCS)
- Industrial IoT sensor data and telemetry
- Quality control and Six Sigma methodologies
- Energy consumption analysis and optimization
- Predictive maintenance and failure analysis
"""

The code interpreter tool is wrapped with safety validations:

def validate_code_ast(code: str) -> bool:
    """Validate Python code using AST to ensure safety."""
    try:
        ast.parse(code)
        return True
    except SyntaxError:
        return False


@tool
def code_interpreter(code: str) -> str:
    """Executes Python code in a sandboxed environment."""
    if not validate_code_ast(code):
        raise UnsafeCodeError("Unsafe code or syntax errors.")

    return code_tool(code_interpreter_input={
        "action": {
            "type": "executeCode",
            "session_name": session_name,
            "code": code,
            "language": "python"
        }
    })
The system uses Claude Sonnet 4 through AWS Bedrock with optimized parameters for industrial analysis:
def get_llm(model: str = DEFAULT_MODEL, max_tokens: int = 4096,
            temperature: float = TemperatureLevel.BALANCED,
            top_k: int = TopKLevel.DIVERSE,
            top_p: float = TopPLevel.CREATIVE) -> BaseChatModel:
    model_kwargs = {
        "max_tokens": max_tokens,
        "temperature": temperature,
        "top_k": top_k,
        "top_p": top_p
    }

    return ChatBedrock(
        model=model,
        client=aws_get_service('bedrock-runtime'),
        model_kwargs=model_kwargs
    )
The project includes fake sample industrial data with manufacturing metrics:

- `machine_id`: Equipment identifier
- `shift`: Production shift (A/M/N for morning/afternoon/night)
- `temperature`, `speed`, `humidity`, `pressure`: Process parameters
- `operator_experience`: Years of operator experience
- `scrap_kg`: Quality metric (waste produced)
- `unplanned_stop`: Equipment failure indicator

A typical analysis query might be: "Do temperature and speed setpoints vary across shifts?"
The agent will stream the response as it generates it.

The agent will:

1. Load and examine the dataset structure
2. Generate appropriate Python code for analysis
3. Execute the code in a sandboxed environment
4. Provide insights about shift-based variations
5. Suggest process optimization recommendations
import logging

import pandas as pd
from langchain_core.callbacks import StreamingStdOutCallbackHandler

from modules.df_analyzer import analyze_df
from prompts import SYSTEM_PROMPT

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


class StreamingCallbackHandler(StreamingStdOutCallbackHandler):
    def on_llm_new_token(self, token: str, **kwargs):
        print(token, end='', flush=True)


df = pd.read_csv('fake_data.csv')

user_prompt = "Do temperature and speed setpoints vary across shifts?"
for chunk in analyze_df(
        user_prompt=user_prompt,
        df=df,
        system_prompt=SYSTEM_PROMPT,
        callbacks=[StreamingCallbackHandler()],
        streaming=True):
    logger.debug(chunk)

This project demonstrates the power of agentic AI for specialized domains. Instead of building custom analytics dashboards or writing specific analysis scripts, we provide the agent with:

  1. Domain Knowledge: Through specialized system prompts
  2. Tools: Safe code execution capabilities
  3. Context: The actual data to analyze

The agent can then:

  • Generate appropriate analysis code
  • Execute it safely
  • Interpret results with industrial context
  • Provide actionable recommendations

The result is a flexible system that can handle various industrial analysis tasks without pre-programmed solutions. The agent reasons about the problem, writes the necessary code (sandboxed), and provides expert-level insights.

Full code in my github.

Building an Agentic AI with Python, LangChain, AWS Bedrock and Claude 4 Sonnet

Today we are going to build an agent with IA. It is just an example of how to build a agent with LangChain and AWS Bedrock and Claude 4 Sonnet. The agent will be a “mathematical expert” capable of performing complex calculations and providing detailed explanations of its reasoning process. The idea is to provide the agent with the ability to perform mathematical operations like addition, subtraction. In fact, with additions and subtractions, we can perform all the mathematical operations, like multiplication, division, exponentiation, square root, etc. The agent will be able to perform these operations step by step, providing a detailed explanation of its reasoning process. I know that we don’t need to use AI to perform these operations, but the idea is to show how to build an agent with LangChain and AWS Bedrock and Claude 4 Sonnet.

The mathematical agent implements the tool-calling pattern, allowing the LLM to dynamically select and execute mathematical operations:

import logging

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate

from core.llm.aws import get_llm, Models
from modules.prompts import AGENT_SYSTEM_PROMPT
from modules.tools import MathTools
from settings import MAX_TOKENS

logger = logging.getLogger(__name__)


def run(question: str, model: Models = Models.CLAUDE_4):
    prompt = ChatPromptTemplate.from_messages([
        ("system", AGENT_SYSTEM_PROMPT),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}")
    ])
    math_tools = MathTools()
    tools = math_tools.get_tools()

    llm = get_llm(model=model, max_tokens=MAX_TOKENS)
    agent = create_tool_calling_agent(llm, tools, prompt)
    agent_executor = AgentExecutor(
        agent=agent,
        tools=tools,
        verbose=True,
        max_iterations=10
    )

    response = agent_executor.invoke({
        "input": question
    })

    logger.info(f"Agent response: {response['output']}")

Tools are defined using LangChain’s @tool decorator, providing automatic schema generation and type validation. Really we don’t need to create a class for the tools, but I have done it because I want to add an extra feature to the agent: the ability to keep a history of the operations performed. This will allow the agent to provide a detailed explanation of its reasoning process, showing the steps taken to arrive at the final result.

import logging
from typing import List

from langchain.tools import tool

logger = logging.getLogger(__name__)


class MathTools:

    def __init__(self):
        self.history = []

    def _diff_values(self, a: int, b: int) -> int:
        result = a - b
        self.history.append(f"{a} - {b} = {result}")
        return result

    def _sum_values(self, a: int, b: int) -> int:
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result

    def _get_history(self) -> str:
        if not self.history:
            return "No previous operations"
        return "\n".join(self.history[-5:])  # Last 5

    def get_tools(self) -> List:
        @tool
        def diff_values(a: int, b: int) -> int:
            """Calculates the difference between two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: difference of a - b
            """
            logger.info(f"Calculating difference: {a} - {b}")
            return self._diff_values(a, b)

        @tool
        def sum_values(a: int, b: int) -> int:
            """Sums two numbers
            Args:
                a (int): first number
                b (int): second number
            Returns:
                int: sum of a + b
            """
            logger.info(f"Calculating sum: {a} + {b}")
            return self._sum_values(a, b)

        @tool
        def get_history() -> str:
            """Gets the operation history
            Returns:
                str: last operations
            """
            logger.info("Retrieving operation history")
            return self._get_history()

        return [diff_values, sum_values, get_history]

The system prompt is carefully crafted to guide the agent’s behavior and establish clear operational boundaries:

AGENT_SYSTEM_PROMPT = """
You are an expert mathematical agent specialized in calculations.

You have access to the following tools:
- diff_values: Calculates the difference between two numbers
- sum_values: Sums two numbers
- get_history: Gets the operation history

Guidelines:
1. Only answer questions related to mathematical operations.
2. For complex operations, use step-by-step calculations:
   - Multiplication: Repeated addition
   - Division: Repeated subtraction
   - Exponentiation: Repeated multiplication
   - Square root: Use methods like Babylonian method or prime factorization.
"""

Now we can invoke our agent by asking questions such as ‘What’s the square root of 16 divided by two, squared?’. The agent will iterate using only the provided tools to obtain the result.

And that’s all. This project demonstrates how to build a production-ready AI agent using LangChain and AWS Bedrock. It’s just a boilerplate, but it can be extended to create more complex agents with additional capabilities and understand how AI agents work.

Full code in my GitHub account.

Sending logs to AWS CloudWatch with a sidecar pattern and Python

In a Docker Swarm environment, the sidecar pattern is a common architectural approach used to extend the functionality of a primary container without directly modifying it.

First there’s a primary container. This is the main application or service you want to run within a Docker Swarm service. It’s responsible for the core functionality of your application. In our case, this container will be the log generator. It will save logs within a json format.

The sidecar container is a secondary container that runs alongside the primary container. It’s tightly coupled with the primary container and assists it by providing additional services or functionality. In our example, the sidecar responsibility will be push logs to AWS CloudWatch. The idea is sharing a docker volume between both containers. Whit this technique, our primary container will not be affected in network latency generating logs.

The idea is to generate something like this:

version: '3.6'

services:
  api:
    image: api:latest
    logging:
      options:
        max-size: 10m
    deploy:
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
      - PROCESS_ID=api
    ports:
      - 5000:5000
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000 --timeout 180

  filebeat:
    image: cw:production
    command: bash -c "python cw.py && sleep 1m"
    deploy:
      restart_policy:
        condition: any
    environment:
      - LOG_GROUP=python_logs_example
      - LOG_STREAM_PREFIX=default
    volumes:
      - logs_volume:/src/logs
volumes:
  logs_volume:

Let’s go. First, we need to setup our aws credentials. We can use a profile or a IAM user

if AWS_PROFILE_NAME:
    session = boto3.Session(profile_name=AWS_PROFILE_NAME, region_name=AWS_REGION)
else:
    session = boto3.Session(
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION)

logs = session. Client('logs')

then we need to setup the logGroup and logStream in CloudWatch. We can generate them by hand, but I preffer to generate them programatically, if they don’t exist.

def init_cloudwatch_stream():
    log_stream_name = f"{LOG_STREAM_PREFIX}_{datetime.now().strftime('%Y%m%d')}"

    log_groups = logs.describe_log_groups(logGroupNamePrefix=LOG_GROUP)['logGroups']
    if not any(group['logGroupName'] == LOG_GROUP for group in log_groups):
        logs.create_log_group(logGroupName=LOG_GROUP)

    log_streams = logs.describe_log_streams(
        logGroupName=LOG_GROUP,
        logStreamNamePrefix=log_stream_name
    )['logStreams']

    if not any(stream['logStreamName'] == log_stream_name for stream in log_streams):
        logs.create_log_stream(logGroupName=LOG_GROUP, logStreamName=log_stream_name)

    return log_stream_name

Now we need to upload logs to CloudWatch. We need to use put_log_events. To send multiple logs we need to use a sequenceToken (not the first time). To do that I use this trick.

function_parameters = dict(
    logGroupName=LOG_GROUP,
    logStreamName=log_stream_name
)

for f in glob(f'{LOG_PATH}/*.{LOG_EXTENSION}'):
    function_parameters['logEvents'] = get_log_events_from_file(f)
    response = logs.put_log_events(**function_parameters)
    function_parameters['sequenceToken'] = response['nextSequenceToken']

We also need to read log files and maybe change the fields according to your needs

def get_log_events_from_file(file):
    exclude_fields = ('@timestamp', 'logger')
    return [
        dict(
            timestamp=int(datetime.fromisoformat(d['@timestamp']).timestamp() * 1000),
            message=json.dumps({k: v for k, v in d.items() if k not in exclude_fields})
        ) for d in [json.loads(linea) for linea in open(file, 'r')]]

I like to have all the settings of my application in a file called settings.py. It’s a pattern that I’ve copied from Django. In this file also I read environment variables from a dotenv file.

import os
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

AWS_PROFILE_NAME = os.getenv('AWS_PROFILE_NAME', False)
AWS_REGION = os.getenv('AWS_REGION')

LOG_GROUP = os.getenv('LOG_GROUP', 'python_logs_example')
LOG_STREAM_PREFIX = os.getenv('LOG_STREAM_PREFIX', 'default')

LOG_EXTENSION = 'log'
LOG_PATH = os.getenv('LOG_PATH', Path(BASE_DIR).resolve())

And that’s all. Your logs in CloudWatch uploaded in a background process decoupled from the main process.

Source code available in my github.

Real-time Object Detection and Streaming with Python, OpenCV, AWS Kinesis, and YOLOv8

I’ve got the following idea in my mind. I want to detect objects with YoloV8 and OpenCV. Here a simple example:

import cv2 as cv
from ultralytics import YOLO
import logging

logging.getLogger("ultralytics").setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

X_RESIZE = 800
Y_RESIZE = 450

YOLO_MODEL="yolov8m.pt"
YOLO_CLASSES_TO_DETECT = 'bottle',
THRESHOLD = 0.80

cap = cv.VideoCapture(0)
model = YOLO(YOLO_MODEL)
while True:
    _, original_frame = cap.read()
    frame = cv.resize(original_frame, (X_RESIZE, Y_RESIZE), interpolation=cv.INTER_LINEAR)

    results = model.predict(frame)

    color = (0, 255, 0)
    for result in results:
        for box in result.boxes:
            class_id = result.names[box.cls[0].item()]
            status = id == class_id
            cords = box.xyxy[0].tolist()
            probability = round(box.conf[0].item(), 2)

            if probability > THRESHOLD and class_id in YOLO_CLASSES_TO_DETECT:
                logger.info(f"{class_id} {round(probability * 100)}%")
                x1, y1, x2, y2 = [round(x) for x in cords]
                cv.rectangle(frame, (x1, y1), (x2, y2), color, 2)

    cv.imshow(f"Cam", frame)

    if cv.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv.destroyAllWindows()

However, my idea is slightly different. I intend to capture the camera stream, transmit it to AWS Kinesis, and subsequently, within a separate process, ingest this Kinesis stream for object detection or any other purpose using the camera frames. To achieve this, our initial step is to publish our OpenCV frames into Kinesis. producer.py

import logging

from lib.video import kinesis_producer
from settings import KINESIS_STREAM_NAME, CAM_URI, WIDTH

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == '__main__':
    kinesis_producer(
        kinesis_stream_name=KINESIS_STREAM_NAME,
        cam_uri=CAM_URI,
        width=WIDTH
    )
def kinesis_producer(kinesis_stream_name, cam_uri, width):
    logger.info(f"start emitting stream from cam={cam_uri} to kinesis")
    kinesis = aws_get_service('kinesis')
    cap = cv.VideoCapture(cam_uri)
    try:
        while True:
            ret, frame = cap.read()
            if ret:
                scale = width / frame.shape[1]
                height = int(frame.shape[0] * scale)

                scaled_frame = cv.resize(frame, (width, height))
                _, img_encoded = cv.imencode('.jpg', scaled_frame)
                kinesis.put_record(
                    StreamName=kinesis_stream_name,
                    Data=img_encoded.tobytes(),
                    PartitionKey='1'
                )

    except KeyboardInterrupt:
        cap.release()

Now our consumer.py

import logging

from lib.video import kinesis_consumer
from settings import KINESIS_STREAM_NAME

logging.getLogger("ultralytics").setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)

if __name__ == '__main__':
    kinesis_consumer(
        kinesis_stream_name=KINESIS_STREAM_NAME
    )

To get the frames from kinesis stream we need to consider the shards. According to your needs you should change it a little bit.

def kinesis_consumer(kinesis_stream_name):
    logger.info(f"start reading kinesis stream from={kinesis_stream_name}")
    kinesis = aws_get_service('kinesis')
    model = YOLO(YOLO_MODEL)
    response = kinesis.describe_stream(StreamName=kinesis_stream_name)
    shard_id = response['StreamDescription']['Shards'][0]['ShardId']
    while True:
        shard_iterator_response = kinesis.get_shard_iterator(
            StreamName=kinesis_stream_name,
            ShardId=shard_id,
            ShardIteratorType='LATEST'
        )
        shard_iterator = shard_iterator_response['ShardIterator']
        response = kinesis.get_records(
            ShardIterator=shard_iterator,
            Limit=10
        )

        for record in response['Records']:
            image_data = record['Data']
            frame = cv.imdecode(np.frombuffer(image_data, np.uint8), cv.IMREAD_COLOR)
            results = model.predict(frame)
            if detect_id_in_results(results):
                logger.info('Detected')

def detect_id_in_results(results):
    status = False
    for result in results:
        for box in result.boxes:
            class_id = result.names[box.cls[0].item()]
            probability = round(box.conf[0].item(), 2)
            if probability > THRESHOLD and class_id in YOLO_CLASSES_TO_DETECT:
                status = True
    return status

And that’s all. Real-time Object Detection with Kinesis stream. However, computer vision examples are often simple and quite straightforward. On the other hand, real-world computer vision projects tend to be more intricate, involving considerations such as cameras, lighting conditions, performance optimization, and accuracy. But remember, this is just an example.

Full code in my github.

AWS SQS 2 HTTP server with Python

Simple service that listens to a sqs queue and bypass the payload to a http server, sending a POST request with the SQS’s payload and a Bearer Token.

The main script read a .env file with the aws credentials and http endpoint.

# AWS CREDENTIALS
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_REGION=eu-west-1
#AWS_PROFILE=xxx
# SQS
SQS_QUEUE_URL=https://sqs.eu-west-1.amazonaws.com/xxx/name
# REST ENDPOINT
HTTP_ENDPOINT=http://localhost:5555
AUTH_BEARER_TOKEN=xxx
import logging

from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())

from lib.sqs2http import loop_step, get_sqs

for library in ['botocore', 'boto3']:
    logging.getLogger(library).setLevel(logging.WARNING)

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X'
)

logger = logging.getLogger(__name__)

if __name__ == "__main__":
    logger.info("sqs2http start")
    sqs = get_sqs()
    while True:
        loop_step(sqs)

The main loop is like this:

def loop_step(sqs):
    response = sqs.receive_message(
        QueueUrl=SQS_QUEUE_URL,
        MaxNumberOfMessages=SQS_MAX_NUMBER_OF_MESSAGES)

    if 'Messages' in response:
        for message in response.get('Messages', []):
            try:
                body = message.get('Body')
                logger.info(body)
                do_post(body)
                remove_from_sqs(sqs, message.get('ReceiptHandle'))

            except Exception as e:
                logger. Exception(e)

To post to the http server we can use requests.

import requests
import json

from settings import HTTP_ENDPOINT, AUTH_BEARER_TOKEN

def do_post(body):
    requests.post(
        url=HTTP_ENDPOINT,
        data=json.dumps(body),
        headers={
            'Authorization': AUTH_BEARER_TOKEN,
            'Content-Type': 'application/json'
        })

With this simple script you can set up a web server that process all incoming SQS messages. Source code in my github.

Deploying Django application to AWS EC2 instance with Docker

In AWS we have several ways to deploy Django (and not Django applications) with Docker. We can use ECS or EKS clusters. If we don’t have one ECS or Kubernetes cluster up and running, maybe it can be complex. Today I want to show how deploy a Django application in production mode within a EC2 host. Let’s start.

I’m getting older to provision one host by hand I prefer to use docker. The idea is create one EC2 instance (one simple Amazon Linux AMI AWS-supported image). This host don’t have docker installed. We need to install it. When we launch one instance, when we’re configuring the instance, we can specify user data to configure an instance or run a configuration script during launch.

We only need to put this shell script to set up docker

#! /bin/bash
yum update -y
yum install -y docker
usermod -a -G docker ec2-user
curl -L https://github.com/docker/compose/releases/download/1.25.5/docker-compose-`uname -s`-`uname -m` | sudo tee /usr/local/bin/docker-compose > /dev/null
chmod +x /usr/local/bin/docker-compose
service docker start
chkconfig docker on

rm /etc/localtime
ln -s /usr/share/zoneinfo/Europe/Madrid /etc/localtime

ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

docker swarm init

We also need to attach one IAM role to our instance. This IAM role only need to allow us the following policies:

  • AmazonEC2ContainerRegistryReadOnly (because we’re going to use AWS ECR as container registry)
  • CloudWatchAgentServerPolicy (because we’re going to emit our logs to Cloudwatch)

Also we need to set up a security group to allow incoming SSH connections to port 22 and HTTP connections (in our example to port 8000)

When we launch our instance we need to provide a keypair to connect via ssh. I like to put this keypair in my .ssh/config

Host xxx.eu-central-1.compute.amazonaws.com
    User ec2-user
    Identityfile ~/.ssh/keypair-xxx.pem

To deploy our application we need to follow those steps:

  • Build our docker images
  • Push our images to a container registry (in this case ECR)
  • Deploy the application.

I’ve created a simple shell script called deploy.sh to perform all tasks:

#!/usr/bin/env bash

set -a
[ -f deploy.env ] && . deploy.env
set +a

echo "$(tput setaf 1)Building docker images ...$(tput sgr0)"
docker build -t ec2-web -t ec2-web:latest -t $ECR/ec2-web:latest .
docker build -t ec2-nginx -t $ECR/ec2-nginx:latest .docker/nginx

echo "$(tput setaf 1)Pusing to ECR ...$(tput sgr0)"
aws ecr get-login-password --region $REGION --profile $PROFILE |
  docker login --username AWS --password-stdin $ECR
docker push $ECR/ec2-web:latest
docker push $ECR/ec2-nginx:latest

CMD="docker stack deploy -c $DOCKER_COMPOSE_YML ec2 --with-registry-auth"
echo "$(tput setaf 1)Deploying to EC2 ($CMD)...$(tput sgr0)"
echo "$CMD"

DOCKER_HOST="ssh://$HOST" $CMD
echo "$(tput setaf 1)Building finished $(date +'%Y%m%d.%H%M%S')$(tput sgr0)"

This script assumes that there’s a deploy.env file with our personal configuration (AWS profile, the host of the EC2, instance, The ECR and things like that)

PROFILE=xxxxxxx

DOKER_COMPOSE_YML=docker-compose.yml
HOST=ec2-user@xxxx.eu-central-1.compute.amazonaws.com

ECR=9999999999.dkr.ecr.eu-central-1.amazonaws.com
REGION=eu-central-1

In this example I’m using docker swarm to deploy the application. I want to play also with secrets. This dummy application don’t have any sensitive information but I’ve created one "ec2.supersecret" variable

echo "super secret text" | docker secret create ec2.supersecret -

That’s the docker-compose.yml file:

version: '3.8'
services:
  web:
    image: 999999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-web:latest
    command: /bin/bash ./docker-entrypoint.sh
    environment:
      DEBUG: 'False'
    secrets:
      - ec2.supersecret
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: app
    volumes:
      - static_volume:/src/staticfiles
  nginx:
    image: 99999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-nginx:latest
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: nginx
    volumes:
      - static_volume:/src/staticfiles:ro
    ports:
      - 8000:80
    depends_on:
      - web
volumes:
  static_volume:

secrets:
  ec2.supersecret:
    external: true

And that’s all. Maybe ECS or EKS are better solutions to deploy docker applications in AWS, but we also can deploy easily to one docker host in a EC2 instance that it can be ready within a couple of minutes.

Source code in my github

Django reactive users with Celery and Channels

Today I want to build a prototype. The idea is to create two Django applications. One application will be the master and the other one will the client. Both applications will have their User model but each change within master User model will be propagated through the client (or clients). Let me show you what I’ve got in my mind:

We’re going to create one signal in User model (at Master) to detect user modifications:

  • If certain fields have been changed (for example we’re going to ignore last_login, password and things like that) we’re going to emit a event
  • I normally work with AWS, so the event will be a SNS event.
  • The idea to have multiple clients, so each client will be listening to one SQS queue. Those SQSs queues will be mapped to the SNS event.
  • To decouple the SNS sending og the message we’re going to send it via Celery worker.
  • The second application (the Client) will have one listener to the SQS queue.
  • Each time the listener have a message it will persists the user information within the client’s User model
  • And also it will emit on message to one Django Channel’s consumer to be sent via websockets to the browser.

The Master

We’re going to emit the event each time the User model changes (and also when we create or delete one user). To detect changes we’re going to register on signal in the pre_save to mark if the model has been changed and later in the post_save we’re going to emit the event via Celery worker.

@receiver(pre_save, sender=User)
def pre_user_modified(sender, instance, **kwargs):
    instance.is_modified = None

    if instance.is_staff is False and instance.id is not None:
        modified_user_data = UserSerializer(instance).data
        user = User.objects.get(username=modified_user_data['username'])
        user_serializer_data = UserSerializer(user).data

        if user_serializer_data != modified_user_data:
            instance.is_modified = True

@receiver(post_save, sender=User)
def post_user_modified(sender, instance, created, **kwargs):
    if instance.is_staff is False:
        if created or instance.is_modified:
            modified_user_data = UserSerializer(instance).data
            user_changed_event.delay(modified_user_data, action=Actions.INSERT if created else Actions.UPDATE)

@receiver(post_delete, sender=User)
def post_user_deleted(sender, instance, **kwargs):
    deleted_user_data = UserSerializer(instance).data
    user_changed_event.delay(deleted_user_data, action=Actions.DELETE)

We need to register our signals in apps.py

from django.apps import AppConfig

class MasterConfig(AppConfig):
    name = 'master'

    def ready(self):
        from master.signals import pre_user_modified
        from master.signals import post_user_modified
        from master.signals import post_user_deleted

Our Celery task will send the message to sns queue

@shared_task()
def user_changed_event(body, action):
    sns = boto3.client('sns')
    message = {
        "user": body,
        "action": action
    }
    response = sns.publish(
        TargetArn=settings.SNS_REACTIVE_TABLE_ARN,
        Message=json.dumps({'default': json.dumps(message)}),
        MessageStructure='json'
    )
    logger.info(response)

AWS

In Aws We need to create one SNS messaging service and one SQS queue linked to this SNS.

The Client

First we need one command to run the listener.

class Actions:
    INSERT = 0
    UPDATE = 1
    DELETE = 2

switch_actions = {
    Actions.INSERT: insert_user,
    Actions.UPDATE: update_user,
    Actions.DELETE: delete_user,
}

class Command(BaseCommand):
    help = 'sqs listener'

    def handle(self, *args, **options):
        self.stdout.write(self.style.WARNING("starting listener"))
        sqs = boto3.client('sqs')

        queue_url = settings.SQS_REACTIVE_TABLES

        def process_message(message):
            decoded_body = json.loads(message['Body'])
            data = json.loads(decoded_body['Message'])

            switch_actions.get(data['action'])(
                data=data['user'],
                timestamp=message['Attributes']['SentTimestamp']
            )

            notify_to_user(data['user'])

            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=message['ReceiptHandle'])

        def loop():
            response = sqs.receive_message(
                QueueUrl=queue_url,
                AttributeNames=[
                    'SentTimestamp'
                ],
                MaxNumberOfMessages=10,
                MessageAttributeNames=[
                    'All'
                ],
                WaitTimeSeconds=20
            )

            if 'Messages' in response:
                messages = [message for message in response['Messages'] if 'Body' in message]
                [process_message(message) for message in messages]

        try:
            while True:
                loop()
        except KeyboardInterrupt:
            sys.exit(0)

Here we persists the model in Client’s database

def insert_user(data, timestamp):
    username = data['username']
    serialized_user = UserSerializer(data=data)
    serialized_user.create(validated_data=data)
    logging.info(f"user: {username} created at {timestamp}")

def update_user(data, timestamp):
    username = data['username']
    try:
        user = User.objects.get(username=data['username'])
        serialized_user = UserSerializer(user)
        serialized_user.update(user, data)
        logging.info(f"user: {username} updated at {timestamp}")
    except User.DoesNotExist:
        logging.info(f"user: {username} don't exits. Creating ...")
        insert_user(data, timestamp)

def delete_user(data, timestamp):
    username = data['username']
    try:
        user = User.objects.get(username=username)
        user.delete()
        logging.info(f"user: {username} deleted at {timestamp}")
    except User.DoesNotExist:
        logging.info(f"user: {username} don't exits. Don't deleted")

And also emit one message to channel’s consumer

def notify_to_user(user):
    username = user['username']
    serialized_user = UserSerializer(user)
    emit_message_to_user(
        message=serialized_user.data,
        username=username, )

Here the Consumer:

class WsConsumer(AsyncWebsocketConsumer):
    @personal_consumer
    async def connect(self):
        await self.channel_layer.group_add(
            self._get_personal_room(),
            self.channel_name
        )

    @private_consumer_event
    async def emit_message(self, event):
        message = event['message']
        await self.send(text_data=json.dumps(message))

    def _get_personal_room(self):
        username = self.scope['user'].username
        return self.get_room_name(username)

    @staticmethod
    def get_room_name(room):
        return f"{'ws_room'}_{room}"

def emit_message_to_user(message, username):
    group = WsConsumer.get_room_name(username)
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(group, {
        'type': WsConsumer.emit_message.__name__,
        'message': message
    })

Our consumer will only allow to connect only if the user is authenticated. That’s because I like Django Channels. This kind of thing are really simple to to (I’ve done similar things using PHP applications connected to a socket.io server and it was a nightmare). I’ve created a couple of decorators to ensure authentication in the consumer.

def personal_consumer(func):
    @wraps(func)
    async def wrapper_decorator(*args, **kwargs):
        self = args[0]

        async def accept():
            value = await func(*args, **kwargs)
            await self.accept()
            return value

        if self.scope['user'].is_authenticated:
            username = self.scope['user'].username
            room_name = self.scope['url_route']['kwargs']['username']
            if username == room_name:
                return await accept()

        await self.close()

    return wrapper_decorator

def private_consumer_event(func):
    @wraps(func)
    async def wrapper_decorator(*args, **kwargs):
        self = args[0]
        if self.scope['user'].is_authenticated:
            return await func(*args, **kwargs)

    return wrapper_decorator

That’s the websocket route

from django.urls import re_path

from client import consumers

websocket_urlpatterns = [
    re_path(r'ws/(?P<username>\w+)$', consumers.WsConsumer),
]

Finally we only need to connect our HTML page to the websocket

{% block title %}Example{% endblock %}
{% block header_text %}Hello <span id="name">{{ request.user.first_name }}</span>{% endblock %}

{% block extra_body %}
  <script>
    var ws_scheme = window.location.protocol === "https:" ? "wss" : "ws"
    var ws_path = ws_scheme + '://' + window.location.host + "/ws/{{ request.user.username }}"
    var ws = new ReconnectingWebSocket(ws_path)
    var render = function (key, value) {
      document.querySelector(`#${key}`).innerHTML = value
    }
    ws.onmessage = function (e) {
      const data = JSON.parse(e.data);
      render('name', data.first_name)
    }

    ws.onopen = function () {
      console.log('Connected')
    };
  </script>
{% endblock %}

Here a docker-compose with the project:

version: '3.4'

services:
  redis:
    image: redis
  master:
    image: reactive_master:latest
    command: python manage.py runserver 0.0.0.0:8001
    build:
      context: ./master
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    ports:
      - 8001:8001
    environment:
      REDIS_HOST: redis
  celery:
    image: reactive_master:latest
    command: celery -A master worker --uid=nobody --gid=nogroup
    depends_on:
      - "redis"
      - "master"
    environment:
      REDIS_HOST: redis
      SNS_REACTIVE_TABLE_ARN: ${SNS_REACTIVE_TABLE_ARN}
      AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
  client:
    image: reactive_client:latest
    command: python manage.py runserver 0.0.0.0:8000
    build:
      context: ./client
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    ports:
      - 8000:8000
    environment:
      REDIS_HOST: redis
  listener:
    image: reactive_client:latest
    command: python manage.py listener
    build:
      context: ./client
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    environment:
      REDIS_HOST: redis
      SQS_REACTIVE_TABLES: ${SQS_REACTIVE_TABLES}
      AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}

And that’s all. Here a working example of the prototype in action:

Source code in my github.

Alexa skill and account linking with serverless and Cognito

Sometimes when we’re building one Alexa skill, we need to identify the user. To do that Alexa provides account linking. Basically we need an Oauth2 server to link our account within our Alexa skill. AWS provide us a managed Oauth2 service called Cognito, so we can use use Cognito identity pool to handle the authentication for our Alexa Skills.

In this example I’ve followed the following blog post. Cognito is is a bit weird to set up but after following all the steps we can use Account Linking in Alexa skill.

There’s also a good sample skill here. I’ve studied a little bit this example and create a working prototype by my own, basically to understand the process.

That’s my skill:

const Alexa = require('ask-sdk')

const RequestInterceptor = require('./interceptors/RequestInterceptor')
const ResponseInterceptor = require('./interceptors/ResponseInterceptor')
const LocalizationInterceptor = require('./interceptors/LocalizationInterceptor')
const GetLinkedInfoInterceptor = require('./interceptors/GetLinkedInfoInterceptor')

const LaunchRequestHandler = require('./handlers/LaunchRequestHandler')
const CheckAccountLinkedHandler = require('./handlers/CheckAccountLinkedHandler')
const HelloWorldIntentHandler = require('./handlers/HelloWorldIntentHandler')
const HelpIntentHandler = require('./handlers/HelpIntentHandler')
const CancelAndStopIntentHandler = require('./handlers/CancelAndStopIntentHandler')
const SessionEndedRequestHandler = require('./handlers/SessionEndedRequestHandler')
const FallbackHandler = require('./handlers/FallbackHandler')
const ErrorHandler = require('./handlers/ErrorHandler')
const RequestInfoHandler = require('./handlers/RequestInfoHandler')

let skill

module.exports.handler = async (event, context) => {
  if (!skill) {
    skill = Alexa.SkillBuilders.custom().
      addRequestInterceptors(
        RequestInterceptor,
        ResponseInterceptor,
        LocalizationInterceptor,
        GetLinkedInfoInterceptor
      ).
      addRequestHandlers(
        LaunchRequestHandler,
        CheckAccountLinkedHandler,
        HelloWorldIntentHandler,
        RequestInfoHandler,
        HelpIntentHandler,
        CancelAndStopIntentHandler,
        SessionEndedRequestHandler,
        FallbackHandler).
      addErrorHandlers(
        ErrorHandler).
      create()
  }

  return await skill.invoke(event, context)
}

The most important thing here is maybe GetLinkedInfoInterceptor.

const log = require('../lib/log')
const cognito = require('../lib/cognito')
const utils = require('../lib/utils')

const GetLinkedInfoInterceptor = {
  async process (handlerInput) {
    if (utils.isAccountLinked(handlerInput)) {
      const userData = await cognito.getUserData(handlerInput.requestEnvelope.session.user.accessToken)
      log.info('GetLinkedInfoInterceptor: getUserData: ', userData)
      const sessionAttributes = handlerInput.attributesManager.getSessionAttributes()
      if (userData.Username !== undefined) {
        sessionAttributes.auth = true
        sessionAttributes.emailAddress = cognito.getAttribute(userData.UserAttributes, 'email')
        sessionAttributes.userName = userData.Username
        handlerInput.attributesManager.setSessionAttributes(sessionAttributes)
      } else {
        sessionAttributes.auth = false
        log.error('GetLinkedInfoInterceptor: No user data was found.')
      }
    }
  }
}

module.exports = GetLinkedInfoInterceptor

This interceptor retrieves the user info from cognito when we provide the accessToken. We can obtain the accessToken from session (if our skill is account linked). Then we inject the user information (in my example the email and the username of the Cognito identity pool) into the session.

Then we can create one intent in our request handlers chain called CheckAccountLinkedHandler. With this intent we check if our skill is account linked. If not we can provide ‘withLinkAccountCard’ to force user to login with Cognito and link the skill’s account.

const utils = require('../lib/utils')

const CheckAccountLinkedHandler = {
  canHandle (handlerInput) {
    return !utils.isAccountLinked(handlerInput)
  },
  handle (handlerInput) {
    const requestAttributes = handlerInput.attributesManager.getRequestAttributes()
    const speakOutput = requestAttributes.t('NEED_TO_LINK_MESSAGE', 'SKILL_NAME')
    return handlerInput.responseBuilder.
      speak(speakOutput).
      withLinkAccountCard().
      getResponse()
  }
}

module.exports = CheckAccountLinkedHandler

Later we can create one intent to give the information to the user of maybe, in another case, perform an authorization workflow

const RequestInfoHandler = {
  canHandle (handlerInput) {
    const request = handlerInput.requestEnvelope.request
    return (request.type === 'IntentRequest'
      && request.intent.name === 'RequestInfoIntent')
  },
  handle (handlerInput) {
    const request = handlerInput.requestEnvelope.request
    const requestAttributes = handlerInput.attributesManager.getRequestAttributes()
    const sessionAttributes = handlerInput.attributesManager.getSessionAttributes()
    const repromptOutput = requestAttributes.t('FOLLOW_UP_MESSAGE')
    const cardTitle = requestAttributes.t('SKILL_NAME')

    let speakOutput = ''

    let inquiryTypeId = getResolvedSlotIDValue(request, 'infoTypeRequested')
    if (!inquiryTypeId) {
      inquiryTypeId = 'fullProfile'
      speakOutput += requestAttributes.t('NOT_SURE_OF_TYPE_MESSAGE')
    } else {
      if (inquiryTypeId === 'emailAddress' || inquiryTypeId === 'fullProfile') {
        speakOutput += requestAttributes.t('REPORT_EMAIL_ADDRESS', sessionAttributes.emailAddress)
      }

      if (inquiryTypeId === 'userName' || inquiryTypeId === 'fullProfile') {
        speakOutput += requestAttributes.t('REPORT_USERNAME', sessionAttributes.userName)
      }
    }

    speakOutput += repromptOutput

    return handlerInput.responseBuilder.
      speak(speakOutput).
      reprompt(repromptOutput).
      withSimpleCard(cardTitle, speakOutput).
      getResponse()
  }
}

module.exports = RequestInfoHandler

And basically that’s all. In fact isn’t very different than traditional web authentication. Maybe the most complicated part especially if you’re not used to Oauth2 is to configure Cognito properly.

Here you can see the source code in my github.