Using a nginx reverse proxy to serve docker swarm replicas

Sometimes we need to serve backend servers behind a nginx reverse proxy. For example when we want to serve a Djnago or a Flask application. In this example I want to show how easy is doing that with nginx.

We’re going to start with a dummy Flask application.

from flask import Flask
from datetime import datetime

app = Flask(__name__)

@app.get("/")
def home():
    now = datetime.now()
    return f'Hello {now}'

The idea is use a nginx reverse proxy to serve the application. We can configure nginx to do that like this:

upstream loadbalancer {
    server backend:5000;
}

server {
    server_tokens off;
    client_max_body_size 20M;
    location / {
        proxy_pass http://loadbalancer;
    }
}

And finally we can create our docker-compose.yml file. We only need to set up the replicas and the reverse proxy will do the magic.

version: '3.6'

services:
  nginx:
    image: nginx:production
    ports:
      - "8080:80"
  backend:
    image: flask:production
    deploy:
      replicas: 3
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000

As we can see we have 3 replicas behind a nginx reverse proxy. Maybe it’s enough for us, but maybe we need to distinguish between the replicas, for example in the logging.

(venv) ➜  docker stack services loadbalancer
ID             NAME                    MODE         REPLICAS   IMAGE              PORTS
u5snhg9tysr0   loadbalancer_backend    replicated   3/3        flask:production
4w0bf8msdiq6   loadbalancer_nginx      replicated   1/1        nginx:production   *:8080->80/tcp 

I’ve changed a little bit our Flask application.

import logging
from datetime import datetime
import socket
import os
from logging.handlers import TimedRotatingFileHandler

from flask import Flask

handlers = [
    logging.StreamHandler()
]
if os.getenv('ENVIRONMENT') == 'production':
    slot = os.getenv('SLOT')
    log_path = f"./logs/log{os.getenv('SLOT')}.log"

    file_handler = TimedRotatingFileHandler(log_path, backupCount=2)
    file_handler.setLevel(logging.INFO)
    handlers.append(file_handler)

logging.basicConfig(
    format=f'%(asctime)s ({socket.gethostname()}) [%(levelname)s] %(message)s',
    level='INFO',
    handlers=handlers,
    datefmt='%d/%m/%Y %X'),

logger = logging.getLogger(__name__)

app = Flask(__name__)


@app.get("/")
def home():
    now = datetime.now()
    logger.info(f"home {now}")
    return f'Hello {now} from {socket.gethostname()}. Slot: {os.getenv("SLOT")}'

And of course our docker-compose.yml file.

version: '3.6'

services:
  nginx:
    image: nginx:production
    ports:
      - "8080:80"
  backend:
    image: flask:production
    hostname: "backend.{{.Task.Slot}}"
    environment:
      SLOT: "{{.Task.Slot}}"
      ENVIRONMENT: production
    volumes:
      - log:/src/logs
    deploy:
      replicas: 3
    command: gunicorn -c gunicorn.conf.py -w 1 app:app -b 0.0.0.0:5000
volumes:
  log:
    name: 'log-{{.Task.Slot}}'

Now we’ve changed the hostname of the backend service using the slot number (instead of the default hostname). We also pass a SLOT environment variable to the backend service to distinguish between the replicas, if wee need to do that. Maybe you’re asking yourself, why the hell we need to do that? The answer ins simple: Working with legacy code is hard and sometimes we need to do very stranger things.

Source code of the example in my github

Advertisement

Listen to PostgreSQL events with pg_notify and Python

With PostgreSQL we can easily publish and listen events from one connection to another. It’s cool because those notifications belong on a transaction. In this example I’m going to create a wrapper to help me to listen the events with Python.

To notify events I only need to use pg_notify function. For example:

select pg_notify('channel', 'xxx')

To listen the events

import psycopg2

from pg_listener import on_db_event

dsn = f"dbname='gonzalo123' user='username' host='localhost' password='password'"

conn = psycopg2.connect(dsn)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

for payload in on_db_event(conn, 'channel'):
    print(payload)

The magic resides in on_db_event. We need to pass a psycopg2 connection, and the channel name. We can iterate over the function and retrieve the payload when someone triggers the event on that channel

def on_db_event(conn: connection, channel: str):
    with conn:
        with conn.cursor() as cursor:
            cursor.execute(f"LISTEN {channel};")
            logger.info(f"Waiting for notifications on channel '{channel}'.")

            while True:
                if select.select([conn], [], [], 5) != ([], [], []):
                    conn.poll()
                    while conn.notifies:
                        notify = conn.notifies.pop(0)
                        yield notify.payload

As I often use Django and Django uses one connection wrapper I need to create a native psycopg2 connection. Maybe it’s possible to retrieve it from Django connection (show me if you know how to do it).

def conn_from_django(django_connection):
    db_settings = django_connection.settings_dict
    dsn = f"dbname='{db_settings['NAME']}' " \
          f"user='{db_settings['USER']}' " \
          f"host='{db_settings['HOST']}' " \
          f"password='{db_settings['PASSWORD']}'"

    conn = psycopg2.connect(dsn)
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

    return conn

You can install the library using pip

pip install pglistener-gonzalo123

Source code available in my github

Playing with gRPC and Python

When we work with microservices normally we need to have, in one way or another, something to communicate between them. Basically we have two choices: Synchronous (APIs) and asynchronous communications (message queues). REST APIs are a pretty straightforward way to create a communication channel. We’ve a lot of frameworks and microframeworks to create REST APIs. For example, in Python, we can use Flask. REST is simple, and it can fit in a lot of cases but sometimes is not enough. REST API is a HTTP service and HTTP is a protocol built over TCP. When we create a REST connection we’re opening a TCP connection to the server, we send the request payload, we receive the response, and we close the connection. If we need to perform a lot of connections maybe we can face a bottleneck. Also we have the payload. We need to define how we’re going to encode the information. We normally use JSON (we also can use XML). It’s easy to encode/decode JSON in almost all languages but JSON is plain text. Big payloads over TCP connection means slow response time.

To solve this situation we’ve another tool in our toolbox. This tool is gRPC. With gRPC we create a persistent connection between client and server (instead of open and close connection like REST) and also we use a binary payload to reduce the size improving the performance.

First we need to define the protocol we’re going to use. It’s something that we don’t need to do in with HTTP APIs (we use JSON and we forget the rest). It’s an extra step. Not complicated, but an extra. We need to define the types of our service and variables using a proto file.

// api.proto
syntax = "proto3";
package api;

service Api {
  rpc sayHello (HelloRequest) returns (Hello) {}
  rpc getAll (ApiRequest) returns (api.Items) {}
  rpc getStream (ApiRequest) returns (stream api.Item) {}
}

message ApiRequest {
  int32 length = 1;
}

message Items {
  repeated api.Item items = 1;
}

message Item {
  int32 id = 1;
  string name = 2;
}

message HelloRequest {
  string name = 1;
}

message Hello {
  string message = 1;
}

With our proto file (language agnostic) we can create a the wrapper of our service using our programming language. In my case python:

python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/api.proto

Of course we can create clients using one language and servers using another. Both using the same proto file.

It creates two files. We don’t need to open those files. We’ll import those files to create our client and server. We can use those files directly but I preffer to use an extra wrapper. Without reinventing the wheel only to make me easy to use the client/and server.

import grpc

from api_pb2 import Items, Item, Hello, HelloRequest, ApiRequest
from api_pb2_grpc import ApiServicer, ApiStub


class ApiServer(ApiServicer):
    def getAll(self, request, context):
        data = []
        for i in range(1, request.length + 1):
            data.append(Item(id=i, name=f'name {i}'))
        return Items(items=data)

    def getStream(self, request, context):
        for i in range(1, request.length + 1):
            yield Item(id=i, name=f'name {i}')

    def sayHello(self, request, context):
        return Hello(message=f'Hello {request.name}!')


class ApiClient:
    def __init__(self, target):
        channel = grpc.insecure_channel(target)
        self.client = ApiStub(channel)

    def sayHello(self, name):
        response = self.client.sayHello(HelloRequest(name=name))
        return response.message

    def getAll(self, length):
        response = self.client.getAll(ApiRequest(length=length))
        return response.items

    def getStream(self, length):
        response = self.client.getStream(ApiRequest(length=length))
        return response

Now I can create a server.

import logging
from concurrent import futures

import grpc

import settings
from api import ApiServer
from api_pb2_grpc import add_ApiServicer_to_server


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    add_ApiServicer_to_server(ApiServer(), server)
    server.add_insecure_port(f'[::]:{settings.BACKEND_PORT}')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

And also a client. In my example I’m going to use a flask frontend that consumes the gRPC server

from flask import Flask, render_template

import settings
from api import ApiClient

app = Flask(__name__)

app.config["api"] = ApiClient(f"{settings.BACKEND_HOST}:{settings.BACKEND_PORT}")


@app.route("/")
def home():
    api = app.config["api"]
    return render_template(
        "index.html",
        name=api.sayHello("Gonzalo"),
        items=api.getAll(length=10),
        items2=api.getStream(length=5)
    )

We can deploy the example in a docker server. Here the docker-compose.yml

version: '3.6'

services:
  frontend:
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      BACKEND_HOST: backend
    ports:
      - 5000:5000
    command: gunicorn -w 4 app:app -b 0.0.0.0:5000
  backend:
    build:
      context: .
      dockerfile: Dockerfile
    command: python server.py

Source code available in my github

Deploying Python Application using Docker and Kubernetes

I’ve learning how to deploy one Python application to Kubernetes. Here you can see my notes:

Let’s start from a dummy Python application. It’s a basic Flask web API. Each time we perform a GET request to “/” we increase one counter and see the number of hits. The persistence layer will be a Redis database. The script is very simple:

from flask import Flask
import os
from redis import Redis

redis = Redis(host=os.getenv('REDIS_HOST', 'localhost'),
              port=os.getenv('REDIS_PORT', 6379))
app = Flask(__name__)

@app.route('/')
def hello():
    redis.incr('hits')
    hits = int(redis.get('hits'))
    return f"Hits: {hits}"


if __name__ == "__main__":
    app.run(host='0.0.0.0')

First of all we create a virtual environment to ensure that we’re going to install your dependencies isolatedly:

python -m venv venv

We enter in the virtualenv

source venv/bin/activate

And we install our dependencies:

pip install -r requirements.txt

To be able to run our application we must ensure that we’ve a Redis database ready. We can run one with Docker:

docker run -p 6379:6379 redis

Now we can start our application:

python app.py

We open our browser with the url: http://localhost:5000 and it works.

Now we’re going to run our application within a Docker container. First of of all we need to create one Docker image from a docker file:

FROM python:alpine3.8
ADD . /code
WORKDIR /code
RUN pip install -r requirements.txt

EXPOSE 5000

Now we can build or image:

docker build -t front .

And now we can run our front image:

docker run -p 5000:5000 front python app.py

If we open now our browser with the url http://localhost:5000 we’ll get a 500 error. That’s because our Docker container is trying to use one Redis host within localhost. It worked before, when our application and our Redis were within the same host. Now our API’s localhost isn’t the same than our host’s one.

Our script the Redis host is localhost by default but it can be passed from an environment variable,

redis = Redis(host=os.getenv('REDIS_HOST', 'localhost'),
              port=os.getenv('REDIS_PORT', 6379))

we can pass to our our Docker container the real host where our Redis resides (suposing my IP address is 192.168.1.100):

docker run -p 5000:5000 --env REDIS_HOST=192.168.1.100 front python app.py

If dont’ want the development server we also can start our API using gunicorn

docker run -p 5000:5000 --env REDIS_HOST=192.168.1.100 front gunicorn -w 1 app:app -b 0.0.0.0:5000

And that works. We can start our app manually using Docker. But it's a bit complicated. We need to run two containers (API and Redis), setting up the env variables.
Docker helps us with docker-compose. We can create a docker-compose.yaml file configuring our all application:


version: '2'

services:
  front:
    image: front
    build:
      context: ./front
      dockerfile: Dockerfile
    container_name: front
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000
    depends_on:
      - redis
    ports:
      - "5000:5000"
    restart: always
    environment:
      REDIS_HOST: redis
      REDIS_PORT: 6379
  redis:
    image: redis
    ports:
      - "6379:6379"

I can execute it

docker-compose up

Docker compose is pretty straightforward. But, what happens if our production environment is a cluster? docker-compose works fine in a single host. But it our production environment is a cluster, we´ll face problems (we need to esure manually things like hight avaiavility and things like that). Docker people tried to answer to this question with Docker Swarm. Basically Swarm is docker-compose within a cluster. It uses almost the same syntax than docker-compose in a single host. Looks good, ins’t it? OK. Nobody uses it. Since Google created their Docker conainer orchestator (Kubernetes, aka K8s) it becames into the de-facto standard. The good thing about K8s is that it’s much better than Swarm (more configurable and more powerfull), but the bad part is that it isn’t as simple and easy to understand as docker-compose.

Let’s try to execute our proyect in K8s:

First I start minikube

minikube start

and I configure kubectl to connect to my minikube k8s cluster

eval $(minikube docker-env)

The API:

First we create one service:

apiVersion: v1
kind: Service
metadata:
  name: front-api
spec:
  # types:
  # - ClusterIP: (default) only accessible from within the Kubernetes cluster
  # - NodePort: accessible on a static port on each Node in the cluster
  # - LoadBalancer: accessible externally through a cloud provider's load balancer
  type: LoadBalancer
  # When the node receives a request on the static port (30163)
  # "select pods with the label 'app' set to 'front-api'"
  # and forward the request to one of them
  selector:
    app: front-api
  ports:
    - protocol: TCP
      port: 5000 # port exposed internally in the cluster
      targetPort: 5000 # the container port to send requests to
      nodePort: 30164 # a static port assigned on each the node

And one deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: front-api
spec:
  # How many copies of each pod do we want?
  replicas: 1

  selector:
    matchLabels:
      # This must match the labels we set on the pod!
      app: front-api

  # This template field is a regular pod configuration
  # nested inside the deployment spec
  template:
    metadata:
      # Set labels on the pod.
      # This is used in the deployment selector.
      labels:
        app: front-api
    spec:
      containers:
        - name: front-api
          image: front:v1
          args: ["gunicorn", "-w 1", "app:app", "-b 0.0.0.0:5000"]
          ports:
            - containerPort: 5000
          env:
            - name: REDIS_HOST
              valueFrom:
                configMapKeyRef:
                  name: api-config
                  key: redis.host

In order to learn a little bit of K8s I’m using a config map called ‘api-config’ where I put some information (such as the Redis host that I’m going to pass as a env variable)

apiVersion: v1
kind: ConfigMap
metadata:
  name: api-config
data:
  redis.host: "back-api"

The Backend: Our Redis database:

First the service:

apiVersion: v1
kind: Service
metadata:
  name: back-api
spec:
  type: ClusterIP
  ports:
    - port: 6379
      targetPort: 6379
  selector:
    app: back-api

And finally the deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: back-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: back-api
  template:
    metadata:
      labels:
        app: back-api
    spec:
      containers:
        - name: back-api
          image: redis
          ports:
            - containerPort: 6379
              name: redis

Before deploying my application to the cluster I need to build my docker image and tag it

docker build -t front .
docker tag front front:v1

Now I can deploy my application to my K8s cluster:

kubectl apply -f .k8s/

If want to know what’s the external url of my application in the cluster I can use this command

minikube service front-api --url

Then I can see it running using the browser or with curl

curl $(minikube service front-api --url)

And that’s all. I can delete all application alos

kubectl delete -f .k8s/ 

Source code available in my github

Playing with threads and Python. Part 2

Today I want to keep on playing with python and threads (part1 here). The idea is create one simple script that prints one asterisk in the console each second. Simple, isn’t it?

from time import sleep
while True:
    print("*")
    sleep(1)

I want to keep this script running but I want to send one message externally, for example using RabbitMQ, and do something within the running script. In this demo, for example, stop the script.

In javascript we can do it with a single tread process using the setInterval function but, since the rabbit listener with pika is a blocking action, we need to use threads in Python (please tell me if I’m wrong). The idea is to create a circuit breaker condition in the main loop to check if I need to stop or not the main thread.

First I’ve created my Rabbit listener in a thread:

from queue import Queue, Empty
import threading
import pika
import os


class Listener(threading.Thread):
    def __init__(self, queue=Queue()):
        super(Listener, self).__init__()
        self.queue = queue
        self.daemon = True

    def run(self):
        channel = self._get_channel()
        channel.queue_declare(queue='stop')

        channel.basic_consume(
            queue='stop',
            on_message_callback=lambda ch, method, properties, body: self.queue.put(item=True),
            auto_ack=True)

        channel.start_consuming()

    def stop(self):
        try:
            return True if self.queue.get(timeout=0.05) is True else False
        except Empty:
            return False

    def _get_channel(self):
        credentials = pika.PlainCredentials(
            username=os.getenv('RABBITMQ_USER'),
            password=os.getenv('RABBITMQ_PASS'))

        parameters = pika.ConnectionParameters(
            host=os.getenv('RABBITMQ_HOST'),
            credentials=credentials)

        connection = pika.BlockingConnection(parameters=parameters)

        return connection.channel()

Now in the main process I start the Listener and I enter in one endless loop to print my asterisk each second but at the end of each loop I check if I need to stop the process or not

from Listener import Listener
from dotenv import load_dotenv
import logging
from time import sleep
import os

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

l = Listener()
l.start()


def main():
    while True:
        logging.info("*")
        sleep(1)
        if l.stop():
            break


if __name__ == '__main__':
    main()

As we can see int the stop function we’re using the queue.Queue package to communicate with our listener loop.

And that’s all. In the example I also provide a minimal RabbitMQ server in a docker container.

version: '3.4'

services:
  rabbit:
    image: rabbitmq:3-management
    restart: always
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
    ports:
      - "15672:15672"
      - "5672:5672"

Source code available in my github

Playing with threads and Python

Today I want to play a little bit with Python and threads. This kind of post are one kind of cheat sheet that I like to publish, basically to remember how do do things.

I don’t like to use threads. They are very powerful but, as uncle Ben said: Great power comes great responsibility. I prefer decouple process with queues instead using threads, but sometimes I need to use them. Let’s start

First I will build a simple script without threads. This script will append three numbers (1, 2 and 3) to a list and it will sum them. The function that append numbers to the list sleeps a number of seconds equals to the number that we’re appending.

import time

start_time = time.time()

total = []


def sleeper(seconds):
    time.sleep(seconds)
    total.append(seconds)


for s in (1, 2, 3):
    sleeper(s)

end_time = time.time()

total_time = end_time - start_time

print("Total time: {:.3} seconds. Sum: {}".format(total_time, sum(total)))

The the outcome of the script is obviously 6 (1 + 2 + 3) and as we’re sleeping the script a number of seconds equals to the the number that we’re appending our script, it takes 6 seconds to finish.

Now we’re going to use a threading version of the script. We’re going to do the same, but we’re going to use one thread for each append (with the same sleep function)

import time
from queue import Queue
import threading

start_time = time.time()

queue = Queue()


def sleeper(seconds):
    time.sleep(seconds)
    queue.put(seconds)


threads = []
for s in (1, 2, 3):
    t = threading.Thread(target=sleeper, args=(s,))
    threads.append(t)
    t.start()

for one_thread in threads:
    one_thread.join()

total = 0
while not queue.empty():
    total = total + queue.get()

end_time = time.time()

total_time = end_time - start_time
print("Total time: {:.3} seconds. Sum: {}".format(total_time, total))

The outcome of our script is 6 again, but now it takes 3 seconds (the highest sleep).

Source code in my github

Playing with TOTP (2FA) and mobile applications with ionic

Today I want to play with Two Factor Authentication. When we speak about 2FA, TOTP come to our mind. There’re a lot of TOTP clients, for example Google Authenticator.

My idea with this prototype is to build one Mobile application (with ionic) and validate one totp token in a server (in this case a Python/Flask application). The token will be generated with a standard TOTP client. Let’s start

The sever will be a simple Flask server to handle routes. One route (GET /) will generate one QR code to allow us to configure or TOTP client. I’m using the library pyotp to handle totp operations.

from flask import Flask, jsonify, abort, render_template, request
import os
from dotenv import load_dotenv
from functools import wraps
import pyotp
from flask_qrcode import QRcode

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

totp = pyotp.TOTP(os.getenv('TOTP_BASE32_SECRET'))

app = Flask(__name__)
QRcode(app)


def verify(key):
    return totp.verify(key)


def authorize(f):
    @wraps(f)
    def decorated_function(*args, **kws):
        if not 'Authorization' in request.headers:
            abort(401)

        data = request.headers['Authorization']
        token = str.replace(str(data), 'Bearer ', '')

        if token != os.getenv('BEARER'):
            abort(401)

        return f(*args, **kws)

    return decorated_function


@app.route('/')
def index():
    return render_template('index.html', totp=pyotp.totp.TOTP(os.getenv('TOTP_BASE32_SECRET')).provisioning_uri("gonzalo123.com", issuer_name="TOTP Example"))


@app.route('/check/<key>', methods=['GET'])
@authorize
def alert(key):
    status = verify(key)
    return jsonify({'status': status})


if __name__ == "__main__":
    app.run(host='0.0.0.0')

I’ll use an standard TOTP client to generate the tokens but with pyotp we can easily create a client also

import pyotp
import time
import os
from dotenv import load_dotenv
import logging

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

totp = pyotp.TOTP(os.getenv('TOTP_BASE32_SECRET'))

mem = None
while True:
    now = totp.now()
    if mem != now:
        logging.info(now)
        mem = now
        time.sleep(1)

And finally the mobile application. It’s a simple ionic application. That’s the view:

<ion-header>
  <ion-toolbar>
    <ion-title>
      TOTP Validation demo
    </ion-title>
  </ion-toolbar>
</ion-header>

<ion-content>
  <div class="ion-padding">
    <ion-item>
      <ion-label position="stacked">totp</ion-label>
      <ion-input placeholder="Enter value" [(ngModel)]="totp"></ion-input>
    </ion-item>
    <ion-button fill="solid" color="secondary" (click)="validate()" [disabled]="!totp">
      Validate
      <ion-icon slot="end" name="help-circle-outline"></ion-icon>
    </ion-button>
  </div>
</ion-content>

The controller:

import { Component } from '@angular/core'
import { ApiService } from '../sercices/api.service'
import { ToastController } from '@ionic/angular'

@Component({
  selector: 'app-home',
  templateUrl: 'home.page.html',
  styleUrls: ['home.page.scss']
})
export class HomePage {
  public totp

  constructor (private api: ApiService, public toastController: ToastController) {}

  validate () {
    this.api.get('/check/' + this.totp).then(data => this.alert(data.status))
  }

  async alert (status) {
    const toast = await this.toastController.create({
      message: status ? 'OK' : 'Not valid code',
      duration: 2000,
      color: status ? 'primary' : 'danger',
    })
    toast.present()
  }
}

I’ve also put a simple security system. In a real life application we’ll need something better, but here I’ve got a Auth Bearer harcoded and I send it en every http request. To do it I’ve created a simple api service

import { Injectable } from '@angular/core'
import { isDevMode } from '@angular/core'
import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http'
import { CONF } from './conf'

@Injectable({
  providedIn: 'root'
})
export class ApiService {

  private isDev: boolean = isDevMode()
  private apiUrl: string

  constructor (private http: HttpClient) {
    this.apiUrl = this.isDev ? CONF.API_DEV : CONF.API_PROD
  }

  public get (uri: string, params?: Object): Promise<any> {
    return new Promise((resolve, reject) => {
      this.http.get(this.apiUrl + uri, {
        headers: ApiService.getHeaders(),
        params: ApiService.getParams(params)
      }).subscribe(
        res => {this.handleHttpNext(res), resolve(res)},
        err => {this.handleHttpError(err), reject(err)},
        () => this.handleHttpComplete()
      )
    })
  }

  private static getHeaders (): HttpHeaders {

    const headers = {
      'Content-Type': 'application/json'
    }

    headers['Authorization'] = 'Bearer ' + CONF.bearer

    return new HttpHeaders(headers)
  }

  private static getParams (params?: Object): HttpParams {
    let Params = new HttpParams()
    for (const key in params) {
      if (params.hasOwnProperty(key)) {
        Params = Params.set(key, params[key])
      }
    }

    return Params
  }

  private handleHttpError (err) {
    console.log('HTTP Error', err)
  }

  private handleHttpNext (res) {
    console.log('HTTP response', res)
  }

  private handleHttpComplete () {
    console.log('HTTP request completed.')
  }
}

And that’s all. Here one video with a working example of the prototype:

Source code here

Playing with lambda, serverless and Python

Couple of weeks ago I attended to serverless course. I’ve played with lambdas from time to time (basically when AWS forced me to use them) but without knowing exactly what I was doing. After this course I know how to work with the serverless framework and I understand better lambda world. Today I want to hack a little bit and create a simple Python service to obtain random numbers. Let’s start

We don’t need Flask to create lambdas but as I’m very comfortable with it so we’ll use it here.
Basically I follow the steps that I’ve read here.

from flask import Flask

app = Flask(__name__)


@app.route("/", methods=["GET"])
def hello():
    return "Hello from lambda"


if __name__ == '__main__':
    app.run()

And serverless yaml to configure the service

service: random

plugins:
  - serverless-wsgi
  - serverless-python-requirements
  - serverless-pseudo-parameters

custom:
  defaultRegion: eu-central-1
  defaultStage: dev
  wsgi:
    app: app.app
    packRequirements: false
  pythonRequirements:
    dockerizePip: non-linux

provider:
  name: aws
  runtime: python3.7
  region: ${opt:region, self:custom.defaultRegion}
  stage: ${opt:stage, self:custom.defaultStage}

functions:
  home:
    handler: wsgi_handler.handler
    events:
      - http: GET /

We’re going to use serverless plugins. We need to install them:

npx serverless plugin install -n serverless-wsgi
npx serverless plugin install -n serverless-python-requirements
npx serverless plugin install -n serverless-pseudo-parameters

And that’s all. Our “Hello world” lambda service with Python and Flask is up and running.
Now We’re going to create a “more complex” service. We’re going to return a random number with random.randint function.
randint requires two parameters: start, end. We’re going to pass the end parameter to our service. The start value will be parameterized. I’ll parameterize it only because I want to play with AWS’s Parameter Store (SSM). It’s just an excuse.

Let’s start with the service:

from random import randint
from flask import Flask, jsonify
import boto3
from ssm_parameter_store import SSMParameterStore

import os
from dotenv import load_dotenv

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

app = Flask(__name__)

app.config.update(
    STORE=SSMParameterStore(
        prefix="{}/{}".format(os.environ.get('ssm_prefix'), os.environ.get('stage')),
        ssm_client=boto3.client('ssm', region_name=os.environ.get('region')),
        ttl=int(os.environ.get('ssm_ttl'))
    )
)


@app.route("/", methods=["GET"])
def hello():
    return "Hello from lambda"


@app.route("/random/<int:to_int>", methods=["GET"])
def get_random_quote(to_int):
    from_int = app.config['STORE']['from_int']
    return jsonify(randint(from_int, to_int))


if __name__ == '__main__':
    app.run()

Now the serverless configuration. I can use only one function, handling all routes and letting Flask do the job.

functions:
  app:
    handler: wsgi_handler.handler
    events:
      - http: ANY /
      - http: 'ANY {proxy+}'

But in this example I want to create two different functions. Only for fun (and to use different role statements and different logs in cloudwatch).

service: random

plugins:
  - serverless-wsgi
  - serverless-python-requirements
  - serverless-pseudo-parameters
  - serverless-iam-roles-per-function

custom:
  defaultRegion: eu-central-1
  defaultStage: dev
  wsgi:
    app: app.app
    packRequirements: false
  pythonRequirements:
    dockerizePip: non-linux

provider:
  name: aws
  runtime: python3.7
  region: ${opt:region, self:custom.defaultRegion}
  stage: ${opt:stage, self:custom.defaultStage}
  memorySize: 128
  environment:
    region: ${self:provider.region}
    stage: ${self:provider.stage}

functions:
  app:
    handler: wsgi_handler.handler
    events:
      - http: ANY /
      - http: 'ANY {proxy+}'
    iamRoleStatements:
      - Effect: Allow
        Action: ssm:DescribeParameters
        Resource: arn:aws:ssm:${self:provider.region}:#{AWS::AccountId}:*
      - Effect: Allow
        Action: ssm:GetParameter
        Resource: arn:aws:ssm:${self:provider.region}:#{AWS::AccountId}:parameter/random/*
  home:
    handler: wsgi_handler.handler
    events:
      - http: GET /

And that’s all. “npx serverless deploy” and my random generator is running.

Data Analysis with Python. Pivot tables with Pandas

One of the first post in my blog was about Pivot tables. I’d created a library to pivot tables in my PHP scripts. The library is not very beautiful (it throws a lot of warnings), but it works. These days I’m playing with Python Data Analysis and I’m using Pandas. The purpose of this post is something that I like a lot: Learn by doing. So I want to do the same operations that I did eight years ago in the post but now with Pandas. Let’s start.

I’ll start with the same datasource that I used almost ten years ago. One simple recordset with cliks and number of users

I create a dataframe with this data

import numpy as np
import pandas as pd

data = pd.DataFrame([
    {'host': 1, 'country': 'fr', 'year': 2010, 'month': 1, 'clicks': 123, 'users': 4},
    {'host': 1, 'country': 'fr', 'year': 2010, 'month': 2, 'clicks': 134, 'users': 5},
    {'host': 1, 'country': 'fr', 'year': 2010, 'month': 3, 'clicks': 341, 'users': 2},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 1, 'clicks': 113, 'users': 4},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 2, 'clicks': 234, 'users': 5},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 3, 'clicks': 421, 'users': 2},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 4, 'clicks': 22, 'users': 3},
    {'host': 2, 'country': 'es', 'year': 2010, 'month': 1, 'clicks': 111, 'users': 2},
    {'host': 2, 'country': 'es', 'year': 2010, 'month': 2, 'clicks': 2, 'users': 4},
    {'host': 3, 'country': 'es', 'year': 2010, 'month': 3, 'clicks': 34, 'users': 2},
    {'host': 3, 'country': 'es', 'year': 2010, 'month': 4, 'clicks': 1, 'users': 1}
])

Pivot_tables

Now we want to do a simple pivot operation. We want to pivot on host

pd.pivot_table(data,
   index=['host'],
   values=['users', 'clicks'],
   columns=['year', 'month'],
   fill_value=''
  )

Pivot_tables_2

We can add totals

pd.pivot_table(data,
               index=['host'],
               values=['users', 'clicks'],
               columns=['year', 'month'],
               fill_value='',
               aggfunc=np.sum,
               margins=True,
               margins_name='Total'
              )

Pivot_tables_3

We can also pivot on more than one column. For example host and country

pd.pivot_table(data,
               index=['host', 'country'],
               values=['users', 'clicks'],
               columns=['year', 'month'],
               fill_value=''
              )

Pivot_tables_4

and also with totals

pd.pivot_table(data,
               index=['host', 'country'],
               values=['users', 'clicks'],
               columns=['year', 'month'],
               aggfunc=np.sum,
               fill_value='',
               margins=True,
               margins_name='Total'
              )

Pivot_tables_5

We can group by dataframe and calculate subtotals

data.groupby(['host', 'country'])[('clicks', 'users')].sum()

Pivot_tables_6

data.groupby(['host', 'country'])[('clicks', 'users')].mean()

Pivot_tables_7

And finally we can mix totals and subtotals.

out = data.groupby('host').apply(lambda sub: sub.pivot_table(
    index=['host', 'country'],
    values=['users', 'clicks'],
    columns=['year', 'month'],
    aggfunc=np.sum,
    margins=True,
    margins_name='SubTotal',
))

out.loc[('', 'Max', '')] = out.max()
out.loc[('', 'Min', '')] = out.min()
out.loc[('', 'Total', '')] = out.sum()

out.index = out.index.droplevel(0)

out.fillna('', inplace=True)

Pivot_tables_8

And that’s all. A lot of to learn yet about data analysis, but Pandas will be definitely a good friend of mine.

You can see the Jupiter notebook in my github account

Monitoring the bandwidth (part 2) now with Python Nameko microservice

This days I’ve been playing with Nameko. The Python framework for building microservices. Today I want to upgrade one small pet project that I’ve got in my house to monitor the bandwidth of my internet connection. I want to use one nameko microservice using the Timer entrypoint.

That’s the worker:

from nameko.timer import timer
import datetime
import logging
import os
import speedtest
from dotenv import load_dotenv
from influxdb import InfluxDBClient

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))


class SpeedService:
    name = "speed"

    def __init__(self):
        self.influx_client = InfluxDBClient(
            host=os.getenv("INFLUXDB_HOST"),
            port=os.getenv("INFLUXDB_PORT"),
            database=os.getenv("INFLUXDB_DATABASE")
        )

    @timer(interval=3600)
    def speedTest(self):
        logging.info("speedTest")
        current_time = datetime.datetime.utcnow().isoformat()
        speed = self.get_speed()

        self.persists(measurement='download', fields={"value": speed['download']}, time=current_time)
        self.persists(measurement='upload', fields={"value": speed['upload']}, time=current_time)
        self.persists(measurement='ping', fields={"value": speed['ping']}, time=current_time)

    def persists(self, measurement, fields, time):
        logging.info("{} {} {}".format(time, measurement, fields))
        self.influx_client.write_points([{
            "measurement": measurement,
            "time": time,
            "fields": fields
        }])

    @staticmethod
    def get_speed():
        logging.info("Calculating speed ...")
        s = speedtest.Speedtest()
        s.get_best_server()
        s.download()
        s.upload()

        return s.results.dict()

I need to adapt my docker-compose file to include the RabbitMQ server (Nameko needs a RabbitMQ message broker)

version: '3'

services:
  speed.worker:
    container_name: speed.worker
    image: speed/worker
    restart: always
    build:
      context: ./src/speed.worker
      dockerfile: .docker/Dockerfile-worker
    command: /bin/bash run.sh
  rabbit:
    container_name: speed.rabbit
    image: rabbitmq:3-management
    restart: always
    ports:
      - "15672:15672"
      - "5672:5672"
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
  influxdb:
    container_name: speed.influxdb
    image: influxdb:latest
    restart: always
    environment:
    - INFLUXDB_DB=${INFLUXDB_DB}
    - INFLUXDB_ADMIN_USER=${INFLUXDB_ADMIN_USER}
    - INFLUXDB_ADMIN_PASSWORD=${INFLUXDB_ADMIN_PASSWORD}
    - INFLUXDB_HTTP_AUTH_ENABLED=${INFLUXDB_HTTP_AUTH_ENABLED}
    volumes:
    - influxdb-data:/data
  grafana:
    container_name: speed.grafana
    build:
      context: ./src/grafana
      dockerfile: .docker/Dockerfile-grafana
    restart: always
    environment:
    - GF_SECURITY_ADMIN_USER=${GF_SECURITY_ADMIN_USER}
    - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD}
    - GF_USERS_DEFAULT_THEME=${GF_USERS_DEFAULT_THEME}
    - GF_USERS_ALLOW_SIGN_UP=${GF_USERS_ALLOW_SIGN_UP}
    - GF_USERS_ALLOW_ORG_CREATE=${GF_USERS_ALLOW_ORG_CREATE}
    - GF_AUTH_ANONYMOUS_ENABLED=${GF_AUTH_ANONYMOUS_ENABLED}
    ports:
    - "3000:3000"
    depends_on:
    - influxdb
volumes:
  influxdb-data:
    driver: local

And that’s all. Over engineering to control my Internet Connection? Maybe, but that’s the way I learn new stuff 🙂

Source code available in my github.