Domain Events in Legacy Applications using Python and PostgreSQL

Sometimes we need to generate domain events in our application. It can be simple when you start an application from scratch, but it can be a nightmare when you have a legacy application. Today we're going to explain how to generate domain events from PostgreSQL. We can set up triggers within our database tables to generate domain events. In those triggers, we can use pg_notify to emit events and after that, we can use a listener to consume those events. This approach works, but we need to set up those triggers in every table that we want to generate events from. Today we're going to use logical replication to generate domain events. With this approach, we can generate events from all tables in our database without the need to set up triggers in every table.

First of all, we need to create a publication. A publication is a set of tables that we want to replicate. We can create a publication with the following SQL command:
CREATE PUBLICATION pub1 FOR ALL TABLES;
SELECT pg_create_logical_replication_slot('slot1', 'pgoutput');
When we create our replication slot, we can choose between different plugins. In this case, we're going to use pgoutput. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. pgoutput is a plugin that is included in PostgreSQL by default that sends the information in bytea format. We can use pgoutput to get the changes in our tables. After that, we can create a subscription to consume those changes. We're going to create the subscription in Python. We can do a simple subscription with the following code:
import psycopg2.extras

from settings import DSN


conn = psycopg2.connect(DSN, connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()

cur.start_replication(
    slot_name='slot1', 
    decode=False,
    options={'proto_version': '1', 'publication_names': 'pub1'})


def consume(msg):
    payload = msg.payload
    print(payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)


cur.consume_stream(consume)
With this simple script, we're listening to all changes in our database. We can use this script to generate domain events in our application. We need to decode the payload to get the changes in our tables. There is a library to decode the payload called pypgoutput. I have had problems with this library, so I have used only one part of the library to decode the payload (decoders.py).

The main script is like this:
import logging

from lib.consumer import Consumer
from lib.models import Types, Event
from settings import DSN, PUBLICATION_NAME, SLOT_NAME

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level='INFO',
    datefmt='%d/%m/%Y %X')

logger = logging.getLogger(__name__)


def callback(event: Event):
    logger.info(
        f"{event.ts} id:{event.tx_id} [{event.type}] "
        f"{event.schema_name}.{event.table_name} with values {event.values}")


consumer = Consumer(DSN)
consumer.on(Types.UPDATE, 'public.*', callback)

consumer.start(
    slot_name=SLOT_NAME,
    publication_name=PUBLICATION_NAME)
For my example, I am using a database with the following schema:
CREATE TABLE actors (
    nconst TEXT PRIMARY KEY,
    primaryname TEXT,
    birthyear INTEGER,
    deathyear INTEGER,
    primaryprofession TEXT,
    knownfortitles TEXT
);
It is important to activate the logical replication in the database. We can do it with the following command:
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
ALTER SYSTEM SET max_worker_processes = 10;
And I am registering an event on every update of the actors table with the callback function. The callback function is called with the event that contains the type of the event, the schema name, the table name, and the values of the row that has been updated. In callback function we can do wathever we want with the event. In this case, I am just logging the event, but maybe you can emit this event to a message broker such as Kafka, RabbitMQ or a MQTT broker.

That is the main notification part of the script. The other part is the conversion of the values of the row. The values are in bytea format, so we need to convert them to Python types. The conversion is done with the following function:
def convert_value(oid, value):
    if value is None:
        return None
    python_type = OID_MAP.get(oid, str)
    try:
        if python_type == bool:
            return value == 't'
        elif python_type == datetime:
            return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
        elif python_type == date:
            return datetime.strptime(value, '%Y-%m-%d').date()
        elif python_type == dict:
            import json
            return json.loads(value)
        elif python_type == uuid.UUID:
            return uuid.UUID(value)
        else:
            return python_type(value)
    except Exception as e:
        logger.error(f"Error converting {value} with OID {oid}: {e}")
        return value


def get_event(message_type, rel, tx, payload) -> Event | None:
    current_type = Types(message_type)
    decoder_map = {
        Types.INSERT: decoders.Insert,
        Types.UPDATE: decoders.Update,
        Types.DELETE: decoders.Delete,
        Types.TRUNCATE: decoders.Truncate
    }
    data = decoder_map.get(current_type, lambda x: None)(payload)

    if data:
        if current_type == Types.TRUNCATE:
            fields = []
        else:
            fields = get_fields(rel, getattr(data, 'old_tuple', None), getattr(data, 'new_tuple', None))

        event = Event(
            type=current_type,
            tx_id=tx.tx_id,
            ts=tx.commit_ts,
            schema_name=rel.namespace,
            table_name=rel.relation_name,
            values=fields
        )
        return event
    return None


def get_fields(rel, old, new):
    fields = [
        Field(
            name=c.name,
            old=convert_value(c.type_id, old.column_data[i].col_data) if old else None,
            new=convert_value(c.type_id, new.column_data[i].col_data) if new else None,
            pkey=c.part_of_pkey == 1
        )
        for i, c in enumerate(rel.columns)
    ]
    return fields
When a client is connected we can see it using a simple query:
SELECT * FROM pg_stat_replication;
And also we can see the replication slots with the following query:
SELECT
    pg_current_wal_lsn() AS current_lsn,
    slot_name,
    restart_lsn,
    confirmed_flush_lsn
FROM
    pg_replication_slots
WHERE
    slot_type = 'logical';
The script is just an experiment. Maybe it can be adapted to a real application, but probably it needs more work, especially in data type conversion.

In conclusion, using logical replication to generate domain events from PostgreSQL offers a powerful and flexible approach, especially for legacy systems or applications where modifying the existing database structure is challenging. This method allows us to capture changes across all tables without the need for individual triggers, potentially simplifying event sourcing and change data capture processes. However, it's important to note that this approach comes with its own set of considerations, such as performance impact on the database, handling of large volumes of events, and ensuring data consistency. As with any experimental technique, thorough testing and careful consideration of your specific use case are crucial before implementing this in a production environment. Despite these challenges, the potential for creating a robust, database-driven event system makes this an exciting area for further exploration and development.

Full source code in my github account

Flask api skeleton to handle PostgreSQL operations

That’s a boilerplate for an api server using Flask. The idea is one api server to work as backend server to handle all database operations. The api server will handle only POST requests and the input parameters will be on the body of the payload as JSON. I know that it isn’t a pure REST server but that’s what I need.

To organize better the api we`ll set a group of modules using Flask’s blueprints. The entry point of the application will be app.py file

import logging

from flask import Flask
from flask_compress import Compress

from lib.logger import setup_logging
from lib.utils import CustomJSONEncoder
from modules.example import blueprint as example
from settings import LOG_LEVEL, ELK_APP, ELK_INDEX, ELK_PROCESS, LOG_PATH

logging.basicConfig(level=LOG_LEVEL)

setup_logging(app=ELK_APP,
              index=ELK_INDEX,
              process=ELK_PROCESS,
              log_path=LOG_PATH)

app = Flask(__name__)
app.json_encoder = CustomJSONEncoder
compress = Compress()
compress.init_app(app)

app.register_blueprint(example)

All application configuration is in settings.py file. I borrowed this pattern from Django applications. All my configuration is in this file and the particularities of the environment are loaded from dotenv files in settings.py

import os
from logging import INFO
from pathlib import Path

from dotenv import load_dotenv

BASE_DIR = Path(__file__).resolve().parent

APP_ID = 'dbapi'
APP_PATH = 'dbapi'
ENVIRONMENT = os.getenv('ENVIRONMENT', 'local')

load_dotenv(dotenv_path=Path(BASE_DIR).resolve().joinpath('env', ENVIRONMENT, '.env'))

PROCESS_ID = os.getenv('PROCESS_ID', APP_ID)
LOG_LEVEL = os.getenv('LOG_LEVEL', INFO)
ELK_APP = f'{APP_ID}.{PROCESS_ID}'
ELK_INDEX = f'{APP_ID}_{ENVIRONMENT}'
ELK_PROCESS = APP_ID
LOG_PATH = f'./logs/{APP_ID}.log'

BEARER = os.getenv('BEARER')

# Database configuration
DEFAULT = 'default'

DATABASES = {
    DEFAULT: f"dbname='{os.getenv('DEFAULT_DB_NAME')}' user='{os.getenv('DEFAULT_DB_USER')}' host='{os.getenv('DEFAULT_DB_HOST')}' password='{os.getenv('DEFAULT_DB_PASS')}' port='{os.getenv('DEFAULT_DB_PORT')}'"
}

In this example we’re using one blueprint called example. I register blueprints manually. The blueprint has a set or routes. Those routes are within routes.py file:

from .actions import foo, bar

routes = [
    dict(route='', action=lambda: True),
    dict(route='foo', action=foo),
    dict(route='bar', action=bar),
]

Here we map url path to actions. For example, foo action is like that

from datetime import datetime

from lib.decorators import use_schema
from .schemas import FooSchema


@use_schema(FooSchema)
def foo(name, email=False):
    now = datetime.now()
    return dict(name=name, email=email, time=now)

To validate user input, we’re using schemas (using marshmallow library). In this example our validation schema is:

from marshmallow import fields, Schema


class FooSchema(Schema):
    name = fields.String(required=True)
    email = fields.Email(required=False)

We’re hiding Flask infrastructure path in module’s __init__.py file

import os

from flask import Blueprint

from lib.auth import authorize_bearer
from lib.utils import call_action, get_response
from settings import BEARER
from .routes import routes

NAME = os.path.basename(os.path.dirname(__file__))
blueprint = Blueprint(NAME, __name__, url_prefix=f'/{NAME}')


@authorize_bearer(bearer=BEARER)
@blueprint.post('/')
@blueprint.post('/<path:name>')
def action(name=''):
    return get_response(NAME, name, routes, call_action)

Another route with a database connection is the following one:

from dbutils import transactional

from lib.db import get_db_from_conn, get_conn_from_dbname
from lib.decorators import use_schema, inject_conn
from settings import DEFAULT
from .schemas import FooSchema
from .sql import SQL_USERS


@use_schema(FooSchema)
@inject_conn(DEFAULT, named=True, autocommit=False)
def bar(conn, name, email=False):
    # Create new transaction from connection injected with a decorator
    with transactional(conn) as db:
        db.upsert('users', dict(email=email), dict(name=name))

    # Example of how to obtain new connection from database name.
    conn2 = get_conn_from_dbname(DEFAULT)
    db2 = get_db_from_conn(conn2)

    return db2.fetch_all(SQL_USERS, dict(name=name))

We can obtain our database connection in diverse ways. For example, we can use a function decorator to inject the connection (in this case the connection named DEFAULT) in the function signatura. We also can create the connection using a constructor. This connection is a raw psycopg2 connection. I also like to use a library to help me to work with psycopg2: a library (https://github.com/gonzalo123/dbutils) created by me a long time ago.

And that’s all. I normally deploy it in production using a nginx as a reverse proxy and n replicas of my api. Logs are also ready to send to ELK using a filebeat.

version: '3.6'

x-logging: &logging
  logging:
    options:
      max-size: 10m


services:
  api:
    image: dbapi:production
    <<: *logging
    deploy:
      replicas: 10
      restart_policy:
        condition: any
    volumes:
      - logs_volume:/src/logs
    environment:
      - ENVIRONMENT=production
    command: /bin/bash ./start.sh

  nginx:
    image: nginx-dbapi:${VERSION}
    deploy:
      restart_policy:
        condition: any
    environment:
      ENVIRON: ${VERSION}
    ports:
      - ${EXPOSED_PORT}:8000
    depends_on:
      - api

volumes:
  logs_volume:

Source code in my github account

Working with SAPUI5 locally (part 3). Adding more services in Docker

In the previous project we moved one project to docker. The idea was to move exactly the same functionality (even without touching anything within the source code). Now we’re going to add more services. Yes, I know, it looks like overenginering (it’s exactly overenginering, indeed), but I want to build something with different services working together. Let start.

We’re going to change a little bit our original project. Now our frontend will only have one button. This button will increment the number of clicks but we’re going to persists this information in a PostgreSQL database. Also, instead of incrementing the counter in the backend, our backend will emit one event to a RabbitMQ message broker. We’ll have one worker service listening to this event and this worker will persist the information. The communication between the worker and the frontend (to show the incremented value), will be via websockets.

With those premises we are going to need:

  • Frontend: UI5 application
  • Backend: PHP/lumen application
  • Worker: nodejs application which is listening to a RabbitMQ event and serving the websocket server (using socket.io)
  • Nginx server
  • PosgreSQL database.
  • RabbitMQ message broker.

As the previous examples, our PHP backend will be server via Nginx and PHP-FPM.

Here we can see to docker-compose file to set up all the services

version: '3.4'

services:
  nginx:
    image: gonzalo123.nginx
    restart: always
    ports:
    - "8080:80"
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-nginx
    volumes:
    - ./src/backend:/code/src
    - ./src/.docker/web/site.conf:/etc/nginx/conf.d/default.conf
    networks:
    - app-network
  api:
    image: gonzalo123.api
    restart: always
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-lumen-dev
    environment:
      XDEBUG_CONFIG: remote_host=${MY_IP}
    volumes:
    - ./src/backend:/code/src
    networks:
    - app-network
  ui5:
    image: gonzalo123.ui5
    ports:
    - "8000:8000"
    restart: always
    volumes:
    - ./src/frontend:/code/src
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-ui5
    networks:
    - app-network
  io:
    image: gonzalo123.io
    ports:
    - "9999:9999"
    restart: always
    volumes:
    - ./src/io:/code/src
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-io
    networks:
    - app-network
  pg:
    image: gonzalo123.pg
    restart: always
    ports:
    - "5432:5432"
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-pg
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata
    networks:
    - app-network
  rabbit:
    image: rabbitmq:3-management
    container_name: gonzalo123.rabbit
    restart: always
    ports:
    - "15672:15672"
    - "5672:5672"
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
    networks:
    - app-network
networks:
  app-network:
    driver: bridge

We’re going to use the same docker files than in the previous post but we also need new ones for worker, database server and message queue:

Worker:

FROM node:alpine

EXPOSE 8000

WORKDIR /code/src
COPY ./io .
RUN npm install
ENTRYPOINT ["npm", "run", "serve"]

The worker script is simple script that serves the socket.io server and emits a websocket within every message to the RabbitMQ queue.

var amqp = require('amqp'),
  httpServer = require('http').createServer(),
  io = require('socket.io')(httpServer, {
    origins: '*:*',
  }),
  pg = require('pg')
;

require('dotenv').config();
var pgClient = new pg.Client(process.env.DB_DSN);

rabbitMq = amqp.createConnection({
  host: process.env.RABBIT_HOST,
  port: process.env.RABBIT_PORT,
  login: process.env.RABBIT_USER,
  password: process.env.RABBIT_PASS,
});

var sql = 'SELECT clickCount FROM docker.clicks';

// Please don't do this. Use lazy connections
// I'm 'lazy' to do it in this POC 🙂
pgClient.connect(function(err) {
  io.on('connection', function() {
    pgClient.query(sql, function(err, result) {
      var count = result.rows[0]['clickcount'];
      io.emit('click', {count: count});
    });

  });

  rabbitMq.on('ready', function() {
    var queue = rabbitMq.queue('ui5');
    queue.bind('#');

    queue.subscribe(function(message) {
      pgClient.query(sql, function(err, result) {
        var count = parseInt(result.rows[0]['clickcount']);
        count = count + parseInt(message.data.toString('utf8'));
        pgClient.query('UPDATE docker.clicks SET clickCount = $1', [count],
          function(err) {
            io.emit('click', {count: count});
          });
      });
    });
  });
});

httpServer.listen(process.env.IO_PORT);

Database server:

FROM postgres:9.6-alpine
COPY pg/init.sql /docker-entrypoint-initdb.d/

As we can see we’re going to generate the database estructure in the first build

CREATE SCHEMA docker;

CREATE TABLE docker.clicks (
clickCount numeric(8) NOT NULL
);

ALTER TABLE docker.clicks
OWNER TO username;

INSERT INTO docker.clicks(clickCount) values (0);

With the RabbitMQ server we’re going to use the official docker image so we don’t need to create one Dockerfile

We also have changed a little bit our Nginx configuration. We want to use Nginx to serve backend and also socket.io server. That’s because we don’t want to expose different ports to internet.

server {
    listen 80;
    index index.php index.html;
    server_name localhost;
    error_log  /var/log/nginx/error.log;
    access_log /var/log/nginx/access.log;
    root /code/src/www;

    location /socket.io/ {
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_pass "http://io:9999";
    }

    location / {
        try_files $uri $uri/ /index.php?$query_string;
    }

    location ~ \.php$ {
        try_files $uri =404;
        fastcgi_split_path_info ^(.+\.php)(/.+)$;
        fastcgi_pass api:9000;
        fastcgi_index index.php;
        include fastcgi_params;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        fastcgi_param PATH_INFO $fastcgi_path_info;
    }
}

To avoid CORS issues we can also use SCP destination (the localneo proxy in this example), to serve socket.io also. So we need to:

  • change our neo-app.json file
  • "routes": [
        ...
        {
          "path": "/socket.io",
          "target": {
            "type": "destination",
            "name": "SOCKETIO"
          },
          "description": "SOCKETIO"
        }
      ],
    

    And basically that’s all. Here also we can use a “production” docker-copose file without exposing all ports and mapping the filesystem to our local machine (useful when we’re developing)

    version: '3.4'
    
    services:
      nginx:
        image: gonzalo123.nginx
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-nginx
        networks:
        - app-network
      api:
        image: gonzalo123.api
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-lumen
        networks:
        - app-network
      ui5:
        image: gonzalo123.ui5
        ports:
        - "80:8000"
        restart: always
        volumes:
        - ./src/frontend:/code/src
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-ui5
        networks:
        - app-network
      io:
        image: gonzalo123.io
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-io
        networks:
        - app-network
      pg:
        image: gonzalo123.pg
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-pg
        environment:
          POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
          POSTGRES_USER: ${POSTGRES_USER}
          POSTGRES_DB: ${POSTGRES_DB}
          PGDATA: /var/lib/postgresql/data/pgdata
        networks:
        - app-network
      rabbit:
        image: rabbitmq:3-management
        restart: always
        environment:
          RABBITMQ_ERLANG_COOKIE:
          RABBITMQ_DEFAULT_VHOST: /
          RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
          RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
        networks:
        - app-network
    networks:
      app-network:
        driver: bridge
    

    And that’s all. The full project is available in my github account

    PHP application in SAP Cloud Platform. With PostgreSQL, Redis and Cloud Foundry

    Keeping on with my study of SAP’s cloud platform (SCP) and Cloud Foundry today I’m going to build a simple PHP application. This application serves a simple Bootstrap landing page. The application uses a HTTP basic authentication. The credentials are validated against a PostgreSQL database. It also has a API to retrieve the localtimestamp from database server (just for play with a database server). I also want to play with Redis in the cloud too, so the API request will have a Time To Live (ttl) of 5 seconds. I will use a Redis service to do it.

    First we create our services in cloud foundry. I’m using the free layer of SAP cloud foundry for this example. I’m not going to explain here how to do that. It’s pretty straightforward within SAP’s coopkit. Time ago I played with IBM’s cloud foundry too. I remember that it was also very simple too.

    Then we create our application (.bp-config/options.json)

    {
    "WEBDIR": "www",
    "LIBDIR": "lib",
    "PHP_VERSION": "{PHP_70_LATEST}",
    "PHP_MODULES": ["cli"],
    "WEB_SERVER": "nginx"
    }

    If we want to use our PostgreSQL and Redis services with our PHP Appliacation we need to connect those services to our application. This operation can be done also with SAP’s Cockpit.

    Now is the turn of PHP application. I normally use Silex framework within my backends, but now there’s a problem: Silex is dead. I feel a little bit sad but I’m not going to cry. It’s just a tool and there’re another ones. I’ve got my example with Silex but, as an exercise, I will also do it with Lumen.

    Let’s start with Silex. If you’re familiar with Silex micro framework (or another microframework, indeed) you can see that there isn’t anything especial.

    use Symfony\Component\HttpKernel\Exception\HttpException;
    use Symfony\Component\HttpFoundation\Request;
    use Silex\Provider\TwigServiceProvider;
    use Silex\Application;
    use Predis\Client;
    
    if (php_sapi_name() == "cli-server") {
        // when I start the server my local machine vendors are in a different path
        require __DIR__ . '/../vendor/autoload.php';
        // and also I mock VCAP_SERVICES env
        $env   = file_get_contents(__DIR__ . "/../conf/vcap_services.json");
        $debug = true;
    } else {
        require 'vendor/autoload.php';
        $env   = $_ENV["VCAP_SERVICES"];
        $debug = false;
    }
    
    $vcapServices = json_decode($env, true);
    
    $app = new Application(['debug' => $debug, 'ttl' => 5]);
    
    $app->register(new TwigServiceProvider(), [
        'twig.path' => __DIR__ . '/../views',
    ]);
    
    $app['db'] = function () use ($vcapServices) {
        $dbConf = $vcapServices['postgresql'][0]['credentials'];
        $dsn    = "pgsql:dbname={$dbConf['dbname']};host={$dbConf['hostname']};port={$dbConf['port']}";
        $dbh    = new PDO($dsn, $dbConf['username'], $dbConf['password']);
        $dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $dbh->setAttribute(PDO::ATTR_CASE, PDO::CASE_UPPER);
        $dbh->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
    
        return $dbh;
    };
    
    $app['redis'] = function () use ($vcapServices) {
        $redisConf = $vcapServices['redis'][0]['credentials'];
    
        return new Client([
            'scheme'   => 'tcp',
            'host'     => $redisConf['hostname'],
            'port'     => $redisConf['port'],
            'password' => $redisConf['password'],
        ]);
    };
    
    $app->get("/", function (Application $app) {
        return $app['twig']->render('index.html.twig', [
            'user' => $app['user'],
            'ttl'  => $app['ttl'],
        ]);
    });
    
    $app->get("/timestamp", function (Application $app) {
        if (!$app['redis']->exists('timestamp')) {
            $stmt = $app['db']->prepare('SELECT localtimestamp');
            $stmt->execute();
            $app['redis']->set('timestamp', $stmt->fetch()['TIMESTAMP'], 'EX', $app['ttl']);
        }
    
        return $app->json($app['redis']->get('timestamp'));
    });
    
    $app->before(function (Request $request) use ($app) {
        $username = $request->server->get('PHP_AUTH_USER', false);
        $password = $request->server->get('PHP_AUTH_PW');
    
        $stmt = $app['db']->prepare('SELECT name, surname FROM public.user WHERE username=:USER AND pass=:PASS');
        $stmt->execute(['USER' => $username, 'PASS' => md5($password)]);
        $row = $stmt->fetch();
        if ($row !== false) {
            $app['user'] = $row;
        } else {
            header("WWW-Authenticate: Basic realm='RIS'");
            throw new HttpException(401, 'Please sign in.');
        }
    }, 0);
    
    $app->run();
    

    Maybe the only especial thing is the way that autoloader is done. We are initializing autoloader in two different ways. One way when the application is run in the cloud and another one when the application is run locally with PHP’s built-in server. That’s because vendors are located in different paths depending on which environment the application lives in. When Cloud Foundry connect services to appliations it injects environment variables with the service configuration (credentials, host, …). It uses VCAP_SERVICES env var.

    I use the built-in server to run the application locally. When I’m doing that I don’t have VCAP_SERVICES variable. And also my services information are different than when I’m running the application in the cloud. Maybe it’s better with an environment variable but I’m using this trick:

    if (php_sapi_name() == "cli-server") {
        // I'm runing the application locally
    } else {
        // I'm in the cloud
    }
    

    So when I’m locally I mock VCAP_SERVICES with my local values and also, for example, configure Silex application in debug mode.

    Sometimes I want to run my application locally but I want to use the cloud services. I cannot connect directly to those services, but we can do it over ssh through our connected application. For example If our PostgreSQL application is running on 10.11.241.0:48825 we can map this remote port (in a private network) to our local port with this command.

    cf ssh -N -T -L 48825:10.11.241.0:48825 silex
    

    You can see more information about this command here.

    Now we can use pgAdmin, for example, in our local machine to connect to cloud server.

    We can do the same with Redis

    cf ssh -N -T -L 54266:10.11.241.9:54266 silex
    

    And basically that’s all. Now we’ll do the same with Lumen. The idea is create the same application with Lumen instead of Silex. It’s a dummy application but it cover task that I normally use. I also will re-use the Redis and PostgreSQL services from the previous project.

    use App\Http\Middleware;
    use Laravel\Lumen\Application;
    use Laravel\Lumen\Routing\Router;
    use Predis\Client;
    
    if (php_sapi_name() == "cli-server") {
        require __DIR__ . '/../vendor/autoload.php';
        $env = 'dev';
    } else {
        require 'vendor/autoload.php';
        $env = 'prod';
    }
    
    (new Dotenv\Dotenv(__DIR__ . "/../env/{$env}"))->load();
    
    $app = new Application();
    
    $app->routeMiddleware([
        'auth' => Middleware\AuthMiddleware::class,
    ]);
    
    $app->register(App\Providers\VcapServiceProvider::class);
    $app->register(App\Providers\StdoutLogServiceProvider::class);
    $app->register(App\Providers\DbServiceProvider::class);
    $app->register(App\Providers\RedisServiceProvider::class);
    
    $app->router->group(['middleware' => 'auth'], function (Router $router) {
        $router->get("/", function () {
            return view("index", [
                'user' => config("user"),
                'ttl'  => getenv('TTL'),
            ]);
        });
    
        $router->get("/timestamp", function (Client $redis, PDO $conn) {
            if (!$redis->exists('timestamp')) {
                $stmt = $conn->prepare('SELECT localtimestamp');
                $stmt->execute();
                $redis->set('timestamp', $stmt->fetch()['TIMESTAMP'], 'EX', getenv('TTL'));
            }
    
            return response()->json($redis->get('timestamp'));
        });
    });
    
    $app->run();
    

    I’ve created four servicer providers. One for handle Database connections (I don’t like ORMs)

    namespace App\Providers;
    
    use Illuminate\Support\ServiceProvider;
    use PDO;
    
    class DbServiceProvider extends ServiceProvider
    {
        public function register()
        {
        }
    
        public function boot()
        {
            $vcapServices = app('vcap_services');
    
            $dbConf = $vcapServices['postgresql'][0]['credentials'];
            $dsn    = "pgsql:dbname={$dbConf['dbname']};host={$dbConf['hostname']};port={$dbConf['port']}";
            $dbh    = new PDO($dsn, $dbConf['username'], $dbConf['password']);
            $dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
            $dbh->setAttribute(PDO::ATTR_CASE, PDO::CASE_UPPER);
            $dbh->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
    
            $this->app->bind(PDO::class, function ($app) use ($dbh) {
                return $dbh;
            });
        }
    }
    

    Another one for Redis. I need to study a little bit more Lumen. I know that Lumen has a built-in tool to work with Redis.

    namespace App\Providers;
    
    use Illuminate\Support\ServiceProvider;
    use Predis\Client;
    
    class RedisServiceProvider extends ServiceProvider
    {
        public function register()
        {
        }
    
        public function boot()
        {
            $vcapServices = app('vcap_services');
            $redisConf    = $vcapServices['redis'][0]['credentials'];
    
            $redis = new Client([
                'scheme'   => 'tcp',
                'host'     => $redisConf['hostname'],
                'port'     => $redisConf['port'],
                'password' => $redisConf['password'],
            ]);
    
            $this->app->bind(Client::class, function ($app) use ($redis) {
                return $redis;
            });
        }
    }
    

    Another one to tell monolog to send logs to Stdout

    namespace App\Providers;
    
    use Illuminate\Support\ServiceProvider;
    use Monolog;
    
    class StdoutLogServiceProvider extends ServiceProvider
    {
        public function register()
        {
            app()->configureMonologUsing(function (Monolog\Logger $monolog) {
                return $monolog->pushHandler(new \Monolog\Handler\ErrorLogHandler());
            });
        }
    }
    

    And the last one to work with Vcap environment variables. Probably I need to integrate it with dotenv files

    namespace App\Providers;
    
    use Illuminate\Support\ServiceProvider;
    
    class VcapServiceProvider extends ServiceProvider
    {
        public function register()
        {
            if (php_sapi_name() == "cli-server") {
                $env = file_get_contents(__DIR__ . "/../../conf/vcap_services.json");
            } else {
                $env = $_ENV["VCAP_SERVICES"];
            }
    
            $vcapServices = json_decode($env, true);
    
            $this->app->bind('vcap_services', function ($app) use ($vcapServices) {
                return $vcapServices;
            });
        }
    }
    

    We also need to handle authentication (http basic auth in this case) so we’ll create a simple middleware

    namespace App\Http\Middleware;
    
    use Closure;
    use Illuminate\Http\Request;
    use PDO;
    
    class AuthMiddleware
    {
        public function handle(Request $request, Closure $next)
        {
            $user = $request->getUser();
            $pass = $request->getPassword();
    
            $db = app(PDO::class);
            $stmt = $db->prepare('SELECT name, surname FROM public.user WHERE username=:USER AND pass=:PASS');
            $stmt->execute(['USER' => $user, 'PASS' => md5($pass)]);
            $row = $stmt->fetch();
            if ($row !== false) {
                config(['user' => $row]);
            } else {
                $headers = ['WWW-Authenticate' => 'Basic'];
                return response('Admin Login', 401, $headers);
            }
    
            return $next($request);
        }
    }
    

    In summary: Lumen is cool. The interface is very similar to Silex. I can swap my mind from thinking in Silex to thinking in Lumen easily. Blade instead Twig: no problem. Service providers are very similar. Routing is almost the same and Middlewares are much better. Nowadays backend is a commodity for me so I don’t want to spend to much time working on it. I want something that just work. Lumen looks like that.

    Both projects: Silex and Lumen are available in my github

    Performing UPSERT (Update or Insert) with PostgreSQL and PHP

    That’s a typical situation. Imagine you’ve got one table

    CREATE TABLE PUBLIC.TBUPSERTEXAMPLE
    (
      KEY1 CHARACTER VARYING(10) NOT NULL,
      KEY2 CHARACTER VARYING(14) NOT NULL,
      KEY3 CHARACTER VARYING(14) NOT NULL,
      KEY4 CHARACTER VARYING(14) NOT NULL,
    
      VALUE1 CHARACTER VARYING(20),
      VALUE2 CHARACTER VARYING(20) NOT NULL,
      VALUE3 CHARACTER VARYING(100),
      VALUE4 CHARACTER VARYING(400),
      VALUE5 CHARACTER VARYING(20),
    
      CONSTRAINT TBUPSERTEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2, KEY3, KEY4)
    )
    

    And you need to update one record. You can perform a simple UPDATE statement but what happens the first time?

    You cannot update the record basically because the record doesn’t exists. You need to create an INSERT statement instead. We can do it following different ways. You can create first a SELECT statement and, if the record exists, perform an UPDATE. If it doesn’t exists you perform an INSERT. We also can perform an UPDATE and see how many records are affected. If no records are affected then we perform an INSERT. Finally we can perform one INSERT and it it throws an error then perform an UPDATE.

    All of these techniques works in one way or another but PostgreSQL gives us one cool way of doing this operation with one SQL sentence. We can use CTE (Common Table Expression) and execute something like this:

    WITH upsert AS (
        UPDATE PUBLIC.TBUPSERTEXAMPLE
        SET
            VALUE1 = :VALUE1,
            VALUE2 = :VALUE2,
            VALUE3 = :VALUE3,
            VALUE4 = :VALUE4,
            VALUE5 = :VALUE5
        WHERE
            KEY1 = :KEY1 AND
            KEY2 = :KEY2 AND
            KEY3 = :KEY3 AND
            KEY4 = :KEY4
        RETURNING *
    )
    INSERT INTO PUBLIC.TBUPSERTEXAMPLE(KEY1, KEY2, KEY3, KEY4, VALUE1, VALUE2, VALUE3, VALUE4, VALUE5)
    SELECT
        :KEY1, :KEY2, :KEY3, :KEY4, :VALUE1, :VALUE2, :VALUE3, :VALUE4, :VALUE5
    WHERE
        NOT EXISTS (SELECT 1 FROM upsert);
    

    Since PostgreSQL 9.5 we also can do another technique to do this UPSERT operations. We can do something like this:

    INSERT INTO PUBLIC.TBUPSERTEXAMPLE (key1, key2, key3, key4, value1, value2, value3, value4, value5)
      VALUES ('key2', 'key2', 'key3', 'key4', 'value1',  'value2',  'value3',  'value4',  'value5')
    ON CONFLICT (key1, key2, key3, key4)
    DO UPDATE SET 
      value1 = 'value1', 
      value2 = 'value2', 
      value3 = 'value3', 
      value4 = 'value4', 
      value5 = 'value5'
    WHERE 
      TBUPSERTEXAMPLE.key1 = 'key2' AND 
      TBUPSERTEXAMPLE.key2 = 'key2' AND 
      TBUPSERTEXAMPLE.key3 = 'key3' AND 
      TBUPSERTEXAMPLE.key4 = 'key4';
    

    To help me writing this sentence I’ve created a simple PHP wrapper:

    Here one example using PDO

    use G\SqlUtils\Upsert;
    
    $conn = new PDO('pgsql:dbname=gonzalo;host=localhost', 'username', 'password');
    $conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    
    $conn->beginTransaction();
    try {
        Upsert::createFromPDO($conn)->exec('PUBLIC.TBUPSERTEXAMPLE', [
            'KEY1' => 'key1',
            'KEY2' => 'key2',
            'KEY3' => 'key3',
            'KEY4' => 'key4',
        ], [
            'VALUE1' => 'value1',
            'VALUE2' => 'value2',
            'VALUE3' => 'value3',
            'VALUE4' => 'value4',
            'VALUE5' => 'value5',
        ]);
        $conn->commit();
    } catch (Exception $e) {
        $conn->rollback();
        throw $e;
    }
    

    And another one using DBAL

    use Doctrine\DBAL\DriverManager;
    use G\SqlUtils\Upsert;
    
    $connectionParams = [
        'dbname'   => 'gonzalo',
        'user'     => 'username',
        'password' => 'password',
        'host'     => 'localhost',
        'driver'   => 'pdo_pgsql',
    ];
    
    $dbh = DriverManager::getConnection($connectionParams);
    $dbh->transactional(function ($conn) {
        Upsert::createFromDBAL($conn)->exec('PUBLIC.TBUPSERTEXAMPLE', [
            'KEY1' => 'key1',
            'KEY2' => 'key2',
            'KEY3' => 'key3',
            'KEY4' => 'key4',
        ], [
            'VALUE1' => 'value1',
            'VALUE2' => 'value2',
            'VALUE3' => 'value3',
            'VALUE4' => null,
            'VALUE5' => 'value5',
        ]);
    });
    

    And that’s all. Library is available in my github and it’s also at packagist.

    Notify events from PostgreSQL to external listeners

    Sometimes we need to call external programs from our PostgreSQL database. We can send sockets from SQL statements. I’ve written about it. The problem with this approach the following one. If user rollbacks the transaction the socket has been already emitted. That’s a problem (or not. Depending on our application). Nobody also guarantees that the process behind the socket server has access to the data of the transaction. If we’re very fast maybe the transaction isn’t commited yet. We can use one sleep function but sleep functions are always a bad idea. PostgreSQL gives us another tool to decouple processes: LISTEN and NOTIFY.

    Let me show you and example. First we create a table:

    CREATE TABLE PUBLIC.TBLEXAMPLE
    (
      KEY1 CHARACTER VARYING(10) NOT NULL,
      KEY2 CHARACTER VARYING(14) NOT NULL,
    
      VALUE1 CHARACTER VARYING(20),
      VALUE2 CHARACTER VARYING(20) NOT NULL,
    
      CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
    )
    

    Now we add a “after insert” trigger to our table

    CREATE TRIGGER TBLEXAMPLE_AFTER
    AFTER INSERT
    ON PUBLIC.TBLEXAMPLE
    FOR EACH ROW
    EXECUTE PROCEDURE PUBLIC.NOTIFY();
    

    And now, within the trigger function, we send a notify event (‘myEvent’ in this case) with the row information. We need to send plain text in the notify event so we’ll use JSON to encode our row data.

    CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
    $BODY$
    BEGIN
      PERFORM pg_notify('myEvent', row_to_json(NEW)::text);
      RETURN new;
    END;
    $BODY$
    LANGUAGE 'plpgsql' VOLATILE COST 100;
    

    Now we’re going to build a server side example that connects to our PostgreSQL database and listen to the event. In this case we’re going to use nodejs to build the prototype. This example also will enqueue events into a gearman server.

    var pg = require('pg'),
        gearmanode = require('gearmanode'),
        gearmanClient,
        conString = 'tcp://username:password@localhost:5432/gonzalo',
        pgClient;
    
    gearmanode.Client.logger.transports.console.level = 'error';
    
    gearmanClient = gearmanode.client();
    
    console.log('LISTEN myEvent');
    pgClient = new pg.Client(conString);
    pgClient.connect();
    pgClient.query('LISTEN myEvent');
    pgClient.on('notification', function (data) {
        console.log("\033[34m" + new Date + '-\033[0m payload', data.payload);
        gearmanClient.submitJob('sms.sender.one', data.payload);
    });
    

    And that’s all. Now we only need to perform an INSERT statement into our table. This process will trigger our event and our nodejs will enqueue the process into a gearman queue.

    INSERT INTO PUBLIC.TBLEXAMPLE(KEY1, KEY2, VALUE1, VALUE2) VALUES ('k1', 'k2', 'v1', 'v2');
    

    It’s good to remark that if our insert statement is inside a transaction and we rollback it, notify won’t send any message.

    Foreign Data Wrappers with PostgreSQL and PHP

    PostgreSQL is more than a relational database. It has many cool features. Today we’re going to play with Foreign Data Wrappers (FDW). The idea is crate a virtual table from an external datasource and use it like we use a traditional table.

    Let me show you an example. Imagine that we’ve got a REST datasource on port 8888. We’re going to use this Silex application, for example

    use Silex\Application;
    
    $app = new Application();
    
    $app->get('/', function(Application $app) {
    
        return $app->json([
            ['name' => 'Peter', 'surname' => 'Parker'],
            ['name' => 'Clark', 'surname' => 'Kent'],
            ['name' => 'Bruce', 'surname' => 'Wayne'],
        ]);
    });
    
    $app->run();
    

    We want to use this datasource in PostgreSQL, so we need to use a “www foreign data wrapper”.

    First we create the extension (maybe we need to compile the extension. We can follow the installation instructions here)

    CREATE EXTENSION www_fdw;
    

    Now with the extension we need to create a “server”. This server is just a proxy that connects to the real Rest service

    CREATE SERVER myRestServer FOREIGN DATA WRAPPER www_fdw OPTIONS (uri 'http://localhost:8888');
    

    Now we need to map our user to the server

    CREATE USER MAPPING FOR gonzalo SERVER myRestServer;
    

    And finally we only need our “Foreign table”

    CREATE FOREIGN TABLE myRest (
        name text,
        surname text
    ) SERVER myRestServer;
    

    Now we can perform SQL queries using our Foreign table

    SELECT * FROM myRest
    

    We must take care with one thing. We can use WHERE clauses but if we run

    SELECT * FROM myRest WHERE name='Peter'
    

    We’ll that the output is the same than “SELECT * FROM myRest”. That’s because if we want to filter something with WHERE clause within Foreign we need to do it in the remote service. WHERE name=‘Peter’ means that our Database will execute the following request:

    http://localhost:8888?name=Peter
    

    And we need to handle this parameter. For example doing something like that

    use Silex\Application;
    use Symfony\Component\HttpFoundation\Request;
    
    $app = new Application();
    
    $app->get('/', function(Application $app, Request $request) {
        $name = $request->get('name');
    
        $data = [
            ['name' => 'Peter', 'surname' => 'Parker'],
            ['name' => 'Clark', 'surname' => 'Kent'],
            ['name' => 'Bruce', 'surname' => 'Wayne'],
        ];
        return $app->json(array_filter($data, function($reg) use($name){
            return $name ? $reg['name'] == $name : true;
        }));
    });
    
    $app->run();
    

    Building one HTTP client in PostgreSQL with PL/Python

    Don’t ask me way, but I need to call to a HTTP server (one Silex application) from a PostgreSQL database.

    I want to do something like this:

    select get('http://localhost:8080?name=Gonzalo')->'hello';
    

    PostgreSQL has a datatype for json. It’s really cool and it allows us to connect our HTTP server and our SQL database using same datatype.

    PostgreSQL also allows us to create stored procedures using different languages. The default language is PL/pgSQL. PL/pgSQL is a simple language where we can embed SQL. But we also can use Python. With Python we can easily create HTTP clients, for example with urllib2. That means that develop our a HTTP client for a PostgreSQL database is pretty straightforward.

    CREATE OR REPLACE FUNCTION get(uri character varying)
      RETURNS json AS
    $BODY$
    import urllib2
    
    data = urllib2.urlopen(uri)
    
    return data.read()
    
    $BODY$
      LANGUAGE plpython2u VOLATILE
      COST 100;
    ALTER FUNCTION get(character varying)
      OWNER TO gonzalo;
    

    Ok that’s a GET client, but we also want a POST client to do something like this:

    select post('http://localhost:8080', '{"name": "Gonzalo"}'::json)->'hello';
    

    As you can see I want to use application/json instead of application/x-www-form-urlencoded to send request parameters. I wrote about it here time ago. So I will create one endpoint within my Silex server to handle my POST requests to:

    <?php
    include __DIR__ . '/../vendor/autoload.php';
    
    use Silex\Application;
    use Symfony\Component\HttpFoundation\Request;
    use G\AngularPostRequestServiceProvider;
    
    $app = new Application(['debug' => true]);
    
    $app->register(new AngularPostRequestServiceProvider());
    
    $app->post('/', function (Application $app, Request $request) {
        return $app->json(['hello' => $request->get('name')]);
    });
    
    $app->get('/', function (Application $app, Request $request) {
        return $app->json(['hello' => $request->get('name')]);
    });
    
    $app->run();
    

    And now we only need to create one stored procedure to send POST requests

    CREATE OR REPLACE FUNCTION post(
        uri character varying,
        paramenters json)
      RETURNS json AS
    $BODY$
    import urllib2
    
    clen = len(paramenters)
    req = urllib2.Request(uri, paramenters, {'Content-Type': 'application/json', 'Content-Length': clen})
    f = urllib2.urlopen(req)
    return f.read()
    
    $BODY$
      LANGUAGE plpython2u VOLATILE
      COST 100;
    ALTER FUNCTION post(character varying, json)
      OWNER TO gonzalo;
    

    And that’s all. At least this simple script is exactly what I need.

    Sending sockets from PostgreSQL triggers with Python

    Picture this: We want to notify to one external service each time that one record is inserted in the database. We can find the place where the insert statement is done and create a TCP client there, but: What happens if the application that inserts the data within the database is a legacy application?, or maybe it is too hard to do?. If your database is PostgreSQL it’s pretty straightforward. With the “default” procedural language of PostgreSQL (pgplsql) we cannot do it, but PostgreSQL allows us to use more procedural languages than plpgsql, for example Python. With plpython we can use sockets in the same way than we use it within Python scripts. It’s very simple. Let me show you how to do it.

    First we need to create one plpython with our TCP client

    CREATE OR REPLACE FUNCTION dummy.sendsocket(msg character varying, host character varying, port integer)
      RETURNS integer AS
    $BODY$
      import _socket
      try:
        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
        s.connect((host, port))
        s.sendall(msg)
        s.close()
        return 1
      except:
        return 0
    $BODY$
      LANGUAGE plpython VOLATILE
      COST 100;
    ALTER FUNCTION dummy.sendsocket(character varying, character varying, integer)
      OWNER TO username;
    

    Now we create the trigger that use our socket client.

    CREATE OR REPLACE FUNCTION dummy.myTriggerToSendSockets()
    RETURNS trigger AS
    $BODY$
       import json
       stmt = plpy.prepare("select dummy.sendSocket($1, $2, $3)", ["text", "text", "int"])
       rv = plpy.execute(stmt, [json.dumps(TD), "host", 26200])
    $BODY$
    LANGUAGE plpython VOLATILE
    COST 100;
    

    As you can see in my example we are sending all the record as a JSON string in the socket body.

    And finally we attach the trigger to one table (or maybe we need to do it to more than one table)

    CREATE TRIGGER myTrigger
      AFTER INSERT OR UPDATE OR DELETE
      ON dummy.myTable
      FOR EACH ROW
      EXECUTE PROCEDURE dummy.myTriggerToSendSockets();
    

    And that’s all. Now we can use one simple TCP socket server to handle those requests. Let me show you different examples of TCP servers with different languages. As we can see all are different implementations of Reactor pattern. We can use, for example:

    node.js:

    var net = require('net');
    
    var host = 'localhost';
    var port = 26200;
    
    var server = net.createServer(function (socket) {
        socket.on('data', function(buffer) {
            // do whatever that we want with buffer
        });
    });
    
    server.listen(port, host);
    

    python (with Twisted):

    from twisted.internet import reactor, protocol
    
    HOST = 'localhost'
    PORT = 26200
    
    class MyServer(protocol.Protocol):
        def dataReceived(self, data):
            # do whatever that we want with data
            pass
    
    class MyServerFactory(protocol.Factory):
        def buildProtocol(self, addr):
            return MyServer()
    
    reactor.listenTCP(PORT, MyServerFactory(), interface=HOST)
    reactor.run()
    

    (I know that we can create the Python’s TCP server without Twisted, but if don’t use it maybe someone will angry with me. Probably he is angry right now because I put the node.js example first :))

    php (with react):

    <?php
    include __DIR__ . '/vendor/autoload.php';
    
    $host = 'localhost';
    $port = 26200;
    
    $loop   = React\EventLoop\Factory::create();
    $socket = new React\Socket\Server($loop);
    
    $socket->on('connection', function ($conn) {
        $conn->on('data', function ($data) {
            // do whatever we want with data
            }
        );
    });
    
    $socket->listen($port, $host);
    $loop->run();
    

    You also can use xinet.d to handle the TCP inbound connections.

    Building a simple SQL wrapper with PHP. Part 2.

    In one of our last post we built a simple SQL wrapper with PHP. Now we are going to improve it a little bit. We area going to use a class Table instead of the table name. Why? Simple. We want to create triggers. OK we can create triggers directly in the database but sometimes our triggers need to perform operations outside the database, such as call a REST webservice, filesystem’s logs or things like that.

    <?php
    class Storage
    {
        static $count = 0;
    
        static function init()
        {
            self::$count = 0;
        }
    
        static function increment()
        {
            self::$count++;
        }
    
        static function decrement()
        {
            self::$count--;
        }
    
        static function get()
        {
            return self::$count;
        }
    }
    
    class SqlTest extends PHPUnit_Framework_TestCase
    {
        public function setUp()
        {
            $this->dbh = new Conn('pgsql:dbname=db;host=localhost', 'gonzalo', 'password');
            $this->dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
            $this->dbh->forceRollback();
        }
    
        public function testInsertWithPostInsertShowingInsertedValues()
        {
            Storage::init();
            $that = $this;
            $this->dbh->transactional(function($dbh) use ($that) {
                $sql = new Sql($that->dbh);
                $users = new Table('users');
                $users->postInsert(function($values) {Storage::increment();});
    
                $that->assertEquals(0, Storage::get());
                $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
                $that->assertTrue($actual);
    
                $that->assertEquals(1, Storage::get());
            });
        }
    
        public function testInsertWithPostInsert()
        {
            Storage::init();
            $that = $this;
            $this->dbh->transactional(function($dbh) use ($that) {
                $sql = new Sql($that->dbh);
                $users = new Table('users');
                $users->postInsert(function() {Storage::increment();});
    
                $that->assertEquals(0, Storage::get());
                $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
                $that->assertTrue($actual);
    
                $that->assertEquals(1, Storage::get());
            });
        }
    
        public function testInsertWithPrePostInsert()
        {
            Storage::init();
            $that = $this;
            $this->dbh->transactional(function($dbh) use ($that) {
                $sql = new Sql($that->dbh);
                $users = new Table('users');
                $users->preInsert(function() {Storage::increment();});
                $users->postInsert(function() {Storage::decrement();});
    
                $that->assertEquals(0, Storage::get());
                $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
                $that->assertTrue($actual);
    
                $that->assertEquals(0, Storage::get());
            });
        }
    
        public function testUpdateWithPrePostInsert()
        {
            Storage::init();
            $that = $this;
            $this->dbh->transactional(function($dbh) use ($that) {
                $sql = new Sql($that->dbh);
                $users = new Table('users');
                $users->preUpdate(function() {Storage::increment();});
                $users->postUpdate(function() {Storage::increment();});
    
                $that->assertEquals(0, Storage::get());
                $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
                $that->assertTrue($actual);
                $that->assertEquals(0, Storage::get());
    
                $data = $sql->select('users', array('uid' => 7));
                $that->assertEquals('Gonzalo', $data[0]['name']);
    
                $actual = $sql->update($users, array('name' => 'gonzalo',), array('uid' => 7));
                $that->assertTrue($actual);
                $that->assertEquals(2, Storage::get());
    
                $data = $sql->select('users', array('uid' => 7));
                $that->assertEquals('gonzalo', $data[0]['name']);
            });
        }
    
        public function testDeleteWithPrePostInsert()
        {
            Storage::init();
            $that = $this;
            $this->dbh->transactional(function($dbh) use ($that) {
                $sql = new Sql($that->dbh);
                $users = new Table('users');
                $users->preDelete(function() {Storage::increment();});
                $users->postDelete(function() {Storage::increment();});
    
                $that->assertEquals(0, Storage::get());
                $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
                $that->assertTrue($actual);
                $that->assertEquals(0, Storage::get());
    
                $actual = $sql->delete($users, array('uid' => 7));
                $that->assertTrue($actual);
                $that->assertEquals(2, Storage::get());
            });
        }
    }
    

    And here the whole library:

    class Conn extends PDO
    {
        private $forcedRollback = false;
        public function transactional(Closure $func)
        {
            $this->beginTransaction();
            try {
                $func($this);
                $this->forcedRollback ? $this->rollback() : $this->commit();
            } catch (Exception $e) {
                $this->rollback();
                throw $e;
            }
        }
    
        public function forceRollback()
        {
            $this->forcedRollback = true;
        }
    }
    
    class Table
    {
        private $tableName;
    
        function __construct($tableName)
        {
            $this->tableName = $tableName;
        }
    
        private $cbkPostInsert;
        private $cbkPostUpdate;
        private $cbkPostDelete;
        private $cbkPreInsert;
        private $cbkPreUpdate;
        private $cbkPreDelete;
    
        public function getTableName()
        {
            return $this->tableName;
        }
    
        public function postInsert(Closure $func)
        {
            $this->cbkPostInsert = $func;
        }
    
        public function postUpdate(Closure $func)
        {
            $this->cbkPostUpdate = $func;
        }
    
        public function postDelete(Closure $func)
        {
            $this->cbkPostDelete = $func;
        }
    
        public function preInsert(Closure $func)
        {
            $this->cbkPreInsert = $func;
        }
    
        public function preUpdate(Closure $func)
        {
            $this->cbkPreUpdate = $func;
        }
    
        public function preDelete(Closure $func)
        {
            $this->cbkPreDelete = $func;
        }
    
        public function execPostInsert($values)
        {
            $func = $this->cbkPostInsert;
            if ($this->cbkPostInsert instanceof Closure) $func($values);
        }
    
        public function execPostUpdate($values, $where)
        {
            $func = $this->cbkPostUpdate;
            if ($func instanceof Closure) $func($values, $where);
        }
    
        public function execPostDelete($where)
        {
            $func = $this->cbkPostDelete;
            if ($func instanceof Closure) $func($where);
        }
    
        public function execPreInsert($values)
        {
            $func = $this->cbkPreInsert;
            if ($func instanceof Closure) $func($values);
        }
    
        public function execPreUpdate($values)
        {
            $func = $this->cbkPreUpdate;
            if ($func instanceof Closure) $func($values);
        }
    
        public function execPreDelete($where)
        {
            $func = $this->cbkPreDelete;
            if ($func instanceof Closure) $func($where);
        }
    }
    
    class Sql
    {
        /** @var Conn */
        private $dbh;
        function __construct(Conn $dbh)
        {
            $this->dbh = $dbh;
        }
    
        public function select($table, $where)
        {
            $tableName   = ($table instanceof Table) ? $table->getTableName() : $table;
            $sql         = $this->createSelect($tableName, $where);
            $whereParams = $this->getWhereParameters($where);
            $stmp = $this->dbh->prepare($sql);
            $stmp->execute($whereParams);
            return $stmp->fetchAll();
        }
    
        public function insert($table, $values)
        {
            $tableName = ($table instanceof Table) ? $table->getTableName() : $table;
            $sql       = $this->createInsert($tableName, $values);
    
            if ($table instanceof Table) $table->execPreInsert($values);
            $stmp = $this->dbh->prepare($sql);
            $out = $stmp->execute($values);
            if ($table instanceof Table) $table->execPostInsert($values);
            return $out;
        }
    
        public function update($table, $values, $where)
        {
            $tableName   = ($table instanceof Table) ? $table->getTableName() : $table;
            $sql         = $this->createUpdate($tableName, $values, $where);
            $whereParams = $this->getWhereParameters($where);
    
            if ($table instanceof Table) $table->execPreUpdate($values, $where);
            $stmp = $this->dbh->prepare($sql);
            $out = $stmp->execute(array_merge($values, $whereParams));
            if ($table instanceof Table) $table->execPostUpdate($values, $where);
            return $out;
        }
    
        public function delete($table, $where)
        {
            $tableName   = ($table instanceof Table) ? $table->getTableName() : $table;
            $sql         = $this->createDelete($tableName, $where);
            $whereParams = $this->getWhereParameters($where);
    
            if ($table instanceof Table) $table->execPreDelete($where);
            $stmp = $this->dbh->prepare($sql);
            $out = $stmp->execute($whereParams);
            if ($table instanceof Table) $table->execPostDelete($where);
            return $out;
        }
    
        protected function getWhereParameters($where)
        {
            $whereParams = array();
            foreach ($where as $key => $value) {
                $whereParams[":W_{$key}"] = $value;
            }
            return $whereParams;
        }
    
        protected function createSelect($table, $where)
        {
            return "SELECT * FROM " . $table . $this->createSqlWhere($where);
        }
    
        protected function createUpdate($table, $values, $where)
        {
            $sqlValues = array();
            foreach (array_keys($values) as $key) {
                $sqlValues[] = "{$key} = :{$key}";
            }
            return "UPDATE {$table} SET " . implode(', ', $sqlValues) . $this->createSqlWhere($where);
        }
    
        protected function createInsert($table, $values)
        {
            $sqlValues = array();
            foreach (array_keys($values) as $key) {
                $sqlValues[] = ":{$key}";
            }
            return "INSERT INTO {$table} (" . implode(', ', array_keys($values)) . ") VALUES (" . implode(', ', $sqlValues) . ")";
        }
    
        protected function createDelete($table, $where)
        {
            return "DELETE FROM {$table}" . $this->createSqlWhere($where);
        }
    
        protected function createSqlWhere($where)
        {
            if (count((array) $where) == 0) return null;
    
            $whereSql = array();
            foreach ($where as $key => $value) {
                $whereSql[] = "{$key} = :W_{$key}";
            }
            return ' WHERE ' . implode(' AND ', $whereSql);
        }
    }