Monthly Archives: April 2016

PHP Gearman Wrapper

I must admit that nowadays I’m using RabbitMQ more than Gearman but I’m still a big fan of gearman. PHP has a great api to connect to gearman work server but sometimes I miss another, how to say, “clean” way. Because of that I’ve creates a gearman wrapper. Let’s start.

I want to cover different areas: Workers, clients, background clients, and tasks.

Worker example:

use G\Gearman\Builder;

$worker = Builder::createWorker();

$worker->on("slow.process", function ($response, \GearmanJob $job) {
    echo "Response: {$response} unique: {$job->unique()}\n";
    // we emulate a slow process with a sleep
    sleep(2);

    return $job->unique();
});

$worker->on("fast.process", function ($response, \GearmanJob $job) {
    echo "Response: {$response} unique: {$job->unique()}\n";

    return $job->unique();
});

$worker->on("exception.process", function () {
    // we emulate a failing process
    throw new \Exception("Something wrong happens");
});

$worker->run();

And a client:

use G\Gearman\Builder;

$client = Builder::createClient();

$client->onSuccess(function ($response) {
    echo $response;
});

$client->doNormal('fast.process', "Hello");

One background client

use G\Gearman\Builder;

$client = Builder::createClient();

$client->doBackground('slow.process', "Hello1");
$client->doBackground('slow.process', "Hello2");
$client->doBackground('slow.process', "Hello3");

And finally, tasks

use G\Gearman\Builder;

$tasks = Builder::createTasks();

$tasks->onSuccess(function (\GearmanTask $task, $context) {
    $out = is_callable($context) ? $context($task) : $task->data();
    echo "onSuccess response: " . $out . " id: {$task->unique()}\n";
});

$tasks->onException(function (\GearmanTask $task) {
    echo "onException response {$task->data()}\n";
});

$responseParser = function (\GearmanTask $task) {
    return "Hello " . $task->data();
};

$tasks->addTask('fast.process', "fast1", $responseParser, 'g1');
$tasks->addTaskHigh('slow.process', "slow1", null, 'xxxx');
$tasks->addTask('fast.process', "fast2");
$tasks->addTask('exception.process', 'hi');

$tasks->runTasks();

The library is just a wrapper to the official api. I’ve create a builder to simplify the creation of the instances:

namespace G\Gearman;
class Builder
{
    static function createWorker($servers=null)
    {
        $worker = new \GearmanWorker();
        $worker->addServers($servers);
        return new Worker($worker);
    }
    static function createClient($servers=null)
    {
        $client = new \GearmanClient();
        $client->addServers($servers);
        return new Client($client);
    }
    static function createTasks($servers=null)
    {
        $client = new \GearmanClient();
        $client->addServers($servers);
        return new Tasks($client);
    }
}

that’s the worker wrapper

namespace G\Gearman;
class Worker
{
    private $worker;
    public function __construct(\GearmanWorker $worker)
    {
        $this->worker = $worker;
    }
    public function on($name, callable $callback, $context = null, $timeout = 0)
    {
        $this->worker->addFunction($name, function (\GearmanJob $job) use ($callback) {
            return call_user_func($callback, json_decode($job->workload()), $job);
        }, $context, $timeout);
    }
    public function run()
    {
        try {
            $this->loop();
        } catch (\Exception $e) {
            echo $e->getMessage() . "\n";
            $this->run();
        }
    }
    private function loop()
    {
        while ($this->worker->work()) {
        }
    }
}

Now the client one

namespace G\Gearman;
class Client
{
    private $onSuccessCallback;
    private $client;
    public function __construct(\GearmanClient $client)
    {
        $this->client = $client;
    }
    public function doHigh($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doNormal($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doLow($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doHighBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doLowBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    private function doAction($action, $name, $workload=null, $unique)
    {
        $workload = (string)$workload;
        $handle = $this->client->$action($name, json_encode($workload), $unique);
        $returnCode = $this->client->returnCode();
        if ($returnCode != \GEARMAN_SUCCESS) {
            throw new \Exception($this->client->error(), $returnCode);
        } else {
            if ($this->onSuccessCallback) {
                return call_user_func($this->onSuccessCallback, $handle);
            }
        }
        return null;
    }
    public function onSuccess(callable $callback)
    {
        $this->onSuccessCallback = $callback;
    }
}

and finally the tasks

namespace G\Gearman;
class Tasks
{
    private $client;
    private $tasks;
    public function __construct(\GearmanClient $client)
    {
        $this->tasks = [];
        $this->client = $client;
    }
    public function addTask($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskHigh($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskLow($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskHighBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskLowBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function runTasks()
    {
        foreach ($this->tasks as list($actionName, $name, $workload, $context, $unique)) {
            $this->client->$actionName($name, json_encode($workload), $context, $unique);
        }
        $this->client->runTasks();
    }
    public function onSuccess(callable $callback)
    {
        $this->client->setCompleteCallback($callback);
    }
    public function onException(callable $callback)
    {
        $this->client->setExceptionCallback($callback);
    }
    public function onFail(callable $callback)
    {
        $this->client->setFailCallback($callback);
    }
}

Library is available in packagist and source code in my github account.

Advertisements

Reading Modbus devices with Python from a PHP/Silex Application via Gearman worker

Yes. I know. I never know how to write a good tittle to my posts. Let me show one integration example that I’ve been working with this days. Let’s start.

In industrial automation there’re several standard protocols. Modbus is one of them. Maybe isn’t the coolest or the newest one (like OPC or OPC/UA), but we can speak Modbus with a huge number of devices.

I need to read from one of them, and show a couple of variables in a Web frontend. Imagine the following fake Modbus server (it emulates my real Modbus device)

#!/usr/bin/env python

##
# Fake modbus server
# - exposes "Energy" 66706 = [1, 1170]
# - exposes "Power" 132242 = [2, 1170]
##

from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.server.async import StartTcpServer
import logging

logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)

hrData = [1, 1170, 2, 1170]
store = ModbusSlaveContext(hr=ModbusSequentialDataBlock(2, hrData))

context = ModbusServerContext(slaves=store, single=True)

StartTcpServer(context)

This server exposes two variables “Energy” and “Power”. This is a fake server and it will returns always 66706 for energy and 132242 for power. Mobus is a binary protocol so 66706 = [1, 1170] and 132242 = [2, 1170]

I can read Modbus from PHP, but normally use Python for this kind of logic. I’m not going to re-write an existing logic to PHP. I’m not crazy enough. Furthermore my real Modbus device only accepts one active socket to retrieve information. That’s means if two clients uses the frontend at the same time, it will crash. In this situations Queues are our friends.

I’ll use a Gearman worker (written in Python) to read Modbus information.

from pyModbusTCP.client import ModbusClient
from gearman import GearmanWorker
import json

def reader(worker, job):
    c = ModbusClient(host="localhost", port=502)

    if not c.is_open() and not c.open():
        print("unable to connect to host")

    if c.is_open():

        holdingRegisters = c.read_holding_registers(1, 4)

        # Imagine we've "energy" value in position 1 with two words
        energy = (holdingRegisters[0] << 16) | holdingRegisters[1]

        # Imagine we've "power" value in position 3 with two words
        power = (holdingRegisters[2] << 16) | holdingRegisters[3]

        out = {"energy": energy, "power": power}
        return json.dumps(out)
    return None

worker = GearmanWorker(['127.0.0.1'])

worker.register_task('modbusReader', reader)

print 'working...'
worker.work()

Our backend is ready. Now we’ll work with the frontend. In this example I’ll use PHP and Silex.

<?php
include __DIR__ . '/../vendor/autoload.php';
use Silex\Application;
$app = new Application(['debug' => true]);
$app->register(new Silex\Provider\TwigServiceProvider(), array(
    'twig.path' => __DIR__.'/../views',
));
$app['modbusReader'] = $app->protect(function() {
    $client = new \GearmanClient();
    $client->addServer();
    $handle = $client->doNormal('modbusReader', 'modbusReader');
    $returnCode = $client->returnCode();
    if ($returnCode != \GEARMAN_SUCCESS) {
        throw new \Exception($this->client->error(), $returnCode);
    } else {
        return json_decode($handle, true);
    }
});
$app->get("/", function(Application $app) {
    return $app['twig']->render('home.twig', $app['modbusReader']());
});
$app->run();

As we can see the frontend is a simple Gearman client. It uses our Python worker to read information from Modbus and render a simple html with a Twig template

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Demo</title>
</head>
<body>
    Energy: {{ energy }}
    Power: {{ power }}
</body>
</html>

And that’s all. You can see the full example in my github account