Monitoring the bandwidth (part 2) now with Python Nameko microservice

This days I’ve been playing with Nameko. The Python framework for building microservices. Today I want to upgrade one small pet project that I’ve got in my house to monitor the bandwidth of my internet connection. I want to use one nameko microservice using the Timer entrypoint.

That’s the worker:

from nameko.timer import timer
import datetime
import logging
import os
import speedtest
from dotenv import load_dotenv
from influxdb import InfluxDBClient

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))


class SpeedService:
    name = "speed"

    def __init__(self):
        self.influx_client = InfluxDBClient(
            host=os.getenv("INFLUXDB_HOST"),
            port=os.getenv("INFLUXDB_PORT"),
            database=os.getenv("INFLUXDB_DATABASE")
        )

    @timer(interval=3600)
    def speedTest(self):
        logging.info("speedTest")
        current_time = datetime.datetime.utcnow().isoformat()
        speed = self.get_speed()

        self.persists(measurement='download', fields={"value": speed['download']}, time=current_time)
        self.persists(measurement='upload', fields={"value": speed['upload']}, time=current_time)
        self.persists(measurement='ping', fields={"value": speed['ping']}, time=current_time)

    def persists(self, measurement, fields, time):
        logging.info("{} {} {}".format(time, measurement, fields))
        self.influx_client.write_points([{
            "measurement": measurement,
            "time": time,
            "fields": fields
        }])

    @staticmethod
    def get_speed():
        logging.info("Calculating speed ...")
        s = speedtest.Speedtest()
        s.get_best_server()
        s.download()
        s.upload()

        return s.results.dict()

I need to adapt my docker-compose file to include the RabbitMQ server (Nameko needs a RabbitMQ message broker)

version: '3'

services:
  speed.worker:
    container_name: speed.worker
    image: speed/worker
    restart: always
    build:
      context: ./src/speed.worker
      dockerfile: .docker/Dockerfile-worker
    command: /bin/bash run.sh
  rabbit:
    container_name: speed.rabbit
    image: rabbitmq:3-management
    restart: always
    ports:
      - "15672:15672"
      - "5672:5672"
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
  influxdb:
    container_name: speed.influxdb
    image: influxdb:latest
    restart: always
    environment:
    - INFLUXDB_DB=${INFLUXDB_DB}
    - INFLUXDB_ADMIN_USER=${INFLUXDB_ADMIN_USER}
    - INFLUXDB_ADMIN_PASSWORD=${INFLUXDB_ADMIN_PASSWORD}
    - INFLUXDB_HTTP_AUTH_ENABLED=${INFLUXDB_HTTP_AUTH_ENABLED}
    volumes:
    - influxdb-data:/data
  grafana:
    container_name: speed.grafana
    build:
      context: ./src/grafana
      dockerfile: .docker/Dockerfile-grafana
    restart: always
    environment:
    - GF_SECURITY_ADMIN_USER=${GF_SECURITY_ADMIN_USER}
    - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD}
    - GF_USERS_DEFAULT_THEME=${GF_USERS_DEFAULT_THEME}
    - GF_USERS_ALLOW_SIGN_UP=${GF_USERS_ALLOW_SIGN_UP}
    - GF_USERS_ALLOW_ORG_CREATE=${GF_USERS_ALLOW_ORG_CREATE}
    - GF_AUTH_ANONYMOUS_ENABLED=${GF_AUTH_ANONYMOUS_ENABLED}
    ports:
    - "3000:3000"
    depends_on:
    - influxdb
volumes:
  influxdb-data:
    driver: local

And that’s all. Over engineering to control my Internet Connection? Maybe, but that’s the way I learn new stuff 🙂

Source code available in my github.

Advertisement

Playing with microservices, Docker, Python an Nameko

In the last projects that I’ve been involved with I’ve playing, in one way or another, with microservices, queues and things like that. I’m always facing the same tasks: Building RPCs, Workers, API gateways, … Because of that I’ve searching one framework to help me with those kind of stuff. Finally I discover Nameko. Basically Nameko is the Python tool that I’ve been looking for. In this post I will create a simple proof of concept to learn how to integrate Nameko within my projects. Let start.

The POC is a simple API gateway that gives me the localtime in iso format. I can create a simple Python script to do it

import datetime
import time

print(datetime.datetime.fromtimestamp(time()).isoformat())

We also can create a simple Flask API server to consume this information. The idea is create a rpc worker to generate this information and also generate another worker to send the localtime, but taken from a PostgreSQL database (yes I know it not very useful but it’s just an excuse to use a PG database in the microservice)

We’re going to create two rpc workers. One giving the local time:

from nameko.rpc import rpc
from time import time
import datetime


class TimeService:
    name = "local_time_service"

    @rpc
    def local(self):
        return datetime.datetime.fromtimestamp(time()).isoformat()

And another one with the date from PostgreSQL:

from nameko.rpc import rpc
from dotenv import load_dotenv
import os
from ext.pg import PgService

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))


class TimeService:
    name = "db_time_service"
    conn = PgService(os.getenv('DSN'))

    @rpc
    def db(self):
        with self.conn:
            with self.conn.cursor() as cur:
                cur.execute("select localtimestamp")
                timestamp = cur.fetchone()
        return timestamp[0]

I’ve created a service called PgService only to learn how to create dependency providers in nameko

from nameko.extensions import DependencyProvider
import psycopg2


class PgService(DependencyProvider):

    def __init__(self, dsn):
        self.dsn = dsn

    def get_dependency(self, worker_ctx):
        return psycopg2.connect(self.dsn)

Now we only need to setup the api gateway. With Nameko we can create http entrypoint also (in the same way than we create rpc) but I want to use it with Flask

from flask import Flask
from nameko.standalone.rpc import ServiceRpcProxy
from dotenv import load_dotenv
import os

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

app = Flask(__name__)


def rpc_proxy(service):
    config = {'AMQP_URI': os.getenv('AMQP_URI')}
    return ServiceRpcProxy(service, config)


@app.route('/')
def hello():
    return "Hello"


@app.route('/local')
def local_time():
    with rpc_proxy('local_time_service') as rpc:
        time = rpc.local()

    return time


@app.route('/db')
def db_time():
    with rpc_proxy('db_time_service') as rpc:
        time = rpc.db()

    return time


if __name__ == '__main__':
    app.run()

As well as I wanna run my POC with docker, here the docker-compose file to set up the project

version: '3.4'

services:
  api:
    image: nameko/api
    container_name: nameko.api
    hostname: api
    ports:
    - "8080:8080"
    restart: always
    links:
    - rabbit
    - db.worker
    - local.worker
    environment:
    - ENV=1
    - FLASK_APP=app.py
    - FLASK_DEBUG=1
    build:
      context: ./api
      dockerfile: .docker/Dockerfile-api
    #volumes:
    #- ./api:/usr/src/app:ro
    command: flask run --host=0.0.0.0 --port 8080
  db.worker:
    container_name: nameko.db.worker
    image: nameko/db.worker
    restart: always
    build:
      context: ./workers/db.worker
      dockerfile: .docker/Dockerfile-worker
    command: /bin/bash run.sh
  local.worker:
    container_name:  nameko.local.worker
    image: nameko/local.worker
    restart: always
    build:
      context: ./workers/local.worker
      dockerfile: .docker/Dockerfile-worker
    command: /bin/bash run.sh
  rabbit:
    container_name: nameko.rabbit
    image: rabbitmq:3-management
    restart: always
    ports:
    - "15672:15672"
    - "5672:5672"
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
  pg:
    container_name: nameko.pg
    image: nameko/pg
    restart: always
    build:
      context: ./pg
      dockerfile: .docker/Dockerfile-pg
    #ports:
    #- "5432:5432"
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

And that’s all. Two nameko rpc services working together behind a api gateway

Code available in my github

Microservice container with Guzzle

This days I’m reading about Microservices. The idea is great. Instead of building a monolithic script using one language/framowork. We create isolated services and we build our application using those services (speaking HTTP between services and application).

That’s means we’ll have several microservices and we need to use them, and maybe sometimes change one service with another one. In this post I want to build one small container to handle those microservices. Similar idea than Dependency Injection Containers.

As we’re going to speak HTTP, we need a HTTP client. We can build one using curl, but in PHP world we have Guzzle, a great HTTP client library. In fact Guzzle has something similar than the idea of this post: Guzzle services, but I want something more siple.

Imagine we have different services:
One Silex service (PHP + Silex)

use Silex\Application;

$app = new Application();

$app->get('/hello/{username}', function($username) {
    return "Hello {$username} from silex service";
});

$app->run();

Another PHP service. This one using Slim framework

use Slim\Slim;

$app = new Slim();

$app->get('/hello/:username', function ($username) {
    echo "Hello {$username} from slim service";
});

$app->run();

And finally one Python service using Flask framework

from flask import Flask, jsonify
app = Flask(__name__)

@app.route('/hello/<username>')
def show_user_profile(username):
    return "Hello %s from flask service" % username

if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=5000)

Now, with our simple container we can use one service or another

use Symfony\Component\Config\FileLocator;
use MSIC\Loader\YamlFileLoader;
use MSIC\Container;

$container = new Container();

$ymlLoader = new YamlFileLoader($container, new FileLocator(__DIR__));
$ymlLoader->load('container.yml');

echo $container->getService('flaskServer')->get('/hello/Gonzalo')->getBody() . "\n";
echo $container->getService('silexServer')->get('/hello/Gonzalo')->getBody() . "\n";
echo $container->getService('slimServer')->get('/hello/Gonzalo')->getBody() . "\n";

And that’s all. You can see the project in my github account.