Category Archives: PostgreSQL

Notify events from PostgreSQL to external listeners

Sometimes we need to call external programs from our PostgreSQL database. We can send sockets from SQL statements. I’ve written about it. The problem with this approach the following one. If user rollbacks the transaction the socket has been already emitted. That’s a problem (or not. Depending on our application). Nobody also guarantees that the process behind the socket server has access to the data of the transaction. If we’re very fast maybe the transaction isn’t commited yet. We can use one sleep function but sleep functions are always a bad idea. PostgreSQL gives us another tool to decouple processes: LISTEN and NOTIFY.

Let me show you and example. First we create a table:

CREATE TABLE PUBLIC.TBLEXAMPLE
(
  KEY1 CHARACTER VARYING(10) NOT NULL,
  KEY2 CHARACTER VARYING(14) NOT NULL,

  VALUE1 CHARACTER VARYING(20),
  VALUE2 CHARACTER VARYING(20) NOT NULL,

  CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
)

Now we add a “after insert” trigger to our table

CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER INSERT
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();

And now, within the trigger function, we send a notify event (‘myEvent’ in this case) with the row information. We need to send plain text in the notify event so we’ll use JSON to encode our row data.

CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
  PERFORM pg_notify('myEvent', row_to_json(NEW)::text);
  RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;

Now we’re going to build a server side example that connects to our PostgreSQL database and listen to the event. In this case we’re going to use nodejs to build the prototype. This example also will enqueue events into a gearman server.

var pg = require('pg'),
    gearmanode = require('gearmanode'),
    gearmanClient,
    conString = 'tcp://username:password@localhost:5432/gonzalo',
    pgClient;

gearmanode.Client.logger.transports.console.level = 'error';

gearmanClient = gearmanode.client();

console.log('LISTEN myEvent');
pgClient = new pg.Client(conString);
pgClient.connect();
pgClient.query('LISTEN myEvent');
pgClient.on('notification', function (data) {
    console.log("\033[34m" + new Date + '-\033[0m payload', data.payload);
    gearmanClient.submitJob('sms.sender.one', data.payload);
});

And that’s all. Now we only need to perform an INSERT statement into our table. This process will trigger our event and our nodejs will enqueue the process into a gearman queue.

INSERT INTO PUBLIC.TBLEXAMPLE(KEY1, KEY2, VALUE1, VALUE2) VALUES ('k1', 'k2', 'v1', 'v2');

It’s good to remark that if our insert statement is inside a transaction and we rollback it, notify won’t send any message.

Foreign Data Wrappers with PostgreSQL and PHP

PostgreSQL is more than a relational database. It has many cool features. Today we’re going to play with Foreign Data Wrappers (FDW). The idea is crate a virtual table from an external datasource and use it like we use a traditional table.

Let me show you an example. Imagine that we’ve got a REST datasource on port 8888. We’re going to use this Silex application, for example

use Silex\Application;

$app = new Application();

$app->get('/', function(Application $app) {

    return $app->json([
        ['name' => 'Peter', 'surname' => 'Parker'],
        ['name' => 'Clark', 'surname' => 'Kent'],
        ['name' => 'Bruce', 'surname' => 'Wayne'],
    ]);
});

$app->run();

We want to use this datasource in PostgreSQL, so we need to use a “www foreign data wrapper”.

First we create the extension (maybe we need to compile the extension. We can follow the installation instructions here)

CREATE EXTENSION www_fdw;

Now with the extension we need to create a “server”. This server is just a proxy that connects to the real Rest service

CREATE SERVER myRestServer FOREIGN DATA WRAPPER www_fdw OPTIONS (uri 'http://localhost:8888');

Now we need to map our user to the server

CREATE USER MAPPING FOR gonzalo SERVER myRestServer;

And finally we only need our “Foreign table”

CREATE FOREIGN TABLE myRest (
    name text,
    surname text
) SERVER myRestServer;

Now we can perform SQL queries using our Foreign table

SELECT * FROM myRest

We must take care with one thing. We can use WHERE clauses but if we run

SELECT * FROM myRest WHERE name='Peter'

We’ll that the output is the same than “SELECT * FROM myRest”. That’s because if we want to filter something with WHERE clause within Foreign we need to do it in the remote service. WHERE name=‘Peter’ means that our Database will execute the following request:

http://localhost:8888?name=Peter

And we need to handle this parameter. For example doing something like that

use Silex\Application;
use Symfony\Component\HttpFoundation\Request;

$app = new Application();

$app->get('/', function(Application $app, Request $request) {
    $name = $request->get('name');

    $data = [
        ['name' => 'Peter', 'surname' => 'Parker'],
        ['name' => 'Clark', 'surname' => 'Kent'],
        ['name' => 'Bruce', 'surname' => 'Wayne'],
    ];
    return $app->json(array_filter($data, function($reg) use($name){
        return $name ? $reg['name'] == $name : true;
    }));
});

$app->run();

Building one HTTP client in PostgreSQL with PL/Python

Don’t ask me way, but I need to call to a HTTP server (one Silex application) from a PostgreSQL database.

I want to do something like this:

select get('http://localhost:8080?name=Gonzalo')->'hello';

PostgreSQL has a datatype for json. It’s really cool and it allows us to connect our HTTP server and our SQL database using same datatype.

PostgreSQL also allows us to create stored procedures using different languages. The default language is PL/pgSQL. PL/pgSQL is a simple language where we can embed SQL. But we also can use Python. With Python we can easily create HTTP clients, for example with urllib2. That means that develop our a HTTP client for a PostgreSQL database is pretty straightforward.

CREATE OR REPLACE FUNCTION get(uri character varying)
  RETURNS json AS
$BODY$
import urllib2

data = urllib2.urlopen(uri)

return data.read()

$BODY$
  LANGUAGE plpython2u VOLATILE
  COST 100;
ALTER FUNCTION get(character varying)
  OWNER TO gonzalo;

Ok that’s a GET client, but we also want a POST client to do something like this:

select post('http://localhost:8080', '{"name": "Gonzalo"}'::json)->'hello';

As you can see I want to use application/json instead of application/x-www-form-urlencoded to send request parameters. I wrote about it here time ago. So I will create one endpoint within my Silex server to handle my POST requests to:

<?php
include __DIR__ . '/../vendor/autoload.php';

use Silex\Application;
use Symfony\Component\HttpFoundation\Request;
use G\AngularPostRequestServiceProvider;

$app = new Application(['debug' => true]);

$app->register(new AngularPostRequestServiceProvider());

$app->post('/', function (Application $app, Request $request) {
    return $app->json(['hello' => $request->get('name')]);
});

$app->get('/', function (Application $app, Request $request) {
    return $app->json(['hello' => $request->get('name')]);
});

$app->run();

And now we only need to create one stored procedure to send POST requests

CREATE OR REPLACE FUNCTION post(
    uri character varying,
    paramenters json)
  RETURNS json AS
$BODY$
import urllib2

clen = len(paramenters)
req = urllib2.Request(uri, paramenters, {'Content-Type': 'application/json', 'Content-Length': clen})
f = urllib2.urlopen(req)
return f.read()

$BODY$
  LANGUAGE plpython2u VOLATILE
  COST 100;
ALTER FUNCTION post(character varying, json)
  OWNER TO gonzalo;

And that’s all. At least this simple script is exactly what I need.

Sending sockets from PostgreSQL triggers with Python

Picture this: We want to notify to one external service each time that one record is inserted in the database. We can find the place where the insert statement is done and create a TCP client there, but: What happens if the application that inserts the data within the database is a legacy application?, or maybe it is too hard to do?. If your database is PostgreSQL it’s pretty straightforward. With the “default” procedural language of PostgreSQL (pgplsql) we cannot do it, but PostgreSQL allows us to use more procedural languages than plpgsql, for example Python. With plpython we can use sockets in the same way than we use it within Python scripts. It’s very simple. Let me show you how to do it.

First we need to create one plpython with our TCP client

CREATE OR REPLACE FUNCTION dummy.sendsocket(msg character varying, host character varying, port integer)
  RETURNS integer AS
$BODY$
  import _socket
  try:
    s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
    s.connect((host, port))
    s.sendall(msg)
    s.close()
    return 1
  except:
    return 0
$BODY$
  LANGUAGE plpython VOLATILE
  COST 100;
ALTER FUNCTION dummy.sendsocket(character varying, character varying, integer)
  OWNER TO username;

Now we create the trigger that use our socket client.

CREATE OR REPLACE FUNCTION dummy.myTriggerToSendSockets()
RETURNS trigger AS
$BODY$
   import json
   stmt = plpy.prepare("select dummy.sendSocket($1, $2, $3)", ["text", "text", "int"])
   rv = plpy.execute(stmt, [json.dumps(TD), "host", 26200])
$BODY$
LANGUAGE plpython VOLATILE
COST 100;

As you can see in my example we are sending all the record as a JSON string in the socket body.

And finally we attach the trigger to one table (or maybe we need to do it to more than one table)

CREATE TRIGGER myTrigger
  AFTER INSERT OR UPDATE OR DELETE
  ON dummy.myTable
  FOR EACH ROW
  EXECUTE PROCEDURE dummy.myTriggerToSendSockets();

And that’s all. Now we can use one simple TCP socket server to handle those requests. Let me show you different examples of TCP servers with different languages. As we can see all are different implementations of Reactor pattern. We can use, for example:

node.js:

var net = require('net');

var host = 'localhost';
var port = 26200;

var server = net.createServer(function (socket) {
    socket.on('data', function(buffer) {
        // do whatever that we want with buffer
    });
});

server.listen(port, host);

python (with Twisted):

from twisted.internet import reactor, protocol

HOST = 'localhost'
PORT = 26200

class MyServer(protocol.Protocol):
    def dataReceived(self, data):
        # do whatever that we want with data
        pass

class MyServerFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return MyServer()

reactor.listenTCP(PORT, MyServerFactory(), interface=HOST)
reactor.run()

(I know that we can create the Python’s TCP server without Twisted, but if don’t use it maybe someone will angry with me. Probably he is angry right now because I put the node.js example first :))

php (with react):

<?php
include __DIR__ . '/vendor/autoload.php';

$host = 'localhost';
$port = 26200;

$loop   = React\EventLoop\Factory::create();
$socket = new React\Socket\Server($loop);

$socket->on('connection', function ($conn) {
    $conn->on('data', function ($data) {
        // do whatever we want with data
        }
    );
});

$socket->listen($port, $host);
$loop->run();

You also can use xinet.d to handle the TCP inbound connections.

Building a simple SQL wrapper with PHP. Part 2.

In one of our last post we built a simple SQL wrapper with PHP. Now we are going to improve it a little bit. We area going to use a class Table instead of the table name. Why? Simple. We want to create triggers. OK we can create triggers directly in the database but sometimes our triggers need to perform operations outside the database, such as call a REST webservice, filesystem’s logs or things like that.

<?php
class Storage
{
    static $count = 0;

    static function init()
    {
        self::$count = 0;
    }

    static function increment()
    {
        self::$count++;
    }

    static function decrement()
    {
        self::$count--;
    }

    static function get()
    {
        return self::$count;
    }
}

class SqlTest extends PHPUnit_Framework_TestCase
{
    public function setUp()
    {
        $this->dbh = new Conn('pgsql:dbname=db;host=localhost', 'gonzalo', 'password');
        $this->dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $this->dbh->forceRollback();
    }

    public function testInsertWithPostInsertShowingInsertedValues()
    {
        Storage::init();
        $that = $this;
        $this->dbh->transactional(function($dbh) use ($that) {
            $sql = new Sql($that->dbh);
            $users = new Table('users');
            $users->postInsert(function($values) {Storage::increment();});

            $that->assertEquals(0, Storage::get());
            $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
            $that->assertTrue($actual);

            $that->assertEquals(1, Storage::get());
        });
    }

    public function testInsertWithPostInsert()
    {
        Storage::init();
        $that = $this;
        $this->dbh->transactional(function($dbh) use ($that) {
            $sql = new Sql($that->dbh);
            $users = new Table('users');
            $users->postInsert(function() {Storage::increment();});

            $that->assertEquals(0, Storage::get());
            $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
            $that->assertTrue($actual);

            $that->assertEquals(1, Storage::get());
        });
    }

    public function testInsertWithPrePostInsert()
    {
        Storage::init();
        $that = $this;
        $this->dbh->transactional(function($dbh) use ($that) {
            $sql = new Sql($that->dbh);
            $users = new Table('users');
            $users->preInsert(function() {Storage::increment();});
            $users->postInsert(function() {Storage::decrement();});

            $that->assertEquals(0, Storage::get());
            $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
            $that->assertTrue($actual);

            $that->assertEquals(0, Storage::get());
        });
    }

    public function testUpdateWithPrePostInsert()
    {
        Storage::init();
        $that = $this;
        $this->dbh->transactional(function($dbh) use ($that) {
            $sql = new Sql($that->dbh);
            $users = new Table('users');
            $users->preUpdate(function() {Storage::increment();});
            $users->postUpdate(function() {Storage::increment();});

            $that->assertEquals(0, Storage::get());
            $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
            $that->assertTrue($actual);
            $that->assertEquals(0, Storage::get());

            $data = $sql->select('users', array('uid' => 7));
            $that->assertEquals('Gonzalo', $data[0]['name']);

            $actual = $sql->update($users, array('name' => 'gonzalo',), array('uid' => 7));
            $that->assertTrue($actual);
            $that->assertEquals(2, Storage::get());

            $data = $sql->select('users', array('uid' => 7));
            $that->assertEquals('gonzalo', $data[0]['name']);
        });
    }

    public function testDeleteWithPrePostInsert()
    {
        Storage::init();
        $that = $this;
        $this->dbh->transactional(function($dbh) use ($that) {
            $sql = new Sql($that->dbh);
            $users = new Table('users');
            $users->preDelete(function() {Storage::increment();});
            $users->postDelete(function() {Storage::increment();});

            $that->assertEquals(0, Storage::get());
            $actual = $sql->insert($users, array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
            $that->assertTrue($actual);
            $that->assertEquals(0, Storage::get());

            $actual = $sql->delete($users, array('uid' => 7));
            $that->assertTrue($actual);
            $that->assertEquals(2, Storage::get());
        });
    }
}

And here the whole library:

class Conn extends PDO
{
    private $forcedRollback = false;
    public function transactional(Closure $func)
    {
        $this->beginTransaction();
        try {
            $func($this);
            $this->forcedRollback ? $this->rollback() : $this->commit();
        } catch (Exception $e) {
            $this->rollback();
            throw $e;
        }
    }

    public function forceRollback()
    {
        $this->forcedRollback = true;
    }
}

class Table
{
    private $tableName;

    function __construct($tableName)
    {
        $this->tableName = $tableName;
    }

    private $cbkPostInsert;
    private $cbkPostUpdate;
    private $cbkPostDelete;
    private $cbkPreInsert;
    private $cbkPreUpdate;
    private $cbkPreDelete;

    public function getTableName()
    {
        return $this->tableName;
    }

    public function postInsert(Closure $func)
    {
        $this->cbkPostInsert = $func;
    }

    public function postUpdate(Closure $func)
    {
        $this->cbkPostUpdate = $func;
    }

    public function postDelete(Closure $func)
    {
        $this->cbkPostDelete = $func;
    }

    public function preInsert(Closure $func)
    {
        $this->cbkPreInsert = $func;
    }

    public function preUpdate(Closure $func)
    {
        $this->cbkPreUpdate = $func;
    }

    public function preDelete(Closure $func)
    {
        $this->cbkPreDelete = $func;
    }

    public function execPostInsert($values)
    {
        $func = $this->cbkPostInsert;
        if ($this->cbkPostInsert instanceof Closure) $func($values);
    }

    public function execPostUpdate($values, $where)
    {
        $func = $this->cbkPostUpdate;
        if ($func instanceof Closure) $func($values, $where);
    }

    public function execPostDelete($where)
    {
        $func = $this->cbkPostDelete;
        if ($func instanceof Closure) $func($where);
    }

    public function execPreInsert($values)
    {
        $func = $this->cbkPreInsert;
        if ($func instanceof Closure) $func($values);
    }

    public function execPreUpdate($values)
    {
        $func = $this->cbkPreUpdate;
        if ($func instanceof Closure) $func($values);
    }

    public function execPreDelete($where)
    {
        $func = $this->cbkPreDelete;
        if ($func instanceof Closure) $func($where);
    }
}

class Sql
{
    /** @var Conn */
    private $dbh;
    function __construct(Conn $dbh)
    {
        $this->dbh = $dbh;
    }

    public function select($table, $where)
    {
        $tableName   = ($table instanceof Table) ? $table->getTableName() : $table;
        $sql         = $this->createSelect($tableName, $where);
        $whereParams = $this->getWhereParameters($where);
        $stmp = $this->dbh->prepare($sql);
        $stmp->execute($whereParams);
        return $stmp->fetchAll();
    }

    public function insert($table, $values)
    {
        $tableName = ($table instanceof Table) ? $table->getTableName() : $table;
        $sql       = $this->createInsert($tableName, $values);

        if ($table instanceof Table) $table->execPreInsert($values);
        $stmp = $this->dbh->prepare($sql);
        $out = $stmp->execute($values);
        if ($table instanceof Table) $table->execPostInsert($values);
        return $out;
    }

    public function update($table, $values, $where)
    {
        $tableName   = ($table instanceof Table) ? $table->getTableName() : $table;
        $sql         = $this->createUpdate($tableName, $values, $where);
        $whereParams = $this->getWhereParameters($where);

        if ($table instanceof Table) $table->execPreUpdate($values, $where);
        $stmp = $this->dbh->prepare($sql);
        $out = $stmp->execute(array_merge($values, $whereParams));
        if ($table instanceof Table) $table->execPostUpdate($values, $where);
        return $out;
    }

    public function delete($table, $where)
    {
        $tableName   = ($table instanceof Table) ? $table->getTableName() : $table;
        $sql         = $this->createDelete($tableName, $where);
        $whereParams = $this->getWhereParameters($where);

        if ($table instanceof Table) $table->execPreDelete($where);
        $stmp = $this->dbh->prepare($sql);
        $out = $stmp->execute($whereParams);
        if ($table instanceof Table) $table->execPostDelete($where);
        return $out;
    }

    protected function getWhereParameters($where)
    {
        $whereParams = array();
        foreach ($where as $key => $value) {
            $whereParams[":W_{$key}"] = $value;
        }
        return $whereParams;
    }

    protected function createSelect($table, $where)
    {
        return "SELECT * FROM " . $table . $this->createSqlWhere($where);
    }

    protected function createUpdate($table, $values, $where)
    {
        $sqlValues = array();
        foreach (array_keys($values) as $key) {
            $sqlValues[] = "{$key} = :{$key}";
        }
        return "UPDATE {$table} SET " . implode(', ', $sqlValues) . $this->createSqlWhere($where);
    }

    protected function createInsert($table, $values)
    {
        $sqlValues = array();
        foreach (array_keys($values) as $key) {
            $sqlValues[] = ":{$key}";
        }
        return "INSERT INTO {$table} (" . implode(', ', array_keys($values)) . ") VALUES (" . implode(', ', $sqlValues) . ")";
    }

    protected function createDelete($table, $where)
    {
        return "DELETE FROM {$table}" . $this->createSqlWhere($where);
    }

    protected function createSqlWhere($where)
    {
        if (count((array) $where) == 0) return null;

        $whereSql = array();
        foreach ($where as $key => $value) {
            $whereSql[] = "{$key} = :W_{$key}";
        }
        return ' WHERE ' . implode(' AND ', $whereSql);
    }
}

Database connection pooling with PHP and React (node.php)

Last saturday I meet a new hype: “React” also known as “node.php”. Basically it’s the same idea than node.js but built with PHP instead of javascript. Twitter was on fire with this new library (at least my timeline). The next sunday was a rainy day and because of that I spent the afternoon hacking a little bit with this new library. Basically I want to create a database connection pooling. It’s one of the things that I miss in PHP. I wrote a post here some time ago with this idea with one exotic experiment building one connection pooling using gearman. Today the idea is the same but now with React. Let’s start

First of all we install React. It’s a simple process using composer.

% echo '{ "require": { "react/react": "dev-master" } }' > composer.json
% composer install

Now we can start with our experiment. Imagine a simple query to PostgreSql using PDO:

CREATE TABLE users
(
  uid integer NOT NULL,
  name character varying(50),
  surname character varying(50),
  CONSTRAINT pk_users PRIMARY KEY (uid)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE users OWNER TO gonzalo;

INSERT INTO users(uid, name, surname) VALUES (0, 'Gonzalo', 'Ayuso');
INSERT INTO users(uid, name, surname) VALUES (1, 'Hans', 'Solo');
INSERT INTO users(uid, name, surname) VALUES (2, 'Luke', 'Skywalker');
$dbh = new PDO('pgsql:dbname=demo;host=vmserver', 'gonzalo', 'password');
$sql = "SELECT * FROM USERS";
$stmt = $dbh->prepare($sql);
$stmt->execute();
$data = $stmt->fetchAll();
print_r($data);

Now we are going to use the same interface but instead of using PDO we will use one server with React:

include "CPool.php";
define('NODEPHP', '127.0.0.1:1337');

$dbh = new CPool();
$sql = "SELECT * FROM USERS";
$stmt = $dbh->prepare($sql);
$stmt->execute();
$data = $stmt->fetchAll();
$stmt->closeCursor();
print_r($data);

Our CPool library:

class CPoolStatement
{
    private $stmt;
    function __construct($sql=null)
    {
        if (!is_null($sql)) {
            $url = "http://" . NODEPHP . "?" . http_build_query(array(
                    'action' => 'prepare',
                    'sql'    => $sql
                 ));
            $this->stmt = file_get_contents($url);
        }
    }

    public function getId()
    {
        return $this->stmt;
    }

    public function setId($id)
    {
        $this->stmt = $id;
    }

    public function execute($values=array())
    {
        $url = "http://" . NODEPHP . "?" . http_build_query(array(
                'action' => 'execute',
                'smtId'  => $this->stmt,
                'values' => $values
             ));
        $this->stmt = file_get_contents($url);
    }

    public function fetchAll()
    {
        $url = "http://" . NODEPHP . "?" . http_build_query(array(
                'action' => 'fetchAll',
                'smtId'  => $this->stmt
             ));
        return (file_get_contents($url));
    }

    public function closeCursor()
    {
        $url = "http://" . NODEPHP . "?" . http_build_query(array(
                'action' => 'closeCursor',
                'smtId'  => $this->stmt
             ));
        return (file_get_contents($url));
    }
}

class CPool
{
    function prepare($sql)
    {
        return new CPoolStatement($sql);
    }
}

We also can create one script that creates one statement

include "CPool.php";
define('NODEPHP', '127.0.0.1:1337');

$dbh = new CPool();
$sql = "SELECT * FROM USERS";
$stmt = $dbh->prepare($sql);

echo $stmt->getId();

And another script (another http request for example) to fetch the resultset. Notice that we can execute this script all the times that we want because the compiled statement persists in the node.php server (we don’t need to create it again and again within each request).

include "CPool.php";
define('NODEPHP', '127.0.0.1:1337');

$stmt = new CPoolStatement();
$stmt->setId(1);

$stmt->execute();
$data = $stmt->fetchAll();
print_r($data);

And basically that was my sunday afternoon experiment. As you can imagine the library is totally unstable. It’s only one experiment. We can add transaccions, comits, rollbacks, savepoints, … but I needed a walk and I stopped:). What do you think?

The code is available at github

Building a simple SQL wrapper with PHP

If we don’t use an ORM within our projects we need to write SQL statements by hand. I don’t mind to write SQL. It’s simple and descriptive but sometimes we like to use helpers to avoid write the same code again and again. Today we are going to create a simple library to help use to write simple SQL queries. Let’s start:

The idea is to instead of write:

SELECT * from users where uid=7;

write:

$sql->select('users', array('uid' => 7));

As we all must know, the best documentation are Unit Test, so here you are the tests of the library:

class SqlTest extends PHPUnit_Framework_TestCase
{
    public function setUp()
    {
        $this->dbh = new Conn('pgsql:dbname=db;host=localhost', 'gonzalo', 'password');
        $this->dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $this->dbh->forceRollback();
    }

    public function testTransactions()
    {

        $sql = new Sql($this->dbh);
        $that = $this;

        $this->dbh->transactional(function($dbh) use ($sql, $that) {
            $actual = $sql->insert('users', array('uid' => 7, 'name' => 'Gonzalo', 'surname' => 'Ayuso'));
            $that->assertTrue($actual);

            $actual = $sql->insert('users', array('uid' => 8, 'name' => 'Peter', 'surname' => 'Parker'));
            $that->assertTrue($actual);

            $data = $sql->select('users', array('uid' => 8));
            $that->assertEquals('Peter', $data[0]['name']);
            $that->assertEquals('Parker', $data[0]['surname']);

            $sql->update('users', array('name' => 'gonzalo'), array('uid' => 7));

            $data = $sql->select('users', array('uid' => 7));
            $that->assertEquals('gonzalo', $data[0]['name']);

            $data = $sql->delete('users', array('uid' => 7));

            $data = $sql->select('users', array('uid' => 7));
            $that->assertTrue(count($data) == 0);
        });
    }
}

As you can see we use DI to inject the database connection to our library. Simple isn’t it?

Here the whole library:

class Conn extends PDO
{
    private $forcedRollback = false;
    public function transactional(Closure $func)
    {
        $this->beginTransaction();
        try {
            $func($this);
            $this->forcedRollback ? $this->rollback() : $this->commit();
        } catch (Exception $e) {
            $this->rollback();
            throw $e;
        }
    }

    public function forceRollback()
    {
        $this->forcedRollback = true;
    }
}

class Sql
{
    /** @var Conn */
    private $dbh;
    function __construct(Conn $dbh)
    {
        $this->dbh = $dbh;
    }

    public function select($table, $where)
    {
        $sql         = $this->createSelect($table, $where);
        $whereParams = $this->getWhereParameters($where);
        $stmp = $this->dbh->prepare($sql);
        $stmp->execute($whereParams);
        return $stmp->fetchAll();
    }

    public function insert($table, $values)
    {
        $sql = $this->createInsert($table, $values);
        $stmp = $this->dbh->prepare($sql);
        return $stmp->execute($values);
    }

    public function update($table, $values, $where)
    {
        $sql = $this->createUpdate($table, $values, $where);
        $whereParams = $this->getWhereParameters($where);

        $stmp = $this->dbh->prepare($sql);
        return $stmp->execute(array_merge($values, $whereParams));
    }

    public function delete($table, $where)
    {
        $sql         = $this->createDelete($table, $where);
        $whereParams = $this->getWhereParameters($where);
        $stmp = $this->dbh->prepare($sql);
        return $stmp->execute($whereParams);
    }

    protected function getWhereParameters($where)
    {
        $whereParams = array();
        foreach ($where as $key => $value) {
            $whereParams[":W_{$key}"] = $value;
        }
        return $whereParams;
    }

    protected function createSelect($table, $where)
    {
        return "SELECT * FROM " . $table . $this->createSqlWhere($where);
    }

    protected function createUpdate($table, $values, $where)
    {
        $sqlValues = array();
        foreach (array_keys($values) as $key) {
            $sqlValues[] = "{$key} = :{$key}";
        }
        return "UPDATE {$table} SET " . implode(', ', $sqlValues) . $this->createSqlWhere($where);
    }

    protected function createInsert($table, $values)
    {
        $sqlValues = array();
        foreach (array_keys($values) as $key) {
            $sqlValues[] = ":{$key}";
        }
        return "INSERT INTO {$table} (" . implode(', ', array_keys($values)) . ") VALUES (" . implode(', ', $sqlValues) . ")";
    }

    protected function createDelete($table, $where)
    {
        return "DELETE FROM {$table}" . $this->createSqlWhere($where);
    }

    protected function createSqlWhere($where)
    {
        if (count((array) $where) == 0) return null;

        $whereSql = array();
        foreach ($where as $key => $value) {
            $whereSql[] = "{$key} = :W_{$key}";
        }
        return ' WHERE ' . implode(' AND ', $whereSql);
    }
}

You can see the full code at github.

Asynchronous queries to PostgreSql database from the browser with node.js and socket.io

Normally we perform our database connection at server side with PHP and PDO for example. In this post I will show a simple technique to send queries to our database (PostgreSql in this example) asynchronously using node.js and socket.io.

The idea is pretty straightforward. We will send the SQL string and the values with a WebSocket and we will execute a callback in the client when the server (node.js script) fetches the recordset.

Our server:

var pg = require('pg');

var conString = "tcp://user:password@localhost:5432/db";
var client = new pg.Client(conString);
client.connect();

var io = require('socket.io').listen(8888);

io.sockets.on('connection', function (socket) {
    socket.on('sql', function (data) {
        var query = client.query(data.sql, data.values);
        query.on('row', function(row) {
            socket.emit('sql', row);
        });
    });
});

And our client:

<script src="http://localhost:8888/socket.io/socket.io.js"></script>
<script>
var socket = io.connect('http://vmserver:8888');

function sendSql(sql, values, cbk) {
    socket.emit('sql', { sql: sql, values : values});
    socket.on('sql', function(data){
        console.log(data);
    });
}
</script>    
<p>
<a href="#" onclick="sendSql('select * from users', [], function(data) {console.log(data);})">select * from users</a>
</p>
<p>
<a href="#" onclick="sendSql('select * from users where uid=$1', [4], function(data) {console.log(data);})">select * from users where uid=$1</a>
</p>

Simple, isn’t it?
You must take care if you use this script at production. Our database is exposed to raw SQL sent from the client. It’s a concept example. Maybe it would be better not to send the SQL. Store them into key-value table in the server and send only an ID from the browser.

What do you think?

Database abstraction layers in PHP. PDO versus DBAL

I normally use PDO in my PHP projects. I like it because it’s a PHP extension easy to use and shares the same interface between all databases. Normally I use PostgreSQL but if I change to mySql or Oracle I don’t need to use different functions to handle the database connections.

PHP has a great project called Doctrine2. Doctrine2 is a ORM and it uses its own database abstraction layer called DBAL. In fact DBAL isn’t a pure database abstraction layer. It’s built over PDO. It’s a set of PHP classes we can use that gives us features not available with ‘pure’ PDO. If we use Doctrine2 we’re using DBAL behind the scene, but we don’t need to use Doctrine2 to use DBAL. We can use DBAL as a database abstraction layer without any ORM. Obiously this extra PHP layer over our PDO extension needs to pay a fee. I will have a look to this fee in this post. I will take one of my old post about PDO and I will do the same with DBAL to see the performance differences. Let’s start:

The PDO version:

error_reporting(-1);
$time = microtime(TRUE);
$mem = memory_get_usage();

$dbh = new PDO('pgsql:dbname=mydb;host=localhost', 'gonzalo', 'password');
$dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

$dbh->beginTransaction();

$smtp = $dbh->prepare('INSERT INTO test.tbl1 (id, field1) values (:ID, :FIELD1)');

for ($i=0; $i<1000; $i++) {
    $smtp->execute(array('ID' => $i, 'FIELD1' => "field {$i}"));
}

$dbh->commit();

$stmt = $dbh->prepare('SELECT * FROM test.tbl1 limit 10000');
$stmt->execute();

$i=0;
while ($row = $stmt->fetch()) {
	$i++;
}
echo '<h1>PDO</h1>';
echo "<strong>{$i} </strong>";

print_r(array('memory' => (memory_get_usage() - $mem) / (1024 * 1024), 'seconds' => microtime(TRUE) - $time));

$dbh->beginTransaction();
$smtp = $dbh->prepare('delete from test.tbl1');
$smtp->execute();
$dbh->commit();

The DBAL version:

error_reporting(-1);
$time = microtime(TRUE);
$mem = memory_get_usage();

use Doctrine\DBAL\DriverManager;

$connectionParams = array(
    'dbname'   => 'mydb',
    'user'     => 'gonzalo',
    'password' => 'password',
    'host'     => 'localhost',
    'driver'   => 'pdo_pgsql',
    );

$dbh = DriverManager::getConnection($connectionParams);

$dbh->beginTransaction();

$smtp = $dbh->prepare('INSERT INTO test.tbl1 (id, field1) values (:ID, :FIELD1)');

for ($i=0; $i<1000; $i++) {
    $smtp->execute(array('ID' => $i, 'FIELD1' => "field {$i}"));
}

$dbh->commit();

$stmt = $dbh->prepare('SELECT * FROM test.tbl1 limit 10000');
$stmt->execute();

$i=0;
while ($row = $stmt->fetch()) {
	$i++;
}
echo '<h1>DBAL</h1>';
echo "<strong>{$i} </strong>";

print_r(array('memory' => (memory_get_usage() - $mem) / (1024 * 1024), 'seconds' => microtime(TRUE) - $time));

As we can see DBAL is slower than pure PDO (obiously). Anyway the most of the extra time of DBAL is the time we need to include php classes (remember PDO is a PHP extension and we dont need to include any file). If we take times excluding the include time, the memory usage is almost the same and the execution time a little slower.

Autoload for DBAL version:

spl_autoload_register(function ($class) {
        $class = str_replace('\\', '/', $class) . '.php';
        require_once($class);
    }
);

or hardcoded includes for this example

require_once('Doctrine/DBAL/Driver.php');
require_once('Doctrine/DBAL/Driver/Connection.php');
require_once('Doctrine/DBAL/Platforms/AbstractPlatform.php');
require_once('Doctrine/DBAL/Driver/Statement.php');

require_once('Doctrine/DBAL/DriverManager.php');
require_once('Doctrine/DBAL/Configuration.php');
require_once('Doctrine/Common/EventManager.php');
require_once('Doctrine/DBAL/Driver/PDOPgSql/Driver.php');
require_once('Doctrine/DBAL/Driver.php');
require_once('Doctrine/DBAL/Connection.php');
require_once('Doctrine/DBAL/Driver/Connection.php');
require_once('Doctrine/DBAL/Query/Expression/ExpressionBuilder.php');
require_once('Doctrine/DBAL/Platforms/PostgreSqlPlatform.php');
require_once('Doctrine/DBAL/Platforms/AbstractPlatform.php');
require_once('Doctrine/DBAL/Driver/PDOConnection.php');
require_once('Doctrine/DBAL/Driver/PDOStatement.php');
require_once('Doctrine/DBAL/Driver/Statement.php');
require_once('Doctrine/DBAL/Events.php');
require_once('Doctrine/DBAL/Statement.php');

Outcomes of the tests:

With pure PDO:

  • memory: 0.0044288635253906
  • seconds: 0.24748301506042

With DBAL and autoload:

  • memory: 0.97610473632812
  • seconds: 0.29042816162109

With DBAL and hardcoded requires:

  • memory: 0.97521591186523
  • seconds: 0.31192088127136

With DBAL bypassing the include part:

  • memory: 0.0099525451660156
  • seconds: 0.30333304405212

The fee we paid for using DBAL gives us some extra features. OK we don’t need DBAL to get those features. If we code a bit we can get them (remember DBAL is nothing but a PHP extra layer). But DBAL has a great interface a well documented. Now I’m going to list a few extra features from DBAL very interesting, at least for me:

Transactional mode

I really like it. It allows us to create scripts like that:

$dbh->transactional(function($conn) {
    $smtp = $conn->prepare('INSERT INTO wf.tbl1 (id, field1) values (:ID, :FIELD1)');

    for ($i=0; $i<1000; $i++) {
        $smtp->execute(array('ID' => $i, 'FIELD1' => "field {$i}"));
    }
});

A simple closure will make the code more concise and it will commit/rollback our transaction for us. In fact I borrowed this function in my PDO projects to use this interface. I love Open source.

Snippet from DBAL library:

    /**
     * Executes a function in a transaction.
     *
     * The function gets passed this Connection instance as an (optional) parameter.
     *
     * If an exception occurs during execution of the function or transaction commit,
     * the transaction is rolled back and the exception re-thrown.
     *
     * @param Closure $func The function to execute transactionally.
     */
    public function transactional(Closure $func)
    {
        $this->beginTransaction();
        try {
            $func($this);
            $this->commit();
        } catch (Exception $e) {
            $this->rollback();
            throw $e;
        }
    }

Types conversion

Really useful, at least for when I work with dates:

$date = new \DateTime("2011-03-05 14:00:21");
$stmt = $conn->prepare("SELECT * FROM articles WHERE publish_date > ?");
$stmt->bindValue(1, $date, "datetime");
$stmt->execute();

List of Parameters Conversion

It’s a cool feature too available in DBAL since Doctrine 2.1

$dbh->executeQuery('SELECT * FROM wf.tbl1 WHERE id IN (?)',
    array(array(1, 2, 3, 4, 5, 6)),
    array(\Doctrine\DBAL\Connection::PARAM_INT_ARRAY));

Bind parameters with IN clause with PDO is a bit ugly. We need to create a series of bind parameters depending on our list to map them within the SQL. It’s possible but DBAL interface is smarter.

Transaction Nesting

Another cool feature:

$dbh->beginTransaction();
try {
    $dbh->beginTransaction();
    try {
        $smtp = $dbh->prepare('INSERT INTO wf.tbl1 (id, field1) values (:ID, :FIELD1)');

        for ($i=0; $i<1000; $i++) {
            $smtp->execute(array('ID' => $i, 'FIELD1' => "field {$i}"));
        }

        } catch (Exception $e) {
            $dbh->rollback(); //transaction marked for rollback only
            throw $e;
        }
    $smtp = $dbh->prepare('INSERT INTO wf.tbl1 (id, field1) values (:ID, :FIELD1)');

    for ($i=0; $i<1000; $i++) {
        $smtp->execute(array('ID' => $i, 'FIELD1' => "field {$i}"));
    }

    $dbh->commit(); // real transaction committed
} catch (Exception $e) {
    $dbh->rollback(); // transaction rollback
    throw $e;
}

This piece of code with PDO will throw the following error:
There is already an active transaction
but it works with DBAL. If we need to do this kind of things with PDO we need to use savepoints and things like that. DBAL does the ugly part for us.

Performance analysis of Stored Procedures with PDO and PHP

Last week I had an interesting conversation on twitter about the usage of stored procedures in databases. Someone told stored procedure are evil. I’m not agree with that. Stored procedures are a great place to store business logic. In this post I’m going to test the performance of a small piece of code using stored procedures and using only PHP code.

Without stored procedures

// Without stored procedures
$time = microtime(TRUE);
$mem = memory_get_usage();

$dsn = 'pgsql:host=localhost;dbname=gonzalo;port=5432';
$user = 'user';
$password = 'password';
$conn = new PDO($dsn, $user, $password);
$conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);

$conn->beginTransaction();
$stmt = $conn->prepare('delete from web.tbltest');
$stmt->execute();

$stmt = $conn->prepare('INSERT INTO web.tbltest (field1) values (?)');
foreach (range(0,1000) as $i) {
    $stmt->execute(array($i));
}
$conn->commit();

print_r(array('memory' => (memory_get_usage() - $mem) / (1024 * 1024), 'seconds' => microtime(TRUE) - $time));

With stored procedures:

// With stored procedures:
/*
CREATE OR REPLACE FUNCTION web.method1()
  RETURNS numeric AS
$BODY$
BEGIN
   DELETE FROM web.tbltest;
   FOR i IN 0..1000 LOOP
     INSERT INTO web.tbltest (field1) values (i);
   END LOOP;
   RETURN 1;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100;
*/
$time = microtime(TRUE);
$mem = memory_get_usage();

$dsn = 'pgsql:host=localhost;dbname=gonzalo;port=5432';
$user = 'user';
$password = 'password';
$conn = new PDO($dsn, $user, $password);
$conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$conn->beginTransaction();
$stmt = $conn->prepare('SELECT web.method1()');
$stmt->execute();
$stmt->setFetchMode(PDO::FETCH_ASSOC);
$out = $stmt->fetchAll();
$conn->commit();

print_r(array('memory' => (memory_get_usage() - $mem) / (1024 * 1024), 'seconds' => microtime(TRUE) - $time));
without stored procedures with stored procedures
memory: 0.0023880004882812
seconds: 0.31109309196472
memory: 0.0020713806152344
seconds: 0.065021991729736

So my conclusion: Stored procedures are not evil. The performance is really good. I know maybe it can be a bit mess if we place business logic within database and outside database at the same time, but with a good design and architecture this problem is easy to solve. What do you think?

Follow

Get every new post delivered to your Inbox.

Join 1,141 other followers