Flask api skeleton to handle PostgreSQL operations

That’s a boilerplate for an api server using Flask. The idea is one api server to work as backend server to handle all database operations. The api server will handle only POST requests and the input parameters will be on the body of the payload as JSON. I know that it isn’t a pure REST server but that’s what I need.

To organize better the api we`ll set a group of modules using Flask’s blueprints. The entry point of the application will be app.py file

import logging

from flask import Flask
from flask_compress import Compress

from lib.logger import setup_logging
from lib.utils import CustomJSONEncoder
from modules.example import blueprint as example
from settings import LOG_LEVEL, ELK_APP, ELK_INDEX, ELK_PROCESS, LOG_PATH

logging.basicConfig(level=LOG_LEVEL)

setup_logging(app=ELK_APP,
              index=ELK_INDEX,
              process=ELK_PROCESS,
              log_path=LOG_PATH)

app = Flask(__name__)
app.json_encoder = CustomJSONEncoder
compress = Compress()
compress.init_app(app)

app.register_blueprint(example)

All application configuration is in settings.py file. I borrowed this pattern from Django applications. All my configuration is in this file and the particularities of the environment are loaded from dotenv files in settings.py

import os
from logging import INFO
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent

APP_ID = 'dbapi'
APP_PATH = 'dbapi'
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

PROCESS_ID = os.getenv('PROCESS_ID', APP_ID)
LOG_LEVEL = os.getenv('LOG_LEVEL', INFO)
ELK_APP = f'{APP_ID}.{PROCESS_ID}'
ELK_INDEX = f'{APP_ID}_{ENVIRONMENT}'
ELK_PROCESS = APP_ID
LOG_PATH = f'./logs/{APP_ID}.log'

BEARER = os.getenv('BEARER')

# Database configuration
DEFAULT = 'default'

DATABASES = {
    DEFAULT: f"dbname='{os.getenv('DEFAULT_DB_NAME')}' user='{os.getenv('DEFAULT_DB_USER')}' host='{os.getenv('DEFAULT_DB_HOST')}' password='{os.getenv('DEFAULT_DB_PASS')}' port='{os.getenv('DEFAULT_DB_PORT')}'"
}

In this example we’re using one blueprint called example. I register blueprints manually. The blueprint has a set or routes. Those routes are within routes.py file:

from .actions import foo, bar

routes = [
    dict(route='', action=lambda: True),
    dict(route='foo', action=foo),
    dict(route='bar', action=bar),
]

Here we map url path to actions. For example, foo action is like that

from datetime import datetime

from lib.decorators import use_schema
from .schemas import FooSchema


@use_schema(FooSchema)
def foo(name, email=False):
    now = datetime.now()
    return dict(name=name, email=email, time=now)

To validate user input, we’re using schemas (using marshmallow library). In this example our validation schema is:

from marshmallow import fields, Schema


class FooSchema(Schema):
    name = fields.String(required=True)
    email = fields.Email(required=False)

We’re hiding Flask infrastructure path in module’s __init__.py file

import os

from flask import Blueprint

from lib.auth import authorize_bearer
from lib.utils import call_action, get_response
from settings import BEARER
from .routes import routes

NAME = os.path.basename(os.path.dirname(__file__))
blueprint = Blueprint(NAME, __name__, url_prefix=f'/{NAME}')


@authorize_bearer(bearer=BEARER)
@blueprint.post('/')
@blueprint.post('/<path:name>')
def action(name=''):
    return get_response(NAME, name, routes, call_action)

Another route with a database connection is the following one:

from dbutils import transactional

from lib.db import get_db_from_conn, get_conn_from_dbname
from lib.decorators import use_schema, inject_conn
from settings import DEFAULT
from .schemas import FooSchema
from .sql import SQL_USERS


@use_schema(FooSchema)
@inject_conn(DEFAULT, named=True, autocommit=False)
def bar(conn, name, email=False):
    # Create new transaction from connection injected with a decorator
    with transactional(conn) as db:
        db.upsert('users', dict(email=email), dict(name=name))

    # Example of how to obtain new connection from database name.
    conn2 = get_conn_from_dbname(DEFAULT)
    db2 = get_db_from_conn(conn2)

    return db2.fetch_all(SQL_USERS, dict(name=name))

We can obtain our database connection in diverse ways. For example, we can use a function decorator to inject the connection (in this case the connection named DEFAULT) in the function signatura. We also can create the connection using a constructor. This connection is a raw psycopg2 connection. I also like to use a library to help me to work with psycopg2: a library (https://github.com/gonzalo123/dbutils) created by me a long time ago.

And that’s all. I normally deploy it in production using a nginx as a reverse proxy and n replicas of my api. Logs are also ready to send to ELK using a filebeat.

version: '3.6'

x-logging: &logging
  logging:
    options:
      max-size: 10m


services:
  api:
    image: dbapi:production
    <<: *logging
    deploy:
      replicas: 10
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
    command: /bin/bash ./start.sh

  nginx:
    image: nginx-dbapi:${VERSION}
    deploy:
      restart_policy:
        condition: any
    environment:
      ENVIRON: ${VERSION}
    ports:
      - ${EXPOSED_PORT}:8000
    depends_on:
      - api

volumes:
  logs_volume:

Source code in my github account

Playing with gRPC and Python

When we work with microservices normally we need to have, in one way or another, something to communicate between them. Basically we have two choices: Synchronous (APIs) and asynchronous communications (message queues). REST APIs are a pretty straightforward way to create a communication channel. We’ve a lot of frameworks and microframeworks to create REST APIs. For example, in Python, we can use Flask. REST is simple, and it can fit in a lot of cases but sometimes is not enough. REST API is a HTTP service and HTTP is a protocol built over TCP. When we create a REST connection we’re opening a TCP connection to the server, we send the request payload, we receive the response, and we close the connection. If we need to perform a lot of connections maybe we can face a bottleneck. Also we have the payload. We need to define how we’re going to encode the information. We normally use JSON (we also can use XML). It’s easy to encode/decode JSON in almost all languages but JSON is plain text. Big payloads over TCP connection means slow response time.

To solve this situation we’ve another tool in our toolbox. This tool is gRPC. With gRPC we create a persistent connection between client and server (instead of open and close connection like REST) and also we use a binary payload to reduce the size improving the performance.

First we need to define the protocol we’re going to use. It’s something that we don’t need to do in with HTTP APIs (we use JSON and we forget the rest). It’s an extra step. Not complicated, but an extra. We need to define the types of our service and variables using a proto file.

// api.proto
syntax = "proto3";
package api;

service Api {
  rpc sayHello (HelloRequest) returns (Hello) {}
  rpc getAll (ApiRequest) returns (api.Items) {}
  rpc getStream (ApiRequest) returns (stream api.Item) {}
}

message ApiRequest {
  int32 length = 1;
}

message Items {
  repeated api.Item items = 1;
}

message Item {
  int32 id = 1;
  string name = 2;
}

message HelloRequest {
  string name = 1;
}

message Hello {
  string message = 1;
}

With our proto file (language agnostic) we can create a the wrapper of our service using our programming language. In my case python:

python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/api.proto

Of course we can create clients using one language and servers using another. Both using the same proto file.

It creates two files. We don’t need to open those files. We’ll import those files to create our client and server. We can use those files directly but I preffer to use an extra wrapper. Without reinventing the wheel only to make me easy to use the client/and server.

import grpc

from api_pb2 import Items, Item, Hello, HelloRequest, ApiRequest
from api_pb2_grpc import ApiServicer, ApiStub


class ApiServer(ApiServicer):
    def getAll(self, request, context):
        data = []
        for i in range(1, request.length + 1):
            data.append(Item(id=i, name=f'name {i}'))
        return Items(items=data)

    def getStream(self, request, context):
        for i in range(1, request.length + 1):
            yield Item(id=i, name=f'name {i}')

    def sayHello(self, request, context):
        return Hello(message=f'Hello {request.name}!')


class ApiClient:
    def __init__(self, target):
        channel = grpc.insecure_channel(target)
        self.client = ApiStub(channel)

    def sayHello(self, name):
        response = self.client.sayHello(HelloRequest(name=name))
        return response.message

    def getAll(self, length):
        response = self.client.getAll(ApiRequest(length=length))
        return response.items

    def getStream(self, length):
        response = self.client.getStream(ApiRequest(length=length))
        return response

Now I can create a server.

import logging
from concurrent import futures

import grpc

import settings
from api import ApiServer
from api_pb2_grpc import add_ApiServicer_to_server


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    add_ApiServicer_to_server(ApiServer(), server)
    server.add_insecure_port(f'[::]:{settings.BACKEND_PORT}')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

And also a client. In my example I’m going to use a flask frontend that consumes the gRPC server

from flask import Flask, render_template

import settings
from api import ApiClient

app = Flask(__name__)

app.config["api"] = ApiClient(f"{settings.BACKEND_HOST}:{settings.BACKEND_PORT}")


@app.route("/")
def home():
    api = app.config["api"]
    return render_template(
        "index.html",
        name=api.sayHello("Gonzalo"),
        items=api.getAll(length=10),
        items2=api.getStream(length=5)
    )

We can deploy the example in a docker server. Here the docker-compose.yml

version: '3.6'

services:
  frontend:
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      BACKEND_HOST: backend
    ports:
      - 5000:5000
    command: gunicorn -w 4 app:app -b 0.0.0.0:5000
  backend:
    build:
      context: .
      dockerfile: Dockerfile
    command: python server.py

Source code available in my github

Deploying Python Application using Docker and Kubernetes

I’ve learning how to deploy one Python application to Kubernetes. Here you can see my notes:

Let’s start from a dummy Python application. It’s a basic Flask web API. Each time we perform a GET request to “/” we increase one counter and see the number of hits. The persistence layer will be a Redis database. The script is very simple:

from flask import Flask
import os
from redis import Redis

redis = Redis(host=os.getenv('REDIS_HOST', 'localhost'),
              port=os.getenv('REDIS_PORT', 6379))
app = Flask(__name__)

@app.route('/')
def hello():
    redis.incr('hits')
    hits = int(redis.get('hits'))
    return f"Hits: {hits}"


if __name__ == "__main__":
    app.run(host='0.0.0.0')

First of all we create a virtual environment to ensure that we’re going to install your dependencies isolatedly:

python -m venv venv

We enter in the virtualenv

source venv/bin/activate

And we install our dependencies:

pip install -r requirements.txt

To be able to run our application we must ensure that we’ve a Redis database ready. We can run one with Docker:

docker run -p 6379:6379 redis

Now we can start our application:

python app.py

We open our browser with the url: http://localhost:5000 and it works.

Now we’re going to run our application within a Docker container. First of of all we need to create one Docker image from a docker file:

FROM python:alpine3.8
ADD . /code
WORKDIR /code
RUN pip install -r requirements.txt

EXPOSE 5000

Now we can build or image:

docker build -t front .

And now we can run our front image:

docker run -p 5000:5000 front python app.py

If we open now our browser with the url http://localhost:5000 we’ll get a 500 error. That’s because our Docker container is trying to use one Redis host within localhost. It worked before, when our application and our Redis were within the same host. Now our API’s localhost isn’t the same than our host’s one.

Our script the Redis host is localhost by default but it can be passed from an environment variable,

redis = Redis(host=os.getenv('REDIS_HOST', 'localhost'),
              port=os.getenv('REDIS_PORT', 6379))

we can pass to our our Docker container the real host where our Redis resides (suposing my IP address is 192.168.1.100):

docker run -p 5000:5000 --env REDIS_HOST=192.168.1.100 front python app.py

If dont’ want the development server we also can start our API using gunicorn

docker run -p 5000:5000 --env REDIS_HOST=192.168.1.100 front gunicorn -w 1 app:app -b 0.0.0.0:5000

And that works. We can start our app manually using Docker. But it's a bit complicated. We need to run two containers (API and Redis), setting up the env variables.
Docker helps us with docker-compose. We can create a docker-compose.yaml file configuring our all application:


version: '2'

services:
  front:
    image: front
    build:
      context: ./front
      dockerfile: Dockerfile
    container_name: front
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000
    depends_on:
      - redis
    ports:
      - "5000:5000"
    restart: always
    environment:
      REDIS_HOST: redis
      REDIS_PORT: 6379
  redis:
    image: redis
    ports:
      - "6379:6379"

I can execute it

docker-compose up

Docker compose is pretty straightforward. But, what happens if our production environment is a cluster? docker-compose works fine in a single host. But it our production environment is a cluster, we´ll face problems (we need to esure manually things like hight avaiavility and things like that). Docker people tried to answer to this question with Docker Swarm. Basically Swarm is docker-compose within a cluster. It uses almost the same syntax than docker-compose in a single host. Looks good, ins’t it? OK. Nobody uses it. Since Google created their Docker conainer orchestator (Kubernetes, aka K8s) it becames into the de-facto standard. The good thing about K8s is that it’s much better than Swarm (more configurable and more powerfull), but the bad part is that it isn’t as simple and easy to understand as docker-compose.

Let’s try to execute our proyect in K8s:

First I start minikube

minikube start

and I configure kubectl to connect to my minikube k8s cluster

eval $(minikube docker-env)

The API:

First we create one service:

apiVersion: v1
kind: Service
metadata:
  name: front-api
spec:
  # types:
  # - ClusterIP: (default) only accessible from within the Kubernetes cluster
  # - NodePort: accessible on a static port on each Node in the cluster
  # - LoadBalancer: accessible externally through a cloud provider's load balancer
  type: LoadBalancer
  # When the node receives a request on the static port (30163)
  # "select pods with the label 'app' set to 'front-api'"
  # and forward the request to one of them
  selector:
    app: front-api
  ports:
    - protocol: TCP
      port: 5000 # port exposed internally in the cluster
      targetPort: 5000 # the container port to send requests to
      nodePort: 30164 # a static port assigned on each the node

And one deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: front-api
spec:
  # How many copies of each pod do we want?
  replicas: 1

  selector:
    matchLabels:
      # This must match the labels we set on the pod!
      app: front-api

  # This template field is a regular pod configuration
  # nested inside the deployment spec
  template:
    metadata:
      # Set labels on the pod.
      # This is used in the deployment selector.
      labels:
        app: front-api
    spec:
      containers:
        - name: front-api
          image: front:v1
          args: ["gunicorn", "-w 1", "app:app", "-b 0.0.0.0:5000"]
          ports:
            - containerPort: 5000
          env:
            - name: REDIS_HOST
              valueFrom:
                configMapKeyRef:
                  name: api-config
                  key: redis.host

In order to learn a little bit of K8s I’m using a config map called ‘api-config’ where I put some information (such as the Redis host that I’m going to pass as a env variable)

apiVersion: v1
kind: ConfigMap
metadata:
  name: api-config
data:
  redis.host: "back-api"

The Backend: Our Redis database:

First the service:

apiVersion: v1
kind: Service
metadata:
  name: back-api
spec:
  type: ClusterIP
  ports:
    - port: 6379
      targetPort: 6379
  selector:
    app: back-api

And finally the deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: back-api
spec:
  replicas: 1
  selector:
    matchLabels:
      app: back-api
  template:
    metadata:
      labels:
        app: back-api
    spec:
      containers:
        - name: back-api
          image: redis
          ports:
            - containerPort: 6379
              name: redis

Before deploying my application to the cluster I need to build my docker image and tag it

docker build -t front .
docker tag front front:v1

Now I can deploy my application to my K8s cluster:

kubectl apply -f .k8s/

If want to know what’s the external url of my application in the cluster I can use this command

minikube service front-api --url

Then I can see it running using the browser or with curl

curl $(minikube service front-api --url)

And that’s all. I can delete all application alos

kubectl delete -f .k8s/ 

Source code available in my github

Playing with TOTP (2FA) and mobile applications with ionic

Today I want to play with Two Factor Authentication. When we speak about 2FA, TOTP come to our mind. There’re a lot of TOTP clients, for example Google Authenticator.

My idea with this prototype is to build one Mobile application (with ionic) and validate one totp token in a server (in this case a Python/Flask application). The token will be generated with a standard TOTP client. Let’s start

The sever will be a simple Flask server to handle routes. One route (GET /) will generate one QR code to allow us to configure or TOTP client. I’m using the library pyotp to handle totp operations.

from flask import Flask, jsonify, abort, render_template, request
import os
from dotenv import load_dotenv
from functools import wraps
import pyotp
from flask_qrcode import QRcode

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

totp = pyotp.TOTP(os.getenv('TOTP_BASE32_SECRET'))

app = Flask(__name__)
QRcode(app)


def verify(key):
    return totp.verify(key)


def authorize(f):
    @wraps(f)
    def decorated_function(*args, **kws):
        if not 'Authorization' in request.headers:
            abort(401)

        data = request.headers['Authorization']
        token = str.replace(str(data), 'Bearer ', '')

        if token != os.getenv('BEARER'):
            abort(401)

        return f(*args, **kws)

    return decorated_function


@app.route('/')
def index():
    return render_template('index.html', totp=pyotp.totp.TOTP(os.getenv('TOTP_BASE32_SECRET')).provisioning_uri("gonzalo123.com", issuer_name="TOTP Example"))


@app.route('/check/<key>', methods=['GET'])
@authorize
def alert(key):
    status = verify(key)
    return jsonify({'status': status})


if __name__ == "__main__":
    app.run(host='0.0.0.0')

I’ll use an standard TOTP client to generate the tokens but with pyotp we can easily create a client also

import pyotp
import time
import os
from dotenv import load_dotenv
import logging

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

totp = pyotp.TOTP(os.getenv('TOTP_BASE32_SECRET'))

mem = None
while True:
    now = totp.now()
    if mem != now:
        logging.info(now)
        mem = now
        time.sleep(1)

And finally the mobile application. It’s a simple ionic application. That’s the view:

<ion-header>
  <ion-toolbar>
    <ion-title>
      TOTP Validation demo
    </ion-title>
  </ion-toolbar>
</ion-header>

<ion-content>
  <div class="ion-padding">
    <ion-item>
      <ion-label position="stacked">totp</ion-label>
      <ion-input placeholder="Enter value" [(ngModel)]="totp"></ion-input>
    </ion-item>
    <ion-button fill="solid" color="secondary" (click)="validate()" [disabled]="!totp">
      Validate
      <ion-icon slot="end" name="help-circle-outline"></ion-icon>
    </ion-button>
  </div>
</ion-content>

The controller:

import { Component } from '@angular/core'
import { ApiService } from '../sercices/api.service'
import { ToastController } from '@ionic/angular'

@Component({
  selector: 'app-home',
  templateUrl: 'home.page.html',
  styleUrls: ['home.page.scss']
})
export class HomePage {
  public totp

  constructor (private api: ApiService, public toastController: ToastController) {}

  validate () {
    this.api.get('/check/' + this.totp).then(data => this.alert(data.status))
  }

  async alert (status) {
    const toast = await this.toastController.create({
      message: status ? 'OK' : 'Not valid code',
      duration: 2000,
      color: status ? 'primary' : 'danger',
    })
    toast.present()
  }
}

I’ve also put a simple security system. In a real life application we’ll need something better, but here I’ve got a Auth Bearer harcoded and I send it en every http request. To do it I’ve created a simple api service

import { Injectable } from '@angular/core'
import { isDevMode } from '@angular/core'
import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http'
import { CONF } from './conf'

@Injectable({
  providedIn: 'root'
})
export class ApiService {

  private isDev: boolean = isDevMode()
  private apiUrl: string

  constructor (private http: HttpClient) {
    this.apiUrl = this.isDev ? CONF.API_DEV : CONF.API_PROD
  }

  public get (uri: string, params?: Object): Promise<any> {
    return new Promise((resolve, reject) => {
      this.http.get(this.apiUrl + uri, {
        headers: ApiService.getHeaders(),
        params: ApiService.getParams(params)
      }).subscribe(
        res => {this.handleHttpNext(res), resolve(res)},
        err => {this.handleHttpError(err), reject(err)},
        () => this.handleHttpComplete()
      )
    })
  }

  private static getHeaders (): HttpHeaders {

    const headers = {
      'Content-Type': 'application/json'
    }

    headers['Authorization'] = 'Bearer ' + CONF.bearer

    return new HttpHeaders(headers)
  }

  private static getParams (params?: Object): HttpParams {
    let Params = new HttpParams()
    for (const key in params) {
      if (params.hasOwnProperty(key)) {
        Params = Params.set(key, params[key])
      }
    }

    return Params
  }

  private handleHttpError (err) {
    console.log('HTTP Error', err)
  }

  private handleHttpNext (res) {
    console.log('HTTP response', res)
  }

  private handleHttpComplete () {
    console.log('HTTP request completed.')
  }
}

And that’s all. Here one video with a working example of the prototype:

Source code here

Microservice container with Guzzle

This days I’m reading about Microservices. The idea is great. Instead of building a monolithic script using one language/framowork. We create isolated services and we build our application using those services (speaking HTTP between services and application).

That’s means we’ll have several microservices and we need to use them, and maybe sometimes change one service with another one. In this post I want to build one small container to handle those microservices. Similar idea than Dependency Injection Containers.

As we’re going to speak HTTP, we need a HTTP client. We can build one using curl, but in PHP world we have Guzzle, a great HTTP client library. In fact Guzzle has something similar than the idea of this post: Guzzle services, but I want something more siple.

Imagine we have different services:
One Silex service (PHP + Silex)

use Silex\Application;

$app = new Application();

$app->get('/hello/{username}', function($username) {
    return "Hello {$username} from silex service";
});

$app->run();

Another PHP service. This one using Slim framework

use Slim\Slim;

$app = new Slim();

$app->get('/hello/:username', function ($username) {
    echo "Hello {$username} from slim service";
});

$app->run();

And finally one Python service using Flask framework

from flask import Flask, jsonify
app = Flask(__name__)

@app.route('/hello/<username>')
def show_user_profile(username):
    return "Hello %s from flask service" % username

if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=5000)

Now, with our simple container we can use one service or another

use Symfony\Component\Config\FileLocator;
use MSIC\Loader\YamlFileLoader;
use MSIC\Container;

$container = new Container();

$ymlLoader = new YamlFileLoader($container, new FileLocator(__DIR__));
$ymlLoader->load('container.yml');

echo $container->getService('flaskServer')->get('/hello/Gonzalo')->getBody() . "\n";
echo $container->getService('silexServer')->get('/hello/Gonzalo')->getBody() . "\n";
echo $container->getService('slimServer')->get('/hello/Gonzalo')->getBody() . "\n";

And that’s all. You can see the project in my github account.