Silex service provider for a Gearman wrapper

I’ve written an small wrapper for the gearman api. Normally I use Silex in the frontend. Today we’re going to build a service provider to allow us to integrate the gearman wrapper easily within our Silex applications.

Here I show you an example of how to use the service provider.

Imagine this simple worker:

use G\Gearman\Builder;

$worker = Builder::createWorker();

$worker->on("worker.example", function ($response) {
    return strrev($response);
});

$worker->run();

And this is the Silex application using the service provider as a gearman client:

use G\Gearman\Client;
use G\GearmanServiceProvider;
use Silex\Application;

$app = new Application();

$app->register(new GearmanServiceProvider());

$app->get("/", function (Client $client) {
    return "Hello " . $client->doNormal("worker.example", "Gonzalo");
});

$app->run();

I’m using injector library to inject providers. I’ve written about it here.

This is the code of the service provider

namespace G;
use G\Gearman\Client;
use G\Gearman\Tasks;
use Injector\InjectorServiceProvider;
use Silex\Application;
use Silex\ServiceProviderInterface;
class GearmanServiceProvider implements ServiceProviderInterface
{
    private $client;
    public function __construct(\GearmanClient $client = null)
    {
        if (is_null($client)) {
            $client = new \GearmanClient();
            $client->addServers("localhost:4730");
        }
        $this->client = $client;
    }
    public function register(Application $app)
    {
        $app->register(new InjectorServiceProvider([
            'G\Gearman\Client' => 'gearmanClient',
            'G\Gearman\Tasks'  => 'gearmanTasks',
        ]));
        $app['gearmanClient'] = function () use ($app) {
            $client = new Client($this->client);
            $client->onSuccess(function ($response) {
                return $response;
            });
            return $client;
        };
        $app['gearmanTasks'] = function () use ($app) {
            return new Tasks($this->client);
        };
    }
    public function boot(Application $app)
    {
    }
}

The code is available in my github account

PHP Gearman Wrapper

I must admit that nowadays I’m using RabbitMQ more than Gearman but I’m still a big fan of gearman. PHP has a great api to connect to gearman work server but sometimes I miss another, how to say, “clean” way. Because of that I’ve creates a gearman wrapper. Let’s start.

I want to cover different areas: Workers, clients, background clients, and tasks.

Worker example:

use G\Gearman\Builder;

$worker = Builder::createWorker();

$worker->on("slow.process", function ($response, \GearmanJob $job) {
    echo "Response: {$response} unique: {$job->unique()}\n";
    // we emulate a slow process with a sleep
    sleep(2);

    return $job->unique();
});

$worker->on("fast.process", function ($response, \GearmanJob $job) {
    echo "Response: {$response} unique: {$job->unique()}\n";

    return $job->unique();
});

$worker->on("exception.process", function () {
    // we emulate a failing process
    throw new \Exception("Something wrong happens");
});

$worker->run();

And a client:

use G\Gearman\Builder;

$client = Builder::createClient();

$client->onSuccess(function ($response) {
    echo $response;
});

$client->doNormal('fast.process', "Hello");

One background client

use G\Gearman\Builder;

$client = Builder::createClient();

$client->doBackground('slow.process', "Hello1");
$client->doBackground('slow.process', "Hello2");
$client->doBackground('slow.process', "Hello3");

And finally, tasks

use G\Gearman\Builder;

$tasks = Builder::createTasks();

$tasks->onSuccess(function (\GearmanTask $task, $context) {
    $out = is_callable($context) ? $context($task) : $task->data();
    echo "onSuccess response: " . $out . " id: {$task->unique()}\n";
});

$tasks->onException(function (\GearmanTask $task) {
    echo "onException response {$task->data()}\n";
});

$responseParser = function (\GearmanTask $task) {
    return "Hello " . $task->data();
};

$tasks->addTask('fast.process', "fast1", $responseParser, 'g1');
$tasks->addTaskHigh('slow.process', "slow1", null, 'xxxx');
$tasks->addTask('fast.process', "fast2");
$tasks->addTask('exception.process', 'hi');

$tasks->runTasks();

The library is just a wrapper to the official api. I’ve create a builder to simplify the creation of the instances:

namespace G\Gearman;
class Builder
{
    static function createWorker($servers=null)
    {
        $worker = new \GearmanWorker();
        $worker->addServers($servers);
        return new Worker($worker);
    }
    static function createClient($servers=null)
    {
        $client = new \GearmanClient();
        $client->addServers($servers);
        return new Client($client);
    }
    static function createTasks($servers=null)
    {
        $client = new \GearmanClient();
        $client->addServers($servers);
        return new Tasks($client);
    }
}

that’s the worker wrapper

namespace G\Gearman;
class Worker
{
    private $worker;
    public function __construct(\GearmanWorker $worker)
    {
        $this->worker = $worker;
    }
    public function on($name, callable $callback, $context = null, $timeout = 0)
    {
        $this->worker->addFunction($name, function (\GearmanJob $job) use ($callback) {
            return call_user_func($callback, json_decode($job->workload()), $job);
        }, $context, $timeout);
    }
    public function run()
    {
        try {
            $this->loop();
        } catch (\Exception $e) {
            echo $e->getMessage() . "\n";
            $this->run();
        }
    }
    private function loop()
    {
        while ($this->worker->work()) {
        }
    }
}

Now the client one

namespace G\Gearman;
class Client
{
    private $onSuccessCallback;
    private $client;
    public function __construct(\GearmanClient $client)
    {
        $this->client = $client;
    }
    public function doHigh($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doNormal($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doLow($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doHighBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doLowBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    private function doAction($action, $name, $workload=null, $unique)
    {
        $workload = (string)$workload;
        $handle = $this->client->$action($name, json_encode($workload), $unique);
        $returnCode = $this->client->returnCode();
        if ($returnCode != \GEARMAN_SUCCESS) {
            throw new \Exception($this->client->error(), $returnCode);
        } else {
            if ($this->onSuccessCallback) {
                return call_user_func($this->onSuccessCallback, $handle);
            }
        }
        return null;
    }
    public function onSuccess(callable $callback)
    {
        $this->onSuccessCallback = $callback;
    }
}

and finally the tasks

namespace G\Gearman;
class Tasks
{
    private $client;
    private $tasks;
    public function __construct(\GearmanClient $client)
    {
        $this->tasks = [];
        $this->client = $client;
    }
    public function addTask($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskHigh($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskLow($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskHighBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskLowBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function runTasks()
    {
        foreach ($this->tasks as list($actionName, $name, $workload, $context, $unique)) {
            $this->client->$actionName($name, json_encode($workload), $context, $unique);
        }
        $this->client->runTasks();
    }
    public function onSuccess(callable $callback)
    {
        $this->client->setCompleteCallback($callback);
    }
    public function onException(callable $callback)
    {
        $this->client->setExceptionCallback($callback);
    }
    public function onFail(callable $callback)
    {
        $this->client->setFailCallback($callback);
    }
}

Library is available in packagist and source code in my github account.

Reading Modbus devices with Python from a PHP/Silex Application via Gearman worker

Yes. I know. I never know how to write a good tittle to my posts. Let me show one integration example that I’ve been working with this days. Let’s start.

In industrial automation there’re several standard protocols. Modbus is one of them. Maybe isn’t the coolest or the newest one (like OPC or OPC/UA), but we can speak Modbus with a huge number of devices.

I need to read from one of them, and show a couple of variables in a Web frontend. Imagine the following fake Modbus server (it emulates my real Modbus device)

#!/usr/bin/env python

##
# Fake modbus server
# - exposes "Energy" 66706 = [1, 1170]
# - exposes "Power" 132242 = [2, 1170]
##

from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.server.async import StartTcpServer
import logging

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)

hrData = [1, 1170, 2, 1170]
store = ModbusSlaveContext(hr=ModbusSequentialDataBlock(2, hrData))

context = ModbusServerContext(slaves=store, single=True)

StartTcpServer(context)

This server exposes two variables “Energy” and “Power”. This is a fake server and it will returns always 66706 for energy and 132242 for power. Mobus is a binary protocol so 66706 = [1, 1170] and 132242 = [2, 1170]

I can read Modbus from PHP, but normally use Python for this kind of logic. I’m not going to re-write an existing logic to PHP. I’m not crazy enough. Furthermore my real Modbus device only accepts one active socket to retrieve information. That’s means if two clients uses the frontend at the same time, it will crash. In this situations Queues are our friends.

I’ll use a Gearman worker (written in Python) to read Modbus information.

from pyModbusTCP.client import ModbusClient
from gearman import GearmanWorker
import json

def reader(worker, job):
    c = ModbusClient(host="localhost", port=502)

    if not c.is_open() and not c.open():
        print("unable to connect to host")

    if c.is_open():

        holdingRegisters = c.read_holding_registers(1, 4)

        # Imagine we've "energy" value in position 1 with two words
        energy = (holdingRegisters[0] << 16) | holdingRegisters[1]

        # Imagine we've "power" value in position 3 with two words
        power = (holdingRegisters[2] << 16) | holdingRegisters[3]

        out = {"energy": energy, "power": power}
        return json.dumps(out)
    return None

worker = GearmanWorker(['127.0.0.1'])

worker.register_task('modbusReader', reader)

print 'working...'
worker.work()

Our backend is ready. Now we’ll work with the frontend. In this example I’ll use PHP and Silex.

<?php
include __DIR__ . '/../vendor/autoload.php';
use Silex\Application;
$app = new Application(['debug' => true]);
$app->register(new Silex\Provider\TwigServiceProvider(), array(
    'twig.path' => __DIR__.'/../views',
));
$app['modbusReader'] = $app->protect(function() {
    $client = new \GearmanClient();
    $client->addServer();
    $handle = $client->doNormal('modbusReader', 'modbusReader');
    $returnCode = $client->returnCode();
    if ($returnCode != \GEARMAN_SUCCESS) {
        throw new \Exception($this->client->error(), $returnCode);
    } else {
        return json_decode($handle, true);
    }
});
$app->get("/", function(Application $app) {
    return $app['twig']->render('home.twig', $app['modbusReader']());
});
$app->run();

As we can see the frontend is a simple Gearman client. It uses our Python worker to read information from Modbus and render a simple html with a Twig template

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Demo</title>
</head>
<body>
    Energy: {{ energy }}
    Power: {{ power }}
</body>
</html>

And that’s all. You can see the full example in my github account

Watermarks in our images with PHP and Gearman

In my last post I’ve tried to explain how to add a dynamic watermarks in our images using PHP and the GD library. Someone told me in a comment that he has used something similar, but he had to disable it because of the use of huge resources. Probably the use of the solution of the previous post doesn’t scale well. If you have a hight traffic site your memory and CPU usage will increase a lot because of the image transformation (especially if you work with big images). It’d be better if you can generate the watermarks offline, but if is mandatory to create them dynamically (for example we need to place the current timestamp), there are other solutions.

In this second solution I will use a gearman worker to generate the watermarks. The benefits of gearman is the possibility of use a pool of workers. We can add/remove workers if our application scales. Those workers can be placed even at different hosts, and we can swap easily from one configuration to another. Imagine we have an application that uses a single worker at the same host of the webserver. Maybe it’s enough for a small site, but suddenly we increase our users. We can add new workers to our host. But if our single host is not enough, we can rent new host/hosts (with amazon for example) and our application will adapt easily to the new scenario. Gearman allows an easy way to scale out our applications.

Let’s start:

Now our main script instead of doing the hard work, will be a gearman client

<?php 
$filename = "/path/to/img.jpg";
$footer = date('d/m/Y H:i:s');

$gmclient = new GearmanClient();
$gmclient->addServer();

$handle = $gmclient->do("watermark", json_encode(array($filename, $footer)));

if ($gmclient->returnCode() != GEARMAN_SUCCESS){
    echo "Ups something wrong happen";
} else {
    header( 'Content-Type: image/jpeg' );
    echo $handle;
}

And our worker will do the hard work.

<?php 
$gmw = new GearmanWorker(); 
$gmw->addServer();
$gmw->addFunction("watermark", function($job) {

    $workload = $job->workload();
    $workload_size = $job->workloadSize();

    list($filename, $footer) = json_decode($workload);

    $footerSize = 15;
    list($width, $height, $image_type) = getimagesize($filename);

    $im = imagecreatefromjpeg($filename);

    imagefilledrectangle (
            $im,
            0,
            $height,
            $width,
            $height - $footerSize, imagecolorallocate($im, 49, 49, 156));

    imagestring($im,
            $stringSize,
            $width-(imagefontwidth($stringSize)*strlen($footer)) - 2,
            $height-$footerSize,
            $footer,
            imagecolorallocate($im, 255, 255, 255));

    ob_start();
    ob_implicit_flush(0);
    imagepng($im);
    $img=ob_get_contents();
    ob_end_clean();

    return $img;
});
while(1) {
  $gmw->work();
}

We must remember to start our worker/workers within our server.

php /path/to/worker/worker.watermark.php

What do you think with this solution?

Watermarks in our images with PHP and mod_rewrite

Imagine you have a set of images and you need to add watermark to all of them. You can do the work with an image editor (Gimp or Photoshop for example). It’s easy work but if your image library is big, the easy work become into hard. But imagine again you need to add a dynamic watermark, let’s say the current timestamp. To solve this problem we can use PHP’s GD image function set.

The idea is simple. Instead of opening directly the image, we are going to open a PHP script. This PHP script will open the original image file with imagecreatefromjpeg, add the footer and flush the new image to the browser with the properly headers.

But I don’t want to rewrite all the anchor’s hrefs within my HTML files. Because of that I’m going to use the same trick I used in my one of my posts. With a simple .htaccess in our jpg’s folder, our apache’s mod_rewrite will do work for us:

RewriteEngine on
RewriteRule !\.(php)$ watermark.php

And now our PHP script called watermark.php

<?php
$uri = $_SERVER['REQUEST_URI'];
$documentRoot = $_SERVER['DOCUMENT_ROOT'];
$filename = $documentRoot . $uri;
if (realpath(__FILE__) == realpath($filename)) {
    exit();
}
$stringSize = 3;
$footerSize = ($stringSize==1) ? 12 : 15;
$footer = date('d/m/Y H:i:s');

list($width, $height, $image_type) = getimagesize($filename);
$im = imagecreatefromjpeg($filename);
imagefilledrectangle (
        $im,
        0,
        $height,
        $width,
        $height - $footerSize, imagecolorallocate($im, 49, 49, 156));

imagestring($im,
        $stringSize,
        $width-(imagefontwidth($stringSize)*strlen($footer)) - 2,
        $height-$footerSize,
        $footer,
        imagecolorallocate($im, 255, 255, 255));

header( 'Content-Type: image/jpeg' );
imagejpeg($im);

Simple, isn’t it? You can change my blue footer with a your logo. Easy with PHP’s gd functions. Of course you need to have your PHP compiled with GD support.

Original image:

After transformation:

Database connection pooling with PHP and gearman

Handling Database connections with PHP is pretty straightforward. Just open database connection, perform the actions we need, and close it. There’s a little problem. We cannot create a pull of database connections. We need to create the connection in every request. Create and destroy, again and again. There are some third-party solutions like SQL relay or pgpool2 (if you use PostgreSQL like me). In this post I’m going to try to explain a personal experiment to create a connection pooling using gearman. Another purpose of this experiment is create a prepared statements’ cache not only for the current request but also for all ones. Let’s start.

This is an example of SELECT statement with PDO and PHP

$dbh = new PDO('pgsql:dbname=pg1;host=localhost', 'user', 'pass');
$stmt = $dbh->prepare($sql);
$stmt->execute();
$data = $stmt->fetchAll();

Basically the idea is to use a gearman worker to perform every database operations. As far as I known we cannot pass PDO instances from gearman worker to gearman client. Even with object serialization (PDO objects cannot be serialized). That’s a problem.

My idea is use the same interface than using PDO but let the database work to the worker and obtaining a connection id instead of a real PDO connection.

That’s is the configuration class. We can see It’s defined one database connection and two gearman servers at the same host:

class PoolConf
{
    const PG1 = 'PG1';
    static $DB = array(
        self::PG1 => array(
            'dsn'      => "pgsql:dbname=gonzalo;host=localhost",
            'username' => 'user',
            'password' => 'pass',
            'options'  => null),
    );

    static $SERVERS = array(
        array('127.0.0.1', 4730),
        array('127.0.0.1', 4731),
    );
}

We start the workers:

gearmand -d --log-file=/var/log/gearman --user=gonzalo -p=4730
gearmand -d --log-file=/var/log/gearman --user=gonzalo -p=4731

How many workers we need to start? Depends on your needs. We must realize gearman is not a pool. It’s a queue. But we can start as many servers as we want  (OK it’s depends on our RAM) and create a poll of queues. We need to remember that if we start only one gearman server we only can handle only one database operation each time (it’s a queue) and it will be a huge bottleneck if the application scales. So you need to assess your site and evaluate how many concurrent operations you normally have and start as many gearman server as you need.

Maybe is difficult to explain but the final outcome will be something like that:

use Pool\Client;
$conn = Client::singleton()->getConnection(PoolConf::PG1);

$sql = "SELECT * FROM TEST.TBL1";
$stmt = $conn->prepare($sql);

$stmt->execute();
$data = $stmt->fetchall();
echo "<p>count: " . count($data) . "</p>";

We must take into account that $stmt is not a “real” PHP statement. The real PHP statement is stored into a static variable within the worker. Our $stmt is an instance of Pool\Server\Stmt class. This class has some public methods with the same name than the real statement (because of that it behaves as a real statement), and internally those methods are calls to gearman worker. The same occurs with $conn variable. It’s not a real PDO connection. It’s am instance of Pool\Server\Connection Class.

// Pool/Server/Stmt.php
namespace Pool\Server;

class Stmt
{
    private $_smtpId = null;
    private $_cid    = null;
    private $_client = null;

    function __construct($stmtId, $cid, $client)
    {
        $this->_stmt = $stmtId;
        $this->_cid = $cid;
        $this->_client = $client;
    }

    public function execute($parameters=array())
    {
        $out = $this->_client->do('execute', serialize(array(
            'parameters' => $parameters,
            'stmt'       => $this->_stmt,
            )));
        $error = unserialize($out);
        if (is_a($error, '\Pool\Exception')) {
            throw $error;
        }
        $this->_stmt = $out;
        return $this;
    }

    public function fetchAll()
    {
        $data = $this->_client->do('fetchAll', serialize(array(
            'stmt' => $this->_stmt,
            )));
        return unserialize($data);
    }
}

The worker. The following code is an extract. You can see the full code here

# Create our worker object.
$worker= new GearmanWorker();
foreach (PoolConf::$SERVERS as $server) {
    $worker->addServer($server[0], $server[1]);
}

\Pool\Server::init();

$worker->addFunction('getConnection', 'getConnection');
$worker->addFunction('prepare', 'prepare');
$worker->addFunction('execute', 'execute');
$worker->addFunction('fetchAll', 'fetchAll');
$worker->addFunction('info', 'info');
$worker->addFunction('release', 'release');
$worker->addFunction('beginTransaction', 'beginTransaction');
$worker->addFunction('commit', 'commit');
$worker->addFunction('rollback', 'rollback');

while (1) {
    try {
        $ret = $worker->work();
        if ($worker->returnCode() != GEARMAN_SUCCESS) {
            break;
        }
    } catch (Exception $e) {
        echo $e->getMessage();
    }
}

function fetchAll($job)
{
    echo __function__."\n";
    $params = unserialize($job->workload());
    $stmtId = $params['stmt'];
    return serialize(\Pool\Server::fetchAll($stmtId));
}

function execute($job)
{
    echo __function__."\n";
    $params = unserialize($job->workload());
    $stmtId = $params['stmt'];
    $parameters = $params['parameters'];
    return \Pool\Server::execute($stmtId, $parameters);
}
...

The heart of the worker is \Pool\Server class. This class performs every real PDO operations and stores statements and connections into static private variables.

And now we can use the database pool reusing connections and prepared statements. You can see here a small performance test of reusing prepared statements in an older post.

I’ve also implemented a small error handling. Errors in the worker are serialized and thrown on the client simulating the normal operation of standard PDO usage.

Now a set of examples:

Simple queries. And a simple error handling:

include('../conf/PoolConf.php');
include('../lib/Pool/Client.php');
include('../lib/Pool/Server.php');
include('../lib/Pool/Exception.php');
include('../lib/Pool/Server/Connection.php');
include('../lib/Pool/Server/Stmt.php');

use Pool\Client;
$conn = Client::singleton()->getConnection(PoolConf::PG1);

$sql = "SELECT * FROM TEST.TBL1";
$stmt = $conn->prepare($sql);

$stmt->execute();
$data = $stmt->fetchall();
echo "<p>count: " . count($data) . "</p>";

try {
    $sql = "SELECT * TEST.NON_EXISTENT_TABLE";
    $stmt = $conn->prepare($sql);

    $stmt->execute();
    $data = $stmt->fetchall();
    echo "<p>count: " . count($data) . "</p>";

} catch (Exception $e) {
    echo "ERROR: " . $e->getMessage();
}

print_r(Client::singleton()->info(PoolConf::PG1));

Now with bind parameters:

<?php
include('../conf/PoolConf.php');
include('../lib/Pool/Client.php');
include('../lib/Pool/Server.php');
include('../lib/Pool/Exception.php');
include('../lib/Pool/Server/Connection.php');
include('../lib/Pool/Server/Stmt.php');

use Pool\Client;
$conn = Client::singleton()->getConnection(PoolConf::PG1);

$data = $conn->prepare("SELECT * FROM TEST.TBL1 WHERE SELECCION=:S")->execute(array('S' => 1))->fetchall();

echo count($data);

print_r(Client::singleton()->info(PoolConf::PG1));

And now a transaction:

<?php
include('../conf/PoolConf.php');
include('../lib/Pool/Client.php');
include('../lib/Pool/Server.php');
include('../lib/Pool/Exception.php');
include('../lib/Pool/Server/Connection.php');
include('../lib/Pool/Server/Stmt.php');

use Pool\Client;
$conn = Client::singleton()->getConnection(PoolConf::PG1);

$conn->beginTransaction();
$data = $conn->prepare("SELECT * FROM TEST.TBL1 WHERE SELECCION=:S")->execute(array('S' => 1))->fetchall();
$conn->rollback();

print_r(Client::singleton()->info(PoolConf::PG1));

Conclusion.

That’s a personal experiment. It works, indeed, but probably it’s crowed by bugs. You can use it in production if you are a brave developer :).

Source code.
The full sourcecode is available here at google code