Creating a standalone WebSocket Server with FastApi and JWT Authentication in Python

In this post, I will show you how to create a WebSocket server in Python that uses JWT tokens for authentication. The server is designed to be independent of the main process, making it easy to integrate into existing applications. The client-side JavaScript will handle reconnections incrementally.

The WebSocket server will be created using FastApi, the web framework built on top of Starlette. This is the entrypoint.
import logging

from fastapi import FastAPI

from asgi_ws import setup_app

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)
SECRET_KEY = "your_secret_key"

app = FastAPI()

app = setup_app(
    app=app,
    base_path='/ws',
    jwt_secret_key=SECRET_KEY,
)
The `setup_app` function is defined in the `lib.websockets` module. This function will set up the WebSocket server and the necessary routes.
def setup_app(app, jwt_secret_key: str, base_path='/ws', jwt_algorithm: str = "HS256"):
    ws_router = get_ws_router(
        jwt_secret_key=jwt_secret_key,
        jwt_algorithm=jwt_algorithm,
        base_path=base_path
    )
    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"

    app.mount("/js", StaticFiles(directory=static_dir), name="js")
    app.include_router(ws_router)

    return app
The `get_ws_router` function is defined in the same module. This function will create the WebSocket router and the necessary routes.
def get_ws_router(jwt_secret_key: str, base_path='ws', jwt_algorithm: str = "HS256"):
    ws_router = APIRouter()

    current_dir = Path(__file__).parent
    static_dir = current_dir / "static/js"
    ws_router.mount(f"{base_path}/js", StaticFiles(directory=static_dir), name="js")

    manager = ConnectionManager(jwt_secret_key=jwt_secret_key, jwt_algorithm=jwt_algorithm)

    @ws_router.post(f"{base_path}/emmit")
    async def emmit_endpoint(request: Request):
        payload = await request.json()
        await manager.broadcast(payload["channel"], payload["payload"])
        return True

    @ws_router.websocket(f"{base_path}/")
    async def websocket_endpoint(websocket: WebSocket):
        token = websocket.query_params.get("token")
        if not token:
            await websocket.close(code=1008)
            raise HTTPException(status_code=401, detail="Token required")

        await manager.connect(websocket, token)
        try:
            while True:
                message: Message = await websocket.receive()
                if message["type"] == "websocket.disconnect":
                    manager.disconnect(websocket)
                    break
        except WebSocketDisconnect:
            manager.disconnect(websocket)

    return ws_router
Websockets are bidirectional communication channels that allow real-time data transfer between clients and servers, but I prefer to avoid the communication from the client to the server. When a client wants to send a message to the server, it will send an HTTP POST request to the `/emit` endpoint (via the main process). The server will then broadcast the message to all connected clients. The client will only receive messages from the server. Because of that we need a main wsgi process using FastApi or another web framework to handle the HTTP requests. 

This an example with FastApi:
<!DOCTYPE html>
<html>
<head>
    <title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>

<input type="text" id="messageText" autocomplete="off"/>
<button onclick="sendMessage()">Send</button>

<ul id='messages'>
</ul>
//localhost:8000/js/websockets.js
<script>
    async function sendMessage() {
        const channel = 'chat';
        const url = `/emit/${channel}`;
        const input = document.getElementById("messageText");
        const message = input.value;
        input.value = '';
        const body = JSON.stringify({channel: 'chat1', payload: message});
        const headers = {'Content-Type': 'application/json'};

        try {
            const response = await fetch(url, {method: 'POST', headers: headers, body: body});
        } catch (error) {
            console.error('Error:', error);
        }
    }

    (async function () {
        const getToken = async () => {
            const response = await fetch('/token');
            const {token} = await response.json();
            return token;
        };

        const messageCallback = (event) => {
            const messages = document.getElementById('messages');
            const message = document.createElement('li');
            message.textContent = event.data;
            messages.appendChild(message);
        };

        const wsManager = new WebSocketManager('ws://localhost:8000/ws/', getToken, messageCallback);
        await wsManager.connect();
    })();

</script>
</body>
</html>

Library is available at pypi

poetry add asgi_ws
pip install asgi_ws

Full code available in my github account.

Creating a Real-Time Flask Application with Flask-SocketIO and Redis

Today, we’re going to create a simple Flask application with real-time communication using websockets and the SocketIO library. We’ll leverage the Flask-SocketIO extension for integration.

Here’s the plan: while websockets support bidirectional communication, we’ll use them exclusively for server-to-client messages. For client-to-server interactions, we’ll stick with traditional HTTP communication.

Our application will include session-based authentication. To simulate login, we’ve created a route called /login that establishes a session. This session-based authentication will also apply to our websocket connections.

A key objective of this tutorial is to enable sending websocket messages from outside the web application. For instance, you might want to send messages from a cron job or an external service. To achieve this, we’ll use a message queue to facilitate communication between the SocketIO server and the client application. We’ll utilize Redis as our message queue.

That’s the main application

from flask import Flask, render_template, session, request

from lib.ws import register_ws, emit_event, EmitWebsocketRequest
from settings import REDIS_HOST, WS_PATH

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

register_ws(app=app, socketio_path=WS_PATH, redis_host=REDIS_HOST)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/login')
def login():
    session['user'] = 'Gonzalo'
    return dict(name=session['user'])


@app.post('/api/')
def api():
    data = EmitWebsocketRequest(**request.json)
    emit_event(data.channel, data.body)

    return dict(status=True)

That’s the html template

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Flask-SocketIO Websocket Example</title>
    //cdn.socket.io/4.0.0/socket.io.min.js
</head>
<body>

<h1>Flask-SocketIO Websocket Example</h1>
<label for="message">Message:</label>
<input type="text" id="message" placeholder="type a message...">

<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>

<script>
    document.addEventListener("DOMContentLoaded", function () {
        let host = location.protocol + '//' + location.hostname + ':' + location.port
        let socket = io.connect(host, {
            path: '/ws/socket.io',
            reconnection: true,
            reconnectionDelayMax: 5000,
            reconnectionDelay: 1000
        });

        socket.on('connect', function () {
            console.log('Connected to ws');
        });

        socket.on('disconnect', function () {
            console.log('Disconnected from ws');
        });

        socket.on('message', function (msg) {
            let messages = document.getElementById('messages');
            let messageItem = document.createElement('li');
            messageItem.textContent = msg;
            messages.appendChild(messageItem);
        });

        window.sendMessage = async function () {
            const url = '/api/';
            const payload = {"channel": "message", "body": this.message.value};

            try {
                const response = await fetch(url, {
                    method: 'POST',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify(payload)
                });

                if (!response.ok) {
                    console.error('Error: ' + response.statusText);
                }

                await response.json();
            } catch (error) {
                console.error('Error:', error);
            }
        };
    });
</script>
</body>
</html>

The register_ws function binds SocketIO to our Flask server. To enable sending messages from outside our Flask application, we need to instantiate SocketIO in two different ways. For this purpose, I’ve created a ws.py file. Note: I’m using Pydantic to validate the HTTP requests.

import logging
from typing import Dict, Any, Union

from flask import session
from flask_socketio import SocketIO
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class Conf:
    def __init__(self, socketio=None):
        self._socketio = socketio

    @property
    def socketio(self):
        return self._socketio

    @socketio.setter
    def socketio(self, value):
        self._socketio = value


conf = Conf()


def emit_event(channel, body):
    conf.socketio.emit(channel, body)


class EmitWebsocketRequest(BaseModel):
    channel: str
    body: Union[Dict[str, Any], str]


def setup_ws(redis_host, redis_port=6379):
    conf.socketio = SocketIO(message_queue=f'redis://{redis_host}:{redis_port}')


def register_ws(
        app,
        redis_host,
        socketio_path='/ws/socket.io',
        redis_port=6379
):
    redis_url = f'redis://{redis_host}:{redis_port}' if redis_host else None
    conf.socketio = SocketIO(app, path=socketio_path, message_queue=redis_url)

    @conf.socketio.on('connect')
    def handle_connect():
        if not session.get("user"):
            raise ConnectionRefusedError('unauthorized!')
        logger.debug(f'Client connected: {session["user"]}')

    @conf.socketio.on('disconnect')
    def handle_disconnect():
        logger.debug('Client disconnected')

    return conf.socketio

Now, we can emit an event from outside the Flask application.

from lib.ws import emit_event, setup_ws
from settings import REDIS_HOST

setup_ws(redis_host=REDIS_HOST)
emit_event('message', 'Hi')

The application needs a Redis server. I set up the server using docker.

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Source code available in my github.

Django reactive users with Celery and Channels

Today I want to build a prototype. The idea is to create two Django applications. One application will be the master and the other one will the client. Both applications will have their User model but each change within master User model will be propagated through the client (or clients). Let me show you what I’ve got in my mind:

We’re going to create one signal in User model (at Master) to detect user modifications:

  • If certain fields have been changed (for example we’re going to ignore last_login, password and things like that) we’re going to emit a event
  • I normally work with AWS, so the event will be a SNS event.
  • The idea to have multiple clients, so each client will be listening to one SQS queue. Those SQSs queues will be mapped to the SNS event.
  • To decouple the SNS sending og the message we’re going to send it via Celery worker.
  • The second application (the Client) will have one listener to the SQS queue.
  • Each time the listener have a message it will persists the user information within the client’s User model
  • And also it will emit on message to one Django Channel’s consumer to be sent via websockets to the browser.

The Master

We’re going to emit the event each time the User model changes (and also when we create or delete one user). To detect changes we’re going to register on signal in the pre_save to mark if the model has been changed and later in the post_save we’re going to emit the event via Celery worker.

[sourcecode language=”python”]
@receiver(pre_save, sender=User)
def pre_user_modified(sender, instance, **kwargs):
instance.is_modified = None

if instance.is_staff is False and instance.id is not None:
modified_user_data = UserSerializer(instance).data
user = User.objects.get(username=modified_user_data[‘username’])
user_serializer_data = UserSerializer(user).data

if user_serializer_data != modified_user_data:
instance.is_modified = True

@receiver(post_save, sender=User)
def post_user_modified(sender, instance, created, **kwargs):
if instance.is_staff is False:
if created or instance.is_modified:
modified_user_data = UserSerializer(instance).data
user_changed_event.delay(modified_user_data, action=Actions.INSERT if created else Actions.UPDATE)

@receiver(post_delete, sender=User)
def post_user_deleted(sender, instance, **kwargs):
deleted_user_data = UserSerializer(instance).data
user_changed_event.delay(deleted_user_data, action=Actions.DELETE)
[/sourcecode]

We need to register our signals in apps.py

[sourcecode language=”python”]
from django.apps import AppConfig

class MasterConfig(AppConfig):
name = ‘master’

def ready(self):
from master.signals import pre_user_modified
from master.signals import post_user_modified
from master.signals import post_user_deleted
[/sourcecode]

Our Celery task will send the message to sns queue

[sourcecode language=”python”]
@shared_task()
def user_changed_event(body, action):
sns = boto3.client(‘sns’)
message = {
"user": body,
"action": action
}
response = sns.publish(
TargetArn=settings.SNS_REACTIVE_TABLE_ARN,
Message=json.dumps({‘default’: json.dumps(message)}),
MessageStructure=’json’
)
logger.info(response)
[/sourcecode]

AWS

In Aws We need to create one SNS messaging service and one SQS queue linked to this SNS.

The Client

First we need one command to run the listener.

[sourcecode language=”python”]
class Actions:
INSERT = 0
UPDATE = 1
DELETE = 2

switch_actions = {
Actions.INSERT: insert_user,
Actions.UPDATE: update_user,
Actions.DELETE: delete_user,
}

class Command(BaseCommand):
help = ‘sqs listener’

def handle(self, *args, **options):
self.stdout.write(self.style.WARNING("starting listener"))
sqs = boto3.client(‘sqs’)

queue_url = settings.SQS_REACTIVE_TABLES

def process_message(message):
decoded_body = json.loads(message[‘Body’])
data = json.loads(decoded_body[‘Message’])

switch_actions.get(data[‘action’])(
data=data[‘user’],
timestamp=message[‘Attributes’][‘SentTimestamp’]
)

notify_to_user(data[‘user’])

sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message[‘ReceiptHandle’])

def loop():
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=[
‘SentTimestamp’
],
MaxNumberOfMessages=10,
MessageAttributeNames=[
‘All’
],
WaitTimeSeconds=20
)

if ‘Messages’ in response:
messages = [message for message in response[‘Messages’] if ‘Body’ in message]
[process_message(message) for message in messages]

try:
while True:
loop()
except KeyboardInterrupt:
sys.exit(0)
[/sourcecode]

Here we persists the model in Client’s database

[sourcecode language=”python”]
def insert_user(data, timestamp):
username = data[‘username’]
serialized_user = UserSerializer(data=data)
serialized_user.create(validated_data=data)
logging.info(f"user: {username} created at {timestamp}")

def update_user(data, timestamp):
username = data[‘username’]
try:
user = User.objects.get(username=data[‘username’])
serialized_user = UserSerializer(user)
serialized_user.update(user, data)
logging.info(f"user: {username} updated at {timestamp}")
except User.DoesNotExist:
logging.info(f"user: {username} don’t exits. Creating …")
insert_user(data, timestamp)

def delete_user(data, timestamp):
username = data[‘username’]
try:
user = User.objects.get(username=username)
user.delete()
logging.info(f"user: {username} deleted at {timestamp}")
except User.DoesNotExist:
logging.info(f"user: {username} don’t exits. Don’t deleted")
[/sourcecode]

And also emit one message to channel’s consumer

[sourcecode language=”python”]
def notify_to_user(user):
username = user[‘username’]
serialized_user = UserSerializer(user)
emit_message_to_user(
message=serialized_user.data,
username=username, )
[/sourcecode]

Here the Consumer:

[sourcecode language=”python”]
class WsConsumer(AsyncWebsocketConsumer):
@personal_consumer
async def connect(self):
await self.channel_layer.group_add(
self._get_personal_room(),
self.channel_name
)

@private_consumer_event
async def emit_message(self, event):
message = event[‘message’]
await self.send(text_data=json.dumps(message))

def _get_personal_room(self):
username = self.scope[‘user’].username
return self.get_room_name(username)

@staticmethod
def get_room_name(room):
return f"{‘ws_room’}_{room}"

def emit_message_to_user(message, username):
group = WsConsumer.get_room_name(username)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(group, {
‘type’: WsConsumer.emit_message.__name__,
‘message’: message
})
[/sourcecode]

Our consumer will only allow to connect only if the user is authenticated. That’s because I like Django Channels. This kind of thing are really simple to to (I’ve done similar things using PHP applications connected to a socket.io server and it was a nightmare). I’ve created a couple of decorators to ensure authentication in the consumer.

[sourcecode language=”python”]
def personal_consumer(func):
@wraps(func)
async def wrapper_decorator(*args, **kwargs):
self = args[0]

async def accept():
value = await func(*args, **kwargs)
await self.accept()
return value

if self.scope[‘user’].is_authenticated:
username = self.scope[‘user’].username
room_name = self.scope[‘url_route’][‘kwargs’][‘username’]
if username == room_name:
return await accept()

await self.close()

return wrapper_decorator

def private_consumer_event(func):
@wraps(func)
async def wrapper_decorator(*args, **kwargs):
self = args[0]
if self.scope[‘user’].is_authenticated:
return await func(*args, **kwargs)

return wrapper_decorator
[/sourcecode]

That’s the websocket route

[sourcecode language=”python”]
from django.urls import re_path

from client import consumers

websocket_urlpatterns = [
re_path(r’ws/(?P&amp;lt;username&amp;gt;\w+)$’, consumers.WsConsumer),
]
[/sourcecode]

Finally we only need to connect our HTML page to the websocket

[sourcecode language=”python”]
{% block title %}Example{% endblock %}
{% block header_text %}Hello <span id="name">{{ request.user.first_name }}</span>{% endblock %}

{% block extra_body %}
<script>
var ws_scheme = window.location.protocol === "https:" ? "wss" : "ws"
var ws_path = ws_scheme + ‘://’ + window.location.host + "/ws/{{ request.user.username }}"
var ws = new ReconnectingWebSocket(ws_path)
var render = function (key, value) {
document.querySelector(`#${key}`).innerHTML = value
}
ws.onmessage = function (e) {
const data = JSON.parse(e.data);
render(‘name’, data.first_name)
}

ws.onopen = function () {
console.log(‘Connected’)
};
</script>
{% endblock %}
[/sourcecode]

Here a docker-compose with the project:

[sourcecode language=”xml”]
version: ‘3.4’

services:
redis:
image: redis
master:
image: reactive_master:latest
command: python manage.py runserver 0.0.0.0:8001
build:
context: ./master
dockerfile: Dockerfile
depends_on:
– "redis"
ports:
– 8001:8001
environment:
REDIS_HOST: redis
celery:
image: reactive_master:latest
command: celery -A master worker –uid=nobody –gid=nogroup
depends_on:
– "redis"
– "master"
environment:
REDIS_HOST: redis
SNS_REACTIVE_TABLE_ARN: ${SNS_REACTIVE_TABLE_ARN}
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
client:
image: reactive_client:latest
command: python manage.py runserver 0.0.0.0:8000
build:
context: ./client
dockerfile: Dockerfile
depends_on:
– "redis"
ports:
– 8000:8000
environment:
REDIS_HOST: redis
listener:
image: reactive_client:latest
command: python manage.py listener
build:
context: ./client
dockerfile: Dockerfile
depends_on:
– "redis"
environment:
REDIS_HOST: redis
SQS_REACTIVE_TABLES: ${SQS_REACTIVE_TABLES}
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
[/sourcecode]

And that’s all. Here a working example of the prototype in action:

Source code in my github.

Building real time Python applications with Django Channels, Docker and Kubernetes

Three years ago I wrote an article about webockets. In fact I’ve written several articles about Websockets (Websockets and real time communications is something that I’m really passionate about), but today I would like to pick up this article. Nowadays I’m involved with several Django projects so I want to create a similar working prototype with Django. Let’s start:

In the past I normally worked with libraries such as socket.io to ensure browser compatibility with Websockets. Nowadays, at least in my world, we can assume that our users are using a modern browser with websocket support, so we’re going to use plain Websockets instead external libraries. Django has a great support to Websockets called Django Channels. It allows us to to handle Websockets (and other async protocols) thanks to Python’s ASGI’s specification. In fact is pretty straightforward to build applications with real time communication and with shared authentication (something that I have done in the past with a lot of effort. I’m getting old and now I like simple things :))

The application that I want to build is the following one: One Web application that shows the current time with seconds. Ok it’s very simple to do it with a couple of javascript lines but this time I want to create a worker that emits an event via Websockets with the current time. My web application will show that real time update. This kind of architecture always have the same problem: The initial state. In this example we can ignore it. When the user opens the browser it must show the current time. As I said before in this example we can ignore this situation. We can wait until the next event to update the initial blank information but if the event arrives each 10 seconds our user will have a blank screen until the next event arrives. In our example we’re going to take into account this situation. Each time our user connects to the Websocket it will ask to the server for the initial state.

Our initial state route will return the current time (using Redis). We can authorize our route using the standard Django’s protected routes

[sourcecode language=”python”]
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
from ws.redis import redis

@login_required
def initial_state(request):
return JsonResponse({‘current’: redis.get(‘time’)})
[/sourcecode]

We need to configure our channels and define a our event:

[sourcecode language=”python”]
from django.urls import re_path

from ws import consumers

websocket_urlpatterns = [
re_path(r’time/tic/$’, consumers.WsConsumer),
]
[/sourcecode]

As we can see here we can reuse the authentication middleware in channel’s consumers also.
[sourcecode language=”python”]
import json
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class WsConsumer(AsyncWebsocketConsumer):
GROUP = ‘time’

async def connect(self):
if self.scope["user"].is_anonymous:
await self.close()
else:
await self.channel_layer.group_add(
self.GROUP,
self.channel_name
)
await self.accept()

async def tic_message(self, event):
if not self.scope["user"].is_anonymous:
message = event[‘message’]

await self.send(text_data=json.dumps({
‘message’: message
}))
[/sourcecode]

We’re going to need a worker that each second triggers the current time (to avoid problems we’re going to trigger our event each 0.5 seconds). To perform those kind of actions Django has a great tool called Celery. We can create workers and scheduled task with Celery (exactly what we need in our example). To avoid the “initial state” situation our worker will persists the initial state into a Redis storage

[sourcecode language=”python”]
app = Celery(‘config’)
app.config_from_object(‘django.conf:settings’, namespace=’CELERY’)
app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(0.5, ws_beat.s(), name=’beat every 0.5 seconds’)

@app.task
def ws_beat(group=WsConsumer.GROUP, event=’tic_message’):
current_time = time.strftime(‘%X’)
redis.set(‘time’, current_time)
message = {‘time’: current_time}
channel_layer = channels.layers.get_channel_layer()
async_to_sync(channel_layer.group_send)(group, {‘type’: event, ‘message’: message})
[/sourcecode]

Finally we need a javascript client to consume our Websockets

[sourcecode language=”javascript”]
let getWsUri = () => {
return window.location.protocol === "https:" ? "wss" : "ws" +
‘://’ + window.location.host +
"/time/tic/"
}

let render = value => {
document.querySelector(‘#display’).innerHTML = value
}

let ws = new ReconnectingWebSocket(getWsUri())

ws.onmessage = e => {
const data = JSON.parse(e.data);
render(data.message.time)
}

ws.onopen = async () => {
let response = await axios.get("/api/initial_state")
render(response.data.current)
}
[/sourcecode]

Basically that’s the source code (plus Django the stuff).

Application architecture
The architecture of the application is the following one:

Frontend
The Django application. We can run this application in development with
python manage.py runserver

And in production using a asgi server (uvicorn in this case)
[sourcecode language=”xml”]
uvicorn config.asgi:application –port 8000 –host 0.0.0.0 –workers 1
[/sourcecode]

In development mode:
[sourcecode language=”xml”]
celery -A ws worker -l debug
[/sourcecode]

And in production
[sourcecode language=”xml”]
celery -A ws worker –uid=nobody –gid=nogroup
[/sourcecode]

We need this scheduler to emit our event (each 0.5 seconds)
[sourcecode language=”xml”]
celery -A ws beat
[/sourcecode]

Message Server for Celery
In this case we’re going to use Redis

Docker
With this application we can use the same dockerfile for frontend, worker and scheduler using different entrypoints

Dockerfile:

[sourcecode language=”xml”]
FROM python:3.8

ENV TZ ‘Europe/Madrid’
RUN echo $TZ > /etc/timezone && \
apt-get update && apt-get install -y tzdata && \
rm /etc/localtime && \
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
dpkg-reconfigure -f noninteractive tzdata && \
apt-get clean

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

ADD . /src
WORKDIR /src

RUN pip install -r requirements.txt

RUN mkdir -p /var/run/celery /var/log/celery
RUN chown -R nobody:nogroup /var/run/celery /var/log/celery
[/sourcecode]

And our whole application within a docker-compose file

[sourcecode language=”xml”]
version: ‘3.4’

services:
redis:
image: redis
web:
image: clock:latest
command: /bin/bash ./docker-entrypoint.sh
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"%5D
interval: 1m30s
timeout: 10s
retries: 3
start_period: 40s
depends_on:
– "redis"
ports:
– 8000:8000
environment:
ENVIRONMENT: prod
REDIS_HOST: redis
celery:
image: clock:latest
command: celery -A ws worker –uid=nobody –gid=nogroup
depends_on:
– "redis"
environment:
ENVIRONMENT: prod
REDIS_HOST: redis
cron:
image: clock:latest
command: celery -A ws beat
depends_on:
– "redis"
environment:
ENVIRONMENT: prod
REDIS_HOST: redis
[/sourcecode]

If we want to deploy our application in a K8s cluster we need to migrate our docker-compose file into a k8s yaml files. I assume that we’ve deployed our docker containers into a container registry (such as ECR)

Frontend:
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-web-api
spec:
replicas: 1
selector:
matchLabels:
app: clock-web-api
project: clock
template:
metadata:
labels:
app: clock-web-api
project: clock
spec:
containers:
– name: web-api
image: my-ecr-path/clock:latest
args: ["uvicorn", "config.asgi:application", "–port", "8000", "–host", "0.0.0.0", "–workers", "1"]
ports:
– containerPort: 8000
env:
– name: REDIS_HOST
valueFrom:
configMapKeyRef:
name: clock-app-config
key: redis.host

apiVersion: v1
kind: Service
metadata:
name: clock-web-api
spec:
type: LoadBalancer
selector:
app: clock-web-api
project: clock
ports:
– protocol: TCP
port: 8000 # port exposed internally in the cluster
targetPort: 8000 # the container port to send requests to
[/sourcecode]

Celery worker
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-web-api
spec:
replicas: 1
selector:
matchLabels:
app: clock-web-api
project: clock
template:
metadata:
labels:
app: clock-web-api
project: clock
spec:
containers:
– name: web-api
image: my-ecr-path/clock:latest
args: ["uvicorn", "config.asgi:application", "–port", "8000", "–host", "0.0.0.0", "–workers", "1"]
ports:
– containerPort: 8000
env:
– name: REDIS_HOST
valueFrom:
configMapKeyRef:
name: clock-app-config
key: redis.host

apiVersion: v1
kind: Service
metadata:
name: clock-web-api
spec:
type: LoadBalancer
selector:
app: clock-web-api
project: clock
ports:
– protocol: TCP
port: 8000 # port exposed internally in the cluster
targetPort: 8000 # the container port to send requests to
[/sourcecode]

Celery scheduler
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-cron
spec:
replicas: 1
selector:
matchLabels:
app: clock-cron
project: clock
template:
metadata:
labels:
app: clock-cron
project: clock
spec:
containers:
– name: clock-cron
image: my-ecr-path/clock:latest
args: ["celery", "-A", "ws", "beat"]
env:
– name: REDIS_HOST
valueFrom:
configMapKeyRef:
name: clock-app-config
key: redis.host
[/sourcecode]

Redis
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-redis
spec:
replicas: 1
selector:
matchLabels:
app: clock-redis
project: clock
template:
metadata:
labels:
app: clock-redis
project: clock
spec:
containers:
– name: clock-redis
image: redis
ports:
– containerPort: 6379
name: clock-redis

apiVersion: v1
kind: Service
metadata:
name: clock-redis
spec:
type: ClusterIP
ports:
– port: 6379
targetPort: 6379
selector:
app: clock-redis
[/sourcecode]

Shared configuration
[sourcecode language=”xml”]
apiVersion: v1
kind: ConfigMap
metadata:
name: clock-app-config
data:
redis.host: "clock-redis"
[/sourcecode]

We can deploy or application to our k8s cluster

[sourcecode language=”xml”]
kubectl apply -f .k8s/
[/sourcecode]

And see it running inside the cluster locally with a port forward

[sourcecode language=”xml”]
kubectl port-forward deployment/clock-web-api 8000:8000
[/sourcecode]

And that’s all. Our Django application with Websockets using Django Channels up and running with docker and also using k8s.

Source code in my github

Working with SAPUI5 locally (part 3). Adding more services in Docker

In the previous project we moved one project to docker. The idea was to move exactly the same functionality (even without touching anything within the source code). Now we’re going to add more services. Yes, I know, it looks like overenginering (it’s exactly overenginering, indeed), but I want to build something with different services working together. Let start.

We’re going to change a little bit our original project. Now our frontend will only have one button. This button will increment the number of clicks but we’re going to persists this information in a PostgreSQL database. Also, instead of incrementing the counter in the backend, our backend will emit one event to a RabbitMQ message broker. We’ll have one worker service listening to this event and this worker will persist the information. The communication between the worker and the frontend (to show the incremented value), will be via websockets.

With those premises we are going to need:

  • Frontend: UI5 application
  • Backend: PHP/lumen application
  • Worker: nodejs application which is listening to a RabbitMQ event and serving the websocket server (using socket.io)
  • Nginx server
  • PosgreSQL database.
  • RabbitMQ message broker.

As the previous examples, our PHP backend will be server via Nginx and PHP-FPM.

Here we can see to docker-compose file to set up all the services

[sourcecode language=”xml”]
version: ‘3.4’

services:
nginx:
image: gonzalo123.nginx
restart: always
ports:
– "8080:80"
build:
context: ./src
dockerfile: .docker/Dockerfile-nginx
volumes:
– ./src/backend:/code/src
– ./src/.docker/web/site.conf:/etc/nginx/conf.d/default.conf
networks:
– app-network
api:
image: gonzalo123.api
restart: always
build:
context: ./src
dockerfile: .docker/Dockerfile-lumen-dev
environment:
XDEBUG_CONFIG: remote_host=${MY_IP}
volumes:
– ./src/backend:/code/src
networks:
– app-network
ui5:
image: gonzalo123.ui5
ports:
– "8000:8000"
restart: always
volumes:
– ./src/frontend:/code/src
build:
context: ./src
dockerfile: .docker/Dockerfile-ui5
networks:
– app-network
io:
image: gonzalo123.io
ports:
– "9999:9999"
restart: always
volumes:
– ./src/io:/code/src
build:
context: ./src
dockerfile: .docker/Dockerfile-io
networks:
– app-network
pg:
image: gonzalo123.pg
restart: always
ports:
– "5432:5432"
build:
context: ./src
dockerfile: .docker/Dockerfile-pg
environment:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_DB: ${POSTGRES_DB}
PGDATA: /var/lib/postgresql/data/pgdata
networks:
– app-network
rabbit:
image: rabbitmq:3-management
container_name: gonzalo123.rabbit
restart: always
ports:
– "15672:15672"
– "5672:5672"
environment:
RABBITMQ_ERLANG_COOKIE:
RABBITMQ_DEFAULT_VHOST: /
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
networks:
– app-network
networks:
app-network:
driver: bridge
[/sourcecode]

We’re going to use the same docker files than in the previous post but we also need new ones for worker, database server and message queue:

Worker:
[sourcecode language=”xml”]
FROM node:alpine

EXPOSE 8000

WORKDIR /code/src
COPY ./io .
RUN npm install
ENTRYPOINT ["npm", "run", "serve"]
[/sourcecode]

The worker script is simple script that serves the socket.io server and emits a websocket within every message to the RabbitMQ queue.

[sourcecode language=”js”]
var amqp = require(‘amqp’),
httpServer = require(‘http’).createServer(),
io = require(‘socket.io’)(httpServer, {
origins: ‘*:*’,
}),
pg = require(‘pg’)
;

require(‘dotenv’).config();
var pgClient = new pg.Client(process.env.DB_DSN);

rabbitMq = amqp.createConnection({
host: process.env.RABBIT_HOST,
port: process.env.RABBIT_PORT,
login: process.env.RABBIT_USER,
password: process.env.RABBIT_PASS,
});

var sql = ‘SELECT clickCount FROM docker.clicks’;

// Please don’t do this. Use lazy connections
// I’m ‘lazy’ to do it in this POC 🙂
pgClient.connect(function(err) {
io.on(‘connection’, function() {
pgClient.query(sql, function(err, result) {
var count = result.rows[0][‘clickcount’];
io.emit(‘click’, {count: count});
});

});

rabbitMq.on(‘ready’, function() {
var queue = rabbitMq.queue(‘ui5’);
queue.bind(‘#’);

queue.subscribe(function(message) {
pgClient.query(sql, function(err, result) {
var count = parseInt(result.rows[0][‘clickcount’]);
count = count + parseInt(message.data.toString(‘utf8’));
pgClient.query(‘UPDATE docker.clicks SET clickCount = $1’, [count],
function(err) {
io.emit(‘click’, {count: count});
});
});
});
});
});

httpServer.listen(process.env.IO_PORT);
[/sourcecode]

Database server:
[sourcecode language=”xml”]
FROM postgres:9.6-alpine
COPY pg/init.sql /docker-entrypoint-initdb.d/
[/sourcecode]

As we can see we’re going to generate the database estructure in the first build
[sourcecode language=”sql”]
CREATE SCHEMA docker;

CREATE TABLE docker.clicks (
clickCount numeric(8) NOT NULL
);

ALTER TABLE docker.clicks
OWNER TO username;

INSERT INTO docker.clicks(clickCount) values (0);
[/sourcecode]

With the RabbitMQ server we’re going to use the official docker image so we don’t need to create one Dockerfile

We also have changed a little bit our Nginx configuration. We want to use Nginx to serve backend and also socket.io server. That’s because we don’t want to expose different ports to internet.

[sourcecode language=”xml”]
server {
listen 80;
index index.php index.html;
server_name localhost;
error_log /var/log/nginx/error.log;
access_log /var/log/nginx/access.log;
root /code/src/www;

location /socket.io/ {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_pass "http://io:9999&quot;;
}

location / {
try_files $uri $uri/ /index.php?$query_string;
}

location ~ \.php$ {
try_files $uri =404;
fastcgi_split_path_info ^(.+\.php)(/.+)$;
fastcgi_pass api:9000;
fastcgi_index index.php;
include fastcgi_params;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
fastcgi_param PATH_INFO $fastcgi_path_info;
}
}
[/sourcecode]

To avoid CORS issues we can also use SCP destination (the localneo proxy in this example), to serve socket.io also. So we need to:

  • change our neo-app.json file
  • [sourcecode language=”js”]
    "routes": [

    {
    "path": "/socket.io",
    "target": {
    "type": "destination",
    "name": "SOCKETIO"
    },
    "description": "SOCKETIO"
    }
    ],
    [/sourcecode]

    And basically that’s all. Here also we can use a “production” docker-copose file without exposing all ports and mapping the filesystem to our local machine (useful when we’re developing)

    [sourcecode language=”xml”]
    version: ‘3.4’

    services:
    nginx:
    image: gonzalo123.nginx
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-nginx
    networks:
    – app-network
    api:
    image: gonzalo123.api
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-lumen
    networks:
    – app-network
    ui5:
    image: gonzalo123.ui5
    ports:
    – "80:8000"
    restart: always
    volumes:
    – ./src/frontend:/code/src
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-ui5
    networks:
    – app-network
    io:
    image: gonzalo123.io
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-io
    networks:
    – app-network
    pg:
    image: gonzalo123.pg
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-pg
    environment:
    POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    POSTGRES_USER: ${POSTGRES_USER}
    POSTGRES_DB: ${POSTGRES_DB}
    PGDATA: /var/lib/postgresql/data/pgdata
    networks:
    – app-network
    rabbit:
    image: rabbitmq:3-management
    restart: always
    environment:
    RABBITMQ_ERLANG_COOKIE:
    RABBITMQ_DEFAULT_VHOST: /
    RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
    RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
    networks:
    – app-network
    networks:
    app-network:
    driver: bridge
    [/sourcecode]

    And that’s all. The full project is available in my github account

    Real Time IoT in the cloud with SAP’s SCP, Cloud Foundry and WebSockets

    Nowadays I’m involved with a cloud project based on SAP Cloud Platform (SCP). Side projects are the best way to mastering new technologies (at least for me) so I want to build something with SCP and my Arduino stuff. SCP comes whit one IoT module. In fact every cloud platforms have, in one way or another, one IoT module (Amazon, Azure, …). With SCP the IoT module it’s just a Hana Database where we can push our IoT values and we’re able to retrieve information via oData (the common way in SAP world).

    It’s pretty straightforward to configure the IoT module with the SAP Cloud Platform Cockpit (Every thing can be done with a hana trial account).

    NodeMcu

    First I’m going to use a simple circuit with my NodeMcu connected to my wifi network. The prototype is a potentiometer connected to the analog input. I normally use this this circuit because I can change the value just changing the potentiometer wheel. I know it’s not very usefull, but we can easily change it and use a sensor (temperature, humidity, light, …)

    It will send the percentage (from 0 to 100) of the position of the potentiometer directly to the cloud.

    [sourcecode language=”c”]
    #include <ESP8266WiFi.h>

    const int potentiometerPin = 0;

    // Wifi configuration
    const char* ssid = "my-wifi-ssid";
    const char* password = "my-wifi-password";

    // SAP SCP specific configuration
    const char* host = "mytenant.hanatrial.ondemand.com";
    String device_id = "my-device-ide";
    String message_type_id = "my-device-type-id";
    String oauth_token = "my-oauth-token";

    String url = "https://[mytenant].hanatrial.ondemand.com/com.sap.iotservices.mms/v1/api/http/data/&quot; + device_id;

    const int httpsPort = 443;

    WiFiClientSecure clientTLS;

    void wifiConnect() {
    Serial.println();
    Serial.print("Connecting to ");
    Serial.println(ssid);

    WiFi.begin(ssid, password);

    while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
    }
    Serial.println("");
    Serial.print("WiFi connected.");
    Serial.print("IP address: ");
    Serial.println(WiFi.localIP());
    }

    void sendMessage(int value) {
    String payload = "{\"mode\":\"async\", \"messageType\":\"" + message_type_id + "\", \"messages\":[{\"value\": " + (String) value + "}]}";
    Serial.print("connecting to ");
    Serial.println(host);
    if (!clientTLS.connect(host, httpsPort)) {
    Serial.println("connection failed");
    return;
    }

    Serial.print("requesting payload: ");
    Serial.println(url);

    clientTLS.print(String("POST ") + url + " HTTP/1.0\r\n" +
    "Host: " + host + "\r\n" +
    "Content-Type: application/json;charset=utf-8\r\n" +
    "Authorization: Bearer " + oauth_token + "\r\n" +
    "Content-Length: " + payload.length() + "\r\n\r\n" +
    payload + "\r\n\r\n");

    Serial.println("request sent");

    Serial.println("reply was:");
    while (clientTLS.connected()) {
    String line = clientTLS.readStringUntil(‘\n’);
    Serial.println(line);
    }
    }

    void setup() {
    Serial.begin(9600);
    wifiConnect();

    delay(10);
    }

    int mem;
    void loop() {

    int value = ((analogRead(potentiometerPin) * 100) / 1010);
    if (value < (mem – 1) or value > (mem + 1)) {
    sendMessage(value);
    Serial.println(value);
    mem = value;
    }

    delay(200);
    }
    [/sourcecode]

    SCP

    SAP Cloud Platform allows us to create web applications using SAPUI5 framework easily. It also allows us to create a destination (the way that SAP’s cloud uses to connect different modules) to our IoT module. Also every Hana table can be accessed via oData so and we can retrieve the information easily within SAPIUI5.

    [sourcecode language=”js”]
    onAfterRendering: function () {
    var model = this.model;

    this.getView().getModel().read("/my-hana-table-odata-uri", {
    urlParameters: {
    $top: 1,
    $orderby: "G_CREATED desc"
    },
    success: function (oData) {
    model.setProperty("/value", oData.results[0].C_VALUE);
    }
    });
    }
    [/sourcecode]

    and display in a view

    [sourcecode language=”xml”]
    <mvc:View controllerName="gonzalo123.iot.controller.Main" xmlns:html="http://www.w3.org/1999/xhtml&quot; xmlns:mvc="sap.ui.core.mvc"
    displayBlock="true" xmlns="sap.m">
    <App>
    <pages>
    <Page title="{i18n>title}">
    <content>
    <GenericTile class="sapUiTinyMarginBegin sapUiTinyMarginTop tileLayout" header="nodemcu" frameType="OneByOne">
    <tileContent>
    <TileContent unit="%">
    <content>
    <NumericContent value="{view>/value}" icon="sap-icon://line-charts"/>
    </content>
    </TileContent>
    </tileContent>
    </GenericTile>
    </content>
    </Page>
    </pages>
    </App>
    </mvc:View>
    [/sourcecode]

    Cloud Foundry

    The web application (with SCP and SAPUI5) can access to IoT values via oData. We can fetch data again and again, but that’s not cool. We want real time updates in the web application. So we need WebSockets. SCP IoT module allows us to use WebSockets to put information, but not get updates (afaik. Let me know if I’m wrong). We also can connect our IoT to an existing MQTT server, but in this prototype I only want to use websockets. So we’re going to create a simple WebSocket server with node and socket.io. This server will be listening to devices updates (again and again with a setInterval function via oData) and when it detects a change it will emit a broadcast to the WebSocket.

    SAP’s SCP also allows us to create services with Cloud Foundry. So we’ll create our nodejs server there.

    [sourcecode language=”js”]
    var http = require(‘http’),
    io = require(‘socket.io’),
    request = require(‘request’),
    auth = "Basic " + new Buffer(process.env.USER + ":" + process.env.PASS).toString("base64"),
    url = process.env.IOT_ODATA,
    INTERVAL = process.env.INTERVAL,
    socket,
    value;

    server = http.createServer();
    server.listen(process.env.PORT || 3000);

    socket = io.listen(server);

    setInterval(function () {
    request.get({
    url: url,
    headers: {
    "Authorization": auth,
    "Accept": "application/json"
    }
    }, function (error, response, body) {
    var newValue = JSON.parse(body).d.results[0].C_VALUE;
    if (value !== newValue) {
    value = newValue;
    socket.sockets.emit(‘value’, value);
    }
    });
    }, INTERVAL);
    [/sourcecode]

    And that’s all. My NodeMcu device connected to the cloud.

    Full project available in my github

    Playing with Docker, Silex, Python, Node and WebSockets

    I’m learning Docker. In this post I want to share a little experiment that I have done. I know the code looks like over-engineering but it’s just an excuse to build something with docker and containers. Let me explain it a little bit.

    The idea is build a Time clock in the browser. Something like this:

    Clock

    Yes I know. We can do it only with js, css and html but we want to hack a little bit more. The idea is to create:

    • A Silex/PHP frontend
    • A WebSocket server with socket.io/node
    • A Python script to obtain the current time

    WebSocket server will open 2 ports: One port to serve webSockets (socket.io) and another one as a http server (express). Python script will get the current time and it’ll send it to the webSocket server. Finally one frontend(silex) will be listening to WebSocket’s event and it will render the current time.

    That’s the WebSocket server (with socket.io and express)
    [sourcecode language=”js”]
    var
    express = require(‘express’),
    expressApp = express(),
    server = require(‘http’).Server(expressApp),
    io = require(‘socket.io’)(server, {origins: ‘localhost:*’})
    ;

    expressApp.get(‘/tic’, function (req, res) {
    io.sockets.emit(‘time’, req.query.time);
    res.json(‘OK’);
    });

    expressApp.listen(6400, ‘0.0.0.0’);

    server.listen(8080);
    [/sourcecode]

    That’s our Python script

    [sourcecode language=”python”]
    from time import gmtime, strftime, sleep
    import httplib2

    h = httplib2.Http()
    while True:
    (resp, content) = h.request("http://node:6400/tic?time=&quot; + strftime("%H:%M:%S", gmtime()))
    sleep(1)
    [/sourcecode]

    And our Silex frontend
    [sourcecode language=”php”]
    use Silex\Application;
    use Silex\Provider\TwigServiceProvider;

    $app = new Application([‘debug’ => true]);
    $app->register(new TwigServiceProvider(), [
    ‘twig.path’ => __DIR__ . ‘/../views’,
    ]);

    $app->get("/", function (Application $app) {
    return $app[‘twig’]->render(‘index.twig’, []);
    });

    $app->run();
    [/sourcecode]

    using this twig template

    [sourcecode language=”html”]
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Docker example</title>
    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css&quot; integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
    <link href="css/app.css" rel="stylesheet">
    <script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script&gt;
    <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script&gt;
    </head>
    <body>
    <div class="site-wrapper">
    <div class="site-wrapper-inner">
    <div class="cover-container">
    <div class="inner cover">
    <h1 class="cover-heading">
    <div id="display">
    display
    </div>
    </h1>
    </div>
    </div>
    </div>
    </div>
    <script src="//localhost:8080/socket.io/socket.io.js"></script>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.12.4/jquery.min.js"></script&gt;
    <script>
    var socket = io.connect(‘//localhost:8080’);

    $(function () {
    socket.on(‘time’, function (data) {
    $(‘#display’).html(data);
    });
    });
    </script>
    </body>
    </html>
    [/sourcecode]

    The idea is to use one Docker container for each process. I like to have all the code in one place so all containers will share the same volume with source code.

    First the node container (WebSocket server)

    [sourcecode language=”text”]
    FROM node:argon

    RUN mkdir -p /mnt/src
    WORKDIR /mnt/src/node

    EXPOSE 8080 6400
    [/sourcecode]

    Now the python container
    [sourcecode language=”text”]
    FROM python:2

    RUN pip install httplib2

    RUN mkdir -p /mnt/src
    WORKDIR /mnt/src/python
    [/sourcecode]

    And finally Frontend contailer (apache2 with Ubuntu 16.04)

    [sourcecode language=”text”]
    FROM ubuntu:16.04

    RUN locale-gen es_ES.UTF-8
    RUN update-locale LANG=es_ES.UTF-8
    ENV DEBIAN_FRONTEND=noninteractive

    RUN apt-get update -y
    RUN apt-get install –no-install-recommends -y apache2 php libapache2-mod-php
    RUN apt-get clean -y

    COPY ./apache2/sites-available/000-default.conf /etc/apache2/sites-available/000-default.conf

    RUN mkdir -p /mnt/src

    RUN a2enmod rewrite
    RUN a2enmod proxy
    RUN a2enmod mpm_prefork

    RUN chown -R www-data:www-data /mnt/src
    ENV APACHE_RUN_USER www-data
    ENV APACHE_RUN_GROUP www-data
    ENV APACHE_LOG_DIR /var/log/apache2
    ENV APACHE_LOCK_DIR /var/lock/apache2
    ENV APACHE_PID_FILE /var/run/apache2/apache2.pid
    ENV APACHE_SERVERADMIN admin@localhost
    ENV APACHE_SERVERNAME localhost

    EXPOSE 80
    [/sourcecode]

    Now we’ve got the three containers but we want to use all together. We’ll use a docker-compose.yml file. The web container will expose port 80 and node container 8080. Node container also opens 6400 but this port is an internal port. We don’t need to access to this port outside. Only Python container needs to access to this port. Because of that 6400 is not mapped to any port in docker-compose

    [sourcecode language=”text”]
    version: ‘2’

    services:
    web:
    image: gonzalo123/example_web
    container_name: example_web
    ports:
    – "80:80"
    restart: always
    depends_on:
    – node
    build:
    context: ./images/php
    dockerfile: Dockerfile
    entrypoint:
    – /usr/sbin/apache2
    – -D
    – FOREGROUND
    volumes:
    – ./src:/mnt/src

    node:
    image: gonzalo123/example_node
    container_name: example_node
    ports:
    – "8080:8080"
    restart: always
    build:
    context: ./images/node
    dockerfile: Dockerfile
    entrypoint:
    – npm
    – start
    volumes:
    – ./src:/mnt/src

    python:
    image: gonzalo123/example_python
    container_name: example_python
    restart: always
    depends_on:
    – node
    build:
    context: ./images/python
    dockerfile: Dockerfile
    entrypoint:
    – python
    – tic.py
    volumes:
    – ./src:/mnt/src
    [/sourcecode]

    And that’s all. We only need to start our containers
    [sourcecode language=”bash”]
    docker-compose up –build -d
    [/sourcecode]

    and open our browser at: http://localhost to see our Time clock

    Full source code available within my github account

    Playing with arduino, IoT, crossbar and websockets

    Yes. Finally I’ve got an arduino board. It’s time to hack a little bit. Today I want to try different things. I want to display in a webpage one value from my arduino board. For example one analog data using a potentiometer. Let’s start.

    We are going to use one potentiometer. A potentiometer is a resistor with a rotating contact that forms an adjustable voltage divider. It has three pins. If we connect one pin to 5V power source of our arduino, another one to the ground and another to one A0 (analog input 0), we can read different values depending on the position of potentiometer’s rotating contact.

    arduino_analog

    Arduino has 10 bit analog resolution. That means 1024 possible values, from 0 to 1023. So when our potentiometer gives us 5 volts we’ll obtain 1024 and when our it gives us 0V we’ll read 0. Here we can see a simple arduino program to read this analog input and send data via serial port:
    [sourcecode language=”c”]
    int mem;

    void setup() {
    Serial.begin(9600);
    }

    void loop() {
    int value = analogRead(A0);
    if (value != mem) {
    Serial.println(value);
    }
    mem = value;

    delay(100);
    }
    [/sourcecode]

    This program is simple loop with a delay of 100 milliseconds that reads A0 and if value is different than previously read (to avoid sending the same value when nobody is touching the potentiometer) we send the value via serial port (with 9600 bauds)

    We can test our program using the serial monitor of our arduino IDE our using another serial monitor.

    Now we’re going to create one script to read this serial port data. We’re going to use Python. I’ll use my laptop and my serial port is /dev/tty.usbmodem14231

    [sourcecode language=”python”]
    import serial

    arduino = serial.Serial(‘/dev/tty.usbmodem14231′, 9600)

    while 1:
    print arduino.readline().strip()
    [/sourcecode]

    Basically we’ve got our backend running. Now we can create a simple frontend.

    [sourcecode language=”html”]

    <div id=’display’></div>

    [/sourcecode]

    We’ll need websockets. I normally use socket.io but today I’ll use Crossbar.io. Since I hear about it in a Ronny’s talk at deSymfony conference I wanted to use it.

    I’ll change a little bit our backend to emit one event

    [sourcecode language=”python”]
    import serial
    from os import environ
    from twisted.internet.defer import inlineCallbacks
    from twisted.internet.task import LoopingCall
    from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner

    arduino = serial.Serial(‘/dev/tty.usbmodem14231′, 9600)

    class SeriaReader(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
    def publish():
    return self.publish(u’iot.serial.reader’, arduino.readline().strip())

    yield LoopingCall(publish).start(0.1)

    if __name__ == ‘__main__’:
    runner = ApplicationRunner(environ.get("GONZALO_ROUTER", u"ws://127.0.0.1:8080/ws"), u"iot")
    runner.run(SeriaReader)
    [/sourcecode]

    Now I only need to create a crossbar.io server. I will use node to do it
    [sourcecode language=”js”]
    var autobahn = require(‘autobahn’),
    connection = new autobahn.Connection({
    url: ‘ws://0.0.0.0:8080/ws’,
    realm: ‘iot’
    }
    );

    connection.open();
    [/sourcecode]

    And now we only need to connect our frontend to the websocket server

    [sourcecode language=”js”]
    $(function () {
    var connection = new autobahn.Connection({
    url: "ws://192.168.1.104:8080/ws",
    realm: "iot"
    });

    connection.onopen = function (session) {
    session.subscribe(‘iot.serial.reader’, function (args) {
    $(‘#display’).html(args[0]);
    });
    };

    connection.open();
    });
    [/sourcecode]

    It works but thre’s a problem. The first time we connect with our browser we won’t see the display value until we change the position of the potentiometer. That’s because ‘iot.serial.reader’ event is only emitted when potentiometer changes. No change means no new value. To solve this problem we only need to change a little bit our crossbar.io server. We’ll “memorize” the last value and we’ll expose one method ‘iot.serial.get’ to ask about this value

    [sourcecode language=”js”]
    var autobahn = require(‘autobahn’),
    connection = new autobahn.Connection({
    url: ‘ws://0.0.0.0:8080/ws’,
    realm: ‘iot’
    }
    ),
    mem;

    connection.onopen = function (session) {
    session.register(‘iot.serial.get’, function () {
    return mem;
    });

    session.subscribe(‘iot.serial.reader’, function (args) {
    mem = args[0];
    });
    };

    connection.open();
    [/sourcecode]

    An now in the frontend we ask for ‘iot.serial.get’ when we connect to the socket

    [sourcecode language=”js”]
    $(function () {
    var connection = new autobahn.Connection({
    url: "ws://192.168.1.104:8080/ws",
    realm: "iot"
    });

    connection.onopen = function (session) {
    session.subscribe(‘iot.serial.reader’, function (args) {
    $(‘#display’).html(args[0]);
    }).then(function () {
    session.call(‘iot.serial.get’).then(
    function (result) {
    $(‘#display’).htmlresult);
    }
    );
    }
    );
    };
    connection.open();
    });
    [/sourcecode]

    And thats all. The source code is available in my github account. You also can see a demo of the working prototype here

    Encrypt Websocket (socket.io) communications

    I’m a big fan of WebSockets and socket.io. I’ve written a lot of about it. In last posts I’ve written about socket.io and authentication. Today we’re going to speak about communications.

    Imagine we’ve got a websocket server and we connect our application to this server (even using https/wss). If we open our browser’s console we can inspect our WebSocket communications. We also can enable debugging. This works in a similar way than when we start the promiscuous mode within our network interface. We will see every packets. Not only the packets that server is sending to us.

    If we send send sensitive information over websockets, that means than one logged user can see another ones information. We can separate namespaces in our socket.io server. We also can do another thing: Encrypt communications using crypto-js.

    I’ve created one small wrapper to use it with socket.io.
    We can install our server dependency

    [sourcecode language=”bash”]
    npm g-crypt
    [/sourcecode]

    And install our client dependency with bower

    [sourcecode language=”bash”]
    bower install g-crypt
    [/sourcecode]

    And use it in our server

    [sourcecode language=”js”]
    var io = require(‘socket.io’)(3000),
    Crypt = require("g-crypt"),
    passphrase = ‘super-secret-passphrase’,
    crypter = Crypt(passphrase);

    io.on(‘connection’, function (socket) {
    socket.on(‘counter’, function (data) {
    var decriptedData = crypter.decrypt(data);
    setTimeout(function () {
    console.log("counter status: " + decriptedData.id);
    decriptedData.id++;
    socket.emit(‘counter’, crypter.encrypt(decriptedData));
    }, 1000);
    });
    });
    [/sourcecode]

    And now a simple HTTP application

    [sourcecode language=”html”]
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <title>Title</title>
    </head>
    <body>
    Open console to see the messages

    <script src="http://localhost:3000/socket.io/socket.io.js"></script&gt;
    <script src="assets/cryptojslib/rollups/aes.js"></script>
    <script src="assets/g-crypt/src/Crypt.js"></script>
    <script>
    var socket = io(‘http://localhost:3000/&#8217;),
    passphrase = ‘super-secret-passphrase’,
    crypter = Crypt(passphrase),
    id = 0;

    socket.on(‘connect’, function () {
    console.log("connected! Let’s start the counter with: " + id);
    socket.emit(‘counter’, crypter.encrypt({id: id}));
    });

    socket.on(‘counter’, function (data) {
    var decriptedData = crypter.decrypt(data);
    console.log("counter status: " + decriptedData.id);
    socket.emit(‘counter’, crypter.encrypt({id: decriptedData.id}));
    });
    </script>

    </body>
    </html>
    [/sourcecode]

    Now our communications are encrypted and logged user cannot read another ones data.

    Library is a simple wrapper

    [sourcecode language=”js”]
    Crypt = function (passphrase) {
    "use strict";
    var pass = passphrase;
    var CryptoJSAesJson = {
    parse: function (jsonStr) {
    var j = JSON.parse(jsonStr);
    var cipherParams = CryptoJS.lib.CipherParams.create({ciphertext: CryptoJS.enc.Base64.parse(j.ct)});
    if (j.iv) cipherParams.iv = CryptoJS.enc.Hex.parse(j.iv);
    if (j.s) cipherParams.salt = CryptoJS.enc.Hex.parse(j.s);
    return cipherParams;
    },
    stringify: function (cipherParams) {
    var j = {ct: cipherParams.ciphertext.toString(CryptoJS.enc.Base64)};
    if (cipherParams.iv) j.iv = cipherParams.iv.toString();
    if (cipherParams.salt) j.s = cipherParams.salt.toString();
    return JSON.stringify(j);
    }
    };

    return {
    decrypt: function (data) {
    return JSON.parse(CryptoJS.AES.decrypt(data, pass, {format: CryptoJSAesJson}).toString(CryptoJS.enc.Utf8));
    },
    encrypt: function (data) {
    return CryptoJS.AES.encrypt(JSON.stringify(data), pass, {format: CryptoJSAesJson}).toString();
    }
    };
    };

    if (typeof module !== ‘undefined’ && typeof module.exports !== ‘undefined’) {
    CryptoJS = require("crypto-js");
    module.exports = Crypt;
    } else {
    window.Crypt = Crypt;
    }
    [/sourcecode]

    Library available in my github and also we can use it using npm and bower.

    Sharing authentication between socket.io and a PHP frontend (using JSON Web Tokens)

    I’ve written a previous post about Sharing authentication between socket.io and a PHP frontend but after publish the post a colleague (hi @mariotux) told me that I can use JSON Web Tokens (jwt) to do this. I had never used jwt before so I decided to study a little bit.

    JWT are pretty straightforward. You only need to create the token and send it to the client. You don’t need to store this token within a database. Client can decode and validate it on its own. You also can use any programming language to encode and decode tokens (jwt is available in the most common ones)

    We’re going to create the same example than the previous post. Today, with jwt, we don’t need to pass the PHP session and perform a http request to validate it. We’ll only pass the token. Our nodejs server will validate by its own.

    [sourcecode language=”js”]
    var io = require(‘socket.io’)(3000),
    jwt = require(‘jsonwebtoken’),
    secret = "my_super_secret_key";

    // middleware to perform authorization
    io.use(function (socket, next) {
    var token = socket.handshake.query.token,
    decodedToken;
    try {
    decodedToken = jwt.verify(token, secret);
    console.log("token valid for user", decodedToken.user);
    socket.connectedUser = decodedToken.user;
    next();
    } catch (err) {
    console.log(err);
    next(new Error("not valid token"));
    //socket.disconnect();
    }
    });

    io.on(‘connection’, function (socket) {
    console.log(‘Connected! User: ‘, socket.connectedUser);
    });
    [/sourcecode]

    That’s the client:

    [sourcecode language=”html”]
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <title>Title</title>
    </head>
    <body>
    Welcome {{ user }}!

    <script src="http://localhost:3000/socket.io/socket.io.js"></script&gt;
    <script src="/assets/jquery/dist/jquery.js"></script>

    <script>
    var socket;
    $(function () {
    $.getJSON("/getIoConnectionToken", function (jwt) {
    socket = io(‘http://localhost:3000&#8217;, {
    query: ‘token=’ + jwt
    });

    socket.on(‘connect’, function () {
    console.log("connected!");
    });

    socket.on(‘error’, function (err) {
    console.log(err);
    });
    });
    });
    </script>

    </body>
    </html>
    [/sourcecode]

    And here the backend. A simple Silex server very similar than the previous post one. JWT has also several reserved claims. For example “exp” to set up an expiration timestamp. It’s very useful. We only set one value and validator will reject tokens with incorrect timestamp. In this example I’m not using expiration date. That’s means that my token will never expires. And never means never. In my first prototype I set up an small expiration date (10 seconds). That means my token is only available during 10 seconds. Sounds great. My backend generate tokens that are going to be used immediately. That’s the normal situation but, what happens if I restart the socket.io server? The client will try to reconnect again using the token but it’s expired. We’ll need to create a new jwt before reconnecting. Because of that I’ve removed expiration date in this example but remember: Without expiration date your generated tokens will be always valid (al always is a very big period of time)

    [sourcecode language=”php”]
    <?php
    include __DIR__ . "/../vendor/autoload.php";

    use Firebase\JWT\JWT;
    use Silex\Application;
    use Silex\Provider\SessionServiceProvider;
    use Silex\Provider\TwigServiceProvider;
    use Symfony\Component\HttpFoundation\Response;
    use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;

    $app = new Application([
    ‘secret’ => "my_super_secret_key",
    ‘debug’ => true
    ]);
    $app->register(new SessionServiceProvider());
    $app->register(new TwigServiceProvider(), [
    ‘twig.path’ => __DIR__ . ‘/../views’,
    ]);

    $app->get(‘/’, function (Application $app) {
    return $app[‘twig’]->render(‘home.twig’);
    });
    $app->get(‘/login’, function (Application $app) {
    $username = $app[‘request’]->server->get(‘PHP_AUTH_USER’, false);
    $password = $app[‘request’]->server->get(‘PHP_AUTH_PW’);
    if (‘gonzalo’ === $username && ‘password’ === $password) {
    $app[‘session’]->set(‘user’, [‘username’ => $username]);

    return $app->redirect(‘/private’);
    }
    $response = new Response();
    $response->headers->set(‘WWW-Authenticate’, sprintf(‘Basic realm="%s"’, ‘site_login’));
    $response->setStatusCode(401, ‘Please sign in.’);

    return $response;
    });

    $app->get(‘/getIoConnectionToken’, function (Application $app) {
    $user = $app[‘session’]->get(‘user’);
    if (null === $user) {
    throw new AccessDeniedHttpException(‘Access Denied’);
    }

    $jwt = JWT::encode([
    // I can use "exp" reserved claim. It’s cool. My connection token is only available
    // during a period of time. The problem is if I restart the io server. Client will
    // try to re-connect using this token and it’s expired.
    //"exp" => (new \DateTimeImmutable())->modify(‘+10 second’)->getTimestamp(),
    "user" => $user
    ], $app[‘secret’]);

    return $app->json($jwt);
    });

    $app->get(‘/private’, function (Application $app) {
    $user = $app[‘session’]->get(‘user’);

    if (null === $user) {
    throw new AccessDeniedHttpException(‘Access Denied’);
    }

    $userName = $user[‘username’];

    return $app[‘twig’]->render(‘private.twig’, [
    ‘user’ => $userName
    ]);
    });
    $app->run();
    [/sourcecode]

    Full project in my github.