I’ve written a post about how to send Django logs to ELK stack. You can read it here. In that post I’ve used logstash client with a sidecar docker container. Logstash client works but it needs too much resources. Nowadays it’s better to use Filebeat as data shipper instead of Logstash client. Filebeat it’s also a part of ELK stack. It’s a golang binary much lightweight than logstash client.
The idea is almost the same than the other post. Here we’ll also build a sidecar container with our django application logs mounted.
With filebeat we can perform actions to prepare our logs to be ready to be stored within elasticsearch. But, at least here, it’s much more easy to prepare the logs in the django application:
Normally those validators aren’t enough (at least for me) but it’s very easy to create customs validators. There’re also several validators that we can use, for example those ones.
I normally need to avoid users to repeat passwords (for example the last ten ones). To do that we need to create a custom validator. Whit this validator we also need to create a model to store the last passwords (the hash). The idea is to persits the hash of the password each time the user changes the password. As well as is always a good practice not to use the default User model, we’re going to create a CustomUser model
class DontRepeatValidator:
def __init__(self, history=10):
self.history = history
def validate(self, password, user=None):
for last_pass in self._get_last_passwords(user):
if check_password(password=password, encoded=last_pass):
self._raise_validation_error()
def get_help_text(self):
return _("You cannot repeat passwords")
def _raise_validation_error(self):
raise ValidationError(
_("This password has been used before."),
code='password_has_been_used',
params={'history': self.history},
)
def _get_last_passwords(self, user):
all_history_user_passwords = CustomUserPasswordHistory.objects.filter(username_id=user).order_by('id')
to_index = all_history_user_passwords.count() - self.history
to_index = to_index if to_index > 0 else None
if to_index:
[u.delete() for u in all_history_user_passwords[0:to_index]]
return [p.old_pass for p in all_history_user_passwords[to_index:]]
We can see how it works with the unit tests:
class UserCreationTestCase(TestCase):
def setUp(self):
self.user = User.objects.create(username='gonzalo')
def test_persist_password_to_history(self):
self.user.set_password('pass1')
self.user.save()
all_history_user_passwords = CustomUserPasswordHistory.objects.filter(username_id=self.user)
self.assertEqual(1, all_history_user_passwords.count())
class DontRepeatValidatorTestCase(TestCase):
def setUp(self):
self.user = User.objects.create(username='gonzalo')
self.validator = DontRepeatValidator()
def test_validator_with_new_pass(self):
self.validator.validate('pass33', self.user)
self.assertTrue(True)
def test_validator_with_repeated_pass(self):
for i in range(0, 11):
self.user.set_password(f'pass{i}')
self.user.save()
with self.assertRaises(ValidationError):
self.validator.validate('pass3', self.user)
def test_keep_only_10_passwords(self):
for i in range(0, 11):
self.user.set_password(f'pass{i}')
self.user.save()
self.validator.validate('xxxx', self.user)
all_history_user_passwords = CustomUserPasswordHistory.objects.filter(username_id=self.user)
self.assertEqual(10, all_history_user_passwords.count())
When we’ve one application we need to monitor the logs in one way or another. Not only the server’s logs (500 errors, response times and things like that). Sometimes the user complains about the application. Without logs we cannot do anything. We can save logs within files and let grep and tail do the magic. This’s assumable with a single on-premise server, but nowadays with clouds and docker this’s a nightmare. We need a central log collector to collect all the logs of the application and use this collector to create alerts, and complex searches of our application logs.
I normally work with AWS. In AWS we’ve CloudWatch. It’s pretty straightforward to connect our application logs to CloudWatch when we’re using AWS. When we aren’t using AWS we can use the ELK stack. In this example we’re going to send our Django application logs to a Elasticsearch database. Let’s start:
The idea is not to send the logs directly. The idea save the logs to log files. We can use this LOGGING configuration to do that:
We’re going to use Docker to build our stack, so our logstash and our django containers will share the logs volumes.
Now we need to visualize the logs. Kibana is perfect for this task. We can set up a Kibana server connected to the Elasticsearch and visualize the logs:
Also we can monitor our server performance. Prometheus is the de facto standard for doing that. In fact it’s very simple to connect our Django application to Prometheus. We only need to add django-prometheus dependency, install the application and set up two middlewares:
INSTALLED_APPS = [
...
'django_prometheus',
...
]
MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware', # <-- this one
'app.middleware.RequestLogMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django_prometheus.middleware.PrometheusAfterMiddleware', # <-- this one
]
also we need to set up some application routes
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('p/', include('django_prometheus.urls')), # <-- prometheus routes
path('', include('app.urls'))
]
The easiest way to visualize the data stored in prometheus is using Grafana. In Grafana we need to create a datasource with Prometheus and build our custom dashboard. We can import pre-built dashboards. For example this one: https://grafana.com/grafana/dashboards/9528
Here the docker-compose file with all the project:
In AWS we have several ways to deploy Django (and not Django applications) with Docker. We can use ECS or EKS clusters. If we don’t have one ECS or Kubernetes cluster up and running, maybe it can be complex. Today I want to show how deploy a Django application in production mode within a EC2 host. Let’s start.
I’m getting older to provision one host by hand I prefer to use docker. The idea is create one EC2 instance (one simple Amazon Linux AMI AWS-supported image). This host don’t have docker installed. We need to install it. When we launch one instance, when we’re configuring the instance, we can specify user data to configure an instance or run a configuration script during launch.
We only need to put this shell script to set up docker
This script assumes that there’s a deploy.env file with our personal configuration (AWS profile, the host of the EC2, instance, The ECR and things like that)
In this example I’m using docker swarm to deploy the application. I want to play also with secrets. This dummy application don’t have any sensitive information but I’ve created one "ec2.supersecret" variable
And that’s all. Maybe ECS or EKS are better solutions to deploy docker applications in AWS, but we also can deploy easily to one docker host in a EC2 instance that it can be ready within a couple of minutes.
Today I want to build a prototype. The idea is to create two Django applications. One application will be the master and the other one will the client. Both applications will have their User model but each change within master User model will be propagated through the client (or clients). Let me show you what I’ve got in my mind:
We’re going to create one signal in User model (at Master) to detect user modifications:
If certain fields have been changed (for example we’re going to ignore last_login, password and things like that) we’re going to emit a event
I normally work with AWS, so the event will be a SNS event.
The idea to have multiple clients, so each client will be listening to one SQS queue. Those SQSs queues will be mapped to the SNS event.
To decouple the SNS sending og the message we’re going to send it via Celery worker.
The second application (the Client) will have one listener to the SQS queue.
Each time the listener have a message it will persists the user information within the client’s User model
And also it will emit on message to one Django Channel’s consumer to be sent via websockets to the browser.
The Master
We’re going to emit the event each time the User model changes (and also when we create or delete one user). To detect changes we’re going to register on signal in the pre_save to mark if the model has been changed and later in the post_save we’re going to emit the event via Celery worker.
if instance.is_staff is False and instance.id is not None:
modified_user_data = UserSerializer(instance).data
user = User.objects.get(username=modified_user_data[‘username’])
user_serializer_data = UserSerializer(user).data
if user_serializer_data != modified_user_data:
instance.is_modified = True
@receiver(post_save, sender=User)
def post_user_modified(sender, instance, created, **kwargs):
if instance.is_staff is False:
if created or instance.is_modified:
modified_user_data = UserSerializer(instance).data
user_changed_event.delay(modified_user_data, action=Actions.INSERT if created else Actions.UPDATE)
[sourcecode language=”python”]
from django.apps import AppConfig
class MasterConfig(AppConfig):
name = ‘master’
def ready(self):
from master.signals import pre_user_modified
from master.signals import post_user_modified
from master.signals import post_user_deleted
[/sourcecode]
Our Celery task will send the message to sns queue
if ‘Messages’ in response:
messages = [message for message in response[‘Messages’] if ‘Body’ in message]
[process_message(message) for message in messages]
try:
while True:
loop()
except KeyboardInterrupt:
sys.exit(0)
[/sourcecode]
Here we persists the model in Client’s database
[sourcecode language=”python”]
def insert_user(data, timestamp):
username = data[‘username’]
serialized_user = UserSerializer(data=data)
serialized_user.create(validated_data=data)
logging.info(f"user: {username} created at {timestamp}")
Our consumer will only allow to connect only if the user is authenticated. That’s because I like Django Channels. This kind of thing are really simple to to (I’ve done similar things using PHP applications connected to a socket.io server and it was a nightmare). I’ve created a couple of decorators to ensure authentication in the consumer.
Three years ago I wrote an article about webockets. In fact I’ve written several articles about Websockets (Websockets and real time communications is something that I’m really passionate about), but today I would like to pick up this article. Nowadays I’m involved with several Django projects so I want to create a similar working prototype with Django. Let’s start:
In the past I normally worked with libraries such as socket.io to ensure browser compatibility with Websockets. Nowadays, at least in my world, we can assume that our users are using a modern browser with websocket support, so we’re going to use plain Websockets instead external libraries. Django has a great support to Websockets called Django Channels. It allows us to to handle Websockets (and other async protocols) thanks to Python’s ASGI’s specification. In fact is pretty straightforward to build applications with real time communication and with shared authentication (something that I have done in the past with a lot of effort. I’m getting old and now I like simple things :))
The application that I want to build is the following one: One Web application that shows the current time with seconds. Ok it’s very simple to do it with a couple of javascript lines but this time I want to create a worker that emits an event via Websockets with the current time. My web application will show that real time update. This kind of architecture always have the same problem: The initial state. In this example we can ignore it. When the user opens the browser it must show the current time. As I said before in this example we can ignore this situation. We can wait until the next event to update the initial blank information but if the event arrives each 10 seconds our user will have a blank screen until the next event arrives. In our example we’re going to take into account this situation. Each time our user connects to the Websocket it will ask to the server for the initial state.
Our initial state route will return the current time (using Redis). We can authorize our route using the standard Django’s protected routes
[sourcecode language=”python”]
from django.contrib.auth.decorators import login_required
from django.http import JsonResponse
from ws.redis import redis
As we can see here we can reuse the authentication middleware in channel’s consumers also.
[sourcecode language=”python”]
import json
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class WsConsumer(AsyncWebsocketConsumer):
GROUP = ‘time’
We’re going to need a worker that each second triggers the current time (to avoid problems we’re going to trigger our event each 0.5 seconds). To perform those kind of actions Django has a great tool called Celery. We can create workers and scheduled task with Celery (exactly what we need in our example). To avoid the “initial state” situation our worker will persists the initial state into a Redis storage
Basically that’s the source code (plus Django the stuff).
Application architecture
The architecture of the application is the following one:
Frontend
The Django application. We can run this application in development with
python manage.py runserver
And in production using a asgi server (uvicorn in this case)
[sourcecode language=”xml”]
uvicorn config.asgi:application –port 8000 –host 0.0.0.0 –workers 1
[/sourcecode]
In development mode:
[sourcecode language=”xml”]
celery -A ws worker -l debug
[/sourcecode]
And in production
[sourcecode language=”xml”]
celery -A ws worker –uid=nobody –gid=nogroup
[/sourcecode]
We need this scheduler to emit our event (each 0.5 seconds)
[sourcecode language=”xml”]
celery -A ws beat
[/sourcecode]
Message Server for Celery
In this case we’re going to use Redis
Docker
With this application we can use the same dockerfile for frontend, worker and scheduler using different entrypoints
If we want to deploy our application in a K8s cluster we need to migrate our docker-compose file into a k8s yaml files. I assume that we’ve deployed our docker containers into a container registry (such as ECR)
I’ve learning how to deploy one Python application to Kubernetes. Here you can see my notes:
Let’s start from a dummy Python application. It’s a basic Flask web API. Each time we perform a GET request to “/” we increase one counter and see the number of hits. The persistence layer will be a Redis database. The script is very simple:
[sourcecode language=”python”]
from flask import Flask
import os
from redis import Redis
Now we’re going to run our application within a Docker container. First of of all we need to create one Docker image from a docker file:
[sourcecode language=”xml”]
FROM python:alpine3.8
ADD . /code
WORKDIR /code
RUN pip install -r requirements.txt
EXPOSE 5000
[/sourcecode]
Now we can build or image:
[sourcecode language=”xml”]
docker build -t front .
[/sourcecode]
And now we can run our front image:
[sourcecode language=”xml”]
docker run -p 5000:5000 front python app.py
[/sourcecode]
If we open now our browser with the url http://localhost:5000 we’ll get a 500 error. That’s because our Docker container is trying to use one Redis host within localhost. It worked before, when our application and our Redis were within the same host. Now our API’s localhost isn’t the same than our host’s one.
Our script the Redis host is localhost by default but it can be passed from an environment variable,
we can pass to our our Docker container the real host where our Redis resides (suposing my IP address is 192.168.1.100):
[sourcecode language=”xml”]
docker run -p 5000:5000 –env REDIS_HOST=192.168.1.100 front python app.py
[/sourcecode]
If dont’ want the development server we also can start our API using gunicorn
[sourcecode language=”xml”]
docker run -p 5000:5000 –env REDIS_HOST=192.168.1.100 front gunicorn -w 1 app:app -b 0.0.0.0:5000
And that works. We can start our app manually using Docker. But it’s a bit complicated. We need to run two containers (API and Redis), setting up the env variables.
Docker helps us with docker-compose. We can create a docker-compose.yaml file configuring our all application:
[sourcecode language=”xml”]
docker-compose up
[/sourcecode]
Docker compose is pretty straightforward. But, what happens if our production environment is a cluster? docker-compose works fine in a single host. But it our production environment is a cluster, we´ll face problems (we need to esure manually things like hight avaiavility and things like that). Docker people tried to answer to this question with Docker Swarm. Basically Swarm is docker-compose within a cluster. It uses almost the same syntax than docker-compose in a single host. Looks good, ins’t it? OK. Nobody uses it. Since Google created their Docker conainer orchestator (Kubernetes, aka K8s) it becames into the de-facto standard. The good thing about K8s is that it’s much better than Swarm (more configurable and more powerfull), but the bad part is that it isn’t as simple and easy to understand as docker-compose.
[sourcecode language=”xml”]
apiVersion: v1
kind: Service
metadata:
name: front-api
spec:
# types:
# – ClusterIP: (default) only accessible from within the Kubernetes cluster
# – NodePort: accessible on a static port on each Node in the cluster
# – LoadBalancer: accessible externally through a cloud provider’s load balancer
type: LoadBalancer
# When the node receives a request on the static port (30163)
# "select pods with the label ‘app’ set to ‘front-api’"
# and forward the request to one of them
selector:
app: front-api
ports:
– protocol: TCP
port: 5000 # port exposed internally in the cluster
targetPort: 5000 # the container port to send requests to
nodePort: 30164 # a static port assigned on each the node
[/sourcecode]
And one deployment:
[sourcecode language=”xml”]
apiVersion: apps/v1
kind: Deployment
metadata:
name: front-api
spec:
# How many copies of each pod do we want?
replicas: 1
selector:
matchLabels:
# This must match the labels we set on the pod!
app: front-api
# This template field is a regular pod configuration
# nested inside the deployment spec
template:
metadata:
# Set labels on the pod.
# This is used in the deployment selector.
labels:
app: front-api
spec:
containers:
– name: front-api
image: front:v1
args: ["gunicorn", "-w 1", "app:app", "-b 0.0.0.0:5000"]
ports:
– containerPort: 5000
env:
– name: REDIS_HOST
valueFrom:
configMapKeyRef:
name: api-config
key: redis.host
[/sourcecode]
In order to learn a little bit of K8s I’m using a config map called ‘api-config’ where I put some information (such as the Redis host that I’m going to pass as a env variable)
We’re keeping on playing with Alexa. This time I want to create one skill that uses a compatible device (for example one Raspberry Pi 3). Here you can see the documentation and examples that the people of Alexa provides us. Basically we need to create a new Product/Alexa gadget in the console. It gives us one amazonId and alexaGadgetSecret.
Then we need to install the SDK in our Raspberry Pi (it install several python libraries). The we can create our gadget script running on our Raspberry Pi, using the amazonId and alexaGadgetSecret. We can listen to several Alexa events. For example: When we say the wakeword, with a timer, an alarm and also we can create our custom events.
I’m going to create one skill that allows me to say something like: “Alexa, use device demo and set color to green” or “Alexa, use device demo and set color to red” and I’ll put this color in the led matrix that I’ve got (one Raspberry Pi Sense Hat)
This is the python script:
[sourcecode language=”python”]
import logging
import sys
import json
from agt import AlexaGadget
from sense_hat import SenseHat
RED = [255, 0, 0]
GREEN = [0, 255, 0]
BLUE = [0, 0, 255]
YELLOW = [255, 255, 0]
BLACK = [0, 0, 0]
class Gadget(AlexaGadget):
def on_alexa_gadget_statelistener_stateupdate(self, directive):
for state in directive.payload.states:
if state.name == ‘wakeword’:
sense.clear()
if state.value == ‘active’:
sense.show_message("Alexa", text_colour=BLUE)
One important thing here is the GadgetInterceptor. This interceptor find the endpointId from the request and append it to the session. This endpoint exists because we’ve created our Product/Alexa gadget and also the python script is running on the device. If the endpoint isn’t present maybe our skill must say to the user something like “No gadgets found. Please try again after connecting your gadget.”.
const color = handlerInput.requestEnvelope.request.intent.slots[‘color’].value
log.info(‘color’, color)
log.info(‘endpointId’, endpointId)
return handlerInput.responseBuilder.
speak(`Ok. The selected color is ${color}`).
withSimpleCard(cardTitle, color).
addDirective(buildLEDDirective(endpointId, color)).
getResponse()
}
}
Today I want to keep on playing with python and threads (part1 here). The idea is create one simple script that prints one asterisk in the console each second. Simple, isn’t it?
[sourcecode language=”python”]
from time import sleep
while True:
print("*")
sleep(1)
[/sourcecode]
I want to keep this script running but I want to send one message externally, for example using RabbitMQ, and do something within the running script. In this demo, for example, stop the script.
In javascript we can do it with a single tread process using the setInterval function but, since the rabbit listener with pika is a blocking action, we need to use threads in Python (please tell me if I’m wrong). The idea is to create a circuit breaker condition in the main loop to check if I need to stop or not the main thread.
First I’ve created my Rabbit listener in a thread:
[sourcecode language=”python”]
from queue import Queue, Empty
import threading
import pika
import os
Now in the main process I start the Listener and I enter in one endless loop to print my asterisk each second but at the end of each loop I check if I need to stop the process or not
[sourcecode language=”python”]
from Listener import Listener
from dotenv import load_dotenv
import logging
from time import sleep
import os
Today I want to play a little bit with Python and threads. This kind of post are one kind of cheat sheet that I like to publish, basically to remember how do do things.
I don’t like to use threads. They are very powerful but, as uncle Ben said: Great power comes great responsibility. I prefer decouple process with queues instead using threads, but sometimes I need to use them. Let’s start
First I will build a simple script without threads. This script will append three numbers (1, 2 and 3) to a list and it will sum them. The function that append numbers to the list sleeps a number of seconds equals to the number that we’re appending.
The the outcome of the script is obviously 6 (1 + 2 + 3) and as we’re sleeping the script a number of seconds equals to the the number that we’re appending our script, it takes 6 seconds to finish.
Now we’re going to use a threading version of the script. We’re going to do the same, but we’re going to use one thread for each append (with the same sleep function)
[sourcecode language=”python”]
import time
from queue import Queue
import threading