In the world of data analysis and graphs, we have three important tools: Grafana, PostgreSQL, and Python. They work together to help us look at data and track how it changes over time. In this article, we’ll learn step by step how to use Grafana with a PostgreSQL database. We’ll also discover how to use Python to record data that changes over time. By the end of this article, you’ll know how to set up these tools, and you’ll see how they can be useful for your work with data.
First, we create our table. We also create a sequence for the primary key.
CREATE TABLE MEASUREMENTLOG
(
id numeric(10) NOT NULL,
key character varying(100) NOT NULL,
datetime TIMESTAMP WITHOUT TIME ZONE NOT NULL,
status numeric(2) NOT NULL,
CONSTRAINT MEASUREMENTLOG_pkey PRIMARY KEY (id)
);
create sequence SEQ_MEASUREMENTLOG
minvalue 0
maxvalue 999999999999999999
start with 1
increment by 1
cache 1;
And a simple python script to persists a timeseries.
from random import randint
from time import sleep
from datetime import datetime
import os
import logging
import pytz
from dbutils import transactional, get_conn
from settings import DSN
tz = pytz.timezone('Europe/Madrid')
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level='INFO',
datefmt='%d/%m/%Y %X')
logger = logging.getLogger(__name__)
def persists(key, dt, status):
with transactional(conn=get_conn(DSN)) as db:
seq_log = db.fetch_all("SELECT nextval('seq_measurementlog')")[0][0]
db.insert('measurementlog', dict(
id=seq_log,
key=key,
datetime=dt,
status=status
))
KEY = os.getenv('KEY')
status = 0
while True:
now = datetime.now(tz)
persists(
key=KEY,
dt=now,
status=status
)
logger.info(f"[{now}] status: {status}")
status = 1 if status == 0 else 0
sleep(randint(5, 15))
Now we set up PostgreSQL database and Grafana in a docker-compose.yml.
More information about the configuration of postgresql and grafana here in the links
When we’ve one application we need to monitor the logs in one way or another. Not only the server’s logs (500 errors, response times and things like that). Sometimes the user complains about the application. Without logs we cannot do anything. We can save logs within files and let grep and tail do the magic. This’s assumable with a single on-premise server, but nowadays with clouds and docker this’s a nightmare. We need a central log collector to collect all the logs of the application and use this collector to create alerts, and complex searches of our application logs.
I normally work with AWS. In AWS we’ve CloudWatch. It’s pretty straightforward to connect our application logs to CloudWatch when we’re using AWS. When we aren’t using AWS we can use the ELK stack. In this example we’re going to send our Django application logs to a Elasticsearch database. Let’s start:
The idea is not to send the logs directly. The idea save the logs to log files. We can use this LOGGING configuration to do that:
We’re going to use Docker to build our stack, so our logstash and our django containers will share the logs volumes.
Now we need to visualize the logs. Kibana is perfect for this task. We can set up a Kibana server connected to the Elasticsearch and visualize the logs:
Also we can monitor our server performance. Prometheus is the de facto standard for doing that. In fact it’s very simple to connect our Django application to Prometheus. We only need to add django-prometheus dependency, install the application and set up two middlewares:
INSTALLED_APPS = [
...
'django_prometheus',
...
]
MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware', # <-- this one
'app.middleware.RequestLogMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django_prometheus.middleware.PrometheusAfterMiddleware', # <-- this one
]
also we need to set up some application routes
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('p/', include('django_prometheus.urls')), # <-- prometheus routes
path('', include('app.urls'))
]
The easiest way to visualize the data stored in prometheus is using Grafana. In Grafana we need to create a datasource with Prometheus and build our custom dashboard. We can import pre-built dashboards. For example this one: https://grafana.com/grafana/dashboards/9528
Here the docker-compose file with all the project:
Today I need to integrate a third party service into Grafana. I cannot access directly to the service’s database, so I will integrate via JSON datasource. Grafana allows us to build custom data sources but in this case I don’t need to create a new one. I can use the simple JSON datasource
I’ll use also a basic authentication so we’ll use a simple Http Basic Authentication middleware
<?php
namespace App\Http\Middleware;
use Closure;
use Illuminate\Http\Request;
class AuthMiddleware
{
const NAME = 'auth.web';
public function handle(Request $request, Closure $next)
{
if ($request->getUser() != env('HTTP_USER') || $request->getPassword() != env('HTTP_PASS')) {
$headers = ['WWW-Authenticate' => 'Basic'];
return response('Unauthorized', 401, $headers);
}
return $next($request);
}
}
HelloHandler is a dummy route that the datasource needs to check the connection. We only need to answer with a 200-OK
<?php
namespace App\Http\Handlers;
class HelloHandler
{
public function __invoke()
{
return "Ok";
}
}
SearchHandler will return the list of available metrics that we´ll use within our grafana panels. They aren’t strictly necessary. We can return an empty array and use later one metric that it isn’t defined here (it’s only to fill the combo that grafana shows us)
<?php
namespace App\Http\Handlers;
class SearchHandler
{
public function __invoke()
{
return [25, 50, 100];
}
}
```
QueryHandler is an important one. Here we'll return the datapoints that we´ll show in grafana. For testing purposes I've created one handler that read the metric, and the date from and date to that grafana sends to the backend and return a random values for several metrics and fixed ones to the rest. It's basically to see something in grafana. Later, in the real life project, I'll query the database and return real data.
<?php
namespace App\Http\Handlers;
use Illuminate\Http\Request;
class QueryHandler
{
public function __invoke(Request $request)
{
$json = $request->json();
$range = $json->get('range');
$target = $json->get('targets')[0]['target'];
$tz = new \DateTimeZone('Europe/Madrid');
$from = \DateTimeImmutable::createFromFormat("Y-m-d\TH:i:s.uP", $range['from'], $tz);
$to = \DateTimeImmutable::createFromFormat("Y-m-d\TH:i:s.uP", $range['to'], $tz);
return ['target' => $target, 'datapoints' => $this->getDataPoints($from, $to, $target)];
}
private function getDataPoints($from, $to, $target)
{
$interval = new \DateInterval('PT1H');
$period = new \DatePeriod($from, $interval, $to->add($interval));
$dataPoints = [];
foreach ($period as $date) {
$value = $target > 50 ? rand(0, 100) : $target;
$dataPoints[] = [$value, strtotime($date->format('Y-m-d H:i:sP')) * 1000];
}
return $dataPoints;
}
}
Also I’ll like to use annotations. It’s something similar. AnnotationHandler will handle this request. For this test I’ve created two types of annotations: One each hour and another one each 6 hours
This days I’ve been playing with Nameko. The Python framework for building microservices. Today I want to upgrade one small pet project that I’ve got in my house to monitor the bandwidth of my internet connection. I want to use one nameko microservice using the Timer entrypoint.
Time ago, when I was an ADSL user in my house I had a lot problems with my internet connection. I was a bit lazy to switch to a fiber connection. Finally I changed it, but meanwhile the my Internet company was solving one incident, I started to hack a little bit a simple and dirty script that monitors my connection speed (just for fun and to practise with InfluxDB and Grafana).
Today I’ve lost my quick and dirty script (please Gonzalo keep a working backup the SD card of your Raspberry Pi Server always updated! Sometimes it crashes. It’s simple: “dd if=/dev/disk3 of=pi3.img” 🙂 and I want to rebuild it. This time I want to use Docker (just for fun). Let’s start.
To monitor the bandwidth we only need to use the speedtest-cli api. We can use this api from command line and, as it’s a python library, we can create one python script that uses it.
Now we need to create the docker-compose file to orchestrate the infrastructure. The most complicate thing here is, maybe, to configure grafana within docker files instead of opening browser, create datasoruce and build dashboard by hand. After a couple of hours navigating into github repositories finally I created exactly what I needed for this post. Basically is a custom entry point for my grafana host that creates the datasource and dashboard (via Grafana’s API)
Today I want to play with Grafana. Let me show you my idea:
I’ve got a Beewi temperature sensor. I’ve been playing with it previously. Today I want to show the temperature within a Grafana dashboard.
I want to play also with openweathermap API.
Fist I want to retrieve the temperature from Beewi device. I’ve got a node script that connects via Bluetooth to the device using noble library.
I only need to pass the sensor mac address and I obtain a JSON with the current temperature
#!/usr/bin/env node
noble = require('noble');
var status = false;
var address = process.argv[2];
if (!address) {
console.log('Usage "./reader.py <sensor mac address>"');
process.exit();
}
function hexToInt(hex) {
var num, maxVal;
if (hex.length % 2 !== 0) {
hex = "0" + hex;
}
num = parseInt(hex, 16);
maxVal = Math.pow(2, hex.length / 2 * 8);
if (num > maxVal / 2 - 1) {
num = num - maxVal;
}
return num;
}
noble.on('stateChange', function(state) {
status = (state === 'poweredOn');
});
noble.on('discover', function(peripheral) {
if (peripheral.address == address) {
var data = peripheral.advertisement.manufacturerData.toString('hex');
out = {
temperature: parseFloat(hexToInt(data.substr(10, 2)+data.substr(8, 2))/10).toFixed(1)
};
console.log(JSON.stringify(out))
noble.stopScanning();
process.exit();
}
});
noble.on('scanStop', function() {
noble.stopScanning();
});
setTimeout(function() {
noble.stopScanning();
noble.startScanning();
}, 2000);
setTimeout(function() {
noble.stopScanning();
process.exit()
}, 20000);
And finally another script (this time a Python script) to collect data from openweathermap API, collect data from node script and storing the information in a influxdb database.
I’m running this python script from a Raspberry Pi3 with a Sense Hat. Sense Hat has a atmospheric pressure sensor, so I will also retrieve the pressure from the Sense Hat.
From openweathermap I will obtain:
Current temperature/humidity and atmospheric pressure in the street
UV Index (the measure of the level of UV radiation)
Weather conditions (if it’s raining or not)
Weather forecast
I run this script with the Rasberry Pi crontab each 5 minutes. That means that I’ve got a fancy time series ready to be shown with grafana.
I must admit this post is just an excuse to play with Grafana and InfluxDb. InfluxDB is a cool database especially designed to work with time series. Grafana is one open source tool for time series analytics. I want to build a simple prototype. The idea is:
One Arduino device (esp32) emits a MQTT event to a mosquitto server. I’ll use a potentiometer to emulate one sensor (Imagine here, for example, a temperature sensor instead of potentiometer). I’ve used this circuit before in another projects
One Python script will be listening to the MQTT event in my Raspberry Pi and it will persist the value to InfluxDB database
I will monitor the state of the time series given by the potentiometer with Grafana
I will create one alert in Grafana (for example when the average value within 10 seconds is above a threshold) and I will trigger a webhook when the alert changes its state
One microservice (a Python Flask server) will be listening to the webhook and it will emit a MQTT event depending on the state
Another Arduino device (one NodeMcu in this case) will be listening to this MQTT event and it will activate a LED. Red one if the alert is ON and green one if the alert is OFF
The server
As I said before we’ll need three servers:
MQTT server (mosquitto)
InfluxDB server
Grafana server
We’ll use Docker. I’ve got a Docker host running in a Raspberry Pi3. The Raspberry Pi is a ARM device so we need docker images for this architecture.
ESP32
The Esp32 part is very simple. We only need to connect our potentiometer to the Esp32. The potentiometer has three pins: Gnd, Signal and Vcc. For signal we’ll use the pin 32.
We only need to configure our Wifi network, connect to our MQTT server and emit the potentiometer value within each loop.
The esp32 emits an event (“/pot”) with the value of the potentiometer. So we’re going to create a MQTT listener that listen to MQTT and persits the value to InfluxDB.
Grafana
In grafana we need to do two things. First to create one datasource from our InfluxDB server. It’s pretty straightforward to it.
Finally we’ll create a dashboard. We only have one time-serie with the value of the potentiometer. I must admit that my dasboard has a lot things that I’ve created only for fun.
Thats the query that I’m using to plot the main graph
SELECT
last("value") FROM "pot"
WHERE
time >= now() - 5m
GROUP BY
time($interval) fill(previous)
Here we can see the dashboard
And here my alert configuration:
I’ve also created a notification channel with a webhook. Grafana will use this web hook to notify when the state of alert changes
Webhook listener
Grafana will emit a webhook, so we’ll need an REST endpoint to collect the webhook calls. I normally use PHP/Lumen to create REST servers but in this project I’ll use Python and Flask.
We need to handle HTTP Basic Auth and emmit a MQTT event. MQTT is a very simple protocol but it has one very nice feature that fits like hat fits like a glove here. Le me explain it:
Imagine that we’ve got our system up and running and the state is “ok”. Now we connect one device (for example one big red/green lights). Since the “ok” event was fired before we connect the lights, our green light will not be switch on. We need to wait util “alert” event if we want to see any light. That’s not cool.
MQTT allows us to “retain” messages. That means that we can emit messages with “retain” flag to one topic and when we connect one device later to this topic, it will receive the message. Here it’s exactly what we need.
from flask import Flask
from flask import request
from flask_httpauth import HTTPBasicAuth
import paho.mqtt.client as mqtt
import json
client = mqtt.Client()
app = Flask(__name__)
auth = HTTPBasicAuth()
# http basic auth credentials
users = {
"user": "password"
}
@auth.get_password
def get_pw(username):
if username in users:
return users.get(username)
return None
@app.route('/alert', methods=['POST'])
@auth.login_required
def alert():
client.connect("docker", 1883, 60)
data = json.loads(request.data.decode('utf-8'))
if data['state'] == 'alerting':
client.publish(topic="/alert", payload="1", retain=True)
elif data['state'] == 'ok':
client.publish(topic="/alert", payload="0", retain=True)
client.disconnect()
return "ok"
if __name__ == "__main__":
app.run(host='0.0.0.0')
Nodemcu
Finally the Nodemcu. This part is similar than the esp32 one. Our leds are in pins 4 and 5. We also need to configure the Wifi and connect to to MQTT server. Nodemcu and esp32 are similar devices but not the same. For example we need to use different libraries to connect to the wifi.
This device will be listening to the MQTT event and trigger on led or another depending on the state
#include <PubSubClient.h>
#include <ESP8266WiFi.h>
const int ledRed = 4;
const int ledGreen = 5;
// Wifi configuration
const char* ssid = "my_wifi_ssid";
const char* password = "my_wifi_password";
// mqtt configuration
const char* server = "192.168.1.111";
const char* topic = "/alert";
const char* clientName = "com.gonzalo123.nodemcu";
int value;
int percent;
String payload;
WiFiClient wifiClient;
PubSubClient client(wifiClient);
void wifiConnect() {
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.print("WiFi connected.");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
void mqttReConnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
if (client.connect(clientName)) {
Serial.println("connected");
client.subscribe(topic);
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
String data;
for (int i = 0; i < length; i++) {
data += (char)payload[i];
}
cleanLeds();
int value = data.toInt();
switch (value) {
case 1:
digitalWrite(ledRed, HIGH);
break;
case 0:
digitalWrite(ledGreen, HIGH);
break;
}
Serial.print("] value:");
Serial.println((int) value);
}
void cleanLeds() {
digitalWrite(ledRed, LOW);
digitalWrite(ledGreen, LOW);
}
void setup() {
Serial.begin(9600);
pinMode(ledRed, OUTPUT);
pinMode(ledGreen, OUTPUT);
cleanLeds();
Serial.println("start");
wifiConnect();
client.setServer(server, 1883);
client.setCallback(callback);
delay(1500);
}
void loop() {
Serial.print(".");
if (!client.connected()) {
mqttReConnect();
}
client.loop();
delay(500);
}