Flask api skeleton to handle PostgreSQL operations

That’s a boilerplate for an api server using Flask. The idea is one api server to work as backend server to handle all database operations. The api server will handle only POST requests and the input parameters will be on the body of the payload as JSON. I know that it isn’t a pure REST server but that’s what I need.

To organize better the api we`ll set a group of modules using Flask’s blueprints. The entry point of the application will be app.py file

import logging

from flask import Flask
from flask_compress import Compress

from lib.logger import setup_logging
from lib.utils import CustomJSONEncoder
from modules.example import blueprint as example
from settings import LOG_LEVEL, ELK_APP, ELK_INDEX, ELK_PROCESS, LOG_PATH

logging.basicConfig(level=LOG_LEVEL)

setup_logging(app=ELK_APP,
              index=ELK_INDEX,
              process=ELK_PROCESS,
              log_path=LOG_PATH)

app = Flask(__name__)
app.json_encoder = CustomJSONEncoder
compress = Compress()
compress.init_app(app)

app.register_blueprint(example)

All application configuration is in settings.py file. I borrowed this pattern from Django applications. All my configuration is in this file and the particularities of the environment are loaded from dotenv files in settings.py

import os
from logging import INFO
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent

APP_ID = 'dbapi'
APP_PATH = 'dbapi'
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

PROCESS_ID = os.getenv('PROCESS_ID', APP_ID)
LOG_LEVEL = os.getenv('LOG_LEVEL', INFO)
ELK_APP = f'{APP_ID}.{PROCESS_ID}'
ELK_INDEX = f'{APP_ID}_{ENVIRONMENT}'
ELK_PROCESS = APP_ID
LOG_PATH = f'./logs/{APP_ID}.log'

BEARER = os.getenv('BEARER')

# Database configuration
DEFAULT = 'default'

DATABASES = {
    DEFAULT: f"dbname='{os.getenv('DEFAULT_DB_NAME')}' user='{os.getenv('DEFAULT_DB_USER')}' host='{os.getenv('DEFAULT_DB_HOST')}' password='{os.getenv('DEFAULT_DB_PASS')}' port='{os.getenv('DEFAULT_DB_PORT')}'"
}

In this example we’re using one blueprint called example. I register blueprints manually. The blueprint has a set or routes. Those routes are within routes.py file:

from .actions import foo, bar

routes = [
    dict(route='', action=lambda: True),
    dict(route='foo', action=foo),
    dict(route='bar', action=bar),
]

Here we map url path to actions. For example, foo action is like that

from datetime import datetime

from lib.decorators import use_schema
from .schemas import FooSchema


@use_schema(FooSchema)
def foo(name, email=False):
    now = datetime.now()
    return dict(name=name, email=email, time=now)

To validate user input, we’re using schemas (using marshmallow library). In this example our validation schema is:

from marshmallow import fields, Schema


class FooSchema(Schema):
    name = fields.String(required=True)
    email = fields.Email(required=False)

We’re hiding Flask infrastructure path in module’s __init__.py file

import os

from flask import Blueprint

from lib.auth import authorize_bearer
from lib.utils import call_action, get_response
from settings import BEARER
from .routes import routes

NAME = os.path.basename(os.path.dirname(__file__))
blueprint = Blueprint(NAME, __name__, url_prefix=f'/{NAME}')


@authorize_bearer(bearer=BEARER)
@blueprint.post('/')
@blueprint.post('/<path:name>')
def action(name=''):
    return get_response(NAME, name, routes, call_action)

Another route with a database connection is the following one:

from dbutils import transactional

from lib.db import get_db_from_conn, get_conn_from_dbname
from lib.decorators import use_schema, inject_conn
from settings import DEFAULT
from .schemas import FooSchema
from .sql import SQL_USERS


@use_schema(FooSchema)
@inject_conn(DEFAULT, named=True, autocommit=False)
def bar(conn, name, email=False):
    # Create new transaction from connection injected with a decorator
    with transactional(conn) as db:
        db.upsert('users', dict(email=email), dict(name=name))

    # Example of how to obtain new connection from database name.
    conn2 = get_conn_from_dbname(DEFAULT)
    db2 = get_db_from_conn(conn2)

    return db2.fetch_all(SQL_USERS, dict(name=name))

We can obtain our database connection in diverse ways. For example, we can use a function decorator to inject the connection (in this case the connection named DEFAULT) in the function signatura. We also can create the connection using a constructor. This connection is a raw psycopg2 connection. I also like to use a library to help me to work with psycopg2: a library (https://github.com/gonzalo123/dbutils) created by me a long time ago.

And that’s all. I normally deploy it in production using a nginx as a reverse proxy and n replicas of my api. Logs are also ready to send to ELK using a filebeat.

version: '3.6'

x-logging: &logging
  logging:
    options:
      max-size: 10m


services:
  api:
    image: dbapi:production
    <<: *logging
    deploy:
      replicas: 10
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
    command: /bin/bash ./start.sh

  nginx:
    image: nginx-dbapi:${VERSION}
    deploy:
      restart_policy:
        condition: any
    environment:
      ENVIRON: ${VERSION}
    ports:
      - ${EXPOSED_PORT}:8000
    depends_on:
      - api

volumes:
  logs_volume:

Source code in my github account

Advertisement

Using a nginx reverse proxy to serve docker swarm replicas

Sometimes we need to serve backend servers behind a nginx reverse proxy. For example when we want to serve a Djnago or a Flask application. In this example I want to show how easy is doing that with nginx.

We’re going to start with a dummy Flask application.

from flask import Flask
from datetime import datetime

app = Flask(__name__)

@app.get("/")
def home():
    now = datetime.now()
    return f'Hello {now}'

The idea is use a nginx reverse proxy to serve the application. We can configure nginx to do that like this:

upstream loadbalancer {
    server backend:5000;
}

server {
    server_tokens off;
    client_max_body_size 20M;
    location / {
        proxy_pass http://loadbalancer;
    }
}

And finally we can create our docker-compose.yml file. We only need to set up the replicas and the reverse proxy will do the magic.

version: '3.6'

services:
  nginx:
    image: nginx:production
    ports:
      - "8080:80"
  backend:
    image: flask:production
    deploy:
      replicas: 3
    command: gunicorn -w 1 app:app -b 0.0.0.0:5000

As we can see we have 3 replicas behind a nginx reverse proxy. Maybe it’s enough for us, but maybe we need to distinguish between the replicas, for example in the logging.

(venv) ➜  docker stack services loadbalancer
ID             NAME                    MODE         REPLICAS   IMAGE              PORTS
u5snhg9tysr0   loadbalancer_backend    replicated   3/3        flask:production
4w0bf8msdiq6   loadbalancer_nginx      replicated   1/1        nginx:production   *:8080->80/tcp 

I’ve changed a little bit our Flask application.

import logging
from datetime import datetime
import socket
import os
from logging.handlers import TimedRotatingFileHandler

from flask import Flask

handlers = [
    logging.StreamHandler()
]
if os.getenv('ENVIRONMENT') == 'production':
    slot = os.getenv('SLOT')
    log_path = f"./logs/log{os.getenv('SLOT')}.log"

    file_handler = TimedRotatingFileHandler(log_path, backupCount=2)
    file_handler.setLevel(logging.INFO)
    handlers.append(file_handler)

logging.basicConfig(
    format=f'%(asctime)s ({socket.gethostname()}) [%(levelname)s] %(message)s',
    level='INFO',
    handlers=handlers,
    datefmt='%d/%m/%Y %X'),

logger = logging.getLogger(__name__)

app = Flask(__name__)


@app.get("/")
def home():
    now = datetime.now()
    logger.info(f"home {now}")
    return f'Hello {now} from {socket.gethostname()}. Slot: {os.getenv("SLOT")}'

And of course our docker-compose.yml file.

version: '3.6'

services:
  nginx:
    image: nginx:production
    ports:
      - "8080:80"
  backend:
    image: flask:production
    hostname: "backend.{{.Task.Slot}}"
    environment:
      SLOT: "{{.Task.Slot}}"
      ENVIRONMENT: production
    volumes:
      - log:/src/logs
    deploy:
      replicas: 3
    command: gunicorn -c gunicorn.conf.py -w 1 app:app -b 0.0.0.0:5000
volumes:
  log:
    name: 'log-{{.Task.Slot}}'

Now we’ve changed the hostname of the backend service using the slot number (instead of the default hostname). We also pass a SLOT environment variable to the backend service to distinguish between the replicas, if wee need to do that. Maybe you’re asking yourself, why the hell we need to do that? The answer ins simple: Working with legacy code is hard and sometimes we need to do very stranger things.

Source code of the example in my github

Listen to PostgreSQL events with pg_notify and Python

With PostgreSQL we can easily publish and listen events from one connection to another. It’s cool because those notifications belong on a transaction. In this example I’m going to create a wrapper to help me to listen the events with Python.

To notify events I only need to use pg_notify function. For example:

select pg_notify('channel', 'xxx')

To listen the events

import psycopg2

from pg_listener import on_db_event

dsn = f"dbname='gonzalo123' user='username' host='localhost' password='password'"

conn = psycopg2.connect(dsn)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

for payload in on_db_event(conn, 'channel'):
    print(payload)

The magic resides in on_db_event. We need to pass a psycopg2 connection, and the channel name. We can iterate over the function and retrieve the payload when someone triggers the event on that channel

def on_db_event(conn: connection, channel: str):
    with conn:
        with conn.cursor() as cursor:
            cursor.execute(f"LISTEN {channel};")
            logger.info(f"Waiting for notifications on channel '{channel}'.")

            while True:
                if select.select([conn], [], [], 5) != ([], [], []):
                    conn.poll()
                    while conn.notifies:
                        notify = conn.notifies.pop(0)
                        yield notify.payload

As I often use Django and Django uses one connection wrapper I need to create a native psycopg2 connection. Maybe it’s possible to retrieve it from Django connection (show me if you know how to do it).

def conn_from_django(django_connection):
    db_settings = django_connection.settings_dict
    dsn = f"dbname='{db_settings['NAME']}' " \
          f"user='{db_settings['USER']}' " \
          f"host='{db_settings['HOST']}' " \
          f"password='{db_settings['PASSWORD']}'"

    conn = psycopg2.connect(dsn)
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

    return conn

You can install the library using pip

pip install pglistener-gonzalo123

Source code available in my github

Playing with gRPC and Python

When we work with microservices normally we need to have, in one way or another, something to communicate between them. Basically we have two choices: Synchronous (APIs) and asynchronous communications (message queues). REST APIs are a pretty straightforward way to create a communication channel. We’ve a lot of frameworks and microframeworks to create REST APIs. For example, in Python, we can use Flask. REST is simple, and it can fit in a lot of cases but sometimes is not enough. REST API is a HTTP service and HTTP is a protocol built over TCP. When we create a REST connection we’re opening a TCP connection to the server, we send the request payload, we receive the response, and we close the connection. If we need to perform a lot of connections maybe we can face a bottleneck. Also we have the payload. We need to define how we’re going to encode the information. We normally use JSON (we also can use XML). It’s easy to encode/decode JSON in almost all languages but JSON is plain text. Big payloads over TCP connection means slow response time.

To solve this situation we’ve another tool in our toolbox. This tool is gRPC. With gRPC we create a persistent connection between client and server (instead of open and close connection like REST) and also we use a binary payload to reduce the size improving the performance.

First we need to define the protocol we’re going to use. It’s something that we don’t need to do in with HTTP APIs (we use JSON and we forget the rest). It’s an extra step. Not complicated, but an extra. We need to define the types of our service and variables using a proto file.

// api.proto
syntax = "proto3";
package api;

service Api {
  rpc sayHello (HelloRequest) returns (Hello) {}
  rpc getAll (ApiRequest) returns (api.Items) {}
  rpc getStream (ApiRequest) returns (stream api.Item) {}
}

message ApiRequest {
  int32 length = 1;
}

message Items {
  repeated api.Item items = 1;
}

message Item {
  int32 id = 1;
  string name = 2;
}

message HelloRequest {
  string name = 1;
}

message Hello {
  string message = 1;
}

With our proto file (language agnostic) we can create a the wrapper of our service using our programming language. In my case python:

python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/api.proto

Of course we can create clients using one language and servers using another. Both using the same proto file.

It creates two files. We don’t need to open those files. We’ll import those files to create our client and server. We can use those files directly but I preffer to use an extra wrapper. Without reinventing the wheel only to make me easy to use the client/and server.

import grpc

from api_pb2 import Items, Item, Hello, HelloRequest, ApiRequest
from api_pb2_grpc import ApiServicer, ApiStub


class ApiServer(ApiServicer):
    def getAll(self, request, context):
        data = []
        for i in range(1, request.length + 1):
            data.append(Item(id=i, name=f'name {i}'))
        return Items(items=data)

    def getStream(self, request, context):
        for i in range(1, request.length + 1):
            yield Item(id=i, name=f'name {i}')

    def sayHello(self, request, context):
        return Hello(message=f'Hello {request.name}!')


class ApiClient:
    def __init__(self, target):
        channel = grpc.insecure_channel(target)
        self.client = ApiStub(channel)

    def sayHello(self, name):
        response = self.client.sayHello(HelloRequest(name=name))
        return response.message

    def getAll(self, length):
        response = self.client.getAll(ApiRequest(length=length))
        return response.items

    def getStream(self, length):
        response = self.client.getStream(ApiRequest(length=length))
        return response

Now I can create a server.

import logging
from concurrent import futures

import grpc

import settings
from api import ApiServer
from api_pb2_grpc import add_ApiServicer_to_server


def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    add_ApiServicer_to_server(ApiServer(), server)
    server.add_insecure_port(f'[::]:{settings.BACKEND_PORT}')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

And also a client. In my example I’m going to use a flask frontend that consumes the gRPC server

from flask import Flask, render_template

import settings
from api import ApiClient

app = Flask(__name__)

app.config["api"] = ApiClient(f"{settings.BACKEND_HOST}:{settings.BACKEND_PORT}")


@app.route("/")
def home():
    api = app.config["api"]
    return render_template(
        "index.html",
        name=api.sayHello("Gonzalo"),
        items=api.getAll(length=10),
        items2=api.getStream(length=5)
    )

We can deploy the example in a docker server. Here the docker-compose.yml

version: '3.6'

services:
  frontend:
    build:
      context: .
      dockerfile: Dockerfile
    environment:
      BACKEND_HOST: backend
    ports:
      - 5000:5000
    command: gunicorn -w 4 app:app -b 0.0.0.0:5000
  backend:
    build:
      context: .
      dockerfile: Dockerfile
    command: python server.py

Source code available in my github

Handling M1 problems in Python local development using Docker

I’ve got a new laptop. One MacbookPro with M1 processor. The battery performance is impressive and the performance is very good but not all are good things. M1 processor has a different architecture. Now we’re using arm64 instead of x86_64.

The problem is when we need to compile. We need to take into account this. With python I normally use pyenv to manage different python version in my laptop and I create one virtualenv per project to isolate my environment. It worked like a charm, but now I’m facing problems due to the M1 architecture. For example to install a specific python version with pyenv I need to compile it. Also when I install a pip package and it provides a binary it must be available the M1 version.

This kind of problems are a bit frustrating. Apple provide us rosetta to use x86 binaries but a simple pip install xxx turns into a nightmare. For me, it’s not assumable. I want to deploy projects to production not become an expert in low level architectures. So, Docker is my friend.

My solution to avoid this kind of problems is Docker. Now I’m not using pyenv. If I need a python interpreter I build a Docker image. Instead of virtualenv I create a container.

PyCharm also allows me to use the docker interpreter without any problem.

That’s my python Dockerfile:

FROM python:3.9.6 AS base

ENV APP_HOME=/src
ENV APP_USER=appuser

RUN groupadd -r $APP_USER && \
    useradd -r -g $APP_USER -d $APP_HOME -s /sbin/nologin -c "Docker image user" $APP_USER

WORKDIR $APP_HOME

ENV TZ 'Europe/Madrid'
RUN echo $TZ > /etc/timezone && \
apt-get update && apt-get install --no-install-recommends \
    -y tzdata && \
    rm /etc/localtime && \
    ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \
    dpkg-reconfigure -f noninteractive tzdata && \
    apt-get clean

RUN pip install --upgrade pip

FROM base

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR $APP_HOME

COPY requirements.txt .
RUN pip install -r requirements.txt

ADD src .

RUN chown -R $APP_USER:$APP_USER $APP_HOME

USER $APP_USER

I can build my container:

docker build -t demo .

Now I can add interpreter in pycharm using my demo:latest image

If I need to add a pip dependency i cannot do using pip install locally. I’ve two options: Add the dependency within requirements.txt and build again the image or run pip inside docker container ("with docker run -it –rm …"). To organize those script we can easily create a package.json file.

{
  "name": "flaskdemo",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "docker_local_build": "docker build -t $npm_package_name .",
    "freeze": "docker run -it --rm -v \"$PWD\"/src:/src $npm_package_name python -m pip freeze > requirements.txt",
    "local": "npm run docker_local_build && npm run freeze",
    "python": "docker run -it --rm -v $PWD/src:/src $npm_package_name:latest",
    "bash": "docker run -it --rm -v $PWD/src:/src $npm_package_name:latest bash"
  },
  "author": {
    "name": "Gonzalo Ayuso",
    "email": "gonzalo123@gmail.com",
    "url": "https://gonzalo123.com"
  },
  "license": "MIT"
}

Extra

There’s another problem with M1. Maybe you don’t need to face it but if you build a docker container with a M1 laptop and you try to deploy this container in linux server (not a arm64 server) your containers doesn’t work. To solve it you need to build your containers with the correct architecture. Docker allows us to do that. For example:

docker buildx build --platform linux/amd64 -t gonzalo123/demo:prodution .

https://github.com/gonzalo123/docker.py

Transforming TCP sockets to MQTT with Go





In the last post we’ve created one proxy to upgrade one legacy application that sends raw TCP sockets to a HTTP server without changing the original application.

Now we’re going to do the same but instead sending HTTP request we’re going to connect to a MQTT broker. Probably try to change the legacy application to connect to a MQTT broker can be a nightmare but with with this approach is pretty straightforward.

The idea is the same. We’re going to send our TCP sockets to localhost. Then we’re going to build a go client that reads the TCP sockets and send the information to the MQTT broker.

We’re going to use Mosquitto as MQTT broker. We can set up easily with docker:

version: '2'

services:
  mosquitto:
    image: eclipse-mosquitto
    hostname: mosquitto
    container_name: mosquitto
    build:
      context: .docker/mosquitto
      dockerfile: Dockerfile
    expose:
      - "1883"
      - "9001"
    ports:
      - "1883:1883"
      - "9001:9001"

We can also set up our Mosquitto server with user and password with mosquitto.conf and users.txt. For this example we’re going to use the credentials: username:password

username:$6$6jOr4vVqaKxisTls$4KVYh8NBZdP+z4S/YbuoSHKlJ+5F1DxiE7XtWWXVHQ+7PlCI+b6LhqSbj8lL45HnGlo4D5t0AVFYrYGjb5lTxg==

Our Go program is very similar than the http version:

package main

import (
	"bufio"
	"encoding/json"
	"flag"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"log"
	"net"
	"os"
	"strings"
	"time"
)

func main() {
	port, closeConnection, topic, broker := parseFlags()
	openSocket(*port, *closeConnection, *topic, *broker, onMessage)
}

func openSocket(port string, closeConnection bool, topic string, broker string, onMessage func(url string, topic string, buffer string)) {
	PORT := "localhost:" + port
	l, err := net.Listen("tcp4", PORT)
	log.Printf("Serving %s\n", l.Addr().String())
	if err != nil {
		log.Fatalln(err)
	}
	defer l.Close()

	for {
		c, err := l.Accept()
		if err != nil {
			log.Fatalln(err)
		}
		go handleConnection(c, closeConnection, topic, broker, onMessage)
	}
}

func createClientOptions(url string) *mqtt.ClientOptions {
	opts := mqtt.NewClientOptions()
	opts.AddBroker(url)
	opts.SetUsername(os.Getenv("MQTT_USERNAME"))
	opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
	return opts
}

func connect(url string) mqtt.Client {
	opts := createClientOptions(url)
	client := mqtt.NewClient(opts)
	token := client.Connect()
	for !token.WaitTimeout(3 * time.Second) {
	}
	if err := token.Error(); err != nil {
		log.Fatal(err)
	}
	return client
}

func onMessage(url string, topic string, buffer string) {
	client := connect(url)
	client.Publish(topic, 0, false, buffer)
}

func parseFlags() (*string, *bool, *string, *string) {
	port := flag.String("port", "7777", "port number")
	closeConnection := flag.Bool("close", true, "Close connection")
	topic := flag.String("topic", "topic", "mqtt topic")
	broker := flag.String("broker", "tcp://localhost:1883", "mqtt topic")
	flag.Parse()

	return port, closeConnection, topic, broker
}

func handleConnection(c net.Conn, closeConnection bool, topic string, broker string, onMessage func(url string, topic string, buffer string)) {
	log.Printf("Accepted connection from %s\n", c.RemoteAddr().String())
	for {
		ip, port, err := net.SplitHostPort(c.RemoteAddr().String())
		netData, err := bufio.NewReader(c).ReadString('\n')
		if err != nil {
			log.Println(err)
		}

		message := map[string]interface{}{
			"body":   strings.TrimSpace(netData),
			"ipFrom": ip,
			"port":   port,
		}

		log.Printf("sending to topic %s message:%s\n", topic, message)
		bytesRepresentation, err := json.Marshal(message)
		if err != nil {
			log.Println(err)
		} else {
			onMessage(broker, topic, string(bytesRepresentation))
		}

		if closeConnection {
			c.Close()
			return
		}
	}
	c.Close()
}

And that’s all. Our legacy application can now speak MQTT without problems.

Source code available in my github

Transforming TCP sockets to HTTP with Go

Sometimes we need to work with legacy applications. Legacy application that are hard to rewrite and hard to change. Imagine, for example, this application is sending raw TCP sockets to communicate with another process. Raw TCP sockets are fast but they have various problems, for example all data is sent in plain text over the network and without authentication (if we don’t implement one protocol).

One solution is use https connections instead. We can also authenticate those requests with an Authentication Bearer. For example I’ve created one simple http server with Python and Flask:

import logging
import os
from functools import wraps

from flask import Flask, request, abort
from flask import jsonify

logging.basicConfig(level=logging.DEBUG)

logger = logging.getLogger(__name__)
app = Flask(__name__)


def authorize_bearer(bearer):
    def authorize(f):
        @wraps(f)
        def decorated_function(*args, **kws):
            if 'Authorization' not in request.headers:
                abort(401)

            data = request.headers['Authorization']

            if str.replace(str(data), 'Bearer ', '') != bearer:
                abort(401)

            return f(*args, **kws)

        return decorated_function

    return authorize


@app.route('/register', methods=['POST'])
@authorize_bearer(bearer=os.getenv('TOKEN'))
def hello_world():
    req_data = request.get_json()
    logger.info(req_data)
    return jsonify({"status": "OK", "request_data": req_data})

Now we only need to change our legacy application to use one http client instead raw TCP sockets. But sometimes it’s not possible. Imagine, for example, if this application runs on a old OS without https support or we cannot find and compile an http client in the legacy application.

One possible solution is isolate the application and change only the destination of the TCP socket. Instead the original ip address whe can use localhost and we can create a proxy at localhost that listen to TCP sockets and send the information to the HTTP server.

We’re going to build this proxy in Go. We can do it with any language (Python, C#, Javascript, …). My Kung Fu in Go is not so good (I’m more comfortable with Python) but it’s not so difficult and we can build a binary with our proxy for Windows, Linux and Mac without any problem. Then we only need to copy the binary into the target host and it works (no installation, no SDK, nothing. Just copy and run)

package main

import (
	"bufio"
	"encoding/json"
	"flag"
	"log"
	"net"
	"net/http"
	"os"
	"strings"
)

func main() {
	port, closeConnection, url := parseFlags()
	openSocket(*port, *closeConnection, *url, onMessage)
}

func onMessage(url string, buffer string) {
	bearer := os.Getenv("TOKEN")
	client := &http.Client{}
	req, _ := http.NewRequest("POST", url, strings.NewReader(buffer))
	req.Header.Add("Authorization", "Bearer "+bearer)
	req.Header.Add("content-type", "application/json")
	resp, err := client.Do(req)

	if err != nil {
		log.Println(err)
	} else {
		if resp.Status == "200" {
			var result map[string]interface{}
			json.NewDecoder(resp.Body).Decode(&result)
			log.Println(result["status"])
		} else {
			log.Println("Response status: " + resp.Status)
		}
		defer resp.Body.Close()
	}
}

func parseFlags() (*string, *bool, *string) {
	port := flag.String("port", "7777", "port number")
	closeConnection := flag.Bool("close", true, "Close connection")
	url := flag.String("url", "http://localhost:5000/register", "Destination endpoint")
	flag.Parse()
	return port, closeConnection, url
}

func openSocket(port string, closeConnection bool, url string, onMessage func(url string, buffer string)) {
	PORT := "localhost:" + port
	l, err := net.Listen("tcp4", PORT)
	log.Printf("Serving %s\n", l.Addr().String())
	if err != nil {
		log.Fatalln(err)
	}
	defer l.Close()

	for {
		c, err := l.Accept()
		if err != nil {
			log.Fatalln(err)
		}
		go handleConnection(c, closeConnection, url, onMessage)
	}
}

func handleConnection(c net.Conn, closeConnection bool, url string, onMessage func(url string, buffer string)) {
	log.Printf("Accepted connection from %s\n", c.RemoteAddr().String())
	for {
		ip, port, err := net.SplitHostPort(c.RemoteAddr().String())
		netData, err := bufio.NewReader(c).ReadString('\n')
		if err != nil {
			log.Println(err)
		}

		message := map[string]interface{}{
			"body":   strings.TrimSpace(netData),
			"ipFrom": ip,
			"port":   port,
		}

		log.Printf("Making request with %s\n", message)
		bytesRepresentation, err := json.Marshal(message)
		if err != nil {
			log.Println(err)
		} else {
			//buffer := bytes.NewBuffer(bytesRepresentation)
			onMessage(url, string(bytesRepresentation))
		}

		if closeConnection {
			c.Close()
			return
		}
	}
	c.Close()

And that’s all. We can upgrade our legacy application without almost changing the code.

Source code available in my github

Playing with Go and file system watchers

Let me explain the idea. I want to emit one RabbitMQ message each time new file is generated in a folder. The problem is that I cannot modify the code of the software that generate the files. The idea is generate a filesystem watcher that emits the message. Let’s start.

I’m not a Go expert, but Go it’s cool for those kind of task. You can create a executable file and just copy in the desired server and that’s all. Just works.

Also there’s a fsnotify package to listen filesystem events. I’ve used fsnotify in the past with PHP and Python.

func main() {
	config := GetConf()
	watcher, _ = fsnotify.NewWatcher()
	defer watcher.Close()

	if err := filepath.Walk(config.Path, watchDir); err != nil {
		fmt.Println("ERROR", err)
	}
	done := make(chan bool)

	go func() {
		for {
			select {
			case event := <-watcher.Events:
				processEvent(event, *config)
			case err := <-watcher.Errors:
				fmt.Println("ERROR", err)
			}
		}
	}()
	<-done
}

func processEvent(event fsnotify.Event, conf Conf) {
	filePath := event.Name
	fileName := filepath.Base(filePath)
	switch op := event.Op.String(); op {
	case "CREATE":
		if strings.HasSuffix(filePath, conf.Extension) {
			bytes, _ := copy(event.Name, conf.CopyTo+fileName)
			if bytes > 0 {
				emitEvent(fileName, conf)
			}
		}
	default:
		fmt.Println("Unhandled event: ", op, " on file: ", fileName)
	}
}

And basically that’s all. We only need to take care with filesystem events. Fsnotify is very low level and when we use it we realized how programs write files. Some of them create a temp file, then it writes it and finally it renames the file. Sometimes the program creates an empty file and finally writes the file. Basically it’s the same but the events are different. In my example I only listen to "CREATE" just enough for my test.

Emit event to RabbitMQ it’s also simple. Well documented within documentation.

func emitEvent(fileName string, conf Conf) {
	fmt.Println("Event on file", fileName)
	message := map[string]interface{}{
		"fileName": fileName,
	}
	bytesRepresentation, err := json.Marshal(message)
	if err != nil {
		log.Println(err)
	} else {
		emmitToRabbit(conf, bytesRepresentation)
	}
}

In this example I also want to use a YAML file to store configuration. Just for learn how to read YAML files in go.

type Conf struct {
	Path        string `yaml:"path"`
	CopyTo      string `yaml:"copy_to"`
	Extension   string `yaml:"extension"`
	BrokerTopic string `yaml:"brokerTopic"`
	Broker      string `yaml:"broker"`
}

func readConf(filename string) (*Conf, error) {
	buf, err := ioutil.ReadFile(filename)
	if err != nil {
		return nil, err
	}

	c := &Conf{}
	err = yaml.Unmarshal(buf, c)
	if err != nil {
		return nil, fmt.Errorf("in file %q: %v", filename, err)
	}

	return c, nil
}

And that’s all. My binary executable is ready.

Full code in my github

Database utils for psycopg2

I normally need to perform raw queries when I’m working with Python. Even when I’m using Django and it’s ORM I need to execute sql against the database. As I use (almost always) PostgreSQL and I use (as we all do) psycopg2. Psycopg2 is very complete and easy to use but sometimes I need a few helpers to make my usual tasks easier. I don’t want to create a complex library on top of psycopg2, only a helper.

Something that I miss from PHP and DBAL library is the way that DBAL helps me to create simple CRUD operations. The idea of this helper functions is to do something similar. Let’s start

I’ve created procedural functions to do all and also a Db class. Normally all functions needs the cursor parameter.

def fetch_all(cursor, sql, params=None):
    cursor.execute(
        query=sql,
        vars={} if params is None else params)

    return cursor.fetchall()

The Db class accepts cursor in the constructor

db = Db(cursor=cursor)
data = db.fetch_all("SELECT * FROM table")

Connection

Nothing especial in the connection. I like to use "connection_factory=NamedTupleConnection" to allow me to access to the recordset as an Object. I’ve created simple factory helper to create connections:

def get_conn(dsn, named_tuple=False, autocommit=False):
    conn = psycopg2.connect(
        dsn=dsn,
        connection_factory=NamedTupleConnection if named_tuple else None,
    )
    conn.autocommit = autocommit

    return conn

Sometimes I use NamedTuple in the cursor instead connection, so I’ve created a simple helper

def get_cursor(conn, named_tuple=True):
    return conn.cursor(cursor_factory=NamedTupleCursor if named_tuple else None)

Fetch all

db = Db(cursor)
for reg in db.fetch_all(sql="SELECT email, name from users"):
    assert 'user1' == reg.name
    assert 'user1@email.com' == reg.email

Fetch one

data == db.fetch_one(sql=SQL)

Select

data = db.select(
        table='users',
        where={'email': 'user1@email.com'}

This helper only allows me to perform simple where statements (joined with AND). If I need one complex Where, then I use fetch

Insert

Insert one row

db.insert(
        table='users',
        values={'email': 'user1@email.com', 'name': 'user1'})

Or insert multiple rows (using spycopg2’s executemany)

db.insert(
        table='users',
        values=[
            {'email': 'user2@email.com', 'name': 'user2'},
            {'email': 'user3@email.com', 'name': 'user3'}
        ])

We also can use insert_batch to insert all rows within one command

db.insert_batch(
        table='users',
        values=[
            {'email': 'user2@email.com', 'name': 'user2'},
            {'email': 'user3@email.com', 'name': 'user3'}
        ])

Update

db.update(
        table='users',
        data={'name': 'xxxx'},
        identifier={'email': 'user1@email.com'},
    )

Delete

db.delete(table='users', where={'email': 'user1@email.com'})

Upsert

Sometimes we need to insert one row or update the row if the primary key already exists. We can do that with two statements: One select and if there isn’t any result one insert. Else on update. We can do that with one sql statement. I’ve created a helper to to that:

def _get_upsert_sql(data, identifier, table):
    raw_sql = """
        WITH
            upsert AS (
                UPDATE {tbl}
                SET {t_set}
                WHERE {t_where}
                RETURNING {tbl}.*),
            inserted AS (
                INSERT INTO {tbl} ({t_fields})
                SELECT {t_select_fields}
                WHERE NOT EXISTS (SELECT 1 FROM upsert)
                RETURNING *)
        SELECT * FROM upsert
        UNION ALL
        SELECT * FROM inserted
    """
    merger_data = {**data, **identifier}
    sql = psycopg2_sql.SQL(raw_sql).format(
        tbl=psycopg2_sql.Identifier(table),
        t_set=psycopg2_sql.SQL(', ').join(_get_conditions(data)),
        t_where=psycopg2_sql.SQL(' and ').join(_get_conditions(identifier)),
        t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, merger_data.keys())),
        t_select_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, merger_data.keys())))

    return merger_data, sql


def upsert(cursor, table, data, identifier):
    merger_data, sql = _get_upsert_sql(data, identifier, table)

    cursor.execute(sql, merger_data)
    return cursor.rowcount

Now we can execute a upsert in a simple way:

db.upsert(
        table='users',
        data={'name': 'yyyy'},
        identifier={'email': 'user1@email.com'})

Stored procedures

data = db.sp_fetch_one(
    function='hello',
    params={'name': 'Gonzalo'})
data = db.sp_fetch_all(
    function='hello',
    params={'name': 'Gonzalo'})

Transactions

In PHP and DBAL to create one transaction we can use this syntax

<?php
$conn->transactional(function($conn) {
    // do stuff
});

In Python we can do the same with context management, but I prefer to use this syntax:

with transactional(conn) as db:
    assert 1 == db.insert(
        table='users',
        values={'email': 'user1@email.com', 'name': 'user1'})

The transactional function is like that (I’ve created two functions. One with raw cursor and another one with my Db class):

def _get_transactional(conn, named_tuple, callback):
    try:
        with conn as connection:
            with get_cursor(conn=connection, named_tuple=named_tuple) as cursor:
                yield callback(cursor)
            conn.commit()
    except Exception as e:
        conn.rollback()
        raise e

@contextmanager
def transactional(conn, named_tuple=False):
    return _get_transactional(conn, named_tuple, lambda cursor: Db(cursor))


@contextmanager
def transactional_cursor(conn, named_tuple=False):
    return _get_transactional(conn, named_tuple, lambda cursor: cursor)

Return values

Select and fetch helpers returns the recordset, but insert, update, delete and upsert returns the number of affected rows. For example that’s the insert function:

def _get_insert_sql(table, values):
    keys = values[0].keys() if type(values) is list else values.keys()

    raw_sql = "insert into {tbl} ({t_fields}) values ({t_values})"
    return psycopg2_sql.SQL(raw_sql).format(
        tbl=psycopg2_sql.Identifier(table),
        t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, keys)),
        t_values=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, keys)))


def insert(cursor, table, values):
    sql = _get_insert_sql(table=table, values=values)
    cursor.executemany(query=sql, vars=values) if type(values) is list else cursor.execute(sql, values)

    return cursor.rowcount

Unit tests

You can try the library running the unit tests. I’ve also provide one docker-compose.yml file to set up a PostgreSQL database to run the tests.

version: '3.6'

services:
  pg:
    build:
      context: .docker/pg
      dockerfile: Dockerfile
    ports:
      - 5432:5432
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

Install

You can install the library using pip

pip install dbutils-gonzalo123

Full code in my github

Playing with RabbitMQ, Python and Docker. From Exchanges to Queues

Lately in all my projects message queues appear in one way or another. Normally I work with AWS and I use SQS and SNS, but I also use quite often, RabbitMQ and MQTT. In AWS there’s something that I use a lot to isolate services. The process that emits messages emits message to SNS and I bind SNS to SQS. With this technique I can attach n SQS to the the same SNS. I’ve used it here. In AWS is pretty straightforward to do that. Today We’re going to do the same with RabbitMQ. In fact it’s very easy to do it in RabbitMQ. We only need to follow the tutorial in official RabbitMQ documentation.

The script listen to a exchange and resend the message to a queue topic. Basically the same that we can see within RabbitMQ documentation.

import settings
from lib.logger import logger
from lib.rabbit import get_channel

channel = get_channel()
channel.exchange_declare(
    exchange=settings.EXCHANGE,
    exchange_type='fanout')

result = channel.queue_declare(
    queue='',
    exclusive=True)

queue_name = result.method.queue

channel.queue_bind(
    exchange=settings.EXCHANGE,
    queue=queue_name)

logger.info(f' [*] Waiting for {settings.EXCHANGE}. To exit press CTRL+C')

channel.queue_declare(
    durable=settings.QUEUE_DURABLE,
    auto_delete=settings.QUEUE_AUTO_DELETE,
    queue=settings.QUEUE_TOPIC
)


def callback(ch, method, properties, body):
    channel.basic_publish(
        exchange='',
        routing_key=settings.QUEUE_TOPIC,
        body=body)
    logger.info(f'Message sent to topic {settings.QUEUE_TOPIC}. Message: {body}')


channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True)

channel.start_consuming()

We send one message to the exchange using rabbitmqadmin and see the message in the queue:

rabbitmqadmin -u username -p password publish exchange=exchange routing_key= payload="hello, world"

My idea with this project is to deploy it into a docker swarm cluster and add/remove listener only adding new services to the stack:

...
  exchange2topic1:
    image: ${ECR}/exchange2queue:${VERSION}
      build:
        context: .
        dockerfile: Dockerfile
      deploy:
        restart_policy:
          condition: on-failure
      depends_on:
        - rabbit
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic1
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0
  ...

With this approach it’s very simple form me add and remove new listeners without touching the emitter

Also, as plus, I like to add a simple http api to allow me to send messages to the exchange with a post request, instead of using a RabbitMQ client. That’s because sometimes I work with legacy systems where using a AMQP client isn’t simple. That’s a simple Flask API

from flask import Flask, request
from flask import jsonify

import settings
from lib.auth import authorize_bearer
from lib.logger import logger
from lib.rabbit import get_channel
import json

app = Flask(__name__)


@app.route('/health')
def health():
    return jsonify({"status": "ok"})


@app.route('/publish/<path:exchange>', methods=['POST'])
@authorize_bearer(bearer=settings.API_TOKEN)
def publish(exchange):
    channel = get_channel()
    try:
        message = request.get_json()
        channel.basic_publish(
            exchange=exchange,
            routing_key='',
            body=json.dumps(message)
        )
        logger.info(f"message sent to exchange {exchange}")
        return jsonify({"status": "OK", "exchange": exchange})
    except:
        return jsonify({"status": "NOK", "exchange": exchange})

Now we can emit messages to the exchange using curl, postman or any other http client.

POST http://localhost:5000/publish/exchange
Content-Type: application/json
Authorization: Bearer super_secret_key

{
  "hello": "Gonzalo"
}

Also, in the docker stack I want to use a reverse proxy to server my Flask application (served with gunicorn) and the RabbitMQ management console. I’m using nginx to do that:

upstream api {
    server api:5000;
}

server {
    listen 8000 default_server;
    listen [::]:8000;

    client_max_body_size 20M;

    location / {
        try_files $uri @proxy_to_api;
    }

    location ~* /rabbitmq/api/(.*?)/(.*) {
        proxy_pass http://rabbit:15672/api/$1/%2F/$2?$query_string;
        proxy_buffering                    off;
        proxy_set_header Host              $http_host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location ~* /rabbitmq/(.*) {
        rewrite ^/rabbitmq/(.*)$ /$1 break;
        proxy_pass http://rabbit:15672;
        proxy_buffering                    off;
        proxy_set_header Host              $http_host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location @proxy_to_api {
        proxy_set_header X-Forwarded-Proto https;
        proxy_set_header X-Url-Scheme $scheme;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_redirect off;

        proxy_pass http://api;
    }
}

And that’s all. Here the docker-compose.yml

version: '3.6'

x-base: &base
  image: ${ECR}/exchange2queue:${VERSION}
  build:
    context: .
    dockerfile: Dockerfile
  deploy:
    restart_policy:
      condition: on-failure
  depends_on:
    - rabbit

services:
  rabbit:
    image: rabbitmq:3-management
    deploy:
      restart_policy:
        condition: on-failure
    ports:
      - 5672:5672
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}

  nginx:
    image: ${ECR}/exchange2queue_nginx:${VERSION}
    deploy:
      restart_policy:
        condition: on-failure
    build:
      context: .docker/nginx
      dockerfile: Dockerfile
    ports:
      - 8080:8000
    depends_on:
      - rabbit
      - api

  api:
    <<: *base
    container_name: front
    command: /bin/sh wait-for-it.sh rabbit:5672 -- gunicorn -w 4 api:app -b 0.0.0.0:5000
    deploy:
      restart_policy:
        condition: on-failure
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      API_TOKEN: ${API_TOKEN}

  exchange2topic1:
    <<: *base
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic1
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0

  exchange2topic2:
    <<: *base
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic2
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0

Full code in my github