Blog Archives

Working with SAPUI5 locally (part 3). Adding more services in Docker

In the previous project we moved one project to docker. The idea was to move exactly the same functionality (even without touching anything within the source code). Now we’re going to add more services. Yes, I know, it looks like overenginering (it’s exactly overenginering, indeed), but I want to build something with different services working together. Let start.

We’re going to change a little bit our original project. Now our frontend will only have one button. This button will increment the number of clicks but we’re going to persists this information in a PostgreSQL database. Also, instead of incrementing the counter in the backend, our backend will emit one event to a RabbitMQ message broker. We’ll have one worker service listening to this event and this worker will persist the information. The communication between the worker and the frontend (to show the incremented value), will be via websockets.

With those premises we are going to need:

  • Frontend: UI5 application
  • Backend: PHP/lumen application
  • Worker: nodejs application which is listening to a RabbitMQ event and serving the websocket server (using socket.io)
  • Nginx server
  • PosgreSQL database.
  • RabbitMQ message broker.

As the previous examples, our PHP backend will be server via Nginx and PHP-FPM.

Here we can see to docker-compose file to set up all the services

version: '3.4'

services:
  nginx:
    image: gonzalo123.nginx
    restart: always
    ports:
    - "8080:80"
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-nginx
    volumes:
    - ./src/backend:/code/src
    - ./src/.docker/web/site.conf:/etc/nginx/conf.d/default.conf
    networks:
    - app-network
  api:
    image: gonzalo123.api
    restart: always
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-lumen-dev
    environment:
      XDEBUG_CONFIG: remote_host=${MY_IP}
    volumes:
    - ./src/backend:/code/src
    networks:
    - app-network
  ui5:
    image: gonzalo123.ui5
    ports:
    - "8000:8000"
    restart: always
    volumes:
    - ./src/frontend:/code/src
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-ui5
    networks:
    - app-network
  io:
    image: gonzalo123.io
    ports:
    - "9999:9999"
    restart: always
    volumes:
    - ./src/io:/code/src
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-io
    networks:
    - app-network
  pg:
    image: gonzalo123.pg
    restart: always
    ports:
    - "5432:5432"
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-pg
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata
    networks:
    - app-network
  rabbit:
    image: rabbitmq:3-management
    container_name: gonzalo123.rabbit
    restart: always
    ports:
    - "15672:15672"
    - "5672:5672"
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
    networks:
    - app-network
networks:
  app-network:
    driver: bridge

We’re going to use the same docker files than in the previous post but we also need new ones for worker, database server and message queue:

Worker:

FROM node:alpine

EXPOSE 8000

WORKDIR /code/src
COPY ./io .
RUN npm install
ENTRYPOINT ["npm", "run", "serve"]

The worker script is simple script that serves the socket.io server and emits a websocket within every message to the RabbitMQ queue.

var amqp = require('amqp'),
  httpServer = require('http').createServer(),
  io = require('socket.io')(httpServer, {
    origins: '*:*',
  }),
  pg = require('pg')
;

require('dotenv').config();
var pgClient = new pg.Client(process.env.DB_DSN);

rabbitMq = amqp.createConnection({
  host: process.env.RABBIT_HOST,
  port: process.env.RABBIT_PORT,
  login: process.env.RABBIT_USER,
  password: process.env.RABBIT_PASS,
});

var sql = 'SELECT clickCount FROM docker.clicks';

// Please don't do this. Use lazy connections
// I'm 'lazy' to do it in this POC 🙂
pgClient.connect(function(err) {
  io.on('connection', function() {
    pgClient.query(sql, function(err, result) {
      var count = result.rows[0]['clickcount'];
      io.emit('click', {count: count});
    });

  });

  rabbitMq.on('ready', function() {
    var queue = rabbitMq.queue('ui5');
    queue.bind('#');

    queue.subscribe(function(message) {
      pgClient.query(sql, function(err, result) {
        var count = parseInt(result.rows[0]['clickcount']);
        count = count + parseInt(message.data.toString('utf8'));
        pgClient.query('UPDATE docker.clicks SET clickCount = $1', [count],
          function(err) {
            io.emit('click', {count: count});
          });
      });
    });
  });
});

httpServer.listen(process.env.IO_PORT);

Database server:

FROM postgres:9.6-alpine
COPY pg/init.sql /docker-entrypoint-initdb.d/

As we can see we’re going to generate the database estructure in the first build

CREATE SCHEMA docker;

CREATE TABLE docker.clicks (
clickCount numeric(8) NOT NULL
);

ALTER TABLE docker.clicks
OWNER TO username;

INSERT INTO docker.clicks(clickCount) values (0);

With the RabbitMQ server we’re going to use the official docker image so we don’t need to create one Dockerfile

We also have changed a little bit our Nginx configuration. We want to use Nginx to serve backend and also socket.io server. That’s because we don’t want to expose different ports to internet.

server {
    listen 80;
    index index.php index.html;
    server_name localhost;
    error_log  /var/log/nginx/error.log;
    access_log /var/log/nginx/access.log;
    root /code/src/www;

    location /socket.io/ {
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_pass "http://io:9999";
    }

    location / {
        try_files $uri $uri/ /index.php?$query_string;
    }

    location ~ \.php$ {
        try_files $uri =404;
        fastcgi_split_path_info ^(.+\.php)(/.+)$;
        fastcgi_pass api:9000;
        fastcgi_index index.php;
        include fastcgi_params;
        fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
        fastcgi_param PATH_INFO $fastcgi_path_info;
    }
}

To avoid CORS issues we can also use SCP destination (the localneo proxy in this example), to serve socket.io also. So we need to:

  • change our neo-app.json file
  • "routes": [
        ...
        {
          "path": "/socket.io",
          "target": {
            "type": "destination",
            "name": "SOCKETIO"
          },
          "description": "SOCKETIO"
        }
      ],
    

    And basically that’s all. Here also we can use a “production” docker-copose file without exposing all ports and mapping the filesystem to our local machine (useful when we’re developing)

    version: '3.4'
    
    services:
      nginx:
        image: gonzalo123.nginx
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-nginx
        networks:
        - app-network
      api:
        image: gonzalo123.api
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-lumen
        networks:
        - app-network
      ui5:
        image: gonzalo123.ui5
        ports:
        - "80:8000"
        restart: always
        volumes:
        - ./src/frontend:/code/src
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-ui5
        networks:
        - app-network
      io:
        image: gonzalo123.io
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-io
        networks:
        - app-network
      pg:
        image: gonzalo123.pg
        restart: always
        build:
          context: ./src
          dockerfile: .docker/Dockerfile-pg
        environment:
          POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
          POSTGRES_USER: ${POSTGRES_USER}
          POSTGRES_DB: ${POSTGRES_DB}
          PGDATA: /var/lib/postgresql/data/pgdata
        networks:
        - app-network
      rabbit:
        image: rabbitmq:3-management
        restart: always
        environment:
          RABBITMQ_ERLANG_COOKIE:
          RABBITMQ_DEFAULT_VHOST: /
          RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
          RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
        networks:
        - app-network
    networks:
      app-network:
        driver: bridge
    

    And that’s all. The full project is available in my github account

    Advertisements

    Happy logins. Only the happy user will pass

    Login forms are bored. In this example we’re going to create an especial login form. Only for happy users. Happiness is something complicated, but at least, one smile is more easy to obtain, and all is better with one smile :). Our login form will only appear if the user smiles. Let’s start.

    I must admit that this project is just an excuse to play with different technologies that I wanted to play. Weeks ago I discovered one library called face_classification. With this library I can perform emotion classification from a picture. The idea is simple. We create RabbitMQ RPC server script that answers with the emotion of the face within a picture. Then we obtain on frame from the video stream of the webcam (with HTML5) and we send this frame using websocket to a socket.io server. This websocket server (node) ask to the RabbitMQ RPC the emotion and it sends back to the browser the emotion and a the original picture with a rectangle over the face.

    Frontend

    As well as we’re going to use socket.io for websockets we will use the same script to serve the frontend (the login and the HTML5 video capture)

    <!doctype html>
    <html>
    <head>
        <title>Happy login</title>
        <link rel="stylesheet" href="css/app.css">
    </head>
    <body>
    
    <div id="login-page" class="login-page">
        <div class="form">
            <h1 id="nonHappy" style="display: block;">Only the happy user will pass</h1>
            <form id="happyForm" class="login-form" style="display: none" onsubmit="return false;">
                <input id="user" type="text" placeholder="username"/>
                <input id="pass" type="password" placeholder="password"/>
                <button id="login">login</button>
                <p></p>
                <img id="smile" width="426" height="320" src=""/>
            </form>
            <div id="video">
                <video style="display:none;"></video>
                <canvas id="canvas" style="display:none"></canvas>
                <canvas id="canvas-face" width="426" height="320"></canvas>
            </div>
        </div>
    </div>
    
    <div id="private" style="display: none;">
        <h1>Private page</h1>
    </div>
    
    <script src="https://code.jquery.com/jquery-3.2.1.min.js" integrity="sha256-hwg4gsxgFZhOsEEamdOYGBf13FyQuiTwlAQgxVSNgt4=" crossorigin="anonymous"></script>
    <script src="https://unpkg.com/sweetalert/dist/sweetalert.min.js"></script>
    <script type="text/javascript" src="/socket.io/socket.io.js"></script>
    <script type="text/javascript" src="/js/app.js"></script>
    </body>
    </html>
    

    Here we’ll connect to the websocket and we’ll emit the webcam frame to the server. We´ll also be listening to one event called ‘response’ where server will notify us when one emotion has been detected.

    let socket = io.connect(location.origin),
        img = new Image(),
        canvasFace = document.getElementById('canvas-face'),
        context = canvasFace.getContext('2d'),
        canvas = document.getElementById('canvas'),
        width = 640,
        height = 480,
        delay = 1000,
        jpgQuality = 0.6,
        isHappy = false;
    
    socket.on('response', function (r) {
        let data = JSON.parse(r);
        if (data.length > 0 && data[0].hasOwnProperty('emotion')) {
            if (isHappy === false && data[0]['emotion'] === 'happy') {
                isHappy = true;
                swal({
                    title: "Good!",
                    text: "All is better with one smile!",
                    icon: "success",
                    buttons: false,
                    timer: 2000,
                });
    
                $('#nonHappy').hide();
                $('#video').hide();
                $('#happyForm').show();
                $('#smile')[0].src = 'data:image/png;base64,' + data[0].image;
            }
    
            img.onload = function () {
                context.drawImage(this, 0, 0, canvasFace.width, canvasFace.height);
            };
    
            img.src = 'data:image/png;base64,' + data[0].image;
        }
    });
    
    navigator.getMedia = (navigator.getUserMedia || navigator.webkitGetUserMedia || navigator.mozGetUserMedia);
    
    navigator.getMedia({video: true, audio: false}, (mediaStream) => {
        let video = document.getElementsByTagName('video')[0];
        video.src = window.URL.createObjectURL(mediaStream);
        video.play();
        setInterval(((video) => {
            return function () {
                let context = canvas.getContext('2d');
                canvas.width = width;
                canvas.height = height;
                context.drawImage(video, 0, 0, width, height);
                socket.emit('img', canvas.toDataURL('image/jpeg', jpgQuality));
            }
        })(video), delay)
    }, error => console.log(error));
    
    $(() => {
        $('#login').click(() => {
            $('#login-page').hide();
            $('#private').show();
        })
    });
    

    Backend
    Finally we’ll work in the backend. Basically I’ve check the examples that we can see in face_classification project and tune it a bit according to my needs.

    from rabbit import builder
    import logging
    import numpy as np
    from keras.models import load_model
    from utils.datasets import get_labels
    from utils.inference import detect_faces
    from utils.inference import draw_text
    from utils.inference import draw_bounding_box
    from utils.inference import apply_offsets
    from utils.inference import load_detection_model
    from utils.inference import load_image
    from utils.preprocessor import preprocess_input
    import cv2
    import json
    import base64
    
    detection_model_path = 'trained_models/detection_models/haarcascade_frontalface_default.xml'
    emotion_model_path = 'trained_models/emotion_models/fer2013_mini_XCEPTION.102-0.66.hdf5'
    emotion_labels = get_labels('fer2013')
    font = cv2.FONT_HERSHEY_SIMPLEX
    
    # hyper-parameters for bounding boxes shape
    emotion_offsets = (20, 40)
    
    # loading models
    face_detection = load_detection_model(detection_model_path)
    emotion_classifier = load_model(emotion_model_path, compile=False)
    
    # getting input model shapes for inference
    emotion_target_size = emotion_classifier.input_shape[1:3]
    
    
    def format_response(response):
        decoded_json = json.loads(response)
        return "Hello {}".format(decoded_json['name'])
    
    
    def on_data(data):
        f = open('current.jpg', 'wb')
        f.write(base64.decodebytes(data))
        f.close()
        image_path = "current.jpg"
    
        out = []
        # loading images
        rgb_image = load_image(image_path, grayscale=False)
        gray_image = load_image(image_path, grayscale=True)
        gray_image = np.squeeze(gray_image)
        gray_image = gray_image.astype('uint8')
    
        faces = detect_faces(face_detection, gray_image)
        for face_coordinates in faces:
            x1, x2, y1, y2 = apply_offsets(face_coordinates, emotion_offsets)
            gray_face = gray_image[y1:y2, x1:x2]
    
            try:
                gray_face = cv2.resize(gray_face, (emotion_target_size))
            except:
                continue
    
            gray_face = preprocess_input(gray_face, True)
            gray_face = np.expand_dims(gray_face, 0)
            gray_face = np.expand_dims(gray_face, -1)
            emotion_label_arg = np.argmax(emotion_classifier.predict(gray_face))
            emotion_text = emotion_labels[emotion_label_arg]
            color = (0, 0, 255)
    
            draw_bounding_box(face_coordinates, rgb_image, color)
            draw_text(face_coordinates, rgb_image, emotion_text, color, 0, -50, 1, 2)
            bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR)
    
            cv2.imwrite('predicted.png', bgr_image)
            data = open('predicted.png', 'rb').read()
            encoded = base64.encodebytes(data).decode('utf-8')
            out.append({
                'image': encoded,
                'emotion': emotion_text,
            })
    
        return out
    
    logging.basicConfig(level=logging.WARN)
    rpc = builder.rpc("image.check", {'host': 'localhost', 'port': 5672})
    rpc.server(on_data)
    

    Here you can see in action the working prototype

    Maybe we can do the same with another tools and even more simple but as I said before this example is just an excuse to play with those technologies:

    • Send webcam frames via websockets
    • Connect one web application to a Pyhon application via RabbitMQ RPC
    • Play with face classification script

    Please don’t use this script in production. It’s just a proof of concepts. With smiles but a proof of concepts 🙂

    You can see the project in my github account

    Playing with RabbitMQ (part 2). Now with Python

    Do you remember the las post about RabbitMQ? In that post we created a small wrapper library to use RabbitMQ with node and PHP. I also work with Python and I also want to use the same RabbitMQ wrapper here. With Python there’re several libraries to use Rabbit. I’ll use pika.

    The idea is the same than the another post. I want to use queues, exchanges and RPCs. So let’s start with queues:

    We can create a queue receiver called ‘queue.backend’

    from rabbit import builder
    
    server = {
        'host': 'localhost',
        'port': 5672,
        'user': 'guest',
        'pass': 'guest',
    }
    
    def onData(data):
        print data['aaa']
    
    builder.queue('queue.backend', server).receive(onData)
    

    and emit messages to the queue

    from rabbit import builder
    
    server = {
        'host': 'localhost',
        'port': 5672,
        'user': 'guest',
        'pass': 'guest',
    }
    
    queue = builder.queue('queue.backend', server)
    
    queue.emit({"aaa": 1})
    queue.emit({"aaa": 2})
    queue.emit({"aaa": 3})
    

    The library (as the PHP and ones). Uses a builder class to create our instances

    from queue import Queue
    from rpc import RPC
    from exchange import Exchange
    
    defaults = {
        'queue': {
            'queue': {
                'passive': False,
                'durable': True,
                'exclusive': False,
                'autoDelete': False,
                'nowait': False
            },
            'consumer': {
                'noLocal': False,
                'noAck': False,
                'exclusive': False,
                'nowait': False
            }
        },
        'exchange': {
            'exchange': {
                'passive': False,
                'durable': True,
                'autoDelete': True,
                'internal': False,
                'nowait': False
            },
            'queue': {
                'passive': False,
                'durable': True,
                'exclusive': False,
                'autoDelete': True,
                'nowait': False
            },
            'consumer': {
                'noLocal': False,
                'noAck': False,
                'exclusive': False,
                'nowait': False
            }
        },
        'rpc': {
            'queue': {
                'passive': False,
                'durable': True,
                'exclusive': False,
                'autoDelete': True,
                'nowait': False
            },
            'consumer': {
                'noLocal': False,
                'noAck': False,
                'exclusive': False,
                'nowait': False
            }
        }
    }
    
    def queue(name, server):
        conf = defaults['queue']
        conf['server'] = server
    
        return Queue(name, conf)
    
    def rpc(name, server):
        conf = defaults['rpc']
        conf['server'] = server
    
        return RPC(name, conf)
    
    def exchange(name, server):
        conf = defaults['exchange']
        conf['server'] = server
    
        return Exchange(name, conf)
    

    And our Queue class

    import pika
    import json
    import time
    
    class Queue:
        def __init__(self, name, conf):
            self.name = name
            self.conf = conf
    
        def emit(self, data=None):
            credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
            channel = connection.channel()
    
            queueConf = self.conf['queue']
            channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
    
            channel.basic_publish(exchange='', routing_key=self.name, body=json.dumps(data), properties=pika.BasicProperties(delivery_mode=2))
            connection.close()
    
        def receive(self, callback):
            credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
            channel = connection.channel()
    
            queueConf = self.conf['queue']
            channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
    
            def _callback(ch, method, properties, body):
                callback(json.loads(body))
                ch.basic_ack(delivery_tag=method.delivery_tag)
                print "%s %s::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)
    
            print "%s Queue '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
            consumerConf = self.conf['consumer']
            channel.basic_qos(prefetch_count=1)
            channel.basic_consume(_callback, self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
    
            channel.start_consuming()
    

    We also want to use exchanges to emit messages without waiting for answers, just as a event broadcast. We can emit messages:

    from rabbit import builder
    
    server = {
        'host': 'localhost',
        'port': 5672,
        'user': 'guest',
        'pass': 'guest',
    }
    
    exchange = builder.exchange('process.log', server)
    
    exchange.emit("xxx.log", "aaaa")
    exchange.emit("xxx.log", ["11", "aaaa"])
    exchange.emit("yyy.log", "aaaa")
    

    And listen to messages

    from rabbit import builder
    
    server = {
        'host': 'localhost',
        'port': 5672,
        'user': 'guest',
        'pass': 'guest',
    }
    
    def onData(routingKey, data):
        print routingKey, data
    
    builder.exchange('process.log', server).receive("yyy.log", onData)
    

    That’s the class

    import pika
    import json
    import time
    
    class Exchange:
        def __init__(self, name, conf):
            self.name = name
            self.conf = conf
    
        def emit(self, routingKey, data=None):
            credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
            channel = connection.channel()
    
            exchangeConf = self.conf['exchange']
            channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])
            channel.basic_publish(exchange=self.name, routing_key=routingKey, body=json.dumps(data))
            connection.close()
    
        def receive(self, bindingKey, callback):
            credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
            channel = connection.channel()
    
            exchangeConf = self.conf['exchange']
            channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])
    
            queueConf = self.conf['queue']
            result = channel.queue_declare(passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
            queue_name = result.method.queue
    
            channel.queue_bind(exchange=self.name, queue=queue_name, routing_key=bindingKey)
    
            print "%s Exchange '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
    
            def _callback(ch, method, properties, body):
                callback(method.routing_key, json.loads(body))
                ch.basic_ack(delivery_tag=method.delivery_tag)
                print "%s %s:::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)
    
            consumerConf = self.conf['consumer']
            channel.basic_consume(_callback, queue=queue_name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
            channel.start_consuming()
    

    And finally we can use RPCs. Emit

    from rabbit import builder
    
    server = {
        'host': 'localhost',
        'port': 5672,
        'user': 'guest',
        'pass': 'guest',
    }
    
    print builder.rpc('rpc.hello', server).call("Gonzalo", "Ayuso")
    

    And the server side

    from rabbit import builder
    
    server = {
        'host': 'localhost',
        'port': 5672,
        'user': 'guest',
        'pass': 'guest',
    }
    
    def onData(name, surname):
        return "Hello %s %s" % (name, surname)
    
    builder.rpc('rpc.hello', server).server(onData)
    

    And that’s the class

    import pika
    import json
    import time
    import uuid
    
    class RPC:
        def __init__(self, name, conf):
            self.name = name
            self.conf = conf
    
        def call(self, *params):
            pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
            channel = connection.channel()
    
            queueConf = self.conf['queue']
            result = channel.queue_declare(queue='', passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
            callback_queue = result.method.queue
            consumerConf = self.conf['consumer']
            channel.basic_consume(self.on_call_response, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'], queue='')
    
            self.response = None
            self.corr_id = str(uuid.uuid4())
            channel.basic_publish(exchange='', routing_key=self.name, properties=pika.BasicProperties(reply_to=callback_queue, correlation_id=self.corr_id), body=json.dumps(params))
            while self.response is None:
                connection.process_data_events()
            return self.response
    
        def on_call_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def server(self, callback):
            pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
            channel = connection.channel()
    
            queueConf = self.conf['queue']
            channel.queue_declare(self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
    
            channel.basic_qos(prefetch_count=1)
            consumerConf = self.conf['consumer']
    
            def on_server_request(ch, method, props, body):
                response = callback(*json.loads(body))
    
                ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=json.dumps(response))
                ch.basic_ack(delivery_tag=method.delivery_tag)
                print "%s %s::req => '%s' response => '%s'" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body, response)
    
            channel.basic_consume(on_server_request, queue=self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
    
            print "%s RPC '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
            channel.start_consuming()
    

    And that’s all. Full project is available within my github account

    Playing with RabbitMQ, PHP and node

    I need to use RabbitMQ in one project. I’m a big fan of Gearman, but I must admit Rabbit is much more powerful. In this project I need to handle with PHP code and node, so I want to build a wrapper for those two languages. I don’t want to re-invent the wheel so I will use existing libraries (php-amqplib and amqplib for node).

    Basically I need to use three things: First I need to create exchange channels to log different actions. I need to decouple those actions from the main code. I also need to create work queues to ensure those works are executed. It doesn’t matter if work is executed later but it must be executed. And finally RPC commands.

    Let’s start with the queues. I want to push events to a queue in PHP

    use G\Rabbit\Builder;
    $server = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'pass' => 'guest',
    ];
    $queue = Builder::queue('queue.backend', $server);
    $queue->emit(["aaa" => 1]);
    

    and also with node

    var server = {
        host: 'localhost',
        port: 5672,
        user: 'guest',
        pass: 'guest'
    };
    
    var queue = builder.queue('queue.backend', server);
    queue.emit({aaa: 1});
    

    And I also want to register workers to those queues with PHP and node

    use G\Rabbit\Builder;
    $server = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'pass' => 'guest',
    ];
    Builder::queue('queue.backend', $server)->receive(function ($data) {
        error_log(json_encode($data));
    });
    
    var server = {
        host: 'localhost',
        port: 5672,
        user: 'guest',
        pass: 'guest'
    };
    
    var queue = builder.queue('queue.backend', server);
    queue.receive(function (data) {
        console.log(data);
    });
    

    Both implementations use one builder. In this case we are using Queue:

    namespace G\Rabbit;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    class Queue
    {
        private $name;
        private $conf;
        public function __construct($name, $conf)
        {
            $this->name = $name;
            $this->conf = $conf;
        }
        private function createConnection()
        {
            $server = $this->conf['server'];
            return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
        }
        private function declareQueue($channel)
        {
            $conf = $this->conf['queue'];
            $channel->queue_declare($this->name, $conf['passive'], $conf['durable'], $conf['exclusive'],
                $conf['auto_delete'], $conf['nowait']);
        }
        public function emit($data = null)
        {
            $connection = $this->createConnection();
            $channel = $connection->channel();
            $this->declareQueue($channel);
            $msg = new AMQPMessage(json_encode($data),
                ['delivery_mode' => 2] # make message persistent
            );
            $channel->basic_publish($msg, '', $this->name);
            $channel->close();
            $connection->close();
        }
        public function receive(callable $callback)
        {
            $connection = $this->createConnection();
            $channel = $connection->channel();
            $this->declareQueue($channel);
            $consumer = $this->conf['consumer'];
            if ($consumer['no_ack'] === false) {
                $channel->basic_qos(null, 1, null);
            }
            $channel->basic_consume($this->name, '', $consumer['no_local'], $consumer['no_ack'], $consumer['exclusive'],
                $consumer['nowait'],
                function ($msg) use ($callback) {
                    call_user_func($callback, json_decode($msg->body, true), $this->name);
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                    $now = new \DateTime();
                    echo '['.$now->format('d/m/Y H:i:s')."] {$this->name}::".$msg->body, "\n";
                });
            $now = new \DateTime();
            echo '['.$now->format('d/m/Y H:i:s')."] Queue '{$this->name}' initialized \n";
            while (count($channel->callbacks)) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    }
    
    var amqp = require('amqplib/callback_api');
    
    var Queue = function (name, conf) {
        return {
            emit: function (data, close=true) {
                amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                    conn.createChannel(function (err, ch) {
                        var msg = JSON.stringify(data);
    
                        ch.assertQueue(name, conf.queue);
                        ch.sendToQueue(name, new Buffer(msg));
                    });
                    if (close) {
                        setTimeout(function () {
                            conn.close();
                            process.exit(0)
                        }, 500);
                    }
                });
            },
            receive: function (callback) {
                amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                    conn.createChannel(function (err, ch) {
                        ch.assertQueue(name, conf.queue);
                        console.log(new Date().toString() + ' Queue ' + name + ' initialized');
                        ch.consume(name, function (msg) {
                            console.log(new Date().toString() + " Received %s", msg.content.toString());
                            if (callback) {
                                callback(JSON.parse(msg.content.toString()), msg.fields.routingKey)
                            }
                            if (conf.consumer.noAck === false) {
                                ch.ack(msg);
                            }
                        }, conf.consumer);
                    });
                });
            }
        };
    };
    
    module.exports = Queue;
    

    We also want to emit messages using an exchange

    Emiter:

    use G\Rabbit\Builder;
    $server = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'pass' => 'guest',
    ];
    $exchange = Builder::exchange('process.log', $server);
    $exchange->emit("xxx.log", "aaaa");
    $exchange->emit("xxx.log", ["11", "aaaa"]);
    $exchange->emit("yyy.log", "aaaa");
    
    var builder = require('../../src/Builder');
    
    var server = {
        host: 'localhost',
        port: 5672,
        user: 'guest',
        pass: 'guest'
    };
    
    var exchange = builder.exchange('process.log', server);
    
    exchange.emit("xxx.log", "aaaa");
    exchange.emit("xxx.log", ["11", "aaaa"]);
    exchange.emit("yyy.log", "aaaa");
    

    and receiver:

    use G\Rabbit\Builder;
    $server = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'pass' => 'guest',
    ];
    Builder::exchange('process.log', $server)->receive("yyy.log", function ($routingKey, $data) {
        error_log($routingKey." - ".json_encode($data));
    });
    
    var server = {
        host: 'localhost',
        port: 5672,
        user: 'guest',
        pass: 'guest'
    };
    
    var exchange = builder.exchange('process.log', server);
    
    exchange.receive("yyy.log", function (routingKey, data) {
        console.log(routingKey, data);
    });
    

    And that’s the PHP implementation:

    namespace G\Rabbit;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    class Exchange
    {
        private $name;
        private $conf;
        public function __construct($name, $conf)
        {
            $this->name = $name;
            $this->conf = $conf;
        }
        private function createConnection()
        {
            $server = $this->conf['server'];
            return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
        }
        public function emit($routingKey, $data = null)
        {
            $connection = $this->createConnection();
            $channel = $connection->channel();
            $conf = $this->conf['exchange'];
            $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
                $conf['internal'], $conf['nowait']);
            $msg = new AMQPMessage(json_encode($data), [
                'delivery_mode' => 2, # make message persistent
            ]);
            $channel->basic_publish($msg, $this->name, $routingKey);
            $channel->close();
            $connection->close();
        }
        public function receive($bindingKey, callable $callback)
        {
            $connection = $this->createConnection();
            $channel = $connection->channel();
            $conf = $this->conf['exchange'];
            $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
                $conf['internal'], $conf['nowait']);
            $queueConf = $this->conf['queue'];
            list($queue_name, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
                $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
            $channel->queue_bind($queue_name, $this->name, $bindingKey);
            $consumerConf = $this->conf['consumer'];
            $channel->basic_consume($queue_name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
                $consumerConf['exclusive'], $consumerConf['nowait'],
                function ($msg) use ($callback) {
                    call_user_func($callback, $msg->delivery_info['routing_key'], json_decode($msg->body, true));
                    $now = new \DateTime();
                    echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.':'.$msg->delivery_info['routing_key'].'::', $msg->body, "\n";
                    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                });
            $now = new \DateTime();
            echo '['.$now->format('d/m/Y H:i:s')."] Exchange '{$this->name}' initialized \n";
            while (count($channel->callbacks)) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    }
    

    And node:

    var amqp = require('amqplib/callback_api');
    
    var Exchange = function (name, conf) {
        return {
            emit: function (routingKey, data, close = true) {
                amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                    conn.createChannel(function (err, ch) {
                        var msg = JSON.stringify(data);
                        ch.assertExchange(name, 'topic', conf.exchange);
                        ch.publish(name, routingKey, new Buffer(msg));
                    });
                    if (close) {
                        setTimeout(function () {
                            conn.close();
                            process.exit(0)
                        }, 500);
                    }
                });
            },
            receive: function (bindingKey, callback) {
                amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                    conn.createChannel(function (err, ch) {
                        ch.assertExchange(name, 'topic', conf.exchange);
                        console.log(new Date().toString() + ' Exchange ' + name + ' initialized');
                        ch.assertQueue('', conf.queue, function (err, q) {
    
                            ch.bindQueue(q.queue, name, bindingKey);
    
                            ch.consume(q.queue, function (msg) {
                                console.log(new Date().toString(), name, ":", msg.fields.routingKey, "::", msg.content.toString());
                                if (callback) {
                                    callback(msg.fields.routingKey, JSON.parse(msg.content.toString()))
                                }
                                if (conf.consumer.noAck === false) {
                                    ch.ack(msg);
                                }
                            }, conf.consumer);
                        });
                    });
                });
            }
        };
    };
    
    module.exports = Exchange;
    

    Finally we want to use RPC commands. In fact RPC implementations is similar than Queue but in this case client will receive an answer.

    Client side

    use G\Rabbit\Builder;
    $server = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'pass' => 'guest',
    ];
    echo Builder::rpc('rpc.hello', $server)->call("Gonzalo", "Ayuso");
    
    var builder = require('../../src/Builder');
    
    var server = {
        host: 'localhost',
        port: 5672,
        user: 'guest',
        pass: 'guest'
    };
    
    var rpc = builder.rpc('rpc.hello', server);
    rpc.call("Gonzalo", "Ayuso", function (data) {
        console.log(data);
    });
    

    Server side:

    use G\Rabbit\Builder;
    $server = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'pass' => 'guest',
    ];
    Builder::rpc('rpc.hello', $server)->server(function ($name, $surname) use ($server) {
        return "Hello {$name} {$surname}";
    });
    
    var builder = require('../../src/Builder');
    
    var server = {
        host: 'localhost',
        port: 5672,
        user: 'guest',
        pass: 'guest'
    };
    
    var rpc = builder.rpc('rpc.hello', server);
    
    rpc.server(function (name, surname) {
        return "Hello " + name + " " + surname;
    });
    

    And Implementations:

    namespace G\Rabbit;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    class RPC
    {
        private $name;
        private $conf;
        public function __construct($name, $conf)
        {
            $this->name = $name;
            $this->conf = $conf;
        }
        private function createConnection()
        {
            $server = $this->conf['server'];
            return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
        }
        public function call()
        {
            $params = (array)func_get_args();
            $response = null;
            $corr_id = uniqid();
            $connection = $this->createConnection();
            $channel = $connection->channel();
            $queueConf = $this->conf['queue'];
            list($callback_queue, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
                $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
            $consumerConf = $this->conf['consumer'];
            $channel->basic_consume($callback_queue, '', $consumerConf['no_local'], $consumerConf['no_ack'],
                $consumerConf['exclusive'], $consumerConf['nowait'], function ($rep) use (&$corr_id, &$response) {
                    if ($rep->get('correlation_id') == $corr_id) {
                        $response = $rep->body;
                    }
                });
            $msg = new AMQPMessage(json_encode($params), [
                'correlation_id' => $corr_id,
                'reply_to'       => $callback_queue,
            ]);
            $channel->basic_publish($msg, '', $this->name);
            while (!$response) {
                $channel->wait();
            }
            return json_decode($response, true);
        }
        public function server(callable $callback)
        {
            $connection = $this->createConnection();
            $channel = $connection->channel();
            $queueConf = $this->conf['queue'];
            $channel->queue_declare($this->name, $queueConf['passive'], $queueConf['durable'], $queueConf['exclusive'],
                $queueConf['auto_delete'], $queueConf['nowait']);
            $now = new \DateTime();
            echo '['.$now->format('d/m/Y H:i:s')."] RPC server '{$this->name}' initialized \n";
            $channel->basic_qos(null, 1, null);
            $consumerConf = $this->conf['consumer'];
            $channel->basic_consume($this->name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
                $consumerConf['exclusive'],
                $consumerConf['nowait'], function ($req) use ($callback) {
                    $response = json_encode(call_user_func_array($callback, array_values(json_decode($req->body, true))));
                    $msg = new AMQPMessage($response, [
                        'correlation_id' => $req->get('correlation_id'),
                        'delivery_mode'  => 2, # make message persistent
                    ]);
                    $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
                    $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
                    $now = new \DateTime();
                    echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.":: req => '{$req->body}' response=> '{$response}'\n";
                });
            while (count($channel->callbacks)) {
                $channel->wait();
            }
            $channel->close();
            $connection->close();
        }
    }
    
    var amqp = require('amqplib/callback_api');
    
    var RPC = function (name, conf) {
        var generateUuid = function () {
            return Math.random().toString() +
                Math.random().toString() +
                Math.random().toString();
        };
    
        return {
            call: function () {
                var params = [];
                for (i = 0; i < arguments.length - 1; i++) {
                    params.push(arguments[i]);
                }
                var callback = arguments[arguments.length - 1];
    
                amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                    conn.createChannel(function (err, ch) {
                        ch.assertQueue('', conf.queue, function (err, q) {
                            var corr = generateUuid();
    
                            ch.consume(q.queue, function (msg) {
                                if (msg.properties.correlationId == corr) {
                                    callback(JSON.parse(msg.content.toString()));
                                    setTimeout(function () {
                                        conn.close();
                                        process.exit(0)
                                    }, 500);
                                }
                            }, conf.consumer);
                            ch.sendToQueue(name,
                                new Buffer(JSON.stringify(params)),
                                {correlationId: corr, replyTo: q.queue});
                        });
                    });
                });
            },
            server: function (callback) {
                amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                    conn.createChannel(function (err, ch) {
                        ch.assertQueue(name, conf.queue);
                        console.log(new Date().toString() + ' RPC ' + name + ' initialized');
                        ch.prefetch(1);
                        ch.consume(name, function reply(msg) {
                            console.log(new Date().toString(), msg.fields.routingKey, " :: ", msg.content.toString());
                            var response = JSON.stringify(callback.apply(this, JSON.parse(msg.content.toString())));
                            ch.sendToQueue(msg.properties.replyTo,
                                new Buffer(response),
                                {correlationId: msg.properties.correlationId});
    
                            ch.ack(msg);
    
                        }, conf.consumer);
                    });
                });
            }
        };
    };
    
    module.exports = RPC;
    

    You can see whole projects at github: RabbitMQ-php, RabbitMQ-node

    Sending logs to a remote server using RabbitMQ

    Time ago I wrote an article to show how to send Silex logs to a remote server. Today I want to use a messaging queue to do it. Normally, when I need queues, I use Gearman but today I want to play with RabbitMQ.

    When we work with web applications it’s important to have, in some way or another, one way to decouple operations from the main request. Messaging queues are great tools to perform those operations. They even allow us to create our workers with a different languages than the main request. This days, for example, I’m working with modbus devices. The whole modbus logic is written in Python and I want to use a Frontend with PHP. I can rewrite the modbus logic with PHP (there’re PHP libraries to connect with modbus devices), but I’m not so crazy. Queues are our friends.

    The idea in this post is the same than the previous post. We’ll use event dispatcher to emit events and we’ll send those events to a RabitMQ queue. We’ll use a Service Provider called.

    <?php
    include __DIR__ . '/../vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use RabbitLogger\LoggerServiceProvider;
    use Silex\Application;
    use Symfony\Component\HttpKernel\Event;
    use Symfony\Component\HttpKernel\KernelEvents;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel    = $connection->channel();
    
    $app = new Application(['debug' => true]);
    $app->register(new LoggerServiceProvider($connection, $channel));
    
    $app->on(KernelEvents::TERMINATE, function (Event\PostResponseEvent $event) use ($app) {
        $app['rabbit.logger']->info('TERMINATE');
    });
    
    $app->on(KernelEvents::CONTROLLER, function (Event\FilterControllerEvent $event) use ($app) {
        $app['rabbit.logger']->info('CONTROLLER');
    });
    
    $app->on(KernelEvents::EXCEPTION, function (Event\GetResponseForExceptionEvent $event) use ($app) {
        $app['rabbit.logger']->info('EXCEPTION');
    });
    
    $app->on(KernelEvents::FINISH_REQUEST, function (Event\FinishRequestEvent $event) use ($app) {
        $app['rabbit.logger']->info('FINISH_REQUEST');
    });
    
    $app->on(KernelEvents::RESPONSE, function (Event\FilterResponseEvent $event) use ($app) {
        $app['rabbit.logger']->info('RESPONSE');
    });
    
    $app->on(KernelEvents::REQUEST, function (Event\GetResponseEvent $event) use ($app) {
        $app['rabbit.logger']->info('REQUEST');
    });
    
    $app->on(KernelEvents::VIEW, function (Event\GetResponseForControllerResultEvent $event) use ($app) {
        $app['rabbit.logger']->info('VIEW');
    });
    
    $app->get('/', function (Application $app) {
        $app['rabbit.logger']->info('inside route');
        return "HELLO";
    });
    
    $app->run();
    

    Here we can see the service provider:

    <?php
    namespace RabbitLogger;
    
    use PhpAmqpLib\Channel\AMQPChannel;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use Silex\Application;
    use Silex\ServiceProviderInterface;
    
    class LoggerServiceProvider implements ServiceProviderInterface
    {
        private $connection;
        private $channel;
    
        public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel)
        {
            $this->connection = $connection;
            $this->channel    = $channel;
        }
    
        public function register(Application $app)
        {
            $app['rabbit.logger'] = $app->share(
                function () use ($app) {
                    $channelName = isset($app['logger.channel.name']) ? $app['logger.channel.name'] : 'logger.channel';
                    return new Logger($this->connection, $this->channel, $channelName);
                }
            );
        }
    
        public function boot(Application $app)
        {
        }
    }
    

    And here the logger:

    <?php
    namespace RabbitLogger;
    
    use PhpAmqpLib\Channel\AMQPChannel;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    use Psr\Log\LoggerInterface;
    use Psr\Log\LogLevel;
    use Silex\Application;
    
    class Logger implements LoggerInterface
    {
        private $connection;
        private $channel;
        private $queueName;
    
        public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel, $queueName = 'logger')
        {
            $this->connection = $connection;
            $this->channel    = $channel;
            $this->queueName  = $queueName;
            $this->channel->queue_declare($queueName, false, false, false, false);
        }
    
        function __destruct()
        {
            $this->channel->close();
            $this->connection->close();
        }
    
        public function emergency($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::EMERGENCY);
        }
    
        public function alert($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::ALERT);
        }
    
        public function critical($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::CRITICAL);
        }
    
        public function error($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::ERROR);
        }
    
        public function warning($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::WARNING);
        }
    
        public function notice($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::NOTICE);
        }
    
        public function info($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::INFO);
        }
    
        public function debug($message, array $context = [])
        {
            $this->sendLog($message, $context, LogLevel::DEBUG);
        }
        public function log($level, $message, array $context = [])
        {
            $this->sendLog($message, $context, $level);
        }
    
        private function sendLog($message, array $context = [], $level = LogLevel::INFO)
        {
            $msg = new AMQPMessage(json_encode([$message, $context, $level]), ['delivery_mode' => 2]);
            $this->channel->basic_publish($msg, '', $this->queueName);
        }
    }
    

    And finally the RabbitMQ Worker to process our logs

    require_once __DIR__ . '/../vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('logger.channel', false, false, false, false);
    echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
    $callback = function($msg){
        echo " [x] Received ", $msg->body, "\n";
        //$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    };
    //$channel->basic_qos(null, 1, null);
    $channel->basic_consume('logger.channel', '', false, false, false, false, $callback);
    while(count($channel->callbacks)) {
        $channel->wait();
    }
    $channel->close();
    $connection->close();
    

    To run the example we must:

    Start RabbitMQ server

    rabbitmq-server
    

    start Silex server

    php -S 0.0.0.0:8080 -t www
    

    start worker

    php worker/worker.php
    

    You can see whole project in my github account