Blog Archives

Silex service provider for a Gearman wrapper

I’ve written an small wrapper for the gearman api. Normally I use Silex in the frontend. Today we’re going to build a service provider to allow us to integrate the gearman wrapper easily within our Silex applications.

Here I show you an example of how to use the service provider.

Imagine this simple worker:

use G\Gearman\Builder;

$worker = Builder::createWorker();

$worker->on("worker.example", function ($response) {
    return strrev($response);
});

$worker->run();

And this is the Silex application using the service provider as a gearman client:

use G\Gearman\Client;
use G\GearmanServiceProvider;
use Silex\Application;

$app = new Application();

$app->register(new GearmanServiceProvider());

$app->get("/", function (Client $client) {
    return "Hello " . $client->doNormal("worker.example", "Gonzalo");
});

$app->run();

I’m using injector library to inject providers. I’ve written about it here.

This is the code of the service provider

namespace G;
use G\Gearman\Client;
use G\Gearman\Tasks;
use Injector\InjectorServiceProvider;
use Silex\Application;
use Silex\ServiceProviderInterface;
class GearmanServiceProvider implements ServiceProviderInterface
{
    private $client;
    public function __construct(\GearmanClient $client = null)
    {
        if (is_null($client)) {
            $client = new \GearmanClient();
            $client->addServers("localhost:4730");
        }
        $this->client = $client;
    }
    public function register(Application $app)
    {
        $app->register(new InjectorServiceProvider([
            'G\Gearman\Client' => 'gearmanClient',
            'G\Gearman\Tasks'  => 'gearmanTasks',
        ]));
        $app['gearmanClient'] = function () use ($app) {
            $client = new Client($this->client);
            $client->onSuccess(function ($response) {
                return $response;
            });
            return $client;
        };
        $app['gearmanTasks'] = function () use ($app) {
            return new Tasks($this->client);
        };
    }
    public function boot(Application $app)
    {
    }
}

The code is available in my github account

Advertisements

PHP Gearman Wrapper

I must admit that nowadays I’m using RabbitMQ more than Gearman but I’m still a big fan of gearman. PHP has a great api to connect to gearman work server but sometimes I miss another, how to say, “clean” way. Because of that I’ve creates a gearman wrapper. Let’s start.

I want to cover different areas: Workers, clients, background clients, and tasks.

Worker example:

use G\Gearman\Builder;

$worker = Builder::createWorker();

$worker->on("slow.process", function ($response, \GearmanJob $job) {
    echo "Response: {$response} unique: {$job->unique()}\n";
    // we emulate a slow process with a sleep
    sleep(2);

    return $job->unique();
});

$worker->on("fast.process", function ($response, \GearmanJob $job) {
    echo "Response: {$response} unique: {$job->unique()}\n";

    return $job->unique();
});

$worker->on("exception.process", function () {
    // we emulate a failing process
    throw new \Exception("Something wrong happens");
});

$worker->run();

And a client:

use G\Gearman\Builder;

$client = Builder::createClient();

$client->onSuccess(function ($response) {
    echo $response;
});

$client->doNormal('fast.process', "Hello");

One background client

use G\Gearman\Builder;

$client = Builder::createClient();

$client->doBackground('slow.process', "Hello1");
$client->doBackground('slow.process', "Hello2");
$client->doBackground('slow.process', "Hello3");

And finally, tasks

use G\Gearman\Builder;

$tasks = Builder::createTasks();

$tasks->onSuccess(function (\GearmanTask $task, $context) {
    $out = is_callable($context) ? $context($task) : $task->data();
    echo "onSuccess response: " . $out . " id: {$task->unique()}\n";
});

$tasks->onException(function (\GearmanTask $task) {
    echo "onException response {$task->data()}\n";
});

$responseParser = function (\GearmanTask $task) {
    return "Hello " . $task->data();
};

$tasks->addTask('fast.process', "fast1", $responseParser, 'g1');
$tasks->addTaskHigh('slow.process', "slow1", null, 'xxxx');
$tasks->addTask('fast.process', "fast2");
$tasks->addTask('exception.process', 'hi');

$tasks->runTasks();

The library is just a wrapper to the official api. I’ve create a builder to simplify the creation of the instances:

namespace G\Gearman;
class Builder
{
    static function createWorker($servers=null)
    {
        $worker = new \GearmanWorker();
        $worker->addServers($servers);
        return new Worker($worker);
    }
    static function createClient($servers=null)
    {
        $client = new \GearmanClient();
        $client->addServers($servers);
        return new Client($client);
    }
    static function createTasks($servers=null)
    {
        $client = new \GearmanClient();
        $client->addServers($servers);
        return new Tasks($client);
    }
}

that’s the worker wrapper

namespace G\Gearman;
class Worker
{
    private $worker;
    public function __construct(\GearmanWorker $worker)
    {
        $this->worker = $worker;
    }
    public function on($name, callable $callback, $context = null, $timeout = 0)
    {
        $this->worker->addFunction($name, function (\GearmanJob $job) use ($callback) {
            return call_user_func($callback, json_decode($job->workload()), $job);
        }, $context, $timeout);
    }
    public function run()
    {
        try {
            $this->loop();
        } catch (\Exception $e) {
            echo $e->getMessage() . "\n";
            $this->run();
        }
    }
    private function loop()
    {
        while ($this->worker->work()) {
        }
    }
}

Now the client one

namespace G\Gearman;
class Client
{
    private $onSuccessCallback;
    private $client;
    public function __construct(\GearmanClient $client)
    {
        $this->client = $client;
    }
    public function doHigh($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doNormal($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doLow($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doHighBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    public function doLowBackground($name, $workload=null, $unique = null)
    {
        return $this->doAction(__FUNCTION__, $name, $workload, $unique);
    }
    private function doAction($action, $name, $workload=null, $unique)
    {
        $workload = (string)$workload;
        $handle = $this->client->$action($name, json_encode($workload), $unique);
        $returnCode = $this->client->returnCode();
        if ($returnCode != \GEARMAN_SUCCESS) {
            throw new \Exception($this->client->error(), $returnCode);
        } else {
            if ($this->onSuccessCallback) {
                return call_user_func($this->onSuccessCallback, $handle);
            }
        }
        return null;
    }
    public function onSuccess(callable $callback)
    {
        $this->onSuccessCallback = $callback;
    }
}

and finally the tasks

namespace G\Gearman;
class Tasks
{
    private $client;
    private $tasks;
    public function __construct(\GearmanClient $client)
    {
        $this->tasks = [];
        $this->client = $client;
    }
    public function addTask($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskHigh($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskLow($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskHighBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function addTaskLowBackground($name, $workload=null, $context = null, $unique = null)
    {
        $this->tasks[] = [__FUNCTION__, $name, $workload, $context, $unique];
    }
    public function runTasks()
    {
        foreach ($this->tasks as list($actionName, $name, $workload, $context, $unique)) {
            $this->client->$actionName($name, json_encode($workload), $context, $unique);
        }
        $this->client->runTasks();
    }
    public function onSuccess(callable $callback)
    {
        $this->client->setCompleteCallback($callback);
    }
    public function onException(callable $callback)
    {
        $this->client->setExceptionCallback($callback);
    }
    public function onFail(callable $callback)
    {
        $this->client->setFailCallback($callback);
    }
}

Library is available in packagist and source code in my github account.