Django reactive users with Celery and Channels

Today I want to build a prototype. The idea is to create two Django applications. One application will be the master and the other one will the client. Both applications will have their User model but each change within master User model will be propagated through the client (or clients). Let me show you what I’ve got in my mind:

We’re going to create one signal in User model (at Master) to detect user modifications:

  • If certain fields have been changed (for example we’re going to ignore last_login, password and things like that) we’re going to emit a event
  • I normally work with AWS, so the event will be a SNS event.
  • The idea to have multiple clients, so each client will be listening to one SQS queue. Those SQSs queues will be mapped to the SNS event.
  • To decouple the SNS sending og the message we’re going to send it via Celery worker.
  • The second application (the Client) will have one listener to the SQS queue.
  • Each time the listener have a message it will persists the user information within the client’s User model
  • And also it will emit on message to one Django Channel’s consumer to be sent via websockets to the browser.

The Master

We’re going to emit the event each time the User model changes (and also when we create or delete one user). To detect changes we’re going to register on signal in the pre_save to mark if the model has been changed and later in the post_save we’re going to emit the event via Celery worker.

[sourcecode language=”python”]
@receiver(pre_save, sender=User)
def pre_user_modified(sender, instance, **kwargs):
instance.is_modified = None

if instance.is_staff is False and instance.id is not None:
modified_user_data = UserSerializer(instance).data
user = User.objects.get(username=modified_user_data[‘username’])
user_serializer_data = UserSerializer(user).data

if user_serializer_data != modified_user_data:
instance.is_modified = True

@receiver(post_save, sender=User)
def post_user_modified(sender, instance, created, **kwargs):
if instance.is_staff is False:
if created or instance.is_modified:
modified_user_data = UserSerializer(instance).data
user_changed_event.delay(modified_user_data, action=Actions.INSERT if created else Actions.UPDATE)

@receiver(post_delete, sender=User)
def post_user_deleted(sender, instance, **kwargs):
deleted_user_data = UserSerializer(instance).data
user_changed_event.delay(deleted_user_data, action=Actions.DELETE)
[/sourcecode]

We need to register our signals in apps.py

[sourcecode language=”python”]
from django.apps import AppConfig

class MasterConfig(AppConfig):
name = ‘master’

def ready(self):
from master.signals import pre_user_modified
from master.signals import post_user_modified
from master.signals import post_user_deleted
[/sourcecode]

Our Celery task will send the message to sns queue

[sourcecode language=”python”]
@shared_task()
def user_changed_event(body, action):
sns = boto3.client(‘sns’)
message = {
"user": body,
"action": action
}
response = sns.publish(
TargetArn=settings.SNS_REACTIVE_TABLE_ARN,
Message=json.dumps({‘default’: json.dumps(message)}),
MessageStructure=’json’
)
logger.info(response)
[/sourcecode]

AWS

In Aws We need to create one SNS messaging service and one SQS queue linked to this SNS.

The Client

First we need one command to run the listener.

[sourcecode language=”python”]
class Actions:
INSERT = 0
UPDATE = 1
DELETE = 2

switch_actions = {
Actions.INSERT: insert_user,
Actions.UPDATE: update_user,
Actions.DELETE: delete_user,
}

class Command(BaseCommand):
help = ‘sqs listener’

def handle(self, *args, **options):
self.stdout.write(self.style.WARNING("starting listener"))
sqs = boto3.client(‘sqs’)

queue_url = settings.SQS_REACTIVE_TABLES

def process_message(message):
decoded_body = json.loads(message[‘Body’])
data = json.loads(decoded_body[‘Message’])

switch_actions.get(data[‘action’])(
data=data[‘user’],
timestamp=message[‘Attributes’][‘SentTimestamp’]
)

notify_to_user(data[‘user’])

sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message[‘ReceiptHandle’])

def loop():
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=[
‘SentTimestamp’
],
MaxNumberOfMessages=10,
MessageAttributeNames=[
‘All’
],
WaitTimeSeconds=20
)

if ‘Messages’ in response:
messages = [message for message in response[‘Messages’] if ‘Body’ in message]
[process_message(message) for message in messages]

try:
while True:
loop()
except KeyboardInterrupt:
sys.exit(0)
[/sourcecode]

Here we persists the model in Client’s database

[sourcecode language=”python”]
def insert_user(data, timestamp):
username = data[‘username’]
serialized_user = UserSerializer(data=data)
serialized_user.create(validated_data=data)
logging.info(f"user: {username} created at {timestamp}")

def update_user(data, timestamp):
username = data[‘username’]
try:
user = User.objects.get(username=data[‘username’])
serialized_user = UserSerializer(user)
serialized_user.update(user, data)
logging.info(f"user: {username} updated at {timestamp}")
except User.DoesNotExist:
logging.info(f"user: {username} don’t exits. Creating …")
insert_user(data, timestamp)

def delete_user(data, timestamp):
username = data[‘username’]
try:
user = User.objects.get(username=username)
user.delete()
logging.info(f"user: {username} deleted at {timestamp}")
except User.DoesNotExist:
logging.info(f"user: {username} don’t exits. Don’t deleted")
[/sourcecode]

And also emit one message to channel’s consumer

[sourcecode language=”python”]
def notify_to_user(user):
username = user[‘username’]
serialized_user = UserSerializer(user)
emit_message_to_user(
message=serialized_user.data,
username=username, )
[/sourcecode]

Here the Consumer:

[sourcecode language=”python”]
class WsConsumer(AsyncWebsocketConsumer):
@personal_consumer
async def connect(self):
await self.channel_layer.group_add(
self._get_personal_room(),
self.channel_name
)

@private_consumer_event
async def emit_message(self, event):
message = event[‘message’]
await self.send(text_data=json.dumps(message))

def _get_personal_room(self):
username = self.scope[‘user’].username
return self.get_room_name(username)

@staticmethod
def get_room_name(room):
return f"{‘ws_room’}_{room}"

def emit_message_to_user(message, username):
group = WsConsumer.get_room_name(username)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(group, {
‘type’: WsConsumer.emit_message.__name__,
‘message’: message
})
[/sourcecode]

Our consumer will only allow to connect only if the user is authenticated. That’s because I like Django Channels. This kind of thing are really simple to to (I’ve done similar things using PHP applications connected to a socket.io server and it was a nightmare). I’ve created a couple of decorators to ensure authentication in the consumer.

[sourcecode language=”python”]
def personal_consumer(func):
@wraps(func)
async def wrapper_decorator(*args, **kwargs):
self = args[0]

async def accept():
value = await func(*args, **kwargs)
await self.accept()
return value

if self.scope[‘user’].is_authenticated:
username = self.scope[‘user’].username
room_name = self.scope[‘url_route’][‘kwargs’][‘username’]
if username == room_name:
return await accept()

await self.close()

return wrapper_decorator

def private_consumer_event(func):
@wraps(func)
async def wrapper_decorator(*args, **kwargs):
self = args[0]
if self.scope[‘user’].is_authenticated:
return await func(*args, **kwargs)

return wrapper_decorator
[/sourcecode]

That’s the websocket route

[sourcecode language=”python”]
from django.urls import re_path

from client import consumers

websocket_urlpatterns = [
re_path(r’ws/(?P<username>\w+)$’, consumers.WsConsumer),
]
[/sourcecode]

Finally we only need to connect our HTML page to the websocket

[sourcecode language=”python”]
{% block title %}Example{% endblock %}
{% block header_text %}Hello <span id="name">{{ request.user.first_name }}</span>{% endblock %}

{% block extra_body %}
<script>
var ws_scheme = window.location.protocol === "https:" ? "wss" : "ws"
var ws_path = ws_scheme + ‘://’ + window.location.host + "/ws/{{ request.user.username }}"
var ws = new ReconnectingWebSocket(ws_path)
var render = function (key, value) {
document.querySelector(`#${key}`).innerHTML = value
}
ws.onmessage = function (e) {
const data = JSON.parse(e.data);
render(‘name’, data.first_name)
}

ws.onopen = function () {
console.log(‘Connected’)
};
</script>
{% endblock %}
[/sourcecode]

Here a docker-compose with the project:

[sourcecode language=”xml”]
version: ‘3.4’

services:
redis:
image: redis
master:
image: reactive_master:latest
command: python manage.py runserver 0.0.0.0:8001
build:
context: ./master
dockerfile: Dockerfile
depends_on:
– "redis"
ports:
– 8001:8001
environment:
REDIS_HOST: redis
celery:
image: reactive_master:latest
command: celery -A master worker –uid=nobody –gid=nogroup
depends_on:
– "redis"
– "master"
environment:
REDIS_HOST: redis
SNS_REACTIVE_TABLE_ARN: ${SNS_REACTIVE_TABLE_ARN}
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
client:
image: reactive_client:latest
command: python manage.py runserver 0.0.0.0:8000
build:
context: ./client
dockerfile: Dockerfile
depends_on:
– "redis"
ports:
– 8000:8000
environment:
REDIS_HOST: redis
listener:
image: reactive_client:latest
command: python manage.py listener
build:
context: ./client
dockerfile: Dockerfile
depends_on:
– "redis"
environment:
REDIS_HOST: redis
SQS_REACTIVE_TABLES: ${SQS_REACTIVE_TABLES}
AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
[/sourcecode]

And that’s all. Here a working example of the prototype in action:

Source code in my github.

Building real time Python applications with Django Channels, Docker and Kubernetes

Three years ago I wrote an article about webockets. In fact I’ve written several articles about Websockets (Websockets and real time communications is something that I’m really passionate about), but today I would like to pick up this article. Nowadays I’m involved with several Django projects so I want to create a similar working prototype with Django. Let’s start:

In the past I normally worked with libraries such as socket.io to ensure browser compatibility with Websockets. Nowadays, at least in my world, we can assume that our users are using a modern browser with websocket support, so we’re going to use plain Websockets instead external libraries. Django has a great support to Websockets called Django Channels. It allows us to to handle Websockets (and other async protocols) thanks to Python’s ASGI’s specification. In fact is pretty straightforward to build applications with real time communication and with shared authentication (something that I have done in the past with a lot of effort. I’m getting old and now I like simple things :))

The application that I want to build is the following one: One Web application that shows the current time with seconds. Ok it’s very simple to do it with a couple of javascript lines but this time I want to create a worker that emits an event via Websockets with the current time. My web application will show that real time update. This kind of architecture always have the same problem: The initial state. In this example we can ignore it. When the user opens the browser it must show the current time. As I said before in this example we can ignore this situation. We can wait until the next event to update the initial blank information but if the event arrives each 10 seconds our user will have a blank screen until the next event arrives. In our example we’re going to take into account this situation. Each time our user connects to the Websocket it will ask to the server for the initial state.

Our initial state route will return the current time (using Redis). We can authorize our route using the standard Django’s protected routes

[sourcecode language=”python”]
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
from ws.redis import redis

@login_required
def initial_state(request):
return JsonResponse({‘current’: redis.get(‘time’)})
[/sourcecode]

We need to configure our channels and define a our event:

[sourcecode language=”python”]
from django.urls import re_path

from ws import consumers

websocket_urlpatterns = [
re_path(r’time/tic/$’, consumers.WsConsumer),
]
[/sourcecode]

As we can see here we can reuse the authentication middleware in channel’s consumers also.
[sourcecode language=”python”]
import json
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class WsConsumer(AsyncWebsocketConsumer):
GROUP = ‘time’

async def connect(self):
if self.scope["user"].is_anonymous:
await self.close()
else:
await self.channel_layer.group_add(
self.GROUP,
self.channel_name
)
await self.accept()

async def tic_message(self, event):
if not self.scope["user"].is_anonymous:
message = event[‘message’]

await self.send(text_data=json.dumps({
‘message’: message
}))
[/sourcecode]

We’re going to need a worker that each second triggers the current time (to avoid problems we’re going to trigger our event each 0.5 seconds). To perform those kind of actions Django has a great tool called Celery. We can create workers and scheduled task with Celery (exactly what we need in our example). To avoid the “initial state” situation our worker will persists the initial state into a Redis storage

[sourcecode language=”python”]
app = Celery(‘config’)
app.config_from_object(‘django.conf:settings’, namespace=’CELERY’)
app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(0.5, ws_beat.s(), name=’beat every 0.5 seconds’)

@app.task
def ws_beat(group=WsConsumer.GROUP, event=’tic_message’):
current_time = time.strftime(‘%X’)
redis.set(‘time’, current_time)
message = {‘time’: current_time}
channel_layer = channels.layers.get_channel_layer()
async_to_sync(channel_layer.group_send)(group, {‘type’: event, ‘message’: message})
[/sourcecode]

Finally we need a javascript client to consume our Websockets

[sourcecode language=”javascript”]
let getWsUri = () => {
return window.location.protocol === "https:" ? "wss" : "ws" +
‘://’ + window.location.host +
"/time/tic/"
}

let render = value => {
document.querySelector(‘#display’).innerHTML = value
}

let ws = new ReconnectingWebSocket(getWsUri())

ws.onmessage = e => {
const data = JSON.parse(e.data);
render(data.message.time)
}

ws.onopen = async () => {
let response = await axios.get("/api/initial_state")
render(response.data.current)
}
[/sourcecode]

Basically that’s the source code (plus Django the stuff).

Application architecture
The architecture of the application is the following one:

Frontend
The Django application. We can run this application in development with
python manage.py runserver

And in production using a asgi server (uvicorn in this case)
[sourcecode language=”xml”]
uvicorn config.asgi:application –port 8000 –host 0.0.0.0 –workers 1
[/sourcecode]

In development mode:
[sourcecode language=”xml”]
celery -A ws worker -l debug
[/sourcecode]

And in production
[sourcecode language=”xml”]
celery -A ws worker –uid=nobody –gid=nogroup
[/sourcecode]

We need this scheduler to emit our event (each 0.5 seconds)
[sourcecode language=”xml”]
celery -A ws beat
[/sourcecode]

Message Server for Celery
In this case we’re going to use Redis

Docker
With this application we can use the same dockerfile for frontend, worker and scheduler using different entrypoints

Dockerfile:

[sourcecode language=”xml”]
FROM python:3.8

ENV TZ ‘Europe/Madrid’
RUN echo $TZ > /etc/timezone && \
apt-get update && apt-get install -y tzdata && \
rm /etc/localtime && \
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
dpkg-reconfigure -f noninteractive tzdata && \
apt-get clean

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

ADD . /src
WORKDIR /src

RUN pip install -r requirements.txt

RUN mkdir -p /var/run/celery /var/log/celery
RUN chown -R nobody:nogroup /var/run/celery /var/log/celery
[/sourcecode]

And our whole application within a docker-compose file

[sourcecode language=”xml”]
version: ‘3.4’

services:
redis:
image: redis
web:
image: clock:latest
command: /bin/bash ./docker-entrypoint.sh
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"%5D
interval: 1m30s
timeout: 10s
retries: 3
start_period: 40s
depends_on:
– "redis"
ports:
– 8000:8000
environment:
ENVIRONMENT: prod
REDIS_HOST: redis
celery:
image: clock:latest
command: celery -A ws worker –uid=nobody –gid=nogroup
depends_on:
– "redis"
environment:
ENVIRONMENT: prod
REDIS_HOST: redis
cron:
image: clock:latest
command: celery -A ws beat
depends_on:
– "redis"
environment:
ENVIRONMENT: prod
REDIS_HOST: redis
[/sourcecode]

If we want to deploy our application in a K8s cluster we need to migrate our docker-compose file into a k8s yaml files. I assume that we’ve deployed our docker containers into a container registry (such as ECR)

Frontend:
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-web-api
spec:
replicas: 1
selector:
matchLabels:
app: clock-web-api
project: clock
template:
metadata:
labels:
app: clock-web-api
project: clock
spec:
containers:
– name: web-api
image: my-ecr-path/clock:latest
args: ["uvicorn", "config.asgi:application", "–port", "8000", "–host", "0.0.0.0", "–workers", "1"]
ports:
– containerPort: 8000
env:
– name: REDIS_HOST
valueFrom:
configMapKeyRef:
name: clock-app-config
key: redis.host

apiVersion: v1
kind: Service
metadata:
name: clock-web-api
spec:
type: LoadBalancer
selector:
app: clock-web-api
project: clock
ports:
– protocol: TCP
port: 8000 # port exposed internally in the cluster
targetPort: 8000 # the container port to send requests to
[/sourcecode]

Celery worker
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-web-api
spec:
replicas: 1
selector:
matchLabels:
app: clock-web-api
project: clock
template:
metadata:
labels:
app: clock-web-api
project: clock
spec:
containers:
– name: web-api
image: my-ecr-path/clock:latest
args: ["uvicorn", "config.asgi:application", "–port", "8000", "–host", "0.0.0.0", "–workers", "1"]
ports:
– containerPort: 8000
env:
– name: REDIS_HOST
valueFrom:
configMapKeyRef:
name: clock-app-config
key: redis.host

apiVersion: v1
kind: Service
metadata:
name: clock-web-api
spec:
type: LoadBalancer
selector:
app: clock-web-api
project: clock
ports:
– protocol: TCP
port: 8000 # port exposed internally in the cluster
targetPort: 8000 # the container port to send requests to
[/sourcecode]

Celery scheduler
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-cron
spec:
replicas: 1
selector:
matchLabels:
app: clock-cron
project: clock
template:
metadata:
labels:
app: clock-cron
project: clock
spec:
containers:
– name: clock-cron
image: my-ecr-path/clock:latest
args: ["celery", "-A", "ws", "beat"]
env:
– name: REDIS_HOST
valueFrom:
configMapKeyRef:
name: clock-app-config
key: redis.host
[/sourcecode]

Redis
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: clock-redis
spec:
replicas: 1
selector:
matchLabels:
app: clock-redis
project: clock
template:
metadata:
labels:
app: clock-redis
project: clock
spec:
containers:
– name: clock-redis
image: redis
ports:
– containerPort: 6379
name: clock-redis

apiVersion: v1
kind: Service
metadata:
name: clock-redis
spec:
type: ClusterIP
ports:
– port: 6379
targetPort: 6379
selector:
app: clock-redis
[/sourcecode]

Shared configuration
[sourcecode language=”xml”]
apiVersion: v1
kind: ConfigMap
metadata:
name: clock-app-config
data:
redis.host: "clock-redis"
[/sourcecode]

We can deploy or application to our k8s cluster

[sourcecode language=”xml”]
kubectl apply -f .k8s/
[/sourcecode]

And see it running inside the cluster locally with a port forward

[sourcecode language=”xml”]
kubectl port-forward deployment/clock-web-api 8000:8000
[/sourcecode]

And that’s all. Our Django application with Websockets using Django Channels up and running with docker and also using k8s.

Source code in my github