Transforming TCP sockets to HTTP with Go

Sometimes we need to work with legacy applications. Legacy application that are hard to rewrite and hard to change. Imagine, for example, this application is sending raw TCP sockets to communicate with another process. Raw TCP sockets are fast but they have various problems, for example all data is sent in plain text over the network and without authentication (if we don’t implement one protocol).

One solution is use https connections instead. We can also authenticate those requests with an Authentication Bearer. For example I’ve created one simple http server with Python and Flask:

import logging
import os
from functools import wraps

from flask import Flask, request, abort
from flask import jsonify

logging.basicConfig(level=logging.DEBUG)

logger = logging.getLogger(__name__)
app = Flask(__name__)


def authorize_bearer(bearer):
    def authorize(f):
        @wraps(f)
        def decorated_function(*args, **kws):
            if 'Authorization' not in request.headers:
                abort(401)

            data = request.headers['Authorization']

            if str.replace(str(data), 'Bearer ', '') != bearer:
                abort(401)

            return f(*args, **kws)

        return decorated_function

    return authorize


@app.route('/register', methods=['POST'])
@authorize_bearer(bearer=os.getenv('TOKEN'))
def hello_world():
    req_data = request.get_json()
    logger.info(req_data)
    return jsonify({"status": "OK", "request_data": req_data})

Now we only need to change our legacy application to use one http client instead raw TCP sockets. But sometimes it’s not possible. Imagine, for example, if this application runs on a old OS without https support or we cannot find and compile an http client in the legacy application.

One possible solution is isolate the application and change only the destination of the TCP socket. Instead the original ip address whe can use localhost and we can create a proxy at localhost that listen to TCP sockets and send the information to the HTTP server.

We’re going to build this proxy in Go. We can do it with any language (Python, C#, Javascript, …). My Kung Fu in Go is not so good (I’m more comfortable with Python) but it’s not so difficult and we can build a binary with our proxy for Windows, Linux and Mac without any problem. Then we only need to copy the binary into the target host and it works (no installation, no SDK, nothing. Just copy and run)

package main

import (
	"bufio"
	"encoding/json"
	"flag"
	"log"
	"net"
	"net/http"
	"os"
	"strings"
)

func main() {
	port, closeConnection, url := parseFlags()
	openSocket(*port, *closeConnection, *url, onMessage)
}

func onMessage(url string, buffer string) {
	bearer := os.Getenv("TOKEN")
	client := &http.Client{}
	req, _ := http.NewRequest("POST", url, strings.NewReader(buffer))
	req.Header.Add("Authorization", "Bearer "+bearer)
	req.Header.Add("content-type", "application/json")
	resp, err := client.Do(req)

	if err != nil {
		log.Println(err)
	} else {
		if resp.Status == "200" {
			var result map[string]interface{}
			json.NewDecoder(resp.Body).Decode(&result)
			log.Println(result["status"])
		} else {
			log.Println("Response status: " + resp.Status)
		}
		defer resp.Body.Close()
	}
}

func parseFlags() (*string, *bool, *string) {
	port := flag.String("port", "7777", "port number")
	closeConnection := flag.Bool("close", true, "Close connection")
	url := flag.String("url", "http://localhost:5000/register", "Destination endpoint")
	flag.Parse()
	return port, closeConnection, url
}

func openSocket(port string, closeConnection bool, url string, onMessage func(url string, buffer string)) {
	PORT := "localhost:" + port
	l, err := net.Listen("tcp4", PORT)
	log.Printf("Serving %s\n", l.Addr().String())
	if err != nil {
		log.Fatalln(err)
	}
	defer l.Close()

	for {
		c, err := l.Accept()
		if err != nil {
			log.Fatalln(err)
		}
		go handleConnection(c, closeConnection, url, onMessage)
	}
}

func handleConnection(c net.Conn, closeConnection bool, url string, onMessage func(url string, buffer string)) {
	log.Printf("Accepted connection from %s\n", c.RemoteAddr().String())
	for {
		ip, port, err := net.SplitHostPort(c.RemoteAddr().String())
		netData, err := bufio.NewReader(c).ReadString('\n')
		if err != nil {
			log.Println(err)
		}

		message := map[string]interface{}{
			"body":   strings.TrimSpace(netData),
			"ipFrom": ip,
			"port":   port,
		}

		log.Printf("Making request with %s\n", message)
		bytesRepresentation, err := json.Marshal(message)
		if err != nil {
			log.Println(err)
		} else {
			//buffer := bytes.NewBuffer(bytesRepresentation)
			onMessage(url, string(bytesRepresentation))
		}

		if closeConnection {
			c.Close()
			return
		}
	}
	c.Close()

And that’s all. We can upgrade our legacy application without almost changing the code.

Source code available in my github

Database utils for psycopg2

I normally need to perform raw queries when I’m working with Python. Even when I’m using Django and it’s ORM I need to execute sql against the database. As I use (almost always) PostgreSQL and I use (as we all do) psycopg2. Psycopg2 is very complete and easy to use but sometimes I need a few helpers to make my usual tasks easier. I don’t want to create a complex library on top of psycopg2, only a helper.

Something that I miss from PHP and DBAL library is the way that DBAL helps me to create simple CRUD operations. The idea of this helper functions is to do something similar. Let’s start

I’ve created procedural functions to do all and also a Db class. Normally all functions needs the cursor parameter.

def fetch_all(cursor, sql, params=None):
    cursor.execute(
        query=sql,
        vars={} if params is None else params)

    return cursor.fetchall()

The Db class accepts cursor in the constructor

db = Db(cursor=cursor)
data = db.fetch_all("SELECT * FROM table")

Connection

Nothing especial in the connection. I like to use "connection_factory=NamedTupleConnection" to allow me to access to the recordset as an Object. I’ve created simple factory helper to create connections:

def get_conn(dsn, named_tuple=False, autocommit=False):
    conn = psycopg2.connect(
        dsn=dsn,
        connection_factory=NamedTupleConnection if named_tuple else None,
    )
    conn.autocommit = autocommit

    return conn

Sometimes I use NamedTuple in the cursor instead connection, so I’ve created a simple helper

def get_cursor(conn, named_tuple=True):
    return conn.cursor(cursor_factory=NamedTupleCursor if named_tuple else None)

Fetch all

db = Db(cursor)
for reg in db.fetch_all(sql="SELECT email, name from users"):
    assert 'user1' == reg.name
    assert 'user1@email.com' == reg.email

Fetch one

data == db.fetch_one(sql=SQL)

Select

data = db.select(
        table='users',
        where={'email': 'user1@email.com'}

This helper only allows me to perform simple where statements (joined with AND). If I need one complex Where, then I use fetch

Insert

Insert one row

db.insert(
        table='users',
        values={'email': 'user1@email.com', 'name': 'user1'})

Or insert multiple rows (using spycopg2’s executemany)

db.insert(
        table='users',
        values=[
            {'email': 'user2@email.com', 'name': 'user2'},
            {'email': 'user3@email.com', 'name': 'user3'}
        ])

We also can use insert_batch to insert all rows within one command

db.insert_batch(
        table='users',
        values=[
            {'email': 'user2@email.com', 'name': 'user2'},
            {'email': 'user3@email.com', 'name': 'user3'}
        ])

Update

db.update(
        table='users',
        data={'name': 'xxxx'},
        identifier={'email': 'user1@email.com'},
    )

Delete

db.delete(table='users', where={'email': 'user1@email.com'})

Upsert

Sometimes we need to insert one row or update the row if the primary key already exists. We can do that with two statements: One select and if there isn’t any result one insert. Else on update. We can do that with one sql statement. I’ve created a helper to to that:

def _get_upsert_sql(data, identifier, table):
    raw_sql = """
        WITH
            upsert AS (
                UPDATE {tbl}
                SET {t_set}
                WHERE {t_where}
                RETURNING {tbl}.*),
            inserted AS (
                INSERT INTO {tbl} ({t_fields})
                SELECT {t_select_fields}
                WHERE NOT EXISTS (SELECT 1 FROM upsert)
                RETURNING *)
        SELECT * FROM upsert
        UNION ALL
        SELECT * FROM inserted
    """
    merger_data = {**data, **identifier}
    sql = psycopg2_sql.SQL(raw_sql).format(
        tbl=psycopg2_sql.Identifier(table),
        t_set=psycopg2_sql.SQL(', ').join(_get_conditions(data)),
        t_where=psycopg2_sql.SQL(' and ').join(_get_conditions(identifier)),
        t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, merger_data.keys())),
        t_select_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, merger_data.keys())))

    return merger_data, sql


def upsert(cursor, table, data, identifier):
    merger_data, sql = _get_upsert_sql(data, identifier, table)

    cursor.execute(sql, merger_data)
    return cursor.rowcount

Now we can execute a upsert in a simple way:

db.upsert(
        table='users',
        data={'name': 'yyyy'},
        identifier={'email': 'user1@email.com'})

Stored procedures

data = db.sp_fetch_one(
    function='hello',
    params={'name': 'Gonzalo'})
data = db.sp_fetch_all(
    function='hello',
    params={'name': 'Gonzalo'})

Transactions

In PHP and DBAL to create one transaction we can use this syntax

<?php
$conn->transactional(function($conn) {
    // do stuff
});

In Python we can do the same with context management, but I prefer to use this syntax:

with transactional(conn) as db:
    assert 1 == db.insert(
        table='users',
        values={'email': 'user1@email.com', 'name': 'user1'})

The transactional function is like that (I’ve created two functions. One with raw cursor and another one with my Db class):

def _get_transactional(conn, named_tuple, callback):
    try:
        with conn as connection:
            with get_cursor(conn=connection, named_tuple=named_tuple) as cursor:
                yield callback(cursor)
            conn.commit()
    except Exception as e:
        conn.rollback()
        raise e

@contextmanager
def transactional(conn, named_tuple=False):
    return _get_transactional(conn, named_tuple, lambda cursor: Db(cursor))


@contextmanager
def transactional_cursor(conn, named_tuple=False):
    return _get_transactional(conn, named_tuple, lambda cursor: cursor)

Return values

Select and fetch helpers returns the recordset, but insert, update, delete and upsert returns the number of affected rows. For example that’s the insert function:

def _get_insert_sql(table, values):
    keys = values[0].keys() if type(values) is list else values.keys()

    raw_sql = "insert into {tbl} ({t_fields}) values ({t_values})"
    return psycopg2_sql.SQL(raw_sql).format(
        tbl=psycopg2_sql.Identifier(table),
        t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, keys)),
        t_values=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, keys)))


def insert(cursor, table, values):
    sql = _get_insert_sql(table=table, values=values)
    cursor.executemany(query=sql, vars=values) if type(values) is list else cursor.execute(sql, values)

    return cursor.rowcount

Unit tests

You can try the library running the unit tests. I’ve also provide one docker-compose.yml file to set up a PostgreSQL database to run the tests.

version: '3.6'

services:
  pg:
    build:
      context: .docker/pg
      dockerfile: Dockerfile
    ports:
      - 5432:5432
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

Install

You can install the library using pip

pip install dbutils-gonzalo123

Full code in my github

Playing with RabbitMQ, Python and Docker. From Exchanges to Queues

Lately in all my projects message queues appear in one way or another. Normally I work with AWS and I use SQS and SNS, but I also use quite often, RabbitMQ and MQTT. In AWS there’s something that I use a lot to isolate services. The process that emits messages emits message to SNS and I bind SNS to SQS. With this technique I can attach n SQS to the the same SNS. I’ve used it here. In AWS is pretty straightforward to do that. Today We’re going to do the same with RabbitMQ. In fact it’s very easy to do it in RabbitMQ. We only need to follow the tutorial in official RabbitMQ documentation.

The script listen to a exchange and resend the message to a queue topic. Basically the same that we can see within RabbitMQ documentation.

import settings
from lib.logger import logger
from lib.rabbit import get_channel

channel = get_channel()
channel.exchange_declare(
    exchange=settings.EXCHANGE,
    exchange_type='fanout')

result = channel.queue_declare(
    queue='',
    exclusive=True)

queue_name = result.method.queue

channel.queue_bind(
    exchange=settings.EXCHANGE,
    queue=queue_name)

logger.info(f' [*] Waiting for {settings.EXCHANGE}. To exit press CTRL+C')

channel.queue_declare(
    durable=settings.QUEUE_DURABLE,
    auto_delete=settings.QUEUE_AUTO_DELETE,
    queue=settings.QUEUE_TOPIC
)


def callback(ch, method, properties, body):
    channel.basic_publish(
        exchange='',
        routing_key=settings.QUEUE_TOPIC,
        body=body)
    logger.info(f'Message sent to topic {settings.QUEUE_TOPIC}. Message: {body}')


channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True)

channel.start_consuming()

We send one message to the exchange using rabbitmqadmin and see the message in the queue:

rabbitmqadmin -u username -p password publish exchange=exchange routing_key= payload="hello, world"

My idea with this project is to deploy it into a docker swarm cluster and add/remove listener only adding new services to the stack:

...
  exchange2topic1:
    image: ${ECR}/exchange2queue:${VERSION}
      build:
        context: .
        dockerfile: Dockerfile
      deploy:
        restart_policy:
          condition: on-failure
      depends_on:
        - rabbit
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic1
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0
  ...

With this approach it’s very simple form me add and remove new listeners without touching the emitter

Also, as plus, I like to add a simple http api to allow me to send messages to the exchange with a post request, instead of using a RabbitMQ client. That’s because sometimes I work with legacy systems where using a AMQP client isn’t simple. That’s a simple Flask API

from flask import Flask, request
from flask import jsonify

import settings
from lib.auth import authorize_bearer
from lib.logger import logger
from lib.rabbit import get_channel
import json

app = Flask(__name__)


@app.route('/health')
def health():
    return jsonify({"status": "ok"})


@app.route('/publish/<path:exchange>', methods=['POST'])
@authorize_bearer(bearer=settings.API_TOKEN)
def publish(exchange):
    channel = get_channel()
    try:
        message = request.get_json()
        channel.basic_publish(
            exchange=exchange,
            routing_key='',
            body=json.dumps(message)
        )
        logger.info(f"message sent to exchange {exchange}")
        return jsonify({"status": "OK", "exchange": exchange})
    except:
        return jsonify({"status": "NOK", "exchange": exchange})

Now we can emit messages to the exchange using curl, postman or any other http client.

POST http://localhost:5000/publish/exchange
Content-Type: application/json
Authorization: Bearer super_secret_key

{
  "hello": "Gonzalo"
}

Also, in the docker stack I want to use a reverse proxy to server my Flask application (served with gunicorn) and the RabbitMQ management console. I’m using nginx to do that:

upstream api {
    server api:5000;
}

server {
    listen 8000 default_server;
    listen [::]:8000;

    client_max_body_size 20M;

    location / {
        try_files $uri @proxy_to_api;
    }

    location ~* /rabbitmq/api/(.*?)/(.*) {
        proxy_pass http://rabbit:15672/api/$1/%2F/$2?$query_string;
        proxy_buffering                    off;
        proxy_set_header Host              $http_host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location ~* /rabbitmq/(.*) {
        rewrite ^/rabbitmq/(.*)$ /$1 break;
        proxy_pass http://rabbit:15672;
        proxy_buffering                    off;
        proxy_set_header Host              $http_host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location @proxy_to_api {
        proxy_set_header X-Forwarded-Proto https;
        proxy_set_header X-Url-Scheme $scheme;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_redirect off;

        proxy_pass http://api;
    }
}

And that’s all. Here the docker-compose.yml

version: '3.6'

x-base: &base
  image: ${ECR}/exchange2queue:${VERSION}
  build:
    context: .
    dockerfile: Dockerfile
  deploy:
    restart_policy:
      condition: on-failure
  depends_on:
    - rabbit

services:
  rabbit:
    image: rabbitmq:3-management
    deploy:
      restart_policy:
        condition: on-failure
    ports:
      - 5672:5672
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}

  nginx:
    image: ${ECR}/exchange2queue_nginx:${VERSION}
    deploy:
      restart_policy:
        condition: on-failure
    build:
      context: .docker/nginx
      dockerfile: Dockerfile
    ports:
      - 8080:8000
    depends_on:
      - rabbit
      - api

  api:
    <<: *base
    container_name: front
    command: /bin/sh wait-for-it.sh rabbit:5672 -- gunicorn -w 4 api:app -b 0.0.0.0:5000
    deploy:
      restart_policy:
        condition: on-failure
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      API_TOKEN: ${API_TOKEN}

  exchange2topic1:
    <<: *base
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic1
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0

  exchange2topic2:
    <<: *base
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic2
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0

Full code in my github

Running Python/Django docker containers with non-root user

Running linux processes as root is not a good idea. One problem or exploit with the process can give to the attacker a root shell. When we run one docker container, especially if this container is in production it shouldn’t be run as root.

To do that we only need to generate a Dockerfile to properly run our Python/Django application as non-root. That’s my boilerplate:

FROM python:3.8 AS base

ENV APP_HOME=/src
ENV APP_USER=appuser

RUN groupadd -r $APP_USER && \
    useradd -r -g $APP_USER -d $APP_HOME -s /sbin/nologin -c "Docker image user" $APP_USER

WORKDIR $APP_HOME

ENV TZ 'Europe/Madrid'
RUN echo $TZ > /etc/timezone && apt-get update && \
    apt-get install -y tzdata && \
    rm /etc/localtime && \
    ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
    dpkg-reconfigure -f noninteractive tzdata && \
    apt-get clean

RUN pip install --upgrade pip

FROM base

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

COPY requirements.txt .
RUN pip install -r requirements.txt

ADD src .

RUN chown -R $APP_USER:$APP_USER $APP_HOME
USER $APP_USER

Also, in a Django application, we normally use a nginx as a reverse proxy. Nginx normally runs as root at it launches its process as non-root, but we also can run nginx as non-root.

FROM nginx:1.17.4

RUN rm /etc/nginx/conf.d/default.conf
RUN rm /etc/nginx/nginx.conf
COPY nginx.conf /etc/nginx/conf.d
COPY etc/nginx.conf /etc/nginx/nginx.conf

RUN chown -R nginx:nginx /var/cache/nginx && \
    chown -R nginx:nginx /var/log/nginx && \
    chown -R nginx:nginx /etc/nginx/conf.d && \
    chmod -R 766 /var/log/nginx/

RUN touch /var/run/nginx.pid && \
    chown -R nginx:nginx /var/run/nginx.pid && \
    chown -R nginx:nginx /var/cache/nginx

USER nginx

And that’s all. Source code of a boilerplate application in my github

Playing with Python threads. Part 2

Last year I’ve written one post about Python and threads. You can read it here. Today I want to keep on playing with Python and threads. The idea is create one simple script that prints one asterisk in the console each second. Simple, isn’t it?

from time import sleep
while True:
    print("*")
    sleep(1)

I want to keep this script running but I want to send one message externally, for example using RabbitMQ, and do something with the running script. In this demo, for example, stop the script.

In javascript we can do it with a single thread process with setInterval function but, since the rabbit listener with pika is a blocking action, we need to use threads in Python (please tell me if I’m wrong). The idea is to create a circuit breaker condition in the main loop to check if I need to stop or not the main thread.

First I’ve created my Rabbit listener in a thread:

from queue import Queue, Empty
import threading
import pika
import os


class Listener(threading.Thread):
    def __init__(self, queue=Queue()):
        super(Listener, self).__init__()
        self.queue = queue
        self.daemon = True

    def run(self):
        channel = self._get_channel()
        channel.queue_declare(queue='stop')

        channel.basic_consume(
            queue='stop',
            on_message_callback=lambda ch, method, properties, body: self.queue.put(item=True),
            auto_ack=True)

        channel.start_consuming()

    def stop(self):
        try:
            return True if self.queue.get(timeout=0.05) is True else False
        except Empty:
            return False

    def _get_channel(self):
        credentials = pika.PlainCredentials(
            username=os.getenv('RABBITMQ_USER'),
            password=os.getenv('RABBITMQ_PASS'))

        parameters = pika.ConnectionParameters(
            host=os.getenv('RABBITMQ_HOST'),
            credentials=credentials)

        connection = pika.BlockingConnection(parameters=parameters)

        return connection.channel()

Now in the main process I start the Listener and I enter into one endless loop to print my asterisk each second but at the end of each loop I check if I need to stop the process or not.

from Listener import Listener
from dotenv import load_dotenv
import logging
from time import sleep
import os

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

l = Listener()
l.start()


def main():
    while True:
        logging.info("*")
        sleep(1)
        if l.stop():
            break


if __name__ == '__main__':
    main()

As we can see in the stop function we’re using the queue.Queue package to communicate with our listener loop.

And that’s all. In the example I also provide a minimal RabbitMQ server in a docker container.

version: '3.4'

services:
  rabbit:
    image: rabbitmq:3-management
    restart: always
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
    ports:
      - "15672:15672"
      - "5672:5672"

And that’s all. Full source code available here

Django logs to ELK using Filebeat

I’ve written a post about how to send Django logs to ELK stack. You can read it here. In that post I’ve used logstash client with a sidecar docker container. Logstash client works but it needs too much resources. Nowadays it’s better to use Filebeat as data shipper instead of Logstash client. Filebeat it’s also a part of ELK stack. It’s a golang binary much lightweight than logstash client.

The idea is almost the same than the other post. Here we’ll also build a sidecar container with our django application logs mounted.

version: '3'
services:
  # Application
  api:
    image: elk:latest
    command: /bin/bash ./docker-entrypoint-wsgi.sh
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      DEBUG: 'True'
    volumes:
      - logs_volume:/src/logs
      - static_volume:/src/staticfiles
  nginx:
    image: elk-nginx:latest
    build:
      context: .docker/nginx
      dockerfile: Dockerfile
    volumes:
      - static_volume:/src/staticfiles
    ports:
      - 8000:8000
    depends_on:
      - api
  filebeat:
    image: filebeat:latest
    build:
      context: .docker/filebeat
      dockerfile: Dockerfile
    volumes:
      - logs_volume:/app/logs
    command: filebeat -c /etc/filebeat/filebeat.yml -e -d "*" -strict.perms=false
    depends_on:
      - api

  ...

With filebeat we can perform actions to prepare our logs to be ready to be stored within elasticsearch. But, at least here, it’s much more easy to prepare the logs in the django application:

class CustomisedJSONFormatter(json_log_formatter.JSONFormatter):
    def json_record(self, message: str, extra: dict, record: logging.LogRecord):
        context = extra
        django = {
            'app': settings.APP_ID,
            'name': record.name,
            'filename': record.filename,
            'funcName': record.funcName,
            'msecs': record.msecs,
        }
        if record.exc_info:
            django['exc_info'] = self.formatException(record.exc_info)

        return {
            'message': message,
            'timestamp': now(),
            'level': record.levelname,
            'context': context,
            'django': django
        }

And in settings.py we use our CustomisedJSONFormatter

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'simple': {
            'format': '[%(asctime)s] %(levelname)s|%(name)s|%(message)s',
            'datefmt': '%Y-%m-%d %H:%M:%S',
        },
        "json": {
            '()': CustomisedJSONFormatter,
        },
    },
    'handlers': {
        'applogfile': {
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': Path(BASE_DIR).resolve().joinpath('logs', 'app.log'),
            'maxBytes': 1024 * 1024 * 15,  # 15MB
            'backupCount': 10,
            'formatter': 'json',
        },
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'formatter': 'simple'
        }
    },
    'root': {
        'handlers': ['applogfile', 'console'],
        'level': 'DEBUG',
    }
}

And that’s all. Our Application logs centralized in ELK and ready to consume with Kibana

Source code available here

Don’t repeat password validator in Django

Django has a set of default password validators by default in django.contrib.auth.password_validator

AUTH_PASSWORD_VALIDATORS = [
    {
        'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
    },
    {
        'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
    },
]

Normally those validators aren’t enough (at least for me) but it’s very easy to create customs validators. There’re also several validators that we can use, for example those ones.

I normally need to avoid users to repeat passwords (for example the last ten ones). To do that we need to create a custom validator. Whit this validator we also need to create a model to store the last passwords (the hash). The idea is to persits the hash of the password each time the user changes the password. As well as is always a good practice not to use the default User model, we’re going to create a CustomUser model

class CustomUser(AbstractUser):
    def __init__(self, *args, **kwargs):
        super(CustomUser, self).__init__(*args, **kwargs)
        self.original_password = self.password

    def save(self, *args, **kwargs):
        super(CustomUser, self).save(*args, **kwargs)
        if self._password_has_been_changed():
            CustomUserPasswordHistory.remember_password(self)

    def _password_has_been_changed(self):
        return self.original_password != self.password

And now we can create our CustomUserPasswordHistory implementing the remember_password method.

class CustomUserPasswordHistory(models.Model):
    username = models.ForeignKey(CustomUser, on_delete=models.CASCADE)
    old_pass = models.CharField(max_length=128)
    pass_date = models.DateTimeField()

    @classmethod
    def remember_password(cls, user):
        cls(username=user, old_pass=user.password, pass_date=localtime()).save()

Now the validator:

class DontRepeatValidator:
    def __init__(self, history=10):
        self.history = history

    def validate(self, password, user=None):
        for last_pass in self._get_last_passwords(user):
            if check_password(password=password, encoded=last_pass):
                self._raise_validation_error()

    def get_help_text(self):
        return _("You cannot repeat passwords")

    def _raise_validation_error(self):
        raise ValidationError(
            _("This password has been used before."),
            code='password_has_been_used',
            params={'history': self.history},
        )

    def _get_last_passwords(self, user):
        all_history_user_passwords = CustomUserPasswordHistory.objects.filter(username_id=user).order_by('id')

        to_index = all_history_user_passwords.count() - self.history
        to_index = to_index if to_index > 0 else None
        if to_index:
            [u.delete() for u in all_history_user_passwords[0:to_index]]

        return [p.old_pass for p in all_history_user_passwords[to_index:]]

We can see how it works with the unit tests:

class UserCreationTestCase(TestCase):
    def setUp(self):
        self.user = User.objects.create(username='gonzalo')

    def test_persist_password_to_history(self):
        self.user.set_password('pass1')
        self.user.save()

        all_history_user_passwords = CustomUserPasswordHistory.objects.filter(username_id=self.user)
        self.assertEqual(1, all_history_user_passwords.count())


class DontRepeatValidatorTestCase(TestCase):
    def setUp(self):
        self.user = User.objects.create(username='gonzalo')
        self.validator = DontRepeatValidator()

    def test_validator_with_new_pass(self):
        self.validator.validate('pass33', self.user)
        self.assertTrue(True)

    def test_validator_with_repeated_pass(self):
        for i in range(0, 11):
            self.user.set_password(f'pass{i}')
            self.user.save()

        with self.assertRaises(ValidationError):
            self.validator.validate('pass3', self.user)

    def test_keep_only_10_passwords(self):
        for i in range(0, 11):

            self.user.set_password(f'pass{i}')
            self.user.save()

        self.validator.validate('xxxx', self.user)

        all_history_user_passwords = CustomUserPasswordHistory.objects.filter(username_id=self.user)
        self.assertEqual(10, all_history_user_passwords.count())

Full source code in my github

Monitoring Django applications with Grafana and Kibana using Prometheus and Elasticsearch

When we’ve one application we need to monitor the logs in one way or another. Not only the server’s logs (500 errors, response times and things like that). Sometimes the user complains about the application. Without logs we cannot do anything. We can save logs within files and let grep and tail do the magic. This’s assumable with a single on-premise server, but nowadays with clouds and docker this’s a nightmare. We need a central log collector to collect all the logs of the application and use this collector to create alerts, and complex searches of our application logs.

I normally work with AWS. In AWS we’ve CloudWatch. It’s pretty straightforward to connect our application logs to CloudWatch when we’re using AWS. When we aren’t using AWS we can use the ELK stack. In this example we’re going to send our Django application logs to a Elasticsearch database. Let’s start:

The idea is not to send the logs directly. The idea save the logs to log files. We can use this LOGGING configuration to do that:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        "json": {
            '()': CustomisedJSONFormatter,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
        },
        'app_log_file': {
            'level': LOG_LEVEL,
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': os.path.join(LOG_PATH, 'app.log.json'),
            'maxBytes': 1024 * 1024 * 15,  # 15MB
            'backupCount': 10,
            'formatter': 'json',
        },
    },
    'root': {
        'handlers': ['console', 'app_log_file'],
        'level': LOG_LEVEL,
    },
}

Here I’m using a custom JSON formatter:

import json_log_formatter
import logging
from django.utils.timezone import now


class CustomisedJSONFormatter(json_log_formatter.JSONFormatter):
    def json_record(self, message: str, extra: dict, record: logging.LogRecord):
        extra['name'] = record.name
        extra['filename'] = record.filename
        extra['funcName'] = record.funcName
        extra['msecs'] = record.msecs
        if record.exc_info:
            extra['exc_info'] = self.formatException(record.exc_info)

        return {
            'message': message,
            'timestamp': now(),
            'level': record.levelname,
            'context': extra
        }

With this configuration our logs are going to be something like that:

{"message": "Hello from log", "timestamp": "2020-04-26T19:35:59.427098+00:00", "level": "INFO", "app_id": "Logs", "context": {"random": 68, "name": "app.views", "filename": "views.py", "funcName": "index", "msecs": 426.8479347229004}}

Now we’re going to use logstash as data shipper to send the logs to elastic search. We need to create a pipeline:

input {
    file {
        path => "/logs/*"
        start_position => "beginning"
        codec => "json"
    }
}

output {
  elasticsearch {
        index => "app_logs"
        hosts => ["elasticsearch:9200"]
    }
}

We’re going to use Docker to build our stack, so our logstash and our django containers will share the logs volumes.

Now we need to visualize the logs. Kibana is perfect for this task. We can set up a Kibana server connected to the Elasticsearch and visualize the logs:

Also we can monitor our server performance. Prometheus is the de facto standard for doing that. In fact it’s very simple to connect our Django application to Prometheus. We only need to add django-prometheus dependency, install the application and set up two middlewares:

INSTALLED_APPS = [
   ...
   'django_prometheus',
   ...
]

MIDDLEWARE = [
    'django_prometheus.middleware.PrometheusBeforeMiddleware', # <-- this one
    'app.middleware.RequestLogMiddleware',
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'django_prometheus.middleware.PrometheusAfterMiddleware', # <-- this one
]

also we need to set up some application routes

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('p/', include('django_prometheus.urls')), # <-- prometheus routes
    path('', include('app.urls'))
]

The easiest way to visualize the data stored in prometheus is using Grafana. In Grafana we need to create a datasource with Prometheus and build our custom dashboard. We can import pre-built dashboards. For example this one: https://grafana.com/grafana/dashboards/9528

Here the docker-compose file with all the project:

version: '3'
services:
  web:
    image: web:latest
    restart: always
    command: /bin/bash ./docker-entrypoint.sh
    volumes:
      - static_volume:/src/staticfiles
      - logs_volume:/src/logs
    environment:
      DEBUG: 'False'
      LOG_LEVEL: DEBUG

  nginx:
    image: nginx:latest
    restart: always
    volumes:
      - static_volume:/src/staticfiles
    ports:
      - 80:80
    depends_on:
      - web
      - grafana

  prometheus:
    image: prometheus:latest
    restart: always
    build:
      context: .docker/prometheus
      dockerfile: Dockerfile

  grafana:
    image: grafana:latest
    restart: always
    depends_on:
      - prometheus
    environment:
      - GF_SECURITY_ADMIN_USER=${GF_SECURITY_ADMIN_USER}
      - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD}
      - GF_USERS_DEFAULT_THEME=${GF_USERS_DEFAULT_THEME}
      - GF_USERS_ALLOW_SIGN_UP=${GF_USERS_ALLOW_SIGN_UP}
      - GF_USERS_ALLOW_ORG_CREATE=${GF_USERS_ALLOW_ORG_CREATE}
      - GF_AUTH_ANONYMOUS_ENABLED=${GF_AUTH_ANONYMOUS_ENABLED}

  logstash:
    image: logstash:latest
    restart: always
    depends_on:
      - elasticsearch
    volumes:
      - logs_volume:/logs:ro

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
    restart: always
    environment:
      - discovery.type=single-node
      - http.host=0.0.0.0
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms750m -Xmx750m
    volumes:
      - elasticsearch_volume:/usr/share/elasticsearch/data

  kibana:
    image: kibana:latest
    restart: always
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch
volumes:
  elasticsearch_volume:
  static_volume:
  logs_volume:
  grafana_data:

And that’s all. Our Django application up and running fully monitored.

Source code in my github

Deploying Django application to AWS EC2 instance with Docker

In AWS we have several ways to deploy Django (and not Django applications) with Docker. We can use ECS or EKS clusters. If we don’t have one ECS or Kubernetes cluster up and running, maybe it can be complex. Today I want to show how deploy a Django application in production mode within a EC2 host. Let’s start.

I’m getting older to provision one host by hand I prefer to use docker. The idea is create one EC2 instance (one simple Amazon Linux AMI AWS-supported image). This host don’t have docker installed. We need to install it. When we launch one instance, when we’re configuring the instance, we can specify user data to configure an instance or run a configuration script during launch.

We only need to put this shell script to set up docker

#! /bin/bash
yum update -y
yum install -y docker
usermod -a -G docker ec2-user
curl -L https://github.com/docker/compose/releases/download/1.25.5/docker-compose-`uname -s`-`uname -m` | sudo tee /usr/local/bin/docker-compose > /dev/null
chmod +x /usr/local/bin/docker-compose
service docker start
chkconfig docker on

rm /etc/localtime
ln -s /usr/share/zoneinfo/Europe/Madrid /etc/localtime

ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

docker swarm init

We also need to attach one IAM role to our instance. This IAM role only need to allow us the following policies:

  • AmazonEC2ContainerRegistryReadOnly (because we’re going to use AWS ECR as container registry)
  • CloudWatchAgentServerPolicy (because we’re going to emit our logs to Cloudwatch)

Also we need to set up a security group to allow incoming SSH connections to port 22 and HTTP connections (in our example to port 8000)

When we launch our instance we need to provide a keypair to connect via ssh. I like to put this keypair in my .ssh/config

Host xxx.eu-central-1.compute.amazonaws.com
    User ec2-user
    Identityfile ~/.ssh/keypair-xxx.pem

To deploy our application we need to follow those steps:

  • Build our docker images
  • Push our images to a container registry (in this case ECR)
  • Deploy the application.

I’ve created a simple shell script called deploy.sh to perform all tasks:

#!/usr/bin/env bash

set -a
[ -f deploy.env ] && . deploy.env
set +a

echo "$(tput setaf 1)Building docker images ...$(tput sgr0)"
docker build -t ec2-web -t ec2-web:latest -t $ECR/ec2-web:latest .
docker build -t ec2-nginx -t $ECR/ec2-nginx:latest .docker/nginx

echo "$(tput setaf 1)Pusing to ECR ...$(tput sgr0)"
aws ecr get-login-password --region $REGION --profile $PROFILE |
  docker login --username AWS --password-stdin $ECR
docker push $ECR/ec2-web:latest
docker push $ECR/ec2-nginx:latest

CMD="docker stack deploy -c $DOCKER_COMPOSE_YML ec2 --with-registry-auth"
echo "$(tput setaf 1)Deploying to EC2 ($CMD)...$(tput sgr0)"
echo "$CMD"

DOCKER_HOST="ssh://$HOST" $CMD
echo "$(tput setaf 1)Building finished $(date +'%Y%m%d.%H%M%S')$(tput sgr0)"

This script assumes that there’s a deploy.env file with our personal configuration (AWS profile, the host of the EC2, instance, The ECR and things like that)

PROFILE=xxxxxxx

DOKER_COMPOSE_YML=docker-compose.yml
HOST=ec2-user@xxxx.eu-central-1.compute.amazonaws.com

ECR=9999999999.dkr.ecr.eu-central-1.amazonaws.com
REGION=eu-central-1

In this example I’m using docker swarm to deploy the application. I want to play also with secrets. This dummy application don’t have any sensitive information but I’ve created one "ec2.supersecret" variable

echo "super secret text" | docker secret create ec2.supersecret -

That’s the docker-compose.yml file:

version: '3.8'
services:
  web:
    image: 999999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-web:latest
    command: /bin/bash ./docker-entrypoint.sh
    environment:
      DEBUG: 'False'
    secrets:
      - ec2.supersecret
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: app
    volumes:
      - static_volume:/src/staticfiles
  nginx:
    image: 99999999.dkr.ecr.eu-central-1.amazonaws.com/ec2-nginx:latest
    deploy:
      replicas: 1
    logging:
      driver: awslogs
      options:
        awslogs-group: /projects/ec2
        awslogs-region: eu-central-1
        awslogs-stream: nginx
    volumes:
      - static_volume:/src/staticfiles:ro
    ports:
      - 8000:80
    depends_on:
      - web
volumes:
  static_volume:

secrets:
  ec2.supersecret:
    external: true

And that’s all. Maybe ECS or EKS are better solutions to deploy docker applications in AWS, but we also can deploy easily to one docker host in a EC2 instance that it can be ready within a couple of minutes.

Source code in my github

Django reactive users with Celery and Channels

Today I want to build a prototype. The idea is to create two Django applications. One application will be the master and the other one will the client. Both applications will have their User model but each change within master User model will be propagated through the client (or clients). Let me show you what I’ve got in my mind:

We’re going to create one signal in User model (at Master) to detect user modifications:

  • If certain fields have been changed (for example we’re going to ignore last_login, password and things like that) we’re going to emit a event
  • I normally work with AWS, so the event will be a SNS event.
  • The idea to have multiple clients, so each client will be listening to one SQS queue. Those SQSs queues will be mapped to the SNS event.
  • To decouple the SNS sending og the message we’re going to send it via Celery worker.
  • The second application (the Client) will have one listener to the SQS queue.
  • Each time the listener have a message it will persists the user information within the client’s User model
  • And also it will emit on message to one Django Channel’s consumer to be sent via websockets to the browser.

The Master

We’re going to emit the event each time the User model changes (and also when we create or delete one user). To detect changes we’re going to register on signal in the pre_save to mark if the model has been changed and later in the post_save we’re going to emit the event via Celery worker.

@receiver(pre_save, sender=User)
def pre_user_modified(sender, instance, **kwargs):
    instance.is_modified = None

    if instance.is_staff is False and instance.id is not None:
        modified_user_data = UserSerializer(instance).data
        user = User.objects.get(username=modified_user_data['username'])
        user_serializer_data = UserSerializer(user).data

        if user_serializer_data != modified_user_data:
            instance.is_modified = True

@receiver(post_save, sender=User)
def post_user_modified(sender, instance, created, **kwargs):
    if instance.is_staff is False:
        if created or instance.is_modified:
            modified_user_data = UserSerializer(instance).data
            user_changed_event.delay(modified_user_data, action=Actions.INSERT if created else Actions.UPDATE)

@receiver(post_delete, sender=User)
def post_user_deleted(sender, instance, **kwargs):
    deleted_user_data = UserSerializer(instance).data
    user_changed_event.delay(deleted_user_data, action=Actions.DELETE)

We need to register our signals in apps.py

from django.apps import AppConfig

class MasterConfig(AppConfig):
    name = 'master'

    def ready(self):
        from master.signals import pre_user_modified
        from master.signals import post_user_modified
        from master.signals import post_user_deleted

Our Celery task will send the message to sns queue

@shared_task()
def user_changed_event(body, action):
    sns = boto3.client('sns')
    message = {
        "user": body,
        "action": action
    }
    response = sns.publish(
        TargetArn=settings.SNS_REACTIVE_TABLE_ARN,
        Message=json.dumps({'default': json.dumps(message)}),
        MessageStructure='json'
    )
    logger.info(response)

AWS

In Aws We need to create one SNS messaging service and one SQS queue linked to this SNS.

The Client

First we need one command to run the listener.

class Actions:
    INSERT = 0
    UPDATE = 1
    DELETE = 2

switch_actions = {
    Actions.INSERT: insert_user,
    Actions.UPDATE: update_user,
    Actions.DELETE: delete_user,
}

class Command(BaseCommand):
    help = 'sqs listener'

    def handle(self, *args, **options):
        self.stdout.write(self.style.WARNING("starting listener"))
        sqs = boto3.client('sqs')

        queue_url = settings.SQS_REACTIVE_TABLES

        def process_message(message):
            decoded_body = json.loads(message['Body'])
            data = json.loads(decoded_body['Message'])

            switch_actions.get(data['action'])(
                data=data['user'],
                timestamp=message['Attributes']['SentTimestamp']
            )

            notify_to_user(data['user'])

            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=message['ReceiptHandle'])

        def loop():
            response = sqs.receive_message(
                QueueUrl=queue_url,
                AttributeNames=[
                    'SentTimestamp'
                ],
                MaxNumberOfMessages=10,
                MessageAttributeNames=[
                    'All'
                ],
                WaitTimeSeconds=20
            )

            if 'Messages' in response:
                messages = [message for message in response['Messages'] if 'Body' in message]
                [process_message(message) for message in messages]

        try:
            while True:
                loop()
        except KeyboardInterrupt:
            sys.exit(0)

Here we persists the model in Client’s database

def insert_user(data, timestamp):
    username = data['username']
    serialized_user = UserSerializer(data=data)
    serialized_user.create(validated_data=data)
    logging.info(f"user: {username} created at {timestamp}")

def update_user(data, timestamp):
    username = data['username']
    try:
        user = User.objects.get(username=data['username'])
        serialized_user = UserSerializer(user)
        serialized_user.update(user, data)
        logging.info(f"user: {username} updated at {timestamp}")
    except User.DoesNotExist:
        logging.info(f"user: {username} don't exits. Creating ...")
        insert_user(data, timestamp)

def delete_user(data, timestamp):
    username = data['username']
    try:
        user = User.objects.get(username=username)
        user.delete()
        logging.info(f"user: {username} deleted at {timestamp}")
    except User.DoesNotExist:
        logging.info(f"user: {username} don't exits. Don't deleted")

And also emit one message to channel’s consumer

def notify_to_user(user):
    username = user['username']
    serialized_user = UserSerializer(user)
    emit_message_to_user(
        message=serialized_user.data,
        username=username, )

Here the Consumer:

class WsConsumer(AsyncWebsocketConsumer):
    @personal_consumer
    async def connect(self):
        await self.channel_layer.group_add(
            self._get_personal_room(),
            self.channel_name
        )

    @private_consumer_event
    async def emit_message(self, event):
        message = event['message']
        await self.send(text_data=json.dumps(message))

    def _get_personal_room(self):
        username = self.scope['user'].username
        return self.get_room_name(username)

    @staticmethod
    def get_room_name(room):
        return f"{'ws_room'}_{room}"

def emit_message_to_user(message, username):
    group = WsConsumer.get_room_name(username)
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(group, {
        'type': WsConsumer.emit_message.__name__,
        'message': message
    })

Our consumer will only allow to connect only if the user is authenticated. That’s because I like Django Channels. This kind of thing are really simple to to (I’ve done similar things using PHP applications connected to a socket.io server and it was a nightmare). I’ve created a couple of decorators to ensure authentication in the consumer.

def personal_consumer(func):
    @wraps(func)
    async def wrapper_decorator(*args, **kwargs):
        self = args[0]

        async def accept():
            value = await func(*args, **kwargs)
            await self.accept()
            return value

        if self.scope['user'].is_authenticated:
            username = self.scope['user'].username
            room_name = self.scope['url_route']['kwargs']['username']
            if username == room_name:
                return await accept()

        await self.close()

    return wrapper_decorator

def private_consumer_event(func):
    @wraps(func)
    async def wrapper_decorator(*args, **kwargs):
        self = args[0]
        if self.scope['user'].is_authenticated:
            return await func(*args, **kwargs)

    return wrapper_decorator

That’s the websocket route

from django.urls import re_path

from client import consumers

websocket_urlpatterns = [
    re_path(r'ws/(?P&amp;lt;username&amp;gt;\w+)$', consumers.WsConsumer),
]

Finally we only need to connect our HTML page to the websocket

{% block title %}Example{% endblock %}
{% block header_text %}Hello <span id="name">{{ request.user.first_name }}</span>{% endblock %}

{% block extra_body %}
  <script>
    var ws_scheme = window.location.protocol === "https:" ? "wss" : "ws"
    var ws_path = ws_scheme + '://' + window.location.host + "/ws/{{ request.user.username }}"
    var ws = new ReconnectingWebSocket(ws_path)
    var render = function (key, value) {
      document.querySelector(`#${key}`).innerHTML = value
    }
    ws.onmessage = function (e) {
      const data = JSON.parse(e.data);
      render('name', data.first_name)
    }

    ws.onopen = function () {
      console.log('Connected')
    };
  </script>
{% endblock %}

Here a docker-compose with the project:

version: '3.4'

services:
  redis:
    image: redis
  master:
    image: reactive_master:latest
    command: python manage.py runserver 0.0.0.0:8001
    build:
      context: ./master
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    ports:
      - 8001:8001
    environment:
      REDIS_HOST: redis
  celery:
    image: reactive_master:latest
    command: celery -A master worker --uid=nobody --gid=nogroup
    depends_on:
      - "redis"
      - "master"
    environment:
      REDIS_HOST: redis
      SNS_REACTIVE_TABLE_ARN: ${SNS_REACTIVE_TABLE_ARN}
      AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
  client:
    image: reactive_client:latest
    command: python manage.py runserver 0.0.0.0:8000
    build:
      context: ./client
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    ports:
      - 8000:8000
    environment:
      REDIS_HOST: redis
  listener:
    image: reactive_client:latest
    command: python manage.py listener
    build:
      context: ./client
      dockerfile: Dockerfile
    depends_on:
      - "redis"
    environment:
      REDIS_HOST: redis
      SQS_REACTIVE_TABLES: ${SQS_REACTIVE_TABLES}
      AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}

And that’s all. Here a working example of the prototype in action:

Source code in my github.