Blog Archives

Playing with RabbitMQ (part 2). Now with Python

Do you remember the las post about RabbitMQ? In that post we created a small wrapper library to use RabbitMQ with node and PHP. I also work with Python and I also want to use the same RabbitMQ wrapper here. With Python there’re several libraries to use Rabbit. I’ll use pika.

The idea is the same than the another post. I want to use queues, exchanges and RPCs. So let’s start with queues:

We can create a queue receiver called ‘queue.backend’

from rabbit import builder

server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}

def onData(data):
    print data['aaa']

builder.queue('queue.backend', server).receive(onData)

and emit messages to the queue

from rabbit import builder

server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}

queue = builder.queue('queue.backend', server)

queue.emit({"aaa": 1})
queue.emit({"aaa": 2})
queue.emit({"aaa": 3})

The library (as the PHP and ones). Uses a builder class to create our instances

from queue import Queue
from rpc import RPC
from exchange import Exchange

defaults = {
    'queue': {
        'queue': {
            'passive': False,
            'durable': True,
            'exclusive': False,
            'autoDelete': False,
            'nowait': False
        },
        'consumer': {
            'noLocal': False,
            'noAck': False,
            'exclusive': False,
            'nowait': False
        }
    },
    'exchange': {
        'exchange': {
            'passive': False,
            'durable': True,
            'autoDelete': True,
            'internal': False,
            'nowait': False
        },
        'queue': {
            'passive': False,
            'durable': True,
            'exclusive': False,
            'autoDelete': True,
            'nowait': False
        },
        'consumer': {
            'noLocal': False,
            'noAck': False,
            'exclusive': False,
            'nowait': False
        }
    },
    'rpc': {
        'queue': {
            'passive': False,
            'durable': True,
            'exclusive': False,
            'autoDelete': True,
            'nowait': False
        },
        'consumer': {
            'noLocal': False,
            'noAck': False,
            'exclusive': False,
            'nowait': False
        }
    }
}

def queue(name, server):
    conf = defaults['queue']
    conf['server'] = server

    return Queue(name, conf)

def rpc(name, server):
    conf = defaults['rpc']
    conf['server'] = server

    return RPC(name, conf)

def exchange(name, server):
    conf = defaults['exchange']
    conf['server'] = server

    return Exchange(name, conf)

And our Queue class

import pika
import json
import time

class Queue:
    def __init__(self, name, conf):
        self.name = name
        self.conf = conf

    def emit(self, data=None):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()

        queueConf = self.conf['queue']
        channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])

        channel.basic_publish(exchange='', routing_key=self.name, body=json.dumps(data), properties=pika.BasicProperties(delivery_mode=2))
        connection.close()

    def receive(self, callback):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()

        queueConf = self.conf['queue']
        channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])

        def _callback(ch, method, properties, body):
            callback(json.loads(body))
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print "%s %s::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)

        print "%s Queue '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
        consumerConf = self.conf['consumer']
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(_callback, self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])

        channel.start_consuming()

We also want to use exchanges to emit messages without waiting for answers, just as a event broadcast. We can emit messages:

from rabbit import builder

server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}

exchange = builder.exchange('process.log', server)

exchange.emit("xxx.log", "aaaa")
exchange.emit("xxx.log", ["11", "aaaa"])
exchange.emit("yyy.log", "aaaa")

And listen to messages

from rabbit import builder

server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}

def onData(routingKey, data):
    print routingKey, data

builder.exchange('process.log', server).receive("yyy.log", onData)

That’s the class

import pika
import json
import time

class Exchange:
    def __init__(self, name, conf):
        self.name = name
        self.conf = conf

    def emit(self, routingKey, data=None):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()

        exchangeConf = self.conf['exchange']
        channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])
        channel.basic_publish(exchange=self.name, routing_key=routingKey, body=json.dumps(data))
        connection.close()

    def receive(self, bindingKey, callback):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()

        exchangeConf = self.conf['exchange']
        channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])

        queueConf = self.conf['queue']
        result = channel.queue_declare(passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
        queue_name = result.method.queue

        channel.queue_bind(exchange=self.name, queue=queue_name, routing_key=bindingKey)

        print "%s Exchange '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)

        def _callback(ch, method, properties, body):
            callback(method.routing_key, json.loads(body))
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print "%s %s:::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)

        consumerConf = self.conf['consumer']
        channel.basic_consume(_callback, queue=queue_name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
        channel.start_consuming()

And finally we can use RPCs. Emit

from rabbit import builder

server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}

print builder.rpc('rpc.hello', server).call("Gonzalo", "Ayuso")

And the server side

from rabbit import builder

server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}

def onData(name, surname):
    return "Hello %s %s" % (name, surname)

builder.rpc('rpc.hello', server).server(onData)

And that’s the class

import pika
import json
import time
import uuid

class RPC:
    def __init__(self, name, conf):
        self.name = name
        self.conf = conf

    def call(self, *params):
        pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
        channel = connection.channel()

        queueConf = self.conf['queue']
        result = channel.queue_declare(queue='', passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
        callback_queue = result.method.queue
        consumerConf = self.conf['consumer']
        channel.basic_consume(self.on_call_response, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'], queue='')

        self.response = None
        self.corr_id = str(uuid.uuid4())
        channel.basic_publish(exchange='', routing_key=self.name, properties=pika.BasicProperties(reply_to=callback_queue, correlation_id=self.corr_id), body=json.dumps(params))
        while self.response is None:
            connection.process_data_events()
        return self.response

    def on_call_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def server(self, callback):
        pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
        channel = connection.channel()

        queueConf = self.conf['queue']
        channel.queue_declare(self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])

        channel.basic_qos(prefetch_count=1)
        consumerConf = self.conf['consumer']

        def on_server_request(ch, method, props, body):
            response = callback(*json.loads(body))

            ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=json.dumps(response))
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print "%s %s::req => '%s' response => '%s'" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body, response)

        channel.basic_consume(on_server_request, queue=self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])

        print "%s RPC '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
        channel.start_consuming()

And that’s all. Full project is available within my github account

Playing with RabbitMQ, PHP and node

I need to use RabbitMQ in one project. I’m a big fan of Gearman, but I must admit Rabbit is much more powerful. In this project I need to handle with PHP code and node, so I want to build a wrapper for those two languages. I don’t want to re-invent the wheel so I will use existing libraries (php-amqplib and amqplib for node).

Basically I need to use three things: First I need to create exchange channels to log different actions. I need to decouple those actions from the main code. I also need to create work queues to ensure those works are executed. It doesn’t matter if work is executed later but it must be executed. And finally RPC commands.

Let’s start with the queues. I want to push events to a queue in PHP

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
$queue = Builder::queue('queue.backend', $server);
$queue->emit(["aaa" => 1]);

and also with node

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var queue = builder.queue('queue.backend', server);
queue.emit({aaa: 1});

And I also want to register workers to those queues with PHP and node

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::queue('queue.backend', $server)->receive(function ($data) {
    error_log(json_encode($data));
});
var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var queue = builder.queue('queue.backend', server);
queue.receive(function (data) {
    console.log(data);
});

Both implementations use one builder. In this case we are using Queue:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Queue
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    private function declareQueue($channel)
    {
        $conf = $this->conf['queue'];
        $channel->queue_declare($this->name, $conf['passive'], $conf['durable'], $conf['exclusive'],
            $conf['auto_delete'], $conf['nowait']);
    }
    public function emit($data = null)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $this->declareQueue($channel);
        $msg = new AMQPMessage(json_encode($data),
            ['delivery_mode' => 2] # make message persistent
        );
        $channel->basic_publish($msg, '', $this->name);
        $channel->close();
        $connection->close();
    }
    public function receive(callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $this->declareQueue($channel);
        $consumer = $this->conf['consumer'];
        if ($consumer['no_ack'] === false) {
            $channel->basic_qos(null, 1, null);
        }
        $channel->basic_consume($this->name, '', $consumer['no_local'], $consumer['no_ack'], $consumer['exclusive'],
            $consumer['nowait'],
            function ($msg) use ($callback) {
                call_user_func($callback, json_decode($msg->body, true), $this->name);
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s')."] {$this->name}::".$msg->body, "\n";
            });
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] Queue '{$this->name}' initialized \n";
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
var amqp = require('amqplib/callback_api');

var Queue = function (name, conf) {
    return {
        emit: function (data, close=true) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    var msg = JSON.stringify(data);

                    ch.assertQueue(name, conf.queue);
                    ch.sendToQueue(name, new Buffer(msg));
                });
                if (close) {
                    setTimeout(function () {
                        conn.close();
                        process.exit(0)
                    }, 500);
                }
            });
        },
        receive: function (callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue(name, conf.queue);
                    console.log(new Date().toString() + ' Queue ' + name + ' initialized');
                    ch.consume(name, function (msg) {
                        console.log(new Date().toString() + " Received %s", msg.content.toString());
                        if (callback) {
                            callback(JSON.parse(msg.content.toString()), msg.fields.routingKey)
                        }
                        if (conf.consumer.noAck === false) {
                            ch.ack(msg);
                        }
                    }, conf.consumer);
                });
            });
        }
    };
};

module.exports = Queue;

We also want to emit messages using an exchange

Emiter:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
$exchange = Builder::exchange('process.log', $server);
$exchange->emit("xxx.log", "aaaa");
$exchange->emit("xxx.log", ["11", "aaaa"]);
$exchange->emit("yyy.log", "aaaa");
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var exchange = builder.exchange('process.log', server);

exchange.emit("xxx.log", "aaaa");
exchange.emit("xxx.log", ["11", "aaaa"]);
exchange.emit("yyy.log", "aaaa");

and receiver:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::exchange('process.log', $server)->receive("yyy.log", function ($routingKey, $data) {
    error_log($routingKey." - ".json_encode($data));
});
var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var exchange = builder.exchange('process.log', server);

exchange.receive("yyy.log", function (routingKey, data) {
    console.log(routingKey, data);
});

And that’s the PHP implementation:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Exchange
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    public function emit($routingKey, $data = null)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $conf = $this->conf['exchange'];
        $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
            $conf['internal'], $conf['nowait']);
        $msg = new AMQPMessage(json_encode($data), [
            'delivery_mode' => 2, # make message persistent
        ]);
        $channel->basic_publish($msg, $this->name, $routingKey);
        $channel->close();
        $connection->close();
    }
    public function receive($bindingKey, callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $conf = $this->conf['exchange'];
        $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
            $conf['internal'], $conf['nowait']);
        $queueConf = $this->conf['queue'];
        list($queue_name, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
            $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
        $channel->queue_bind($queue_name, $this->name, $bindingKey);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($queue_name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'], $consumerConf['nowait'],
            function ($msg) use ($callback) {
                call_user_func($callback, $msg->delivery_info['routing_key'], json_decode($msg->body, true));
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.':'.$msg->delivery_info['routing_key'].'::', $msg->body, "\n";
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            });
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] Exchange '{$this->name}' initialized \n";
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}

And node:

var amqp = require('amqplib/callback_api');

var Exchange = function (name, conf) {
    return {
        emit: function (routingKey, data, close = true) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    var msg = JSON.stringify(data);
                    ch.assertExchange(name, 'topic', conf.exchange);
                    ch.publish(name, routingKey, new Buffer(msg));
                });
                if (close) {
                    setTimeout(function () {
                        conn.close();
                        process.exit(0)
                    }, 500);
                }
            });
        },
        receive: function (bindingKey, callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertExchange(name, 'topic', conf.exchange);
                    console.log(new Date().toString() + ' Exchange ' + name + ' initialized');
                    ch.assertQueue('', conf.queue, function (err, q) {

                        ch.bindQueue(q.queue, name, bindingKey);

                        ch.consume(q.queue, function (msg) {
                            console.log(new Date().toString(), name, ":", msg.fields.routingKey, "::", msg.content.toString());
                            if (callback) {
                                callback(msg.fields.routingKey, JSON.parse(msg.content.toString()))
                            }
                            if (conf.consumer.noAck === false) {
                                ch.ack(msg);
                            }
                        }, conf.consumer);
                    });
                });
            });
        }
    };
};

module.exports = Exchange;

Finally we want to use RPC commands. In fact RPC implementations is similar than Queue but in this case client will receive an answer.

Client side

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
echo Builder::rpc('rpc.hello', $server)->call("Gonzalo", "Ayuso");
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var rpc = builder.rpc('rpc.hello', server);
rpc.call("Gonzalo", "Ayuso", function (data) {
    console.log(data);
});

Server side:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::rpc('rpc.hello', $server)->server(function ($name, $surname) use ($server) {
    return "Hello {$name} {$surname}";
});
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var rpc = builder.rpc('rpc.hello', server);

rpc.server(function (name, surname) {
    return "Hello " + name + " " + surname;
});

And Implementations:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RPC
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    public function call()
    {
        $params = (array)func_get_args();
        $response = null;
        $corr_id = uniqid();
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $queueConf = $this->conf['queue'];
        list($callback_queue, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
            $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($callback_queue, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'], $consumerConf['nowait'], function ($rep) use (&$corr_id, &$response) {
                if ($rep->get('correlation_id') == $corr_id) {
                    $response = $rep->body;
                }
            });
        $msg = new AMQPMessage(json_encode($params), [
            'correlation_id' => $corr_id,
            'reply_to'       => $callback_queue,
        ]);
        $channel->basic_publish($msg, '', $this->name);
        while (!$response) {
            $channel->wait();
        }
        return json_decode($response, true);
    }
    public function server(callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $queueConf = $this->conf['queue'];
        $channel->queue_declare($this->name, $queueConf['passive'], $queueConf['durable'], $queueConf['exclusive'],
            $queueConf['auto_delete'], $queueConf['nowait']);
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] RPC server '{$this->name}' initialized \n";
        $channel->basic_qos(null, 1, null);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($this->name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'],
            $consumerConf['nowait'], function ($req) use ($callback) {
                $response = json_encode(call_user_func_array($callback, array_values(json_decode($req->body, true))));
                $msg = new AMQPMessage($response, [
                    'correlation_id' => $req->get('correlation_id'),
                    'delivery_mode'  => 2, # make message persistent
                ]);
                $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
                $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.":: req => '{$req->body}' response=> '{$response}'\n";
            });
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
var amqp = require('amqplib/callback_api');

var RPC = function (name, conf) {
    var generateUuid = function () {
        return Math.random().toString() +
            Math.random().toString() +
            Math.random().toString();
    };

    return {
        call: function () {
            var params = [];
            for (i = 0; i < arguments.length - 1; i++) {
                params.push(arguments[i]);
            }
            var callback = arguments[arguments.length - 1];

            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue('', conf.queue, function (err, q) {
                        var corr = generateUuid();

                        ch.consume(q.queue, function (msg) {
                            if (msg.properties.correlationId == corr) {
                                callback(JSON.parse(msg.content.toString()));
                                setTimeout(function () {
                                    conn.close();
                                    process.exit(0)
                                }, 500);
                            }
                        }, conf.consumer);
                        ch.sendToQueue(name,
                            new Buffer(JSON.stringify(params)),
                            {correlationId: corr, replyTo: q.queue});
                    });
                });
            });
        },
        server: function (callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue(name, conf.queue);
                    console.log(new Date().toString() + ' RPC ' + name + ' initialized');
                    ch.prefetch(1);
                    ch.consume(name, function reply(msg) {
                        console.log(new Date().toString(), msg.fields.routingKey, " :: ", msg.content.toString());
                        var response = JSON.stringify(callback.apply(this, JSON.parse(msg.content.toString())));
                        ch.sendToQueue(msg.properties.replyTo,
                            new Buffer(response),
                            {correlationId: msg.properties.correlationId});

                        ch.ack(msg);

                    }, conf.consumer);
                });
            });
        }
    };
};

module.exports = RPC;

You can see whole projects at github: RabbitMQ-php, RabbitMQ-node

Sending logs to a remote server using RabbitMQ

Time ago I wrote an article to show how to send Silex logs to a remote server. Today I want to use a messaging queue to do it. Normally, when I need queues, I use Gearman but today I want to play with RabbitMQ.

When we work with web applications it’s important to have, in some way or another, one way to decouple operations from the main request. Messaging queues are great tools to perform those operations. They even allow us to create our workers with a different languages than the main request. This days, for example, I’m working with modbus devices. The whole modbus logic is written in Python and I want to use a Frontend with PHP. I can rewrite the modbus logic with PHP (there’re PHP libraries to connect with modbus devices), but I’m not so crazy. Queues are our friends.

The idea in this post is the same than the previous post. We’ll use event dispatcher to emit events and we’ll send those events to a RabitMQ queue. We’ll use a Service Provider called.

<?php
include __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use RabbitLogger\LoggerServiceProvider;
use Silex\Application;
use Symfony\Component\HttpKernel\Event;
use Symfony\Component\HttpKernel\KernelEvents;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$app = new Application(['debug' => true]);
$app->register(new LoggerServiceProvider($connection, $channel));

$app->on(KernelEvents::TERMINATE, function (Event\PostResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('TERMINATE');
});

$app->on(KernelEvents::CONTROLLER, function (Event\FilterControllerEvent $event) use ($app) {
    $app['rabbit.logger']->info('CONTROLLER');
});

$app->on(KernelEvents::EXCEPTION, function (Event\GetResponseForExceptionEvent $event) use ($app) {
    $app['rabbit.logger']->info('EXCEPTION');
});

$app->on(KernelEvents::FINISH_REQUEST, function (Event\FinishRequestEvent $event) use ($app) {
    $app['rabbit.logger']->info('FINISH_REQUEST');
});

$app->on(KernelEvents::RESPONSE, function (Event\FilterResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('RESPONSE');
});

$app->on(KernelEvents::REQUEST, function (Event\GetResponseEvent $event) use ($app) {
    $app['rabbit.logger']->info('REQUEST');
});

$app->on(KernelEvents::VIEW, function (Event\GetResponseForControllerResultEvent $event) use ($app) {
    $app['rabbit.logger']->info('VIEW');
});

$app->get('/', function (Application $app) {
    $app['rabbit.logger']->info('inside route');
    return "HELLO";
});

$app->run();

Here we can see the service provider:

<?php
namespace RabbitLogger;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Silex\Application;
use Silex\ServiceProviderInterface;

class LoggerServiceProvider implements ServiceProviderInterface
{
    private $connection;
    private $channel;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel)
    {
        $this->connection = $connection;
        $this->channel    = $channel;
    }

    public function register(Application $app)
    {
        $app['rabbit.logger'] = $app->share(
            function () use ($app) {
                $channelName = isset($app['logger.channel.name']) ? $app['logger.channel.name'] : 'logger.channel';
                return new Logger($this->connection, $this->channel, $channelName);
            }
        );
    }

    public function boot(Application $app)
    {
    }
}

And here the logger:

<?php
namespace RabbitLogger;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Silex\Application;

class Logger implements LoggerInterface
{
    private $connection;
    private $channel;
    private $queueName;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel, $queueName = 'logger')
    {
        $this->connection = $connection;
        $this->channel    = $channel;
        $this->queueName  = $queueName;
        $this->channel->queue_declare($queueName, false, false, false, false);
    }

    function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }

    public function emergency($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::EMERGENCY);
    }

    public function alert($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::ALERT);
    }

    public function critical($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::CRITICAL);
    }

    public function error($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::ERROR);
    }

    public function warning($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::WARNING);
    }

    public function notice($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::NOTICE);
    }

    public function info($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::INFO);
    }

    public function debug($message, array $context = [])
    {
        $this->sendLog($message, $context, LogLevel::DEBUG);
    }
    public function log($level, $message, array $context = [])
    {
        $this->sendLog($message, $context, $level);
    }

    private function sendLog($message, array $context = [], $level = LogLevel::INFO)
    {
        $msg = new AMQPMessage(json_encode([$message, $context, $level]), ['delivery_mode' => 2]);
        $this->channel->basic_publish($msg, '', $this->queueName);
    }
}

And finally the RabbitMQ Worker to process our logs

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('logger.channel', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//$channel->basic_qos(null, 1, null);
$channel->basic_consume('logger.channel', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
    $channel->wait();
}
$channel->close();
$connection->close();

To run the example we must:

Start RabbitMQ server

rabbitmq-server

start Silex server

php -S 0.0.0.0:8080 -t www

start worker

php worker/worker.php

You can see whole project in my github account