Category Archives: Technology

Deploying Django application to AWS EC2 instance with Docker

In AWS we have several ways to deploy Django (and not Django applications) with Docker. We can use ECS or EKS clusters. If we don’t have one ECS or Kubernetes cluster up and running, maybe it can be complex. Today I want to show how deploy a Django application in production mode within a EC2 host. Let’s start.

I’m getting older to provision one host by hand I prefer to use docker. The idea is create one EC2 instance (one simple Amazon Linux AMI AWS-supported image). This host don’t have docker installed. We need to install it. When we launch one instance, when we’re configuring the instance, we can specify user data to configure an instance or run a configuration script during launch.

We only need to put this shell script to set up docker

#! /bin/bash
yum update -y
yum install -y docker
usermod -a -G docker ec2-user
curl -L https://github.com/docker/compose/releases/download/1.25.5/docker-compose-`uname -s`-`uname -m` | sudo tee /usr/local/bin/docker-compose > /dev/null
chmod +x /usr/local/bin/docker-compose
service docker start
chkconfig docker on

rm /etc/localtime
ln -s /usr/share/zoneinfo/Europe/Madrid /etc/localtime

ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

docker swarm init

We also need to attach one IAM role to our instance. This IAM role only need to allow us the following policies:

  • AmazonEC2ContainerRegistryReadOnly (because we’re going to use AWS ECR as container registry)
  • CloudWatchAgentServerPolicy (because we’re going to emit our logs to Cloudwatch)

Also we need to set up a security group to allow incoming SSH connections to port 22 and HTTP connections (in our example to port 8000)

When we launch our instance we need to provide a keypair to connect via ssh. I like to put this keypair in my .ssh/config

Host xxx.eu-central-1.compute.amazonaws.com
    User ec2-user
    Identityfile ~/.ssh/keypair-xxx.pem

To deploy our application we need to follow those steps:

  • Build our docker images
  • Push our images to a container registry (in this case ECR)
  • Deploy the application.

I’ve created a simple shell script called deploy.sh to perform all tasks:

#!/usr/bin/env bash

set -a
[ -f deploy.env ] && . deploy.env
set +a

echo "$(tput setaf 1)Building docker images ...$(tput sgr0)"
docker build -t ec2-web -t ec2-web:latest -t $ECR/ec2-web:latest .
docker build -t ec2-nginx -t $ECR/ec2-nginx:latest .docker/nginx

echo "$(tput setaf 1)Pusing to ECR ...$(tput sgr0)"
aws ecr get-login-password --region $REGION --profile $PROFILE |
  docker login --username AWS --password-stdin $ECR
docker push $ECR/ec2-web:latest
docker push $ECR/ec2-nginx:latest

CMD="docker stack deploy -c $DOCKER_COMPOSE_YML ec2 --with-registry-auth"
echo "$(tput setaf 1)Deploying to EC2 ($CMD)...$(tput sgr0)"
echo "$CMD"

DOCKER_HOST="ssh://$HOST" $CMD
echo "$(tput setaf 1)Building finished $(date +'%Y%m%d.%H%M%S')$(tput sgr0)"

This script assumes that there’s a deploy.env file with our personal configuration (AWS profile, the host of the EC2, instance, The ECR and things like that)

PROFILE=xxxxxxx

DOKER_COMPOSE_YML=docker-compose.yml
HOST=ec2-user@xxxx.eu-central-1.compute.amazonaws.com

ECR=9999999999.dkr.ecr.eu-central-1.amazonaws.com
REGION=eu-central-1

In this example I’m using docker swarm to deploy the application. I want to play also with secrets. This dummy application don’t have any sensitive information but I’ve created one "ec2.supersecret" variable

echo "super secret text" | docker secret create ec2.supersecret -

That’s the docker-compose.yml file:

version: '3.8'
services:
  web:
    image: 999999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-web:latest
    command: /bin/bash ./docker-entrypoint.sh
    environment:
      DEBUG: 'False'
    secrets:
      - ec2.supersecret
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: app
    volumes:
      - static_volume:/src/staticfiles
  nginx:
    image: 99999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-nginx:latest
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: nginx
    volumes:
      - static_volume:/src/staticfiles:ro
    ports:
      - 8000:80
    depends_on:
      - web
volumes:
  static_volume:

secrets:
  ec2.supersecret:
    external: true

And that’s all. Maybe ECS or EKS are better solutions to deploy docker applications in AWS, but we also can deploy easily to one docker host in a EC2 instance that it can be ready within a couple of minutes.

Source code in my github

Django reactive users with Celery and Channels

Today I want to build a prototype. The idea is to create two Django applications. One application will be the master and the other one will the client. Both applications will have their User model but each change within master User model will be propagated through the client (or clients). Let me show you what I’ve got in my mind:

We’re going to create one signal in User model (at Master) to detect user modifications:

  • If certain fields have been changed (for example we’re going to ignore last_login, password and things like that) we’re going to emit a event
  • I normally work with AWS, so the event will be a SNS event.
  • The idea to have multiple clients, so each client will be listening to one SQS queue. Those SQSs queues will be mapped to the SNS event.
  • To decouple the SNS sending og the message we’re going to send it via Celery worker.
  • The second application (the Client) will have one listener to the SQS queue.
  • Each time the listener have a message it will persists the user information within the client’s User model
  • And also it will emit on message to one Django Channel’s consumer to be sent via websockets to the browser.

The Master

We’re going to emit the event each time the User model changes (and also when we create or delete one user). To detect changes we’re going to register on signal in the pre_save to mark if the model has been changed and later in the post_save we’re going to emit the event via Celery worker.

@receiver(pre_save, sender=User)
def pre_user_modified(sender, instance, **kwargs):
    instance.is_modified = None

    if instance.is_staff is False and instance.id is not None:
        modified_user_data = UserSerializer(instance).data
        user = User.objects.get(username=modified_user_data['username'])
        user_serializer_data = UserSerializer(user).data

        if user_serializer_data != modified_user_data:
            instance.is_modified = True

@receiver(post_save, sender=User)
def post_user_modified(sender, instance, created, **kwargs):
    if instance.is_staff is False:
        if created or instance.is_modified:
            modified_user_data = UserSerializer(instance).data
            user_changed_event.delay(modified_user_data, action=Actions.INSERT if created else Actions.UPDATE)

@receiver(post_delete, sender=User)
def post_user_deleted(sender, instance, **kwargs):
    deleted_user_data = UserSerializer(instance).data
    user_changed_event.delay(deleted_user_data, action=Actions.DELETE)

We need to register our signals in apps.py

from django.apps import AppConfig

class MasterConfig(AppConfig):
    name = 'master'

    def ready(self):
        from master.signals import pre_user_modified
        from master.signals import post_user_modified
        from master.signals import post_user_deleted

Our Celery task will send the message to sns queue

@shared_task()
def user_changed_event(body, action):
    sns = boto3.client('sns')
    message = {
        "user": body,
        "action": action
    }
    response = sns.publish(
        TargetArn=settings.SNS_REACTIVE_TABLE_ARN,
        Message=json.dumps({'default': json.dumps(message)}),
        MessageStructure='json'
    )
    logger.info(response)

AWS

In Aws We need to create one SNS messaging service and one SQS queue linked to this SNS.

The Client

First we need one command to run the listener.

class Actions:
    INSERT = 0
    UPDATE = 1
    DELETE = 2

switch_actions = {
    Actions.INSERT: insert_user,
    Actions.UPDATE: update_user,
    Actions.DELETE: delete_user,
}

class Command(BaseCommand):
    help = 'sqs listener'

    def handle(self, *args, **options):
        self.stdout.write(self.style.WARNING("starting listener"))
        sqs = boto3.client('sqs')

        queue_url = settings.SQS_REACTIVE_TABLES

        def process_message(message):
            decoded_body = json.loads(message['Body'])
            data = json.loads(decoded_body['Message'])

            switch_actions.get(data['action'])(
                data=data['user'],
                timestamp=message['Attributes']['SentTimestamp']
            )

            notify_to_user(data['user'])

            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=message['ReceiptHandle'])

        def loop():
            response = sqs.receive_message(
                QueueUrl=queue_url,
                AttributeNames=[
                    'SentTimestamp'
                ],
                MaxNumberOfMessages=10,
                MessageAttributeNames=[
                    'All'
                ],
                WaitTimeSeconds=20
            )

            if 'Messages' in response:
                messages = [message for message in response['Messages'] if 'Body' in message]
                [process_message(message) for message in messages]

        try:
            while True:
                loop()
        except KeyboardInterrupt:
            sys.exit(0)

Here we persists the model in Client’s database

def insert_user(data, timestamp):
    username = data['username']
    serialized_user = UserSerializer(data=data)
    serialized_user.create(validated_data=data)
    logging.info(f"user: {username} created at {timestamp}")

def update_user(data, timestamp):
    username = data['username']
    try:
        user = User.objects.get(username=data['username'])
        serialized_user = UserSerializer(user)
        serialized_user.update(user, data)
        logging.info(f"user: {username} updated at {timestamp}")
    except User.DoesNotExist:
        logging.info(f"user: {username} don't exits. Creating ...")
        insert_user(data, timestamp)

def delete_user(data, timestamp):
    username = data['username']
    try:
        user = User.objects.get(username=username)
        user.delete()
        logging.info(f"user: {username} deleted at {timestamp}")
    except User.DoesNotExist:
        logging.info(f"user: {username} don't exits. Don't deleted")

And also emit one message to channel’s consumer

def notify_to_user(user):
    username = user['username']
    serialized_user = UserSerializer(user)
    emit_message_to_user(
        message=serialized_user.data,
        username=username, )

Here the Consumer:

class WsConsumer(AsyncWebsocketConsumer):
    @personal_consumer
    async def connect(self):
        await self.channel_layer.group_add(
            self._get_personal_room(),
            self.channel_name
        )

    @private_consumer_event
    async def emit_message(self, event):
        message = event['message']
        await self.send(text_data=json.dumps(message))

    def _get_personal_room(self):
        username = self.scope['user'].username
        return self.get_room_name(username)

    @staticmethod
    def get_room_name(room):
        return f"{'ws_room'}_{room}"

def emit_message_to_user(message, username):
    group = WsConsumer.get_room_name(username)
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(group, {
        'type': WsConsumer.emit_message.__name__,
        'message': message
    })

Our consumer will only allow to connect only if the user is authenticated. That’s because I like Django Channels. This kind of thing are really simple to to (I’ve done similar things using PHP applications connected to a socket.io server and it was a nightmare). I’ve created a couple of decorators to ensure authentication in the consumer.

def personal_consumer(func):
    @wraps(func)
    async def wrapper_decorator(*args, **kwargs):
        self = args[0]

        async def accept():
            value = await func(*args, **kwargs)
            await self.accept()
            return value

        if self.scope['user'].is_authenticated:
            username = self.scope['user'].username
            room_name = self.scope['url_route']['kwargs']['username']
            if username == room_name:
                return await accept()

        await self.close()

    return wrapper_decorator

def private_consumer_event(func):
    @wraps(func)
    async def wrapper_decorator(*args, **kwargs):
        self = args[0]
        if self.scope['user'].is_authenticated:
            return await func(*args, **kwargs)

    return wrapper_decorator

That’s the websocket route

from django.urls import re_path

from client import consumers

websocket_urlpatterns = [
    re_path(r'ws/(?P<username>\w+)$', consumers.WsConsumer),
]

Finally we only need to connect our HTML page to the websocket

{% block title %}Example{% endblock %}
{% block header_text %}Hello <span id="name">{{ request.user.first_name }}</span>{% endblock %}

{% block extra_body %}
  <script>
    var ws_scheme = window.location.protocol === "https:" ? "wss" : "ws"
    var ws_path = ws_scheme + '://' + window.location.host + "/ws/{{ request.user.username }}"
    var ws = new ReconnectingWebSocket(ws_path)
    var render = function (key, value) {
      document.querySelector(`#${key}`).innerHTML = value
    }
    ws.onmessage = function (e) {
      const data = JSON.parse(e.data);
      render('name', data.first_name)
    }

    ws.onopen = function () {
      console.log('Connected')
    };
  </script>
{% endblock %}

Here a docker-compose with the project:

version: '3.4'

services:
  redis:
    image: redis
  master:
    image: reactive_master:latest
    command: python manage.py runserver 0.0.0.0:8001
    build:
      context: ./master
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    ports:
      - 8001:8001
    environment:
      REDIS_HOST: redis
  celery:
    image: reactive_master:latest
    command: celery -A master worker --uid=nobody --gid=nogroup
    depends_on:
      - "redis"
      - "master"
    environment:
      REDIS_HOST: redis
      SNS_REACTIVE_TABLE_ARN: ${SNS_REACTIVE_TABLE_ARN}
      AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
  client:
    image: reactive_client:latest
    command: python manage.py runserver 0.0.0.0:8000
    build:
      context: ./client
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    ports:
      - 8000:8000
    environment:
      REDIS_HOST: redis
  listener:
    image: reactive_client:latest
    command: python manage.py listener
    build:
      context: ./client
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    environment:
      REDIS_HOST: redis
      SQS_REACTIVE_TABLES: ${SQS_REACTIVE_TABLES}
      AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}

And that’s all. Here a working example of the prototype in action:

Source code in my github.

Building real time Python applications with Django Channels, Docker and Kubernetes

Three years ago I wrote an article about webockets. In fact I’ve written several articles about Websockets (Websockets and real time communications is something that I’m really passionate about), but today I would like to pick up this article. Nowadays I’m involved with several Django projects so I want to create a similar working prototype with Django. Let’s start:

In the past I normally worked with libraries such as socket.io to ensure browser compatibility with Websockets. Nowadays, at least in my world, we can assume that our users are using a modern browser with websocket support, so we’re going to use plain Websockets instead external libraries. Django has a great support to Websockets called Django Channels. It allows us to to handle Websockets (and other async protocols) thanks to Python’s ASGI’s specification. In fact is pretty straightforward to build applications with real time communication and with shared authentication (something that I have done in the past with a lot of effort. I’m getting old and now I like simple things :))

The application that I want to build is the following one: One Web application that shows the current time with seconds. Ok it’s very simple to do it with a couple of javascript lines but this time I want to create a worker that emits an event via Websockets with the current time. My web application will show that real time update. This kind of architecture always have the same problem: The initial state. In this example we can ignore it. When the user opens the browser it must show the current time. As I said before in this example we can ignore this situation. We can wait until the next event to update the initial blank information but if the event arrives each 10 seconds our user will have a blank screen until the next event arrives. In our example we’re going to take into account this situation. Each time our user connects to the Websocket it will ask to the server for the initial state.

Our initial state route will return the current time (using Redis). We can authorize our route using the standard Django’s protected routes

from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
from ws.redis import redis

@login_required
def initial_state(request):
    return JsonResponse({'current': redis.get('time')})

We need to configure our channels and define a our event:

from django.urls import re_path

from ws import consumers

websocket_urlpatterns = [
    re_path(r'time/tic/$', consumers.WsConsumer),
]

As we can see here we can reuse the authentication middleware in channel’s consumers also.

import json
import json
from channels.generic.websocket import AsyncWebsocketConsumer


class WsConsumer(AsyncWebsocketConsumer):
    GROUP = 'time'

    async def connect(self):
        if self.scope["user"].is_anonymous:
            await self.close()
        else:
            await self.channel_layer.group_add(
                self.GROUP,
                self.channel_name
            )
            await self.accept()

    async def tic_message(self, event):
        if not self.scope["user"].is_anonymous:
            message = event['message']

            await self.send(text_data=json.dumps({
                'message': message
            }))

We’re going to need a worker that each second triggers the current time (to avoid problems we’re going to trigger our event each 0.5 seconds). To perform those kind of actions Django has a great tool called Celery. We can create workers and scheduled task with Celery (exactly what we need in our example). To avoid the “initial state” situation our worker will persists the initial state into a Redis storage

app = Celery('config')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
   sender.add_periodic_task(0.5, ws_beat.s(), name='beat every 0.5 seconds')


@app.task
def ws_beat(group=WsConsumer.GROUP, event='tic_message'):
   current_time = time.strftime('%X')
   redis.set('time', current_time)
   message = {'time': current_time}
   channel_layer = channels.layers.get_channel_layer()
   async_to_sync(channel_layer.group_send)(group, {'type': event, 'message': message})

Finally we need a javascript client to consume our Websockets

let getWsUri = () => {
  return window.location.protocol === "https:" ? "wss" : "ws" +
    '://' + window.location.host +
    "/time/tic/"
}

let render = value => {
  document.querySelector('#display').innerHTML = value
}

let ws = new ReconnectingWebSocket(getWsUri())

ws.onmessage = e => {
  const data = JSON.parse(e.data);
  render(data.message.time)
}

ws.onopen = async () => {
  let response = await axios.get("/api/initial_state")
  render(response.data.current)
}

Basically that’s the source code (plus Django the stuff).

Application architecture
The architecture of the application is the following one:

Frontend
The Django application. We can run this application in development with
python manage.py runserver

And in production using a asgi server (uvicorn in this case)

uvicorn config.asgi:application --port 8000 --host 0.0.0.0 --workers 1

In development mode:

celery -A ws worker -l debug

And in production

celery -A ws worker --uid=nobody --gid=nogroup

We need this scheduler to emit our event (each 0.5 seconds)

celery -A ws beat

Message Server for Celery
In this case we’re going to use Redis

Docker
With this application we can use the same dockerfile for frontend, worker and scheduler using different entrypoints

Dockerfile:

FROM python:3.8

ENV TZ 'Europe/Madrid'
RUN echo $TZ > /etc/timezone && \
apt-get update && apt-get install -y tzdata && \
rm /etc/localtime && \
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
dpkg-reconfigure -f noninteractive tzdata && \
apt-get clean

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

ADD . /src
WORKDIR /src

RUN pip install -r requirements.txt

RUN mkdir -p /var/run/celery /var/log/celery
RUN chown -R nobody:nogroup /var/run/celery /var/log/celery

And our whole application within a docker-compose file

version: '3.4'

services:
  redis:
    image: redis
  web:
    image: clock:latest
    command: /bin/bash ./docker-entrypoint.sh
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 1m30s
      timeout: 10s
      retries: 3
      start_period: 40s
    depends_on:
      - "redis"
    ports:
      - 8000:8000
    environment:
      ENVIRONMENT: prod
      REDIS_HOST: redis
  celery:
    image: clock:latest
    command: celery -A ws worker --uid=nobody --gid=nogroup
    depends_on:
      - "redis"
    environment:
      ENVIRONMENT: prod
      REDIS_HOST: redis
  cron:
    image: clock:latest
    command: celery -A ws beat
    depends_on:
      - "redis"
    environment:
      ENVIRONMENT: prod
      REDIS_HOST: redis

If we want to deploy our application in a K8s cluster we need to migrate our docker-compose file into a k8s yaml files. I assume that we’ve deployed our docker containers into a container registry (such as ECR)

Frontend:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: clock-web-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: clock-web-api
      project: clock
  template:
    metadata:
      labels:
        app: clock-web-api
        project: clock
    spec:
      containers:
        - name: web-api
          image: my-ecr-path/clock:latest
          args: ["uvicorn", "config.asgi:application", "--port", "8000", "--host", "0.0.0.0", "--workers", "1"]
          ports:
            - containerPort: 8000
          env:
            - name: REDIS_HOST
              valueFrom:
                configMapKeyRef:
                  name: clock-app-config
                  key: redis.host
---
apiVersion: v1
kind: Service
metadata:
  name: clock-web-api
spec:
  type: LoadBalancer
  selector:
    app: clock-web-api
    project: clock
  ports:
    - protocol: TCP
      port: 8000 # port exposed internally in the cluster
      targetPort: 8000 # the container port to send requests to

Celery worker

apiVersion: apps/v1
kind: Deployment
metadata:
  name: clock-web-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: clock-web-api
      project: clock
  template:
    metadata:
      labels:
        app: clock-web-api
        project: clock
    spec:
      containers:
        - name: web-api
          image: my-ecr-path/clock:latest
          args: ["uvicorn", "config.asgi:application", "--port", "8000", "--host", "0.0.0.0", "--workers", "1"]
          ports:
            - containerPort: 8000
          env:
            - name: REDIS_HOST
              valueFrom:
                configMapKeyRef:
                  name: clock-app-config
                  key: redis.host
---
apiVersion: v1
kind: Service
metadata:
  name: clock-web-api
spec:
  type: LoadBalancer
  selector:
    app: clock-web-api
    project: clock
  ports:
    - protocol: TCP
      port: 8000 # port exposed internally in the cluster
      targetPort: 8000 # the container port to send requests to

Celery scheduler

apiVersion: apps/v1
kind: Deployment
metadata:
  name: clock-cron
spec:
  replicas: 1
  selector:
    matchLabels:
      app: clock-cron
      project: clock
  template:
    metadata:
      labels:
        app: clock-cron
        project: clock
    spec:
      containers:
        - name: clock-cron
          image: my-ecr-path/clock:latest
          args: ["celery", "-A", "ws", "beat"]
          env:
            - name: REDIS_HOST
              valueFrom:
                configMapKeyRef:
                  name: clock-app-config
                  key: redis.host

Redis

apiVersion: apps/v1
kind: Deployment
metadata:
  name: clock-redis
spec:
  replicas: 1
  selector:
    matchLabels:
      app: clock-redis
      project: clock
  template:
    metadata:
      labels:
        app: clock-redis
        project: clock
    spec:
      containers:
        - name: clock-redis
          image: redis
          ports:
            - containerPort: 6379
              name: clock-redis
---
apiVersion: v1
kind: Service
metadata:
  name: clock-redis
spec:
  type: ClusterIP
  ports:
    - port: 6379
      targetPort: 6379
  selector:
    app: clock-redis

Shared configuration

apiVersion: v1
kind: ConfigMap
metadata:
  name: clock-app-config
data:
  redis.host: "clock-redis"

We can deploy or application to our k8s cluster

kubectl apply -f .k8s/

And see it running inside the cluster locally with a port forward

kubectl port-forward deployment/clock-web-api 8000:8000

And that’s all. Our Django application with Websockets using Django Channels up and running with docker and also using k8s.

Source code in my github

Deploying Python Application using Docker and Kubernetes

I’ve learning how to deploy one Python application to Kubernetes. Here you can see my notes:

Let’s start from a dummy Python application. It’s a basic Flask web API. Each time we perform a GET request to “/” we increase one counter and see the number of hits. The persistence layer will be a Redis database. The script is very simple:

from flask import Flask
import os
from redis import Redis

redis = Redis(host=os.getenv('REDIS_HOST', 'localhost'),
              port=os.getenv('REDIS_PORT', 6379))
app = Flask(__name__)

@app.route('/')
def hello():
    redis.incr('hits')
    hits = int(redis.get('hits'))
    return f"Hits: {hits}"


if __name__ == "__main__":
    app.run(host='0.0.0.0')

First of all we create a virtual environment to ensure that we’re going to install your dependencies isolatedly:

python -m venv venv

We enter in the virtualenv

source venv/bin/activate

And we install our dependencies:

pip install -r requirements.txt

To be able to run our application we must ensure that we’ve a Redis database ready. We can run one with Docker:

docker run -p 6379:6379 redis

Now we can start our application:

python app.py

We open our browser with the url: http://localhost:5000 and it works.

Now we’re going to run our application within a Docker container. First of of all we need to create one Docker image from a docker file:

FROM python:alpine3.8
ADD . /code
WORKDIR /code
RUN pip install -r requirements.txt

EXPOSE 5000

Now we can build or image:

docker build -t front .

And now we can run our front image:

docker run -p 5000:5000 front python app.py

If we open now our browser with the url http://localhost:5000 we’ll get a 500 error. That’s because our Docker container is trying to use one Redis host within localhost. It worked before, when our application and our Redis were within the same host. Now our API’s localhost isn’t the same than our host’s one.

Our script the Redis host is localhost by default but it can be passed from an environment variable,

redis = Redis(host=os.getenv('REDIS_HOST', 'localhost'),
              port=os.getenv('REDIS_PORT', 6379))

we can pass to our our Docker container the real host where our Redis resides (suposing my IP address is 192.168.1.100):

docker run -p 5000:5000 --env REDIS_HOST=192.168.1.100 front python app.py

If dont’ want the development server we also can start our API using gunicorn

docker run -p 5000:5000 --env REDIS_HOST=192.168.1.100 front gunicorn -w 1 app:app -b 0.0.0.0:5000

And that works. We can start our app manually using Docker. But it's a bit complicated. We need to run two containers (API and Redis), setting up the env variables.
Docker helps us with docker-compose. We can create a docker-compose.yaml file configuring our all application:


version: '2'

services:
  front:
    image: front
    build:
      context: ./front
      dockerfile: Dockerfile
    container_name: front
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000
    depends_on:
      - redis
    ports:
      - "5000:5000"
    restart: always
    environment:
      REDIS_HOST: redis
      REDIS_PORT: 6379
  redis:
    image: redis
    ports:
      - "6379:6379"

I can execute it

docker-compose up

Docker compose is pretty straightforward. But, what happens if our production environment is a cluster? docker-compose works fine in a single host. But it our production environment is a cluster, we´ll face problems (we need to esure manually things like hight avaiavility and things like that). Docker people tried to answer to this question with Docker Swarm. Basically Swarm is docker-compose within a cluster. It uses almost the same syntax than docker-compose in a single host. Looks good, ins’t it? OK. Nobody uses it. Since Google created their Docker conainer orchestator (Kubernetes, aka K8s) it becames into the de-facto standard. The good thing about K8s is that it’s much better than Swarm (more configurable and more powerfull), but the bad part is that it isn’t as simple and easy to understand as docker-compose.

Let’s try to execute our proyect in K8s:

First I start minikube

minikube start

and I configure kubectl to connect to my minikube k8s cluster

eval $(minikube docker-env)

The API:

First we create one service:

apiVersion: v1
kind: Service
metadata:
  name: front-api
spec:
  # types:
  # - ClusterIP: (default) only accessible from within the Kubernetes cluster
  # - NodePort: accessible on a static port on each Node in the cluster
  # - LoadBalancer: accessible externally through a cloud provider's load balancer
  type: LoadBalancer
  # When the node receives a request on the static port (30163)
  # "select pods with the label 'app' set to 'front-api'"
  # and forward the request to one of them
  selector:
    app: front-api
  ports:
    - protocol: TCP
      port: 5000 # port exposed internally in the cluster
      targetPort: 5000 # the container port to send requests to
      nodePort: 30164 # a static port assigned on each the node

And one deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: front-api
spec:
  # How many copies of each pod do we want?
  replicas: 1

  selector:
    matchLabels:
      # This must match the labels we set on the pod!
      app: front-api

  # This template field is a regular pod configuration
  # nested inside the deployment spec
  template:
    metadata:
      # Set labels on the pod.
      # This is used in the deployment selector.
      labels:
        app: front-api
    spec:
      containers:
        - name: front-api
          image: front:v1
          args: ["gunicorn", "-w 1", "app:app", "-b 0.0.0.0:5000"]
          ports:
            - containerPort: 5000
          env:
            - name: REDIS_HOST
              valueFrom:
                configMapKeyRef:
                  name: api-config
                  key: redis.host

In order to learn a little bit of K8s I’m using a config map called ‘api-config’ where I put some information (such as the Redis host that I’m going to pass as a env variable)

apiVersion: v1
kind: ConfigMap
metadata:
  name: api-config
data:
  redis.host: "back-api"

The Backend: Our Redis database:

First the service:

apiVersion: v1
kind: Service
metadata:
  name: back-api
spec:
  type: ClusterIP
  ports:
    - port: 6379
      targetPort: 6379
  selector:
    app: back-api

And finally the deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: back-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: back-api
  template:
    metadata:
      labels:
        app: back-api
    spec:
      containers:
        - name: back-api
          image: redis
          ports:
            - containerPort: 6379
              name: redis

Before deploying my application to the cluster I need to build my docker image and tag it

docker build -t front .
docker tag front front:v1

Now I can deploy my application to my K8s cluster:

kubectl apply -f .k8s/

If want to know what’s the external url of my application in the cluster I can use this command

minikube service front-api --url

Then I can see it running using the browser or with curl

curl $(minikube service front-api --url)

And that’s all. I can delete all application alos

kubectl delete -f .k8s/ 

Source code available in my github

Building Bluetooth iot devices compatible with Alexa

Alexa can speak with iot devices (bulbs, switches, …) directly without creating any skill. Recently I’ve discoverer the library fauxmoesp to use a ESP32 as a virtual device and use it with Alexa.

I want to create a simple example with my M5Stack (it’s basically one ESP32 with an screen). It’s pretty straightforward to do it.
Here the firmware that I deploy to my device

#include <M5Stack.h>
#include <WiFi.h>
#include "fauxmoESP.h"

#define SERIAL_BAUDRATE 115200

#define WIFI_SSID "my_SSOD"
#define WIFI_PASS "my_password"

#define DEVICE_1 "my device"

fauxmoESP fauxmo;
bool current_state;
bool change_flag;
int current_value;

void wifiSetup() {
  WiFi.mode(WIFI_STA);
  WiFi.begin(WIFI_SSID, WIFI_PASS);

  while (WiFi.status() != WL_CONNECTED) {
    Serial.print(".");
    delay(100);
  }
  Serial.println();
  Serial.printf("WIFI coneccted! IP: %s\n", WiFi.localIP().toString().c_str());
}

void setup() {
  Serial.begin(SERIAL_BAUDRATE);
  Serial.println();
  M5.begin();
  M5.Power.begin();

  wifiSetup();

  current_state = false;
  current_value = 0;
  change_flag = false;
  
  fauxmo.createServer(true);
  fauxmo.setPort(80); 
  fauxmo.enable(true);
  fauxmo.addDevice(DEVICE_1);

  fauxmo.onSetState([](unsigned char device_id, const char * device_name, bool state, unsigned char value) { 
    Serial.printf("Device #%d (%s) state: %s value: %d\n", device_id, device_name, state ? "ON" : "OFF", value);
    current_value = value * 100 / 254;
    current_state = state;
    change_flag = true;
  });
}

void loop() {
  fauxmo.handle();

  static unsigned long last = millis();
  if (millis() - last > 5000) {
    last = millis();
    Serial.printf("[MAIN] Free heap: %d bytes\n", ESP.getFreeHeap());
  }
  if (change_flag) {
    M5.Lcd.fillScreen(current_state ? GREEN : RED);
    M5.Lcd.setCursor(100, 100);
    M5.Lcd.setTextColor(WHITE);
    M5.Lcd.setTextSize(6);
    M5.Lcd.print(current_state ? current_value : 0);
    change_flag = false;
  }
}

Then I only need to pair my virtual device (in this case “my device”) to my compatible Alexa Device (In my case one echo spot). We can do it with the Alexa device’s menu or simply saying “Alexa discover devices”. Then I’ve got my iot device paired to my Alexa and I can say something like: “Alexa, switch on my device”, “Alexa, switch off my device” or “Alexa, set my device at 50%”

Here a small video showing the working example:

Alexa and Raspberry Pi demo (Part 2). Listening to external events

Today I want to keep on with the previous example. This time I want to create one Alexa skill that listen to external events. The example that I’ve build is the following one:

I’ve got one ESP32 with a proximity sensor (one HC-SR04) that is sending the distance captured each 500ms to a MQTT broker (a mosquitto server).

I say “Alexa, use event demo” then Alexa tell me to put my hand close to the sensor and it will tell me the distance.

That’s the ESP32 code

#include <WiFi.h>
#include <PubSubClient.h>

const char* ssid = "MY_SSID";
const char* password = "MY_PASSWORD";
const char* server = "mqtt.server.ip";
const char* topic = "/alert";
const char* clientName = "com.gonzalo123.esp32";

WiFiClient wifiClient;
PubSubClient client(wifiClient);

int trigPin = 13;
int echoPin = 12;
int alert = 0;
int alertThreshold = 100;
long duration, distance_cm;

void wifiConnect() {
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print("*");
  }

  Serial.print("WiFi connected: ");
  Serial.println(WiFi.localIP());
}

void mqttReConnect() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    if (client.connect(clientName)) {
      Serial.println("connected");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

void mqttEmit(String topic, String value)
{
  client.publish((char*) topic.c_str(), (char*) value.c_str());
}

void setup() {
  Serial.begin(9600);
  pinMode(trigPin, OUTPUT);
  pinMode(echoPin, INPUT);

  wifiConnect();
  client.setServer(server, 1883);

  delay(1500);
}

void loop() {
  if (!client.connected()) {
    mqttReConnect();
  }

  client.loop();

  digitalWrite(trigPin, HIGH);
  delayMicroseconds(10);
  digitalWrite(trigPin, LOW);

  duration = pulseIn(echoPin, HIGH) / 2;
  distance_cm = duration / 29;

  if (distance_cm <= alertThreshold && alert == 0) {
      alert = 1;
      Serial.println("Alert!");
      mqttEmit("/alert", (String) distance_cm);
  } else if(distance_cm > alertThreshold && alert == 1) {
    alert = 0;
    Serial.println("No alert");
  }

  Serial.print("Distance: ");
  Serial.print(distance_cm);
  Serial.println(" cm ");

  delay(500);
}

And this is my alexa skill:

'use strict'

const Alexa = require('ask-sdk-core')

const RequestInterceptor = require('./interceptors/RequestInterceptor')
const ResponseInterceptor = require('./interceptors/ResponseInterceptor')
const LocalizationInterceptor = require('./interceptors/LocalizationInterceptor')
const GadgetInterceptor = require('./interceptors/GadgetInterceptor')

const YesIntentHandler = require('./handlers/YesIntentHandler')
const NoIntentHandler = require('./handlers/NoIntentHandler')
const LaunchRequestHandler = require('./handlers/LaunchRequestHandler')
const CancelAndStopIntentHandler = require('./handlers/CancelAndStopIntentHandler')
const SessionEndedRequestHandler = require('./handlers/SessionEndedRequestHandler')
const CustomInterfaceEventHandler = require('./handlers/CustomInterfaceEventHandler')
const CustomInterfaceExpirationHandler = require('./handlers/CustomInterfaceExpirationHandler')

const FallbackHandler = require('./handlers/FallbackHandler')
const ErrorHandler = require('./handlers/ErrorHandler')

let skill
exports.handler = function (event, context) {
  if (!skill) {
    skill = Alexa.SkillBuilders.custom().
      addRequestHandlers(
        LaunchRequestHandler,
        YesIntentHandler,
        NoIntentHandler,
        CustomInterfaceEventHandler,
        CustomInterfaceExpirationHandler,
        CancelAndStopIntentHandler,
        SessionEndedRequestHandler,
        FallbackHandler).
      addRequestInterceptors(
        RequestInterceptor,
        ResponseInterceptor,
        LocalizationInterceptor,
        GadgetInterceptor).
      addErrorHandlers(ErrorHandler).create()
  }
  return skill.invoke(event, context)
}

The process is similar to the previous example. There’s a GadgetInterceptor to find the endpointId of my Raspberry Pi. But now the Raspberry Pi must emit to the event to the skill, not the skill to the event. Now my rpi’s python script is a little bit more complicated. We need to start the main loop for the Alexa Gadget SDK but also we need to start another loop listening to a mqtt event. We need to use threads as we see in a previous post. That’s the script that I’m using.

from queue import Queue, Empty
import threading
import logging
import sys
import paho.mqtt.client as mqtt
from agt import AlexaGadget

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)


class Listener(threading.Thread):
    def __init__(self, queue=Queue()):
        super(Listener, self).__init__()
        self.queue = queue
        self.daemon = True

    def on_connect(self, client, userdata, flags, rc):
        print("Connected!")
        client.subscribe("/alert")

    def on_message(self, client, userdata, msg):
        self.on_event(int(msg.payload))

    def on_event(self, cm):
        pass

    def run(self):
        client = mqtt.Client()
        client.on_connect = self.on_connect
        client.on_message = self.on_message

        client.connect('192.168.1.87', 1883, 60)
        client.loop_forever()


class Gadget(AlexaGadget):
    def __init__(self):
        super().__init__()
        Listener.on_event = self._emit_event

    def _emit_event(self, cm):
        logger.info("_emit_event {}".format(cm))
        payload = {'cm': cm}
        self.send_custom_event('Custom.gonzalo123', 'sensor', payload)


l = Listener()
l.start()


def main():
    gadget = Gadget()
    gadget.main()


if __name__ == '__main__':
    main()

And that’s all. It’d be good if I could wake up my skill directly from my iot device, instead of waking up manually, but AFAIK that’s not possible.

Source code in my github

Alexa and Raspberry Pi demo

We’re keeping on playing with Alexa. This time I want to create one skill that uses a compatible device (for example one Raspberry Pi 3). Here you can see the documentation and examples that the people of Alexa provides us. Basically we need to create a new Product/Alexa gadget in the console. It gives us one amazonId and alexaGadgetSecret.

Then we need to install the SDK in our Raspberry Pi (it install several python libraries). The we can create our gadget script running on our Raspberry Pi, using the amazonId and alexaGadgetSecret. We can listen to several Alexa events. For example: When we say the wakeword, with a timer, an alarm and also we can create our custom events.

I’m going to create one skill that allows me to say something like: “Alexa, use device demo and set color to green” or “Alexa, use device demo and set color to red” and I’ll put this color in the led matrix that I’ve got (one Raspberry Pi Sense Hat)

This is the python script:

import logging
import sys
import json
from agt import AlexaGadget
from sense_hat import SenseHat

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger(__name__)

sense = SenseHat()
sense.clear()

RED = [255, 0, 0]
GREEN = [0, 255, 0]
BLUE = [0, 0, 255]
YELLOW = [255, 255, 0]
BLACK = [0, 0, 0]

class Gadget(AlexaGadget):
    def on_alexa_gadget_statelistener_stateupdate(self, directive):
        for state in directive.payload.states:
            if state.name == 'wakeword':
                sense.clear()
                if state.value == 'active':
                    sense.show_message("Alexa", text_colour=BLUE)

    def set_color_to(self, color):
        sense.set_pixels(([color] * 64))

    def on_custom_gonzalo123_setcolor(self, directive):
        payload = json.loads(directive.payload.decode("utf-8"))
        color = payload['color']

        if color == 'red':
            logger.info('turn on RED display')
            self.set_color_to(RED)

        if color == 'green':
            logger.info('turn on GREEN display')
            self.set_color_to(GREEN)

        if color == 'yellow':
            logger.info('turn on YELLOW display')
            self.set_color_to(YELLOW)

        if color == 'black':
            logger.info('turn on YELLOW display')
            self.set_color_to(BLACK)


if __name__ == '__main__':
    Gadget().main()

And here the ini file of our gadget

[GadgetSettings]
amazonId = my_amazonId
alexaGadgetSecret = my_alexaGadgetSecret

[GadgetCapabilities]
Alexa.Gadget.StateListener = 1.0 - wakeword
Custom.gonzalo123 = 1.0

Whit this ini file I’m saying that my gadget will trigger a function when wakeword has been said and with my custom event “Custom.gonzalo123”

That’s the skill:

const Alexa = require('ask-sdk')

const RequestInterceptor = require('./interceptors/RequestInterceptor')
const ResponseInterceptor = require('./interceptors/ResponseInterceptor')
const LocalizationInterceptor = require('./interceptors/LocalizationInterceptor')
const GadgetInterceptor = require('./interceptors/GadgetInterceptor')

const LaunchRequestHandler = require('./handlers/LaunchRequestHandler')
const ColorHandler = require('./handlers/ColorHandler')
const HelpIntentHandler = require('./handlers/HelpIntentHandler')
const CancelAndStopIntentHandler = require('./handlers/CancelAndStopIntentHandler')
const SessionEndedRequestHandler = require('./handlers/SessionEndedRequestHandler')
const FallbackHandler = require('./handlers/FallbackHandler')
const ErrorHandler = require('./handlers/ErrorHandler')

let skill

module.exports.handler = async (event, context) => {
  if (!skill) {
    skill = Alexa.SkillBuilders.custom().
      addRequestInterceptors(
        RequestInterceptor,
        ResponseInterceptor,
        LocalizationInterceptor,
        GadgetInterceptor
      ).
      addRequestHandlers(
        LaunchRequestHandler,
        ColorHandler,
        HelpIntentHandler,
        CancelAndStopIntentHandler,
        SessionEndedRequestHandler,
        FallbackHandler).
      addErrorHandlers(
        ErrorHandler).
      create()
  }

  return await skill.invoke(event, context)
}

One important thing here is the GadgetInterceptor. This interceptor find the endpointId from the request and append it to the session. This endpoint exists because we’ve created our Product/Alexa gadget and also the python script is running on the device. If the endpoint isn’t present maybe our skill must say to the user something like “No gadgets found. Please try again after connecting your gadget.”.

const utils = require('../lib/utils')

const GadgetInterceptor = {
  async process (handlerInput) {
    const endpointId = await utils.getEndpointIdFromConnectedEndpoints(handlerInput)
    if (endpointId) {
      utils.appendToSession(handlerInput, 'endpointId', endpointId)
    }
  }
}

module.exports = GadgetInterceptor

And finally the handlers to trigger the custom event:

const log = require('../lib/log')
const utils = require('../lib/utils')

const buildLEDDirective = (endpointId, color) => {
  return {
    type: 'CustomInterfaceController.SendDirective',
    header: {
      name: 'setColor',
      namespace: 'Custom.gonzalo123'
    },
    endpoint: {
      endpointId: endpointId
    },
    payload: {
      color: color
    }
  }
}

const ColorHandler = {
  canHandle (handlerInput) {
    return handlerInput.requestEnvelope.request.type === 'IntentRequest'
      && handlerInput.requestEnvelope.request.intent.name === 'ColorIntent'
  },
  handle (handlerInput) {
    const requestAttributes = handlerInput.attributesManager.getRequestAttributes()
    const cardTitle = requestAttributes.t('SKILL_NAME')

    const endpointId = utils.getEndpointIdFromSession(handlerInput)
    if (!endpointId) {
      log.error('endpoint', error)
      return handlerInput.responseBuilder.
        speak(error).
        getResponse()
    }

    const color = handlerInput.requestEnvelope.request.intent.slots['color'].value
    log.info('color', color)
    log.info('endpointId', endpointId)

    return handlerInput.responseBuilder.
      speak(`Ok. The selected color is ${color}`).
      withSimpleCard(cardTitle, color).
      addDirective(buildLEDDirective(endpointId, color)).
      getResponse()
  }
}

module.exports = ColorHandler

Here one video with the working example:

Source code in my github

Alexa skill and account linking with serverless and Cognito

Sometimes when we’re building one Alexa skill, we need to identify the user. To do that Alexa provides account linking. Basically we need an Oauth2 server to link our account within our Alexa skill. AWS provide us a managed Oauth2 service called Cognito, so we can use use Cognito identity pool to handle the authentication for our Alexa Skills.

In this example I’ve followed the following blog post. Cognito is is a bit weird to set up but after following all the steps we can use Account Linking in Alexa skill.

There’s also a good sample skill here. I’ve studied a little bit this example and create a working prototype by my own, basically to understand the process.

That’s my skill:

const Alexa = require('ask-sdk')

const RequestInterceptor = require('./interceptors/RequestInterceptor')
const ResponseInterceptor = require('./interceptors/ResponseInterceptor')
const LocalizationInterceptor = require('./interceptors/LocalizationInterceptor')
const GetLinkedInfoInterceptor = require('./interceptors/GetLinkedInfoInterceptor')

const LaunchRequestHandler = require('./handlers/LaunchRequestHandler')
const CheckAccountLinkedHandler = require('./handlers/CheckAccountLinkedHandler')
const HelloWorldIntentHandler = require('./handlers/HelloWorldIntentHandler')
const HelpIntentHandler = require('./handlers/HelpIntentHandler')
const CancelAndStopIntentHandler = require('./handlers/CancelAndStopIntentHandler')
const SessionEndedRequestHandler = require('./handlers/SessionEndedRequestHandler')
const FallbackHandler = require('./handlers/FallbackHandler')
const ErrorHandler = require('./handlers/ErrorHandler')
const RequestInfoHandler = require('./handlers/RequestInfoHandler')

let skill

module.exports.handler = async (event, context) => {
  if (!skill) {
    skill = Alexa.SkillBuilders.custom().
      addRequestInterceptors(
        RequestInterceptor,
        ResponseInterceptor,
        LocalizationInterceptor,
        GetLinkedInfoInterceptor
      ).
      addRequestHandlers(
        LaunchRequestHandler,
        CheckAccountLinkedHandler,
        HelloWorldIntentHandler,
        RequestInfoHandler,
        HelpIntentHandler,
        CancelAndStopIntentHandler,
        SessionEndedRequestHandler,
        FallbackHandler).
      addErrorHandlers(
        ErrorHandler).
      create()
  }

  return await skill.invoke(event, context)
}

The most important thing here is maybe GetLinkedInfoInterceptor.

const log = require('../lib/log')
const cognito = require('../lib/cognito')
const utils = require('../lib/utils')

const GetLinkedInfoInterceptor = {
  async process (handlerInput) {
    if (utils.isAccountLinked(handlerInput)) {
      const userData = await cognito.getUserData(handlerInput.requestEnvelope.session.user.accessToken)
      log.info('GetLinkedInfoInterceptor: getUserData: ', userData)
      const sessionAttributes = handlerInput.attributesManager.getSessionAttributes()
      if (userData.Username !== undefined) {
        sessionAttributes.auth = true
        sessionAttributes.emailAddress = cognito.getAttribute(userData.UserAttributes, 'email')
        sessionAttributes.userName = userData.Username
        handlerInput.attributesManager.setSessionAttributes(sessionAttributes)
      } else {
        sessionAttributes.auth = false
        log.error('GetLinkedInfoInterceptor: No user data was found.')
      }
    }
  }
}

module.exports = GetLinkedInfoInterceptor

This interceptor retrieves the user info from cognito when we provide the accessToken. We can obtain the accessToken from session (if our skill is account linked). Then we inject the user information (in my example the email and the username of the Cognito identity pool) into the session.

Then we can create one intent in our request handlers chain called CheckAccountLinkedHandler. With this intent we check if our skill is account linked. If not we can provide ‘withLinkAccountCard’ to force user to login with Cognito and link the skill’s account.

const utils = require('../lib/utils')

const CheckAccountLinkedHandler = {
  canHandle (handlerInput) {
    return !utils.isAccountLinked(handlerInput)
  },
  handle (handlerInput) {
    const requestAttributes = handlerInput.attributesManager.getRequestAttributes()
    const speakOutput = requestAttributes.t('NEED_TO_LINK_MESSAGE', 'SKILL_NAME')
    return handlerInput.responseBuilder.
      speak(speakOutput).
      withLinkAccountCard().
      getResponse()
  }
}

module.exports = CheckAccountLinkedHandler

Later we can create one intent to give the information to the user of maybe, in another case, perform an authorization workflow

const RequestInfoHandler = {
  canHandle (handlerInput) {
    const request = handlerInput.requestEnvelope.request
    return (request.type === 'IntentRequest'
      && request.intent.name === 'RequestInfoIntent')
  },
  handle (handlerInput) {
    const request = handlerInput.requestEnvelope.request
    const requestAttributes = handlerInput.attributesManager.getRequestAttributes()
    const sessionAttributes = handlerInput.attributesManager.getSessionAttributes()
    const repromptOutput = requestAttributes.t('FOLLOW_UP_MESSAGE')
    const cardTitle = requestAttributes.t('SKILL_NAME')

    let speakOutput = ''

    let inquiryTypeId = getResolvedSlotIDValue(request, 'infoTypeRequested')
    if (!inquiryTypeId) {
      inquiryTypeId = 'fullProfile'
      speakOutput += requestAttributes.t('NOT_SURE_OF_TYPE_MESSAGE')
    } else {
      if (inquiryTypeId === 'emailAddress' || inquiryTypeId === 'fullProfile') {
        speakOutput += requestAttributes.t('REPORT_EMAIL_ADDRESS', sessionAttributes.emailAddress)
      }

      if (inquiryTypeId === 'userName' || inquiryTypeId === 'fullProfile') {
        speakOutput += requestAttributes.t('REPORT_USERNAME', sessionAttributes.userName)
      }
    }

    speakOutput += repromptOutput

    return handlerInput.responseBuilder.
      speak(speakOutput).
      reprompt(repromptOutput).
      withSimpleCard(cardTitle, speakOutput).
      getResponse()
  }
}

module.exports = RequestInfoHandler

And basically that’s all. In fact isn’t very different than traditional web authentication. Maybe the most complicated part especially if you’re not used to Oauth2 is to configure Cognito properly.

Here you can see the source code in my github.

Playing with threads and Python. Part 2

Today I want to keep on playing with python and threads (part1 here). The idea is create one simple script that prints one asterisk in the console each second. Simple, isn’t it?

from time import sleep
while True:
    print("*")
    sleep(1)

I want to keep this script running but I want to send one message externally, for example using RabbitMQ, and do something within the running script. In this demo, for example, stop the script.

In javascript we can do it with a single tread process using the setInterval function but, since the rabbit listener with pika is a blocking action, we need to use threads in Python (please tell me if I’m wrong). The idea is to create a circuit breaker condition in the main loop to check if I need to stop or not the main thread.

First I’ve created my Rabbit listener in a thread:

from queue import Queue, Empty
import threading
import pika
import os


class Listener(threading.Thread):
    def __init__(self, queue=Queue()):
        super(Listener, self).__init__()
        self.queue = queue
        self.daemon = True

    def run(self):
        channel = self._get_channel()
        channel.queue_declare(queue='stop')

        channel.basic_consume(
            queue='stop',
            on_message_callback=lambda ch, method, properties, body: self.queue.put(item=True),
            auto_ack=True)

        channel.start_consuming()

    def stop(self):
        try:
            return True if self.queue.get(timeout=0.05) is True else False
        except Empty:
            return False

    def _get_channel(self):
        credentials = pika.PlainCredentials(
            username=os.getenv('RABBITMQ_USER'),
            password=os.getenv('RABBITMQ_PASS'))

        parameters = pika.ConnectionParameters(
            host=os.getenv('RABBITMQ_HOST'),
            credentials=credentials)

        connection = pika.BlockingConnection(parameters=parameters)

        return connection.channel()

Now in the main process I start the Listener and I enter in one endless loop to print my asterisk each second but at the end of each loop I check if I need to stop the process or not

from Listener import Listener
from dotenv import load_dotenv
import logging
from time import sleep
import os

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

l = Listener()
l.start()


def main():
    while True:
        logging.info("*")
        sleep(1)
        if l.stop():
            break


if __name__ == '__main__':
    main()

As we can see int the stop function we’re using the queue.Queue package to communicate with our listener loop.

And that’s all. In the example I also provide a minimal RabbitMQ server in a docker container.

version: '3.4'

services:
  rabbit:
    image: rabbitmq:3-management
    restart: always
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
    ports:
      - "15672:15672"
      - "5672:5672"

Source code available in my github

Alexa skill example with Serverless framework

Today I want to play with Alexa and serverless framework. I’ve created a sample hello world skill. In fact this example is more or less the official hello-world sample.

const Alexa = require('ask-sdk-core')
const RequestInterceptor = require('./interceptors/RequestInterceptor')
const ResponseInterceptor = require('./interceptors/ResponseInterceptor')
const LaunchRequestHandler = require('./handlers/LaunchRequestHandler')
const HelloWorldIntentHandler = require('./handlers/HelloWorldIntentHandler')
const HelpIntentHandler = require('./handlers/HelpIntentHandler')
const CancelAndStopIntentHandler = require('./handlers/CancelAndStopIntentHandler')
const SessionEndedRequestHandler = require('./handlers/SessionEndedRequestHandler')
const FallbackHandler = require('./handlers/FallbackHandler')
const ErrorHandler = require('./handlers/ErrorHandler')

let skill

module.exports.handler = async (event, context) => {
  if (!skill) {
    skill = Alexa.SkillBuilders.custom().
      addRequestInterceptors(RequestInterceptor).
      addResponseInterceptors(ResponseInterceptor).
      addRequestHandlers(
        LaunchRequestHandler,
        HelloWorldIntentHandler,
        HelpIntentHandler,
        CancelAndStopIntentHandler,
        SessionEndedRequestHandler,
        FallbackHandler).
      addErrorHandlers(
        ErrorHandler).
      create()
  }

  return await skill.invoke(event, context)
}

It has one Intent that answers to hello command.

const HelloWorldIntentHandler = {
  canHandle (handlerInput) {
    return handlerInput.requestEnvelope.request.type === 'IntentRequest'
      && handlerInput.requestEnvelope.request.intent.name === 'HelloWorldIntent'
  },
  handle (handlerInput) {
    const speechOutput = 'Hello World'
    const cardTitle = 'Hello world'

    return handlerInput.responseBuilder.
      speak(speechOutput).
      reprompt(speechOutput).
      withSimpleCard(cardTitle, speechOutput).
      getResponse()
  }
}

module.exports = HelloWorldIntentHandler

I’m using Serverless framework to deploy the skill. I know that serverless frameworks has plugins for Alexa skills that help us to create the whole skill, but in this example I want to do it a little more manually (it’s the way that I learn new things).

First I create the skill in the Alexa developer console (or via ask cli). There’re a lot of tutorials about it. Then I take my alexaSkillId and I use this id within my serverless config file as the trigger event of my lambda function.

service: hello-world

provider:
  name: aws
  runtime: nodejs8.10
  region: ${opt:region, self:custom.defaultRegion}
  stage: ${opt:stage, self:custom.defaultStage}

custom:
  defaultRegion: eu-west-1
  defaultStage: prod

functions:
  info:
    handler: src/index.handler
    events:
      - alexaSkill: amzn1.ask.skill.my_skill

then I deploy the lambda function

npx serverless deploy –aws-s3-accelerate

And I take the ARN of the lambda function and I use this lambda as my endpoint in the Alexa developer console.

Also we can test our skill (at least the lambda function) using our favorite testing framework. I will use jest in this example.
Testing is very important, at least for me, when I’m working with lambdas and serverless. I want to test my script locally, instead of deploying to AWS again and again (it’s slow).

const when = require('./steps/when')
const { init } = require('./steps/init')

describe('When we invoke the skill', () => {
  beforeAll(() => {
    init()
  })

  test('launch intent', async () => {
    const res = await when.we_invoke_intent(require('./events/use_skill'))
    const card = res.response.card
    expect(card.title).toBe('Hello world')
    expect(card.content).toBe('Welcome to Hello world, you can say Hello or Help. Which would you like to try?')
  })

  test('help handler', async () => {
    const res = await when.we_invoke_intent(require('./events/help_handler'))
    console.log(res.response.outputSpeech.ssml)
    expect(res.response.outputSpeech.ssml).toBe('<speak>You can say hello to me! How can I help?</speak>')
  })

  test('Hello world handler', async () => {
    const res = await when.we_invoke_intent(require('./events/hello_world_handler'))
    const card = res.response.card
    expect(card.title).toBe('Hello world')
    expect(card.content).toBe('Hello World')
  })
})

Full code in my github account.