Playing with Go and file system watchers

Let me explain the idea. I want to emit one RabbitMQ message each time new file is generated in a folder. The problem is that I cannot modify the code of the software that generate the files. The idea is generate a filesystem watcher that emits the message. Let’s start.

I’m not a Go expert, but Go it’s cool for those kind of task. You can create a executable file and just copy in the desired server and that’s all. Just works.

Also there’s a fsnotify package to listen filesystem events. I’ve used fsnotify in the past with PHP and Python.

func main() {
	config := GetConf()
	watcher, _ = fsnotify.NewWatcher()
	defer watcher.Close()

	if err := filepath.Walk(config.Path, watchDir); err != nil {
		fmt.Println("ERROR", err)
	}
	done := make(chan bool)

	go func() {
		for {
			select {
			case event := <-watcher.Events:
				processEvent(event, *config)
			case err := <-watcher.Errors:
				fmt.Println("ERROR", err)
			}
		}
	}()
	<-done
}

func processEvent(event fsnotify.Event, conf Conf) {
	filePath := event.Name
	fileName := filepath.Base(filePath)
	switch op := event.Op.String(); op {
	case "CREATE":
		if strings.HasSuffix(filePath, conf.Extension) {
			bytes, _ := copy(event.Name, conf.CopyTo+fileName)
			if bytes > 0 {
				emitEvent(fileName, conf)
			}
		}
	default:
		fmt.Println("Unhandled event: ", op, " on file: ", fileName)
	}
}

And basically that’s all. We only need to take care with filesystem events. Fsnotify is very low level and when we use it we realized how programs write files. Some of them create a temp file, then it writes it and finally it renames the file. Sometimes the program creates an empty file and finally writes the file. Basically it’s the same but the events are different. In my example I only listen to "CREATE" just enough for my test.

Emit event to RabbitMQ it’s also simple. Well documented within documentation.

func emitEvent(fileName string, conf Conf) {
	fmt.Println("Event on file", fileName)
	message := map[string]interface{}{
		"fileName": fileName,
	}
	bytesRepresentation, err := json.Marshal(message)
	if err != nil {
		log.Println(err)
	} else {
		emmitToRabbit(conf, bytesRepresentation)
	}
}

In this example I also want to use a YAML file to store configuration. Just for learn how to read YAML files in go.

type Conf struct {
	Path        string `yaml:"path"`
	CopyTo      string `yaml:"copy_to"`
	Extension   string `yaml:"extension"`
	BrokerTopic string `yaml:"brokerTopic"`
	Broker      string `yaml:"broker"`
}

func readConf(filename string) (*Conf, error) {
	buf, err := ioutil.ReadFile(filename)
	if err != nil {
		return nil, err
	}

	c := &Conf{}
	err = yaml.Unmarshal(buf, c)
	if err != nil {
		return nil, fmt.Errorf("in file %q: %v", filename, err)
	}

	return c, nil
}

And that’s all. My binary executable is ready.

Full code in my github

Playing with RabbitMQ, Python and Docker. From Exchanges to Queues

Lately in all my projects message queues appear in one way or another. Normally I work with AWS and I use SQS and SNS, but I also use quite often, RabbitMQ and MQTT. In AWS there’s something that I use a lot to isolate services. The process that emits messages emits message to SNS and I bind SNS to SQS. With this technique I can attach n SQS to the the same SNS. I’ve used it here. In AWS is pretty straightforward to do that. Today We’re going to do the same with RabbitMQ. In fact it’s very easy to do it in RabbitMQ. We only need to follow the tutorial in official RabbitMQ documentation.

The script listen to a exchange and resend the message to a queue topic. Basically the same that we can see within RabbitMQ documentation.

import settings
from lib.logger import logger
from lib.rabbit import get_channel

channel = get_channel()
channel.exchange_declare(
    exchange=settings.EXCHANGE,
    exchange_type='fanout')

result = channel.queue_declare(
    queue='',
    exclusive=True)

queue_name = result.method.queue

channel.queue_bind(
    exchange=settings.EXCHANGE,
    queue=queue_name)

logger.info(f' [*] Waiting for {settings.EXCHANGE}. To exit press CTRL+C')

channel.queue_declare(
    durable=settings.QUEUE_DURABLE,
    auto_delete=settings.QUEUE_AUTO_DELETE,
    queue=settings.QUEUE_TOPIC
)


def callback(ch, method, properties, body):
    channel.basic_publish(
        exchange='',
        routing_key=settings.QUEUE_TOPIC,
        body=body)
    logger.info(f'Message sent to topic {settings.QUEUE_TOPIC}. Message: {body}')


channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True)

channel.start_consuming()

We send one message to the exchange using rabbitmqadmin and see the message in the queue:

rabbitmqadmin -u username -p password publish exchange=exchange routing_key= payload="hello, world"

My idea with this project is to deploy it into a docker swarm cluster and add/remove listener only adding new services to the stack:

...
  exchange2topic1:
    image: ${ECR}/exchange2queue:${VERSION}
      build:
        context: .
        dockerfile: Dockerfile
      deploy:
        restart_policy:
          condition: on-failure
      depends_on:
        - rabbit
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic1
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0
  ...

With this approach it’s very simple form me add and remove new listeners without touching the emitter

Also, as plus, I like to add a simple http api to allow me to send messages to the exchange with a post request, instead of using a RabbitMQ client. That’s because sometimes I work with legacy systems where using a AMQP client isn’t simple. That’s a simple Flask API

from flask import Flask, request
from flask import jsonify

import settings
from lib.auth import authorize_bearer
from lib.logger import logger
from lib.rabbit import get_channel
import json

app = Flask(__name__)


@app.route('/health')
def health():
    return jsonify({"status": "ok"})


@app.route('/publish/<path:exchange>', methods=['POST'])
@authorize_bearer(bearer=settings.API_TOKEN)
def publish(exchange):
    channel = get_channel()
    try:
        message = request.get_json()
        channel.basic_publish(
            exchange=exchange,
            routing_key='',
            body=json.dumps(message)
        )
        logger.info(f"message sent to exchange {exchange}")
        return jsonify({"status": "OK", "exchange": exchange})
    except:
        return jsonify({"status": "NOK", "exchange": exchange})

Now we can emit messages to the exchange using curl, postman or any other http client.

POST http://localhost:5000/publish/exchange
Content-Type: application/json
Authorization: Bearer super_secret_key

{
  "hello": "Gonzalo"
}

Also, in the docker stack I want to use a reverse proxy to server my Flask application (served with gunicorn) and the RabbitMQ management console. I’m using nginx to do that:

upstream api {
    server api:5000;
}

server {
    listen 8000 default_server;
    listen [::]:8000;

    client_max_body_size 20M;

    location / {
        try_files $uri @proxy_to_api;
    }

    location ~* /rabbitmq/api/(.*?)/(.*) {
        proxy_pass http://rabbit:15672/api/$1/%2F/$2?$query_string;
        proxy_buffering                    off;
        proxy_set_header Host              $http_host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location ~* /rabbitmq/(.*) {
        rewrite ^/rabbitmq/(.*)$ /$1 break;
        proxy_pass http://rabbit:15672;
        proxy_buffering                    off;
        proxy_set_header Host              $http_host;
        proxy_set_header X-Real-IP         $remote_addr;
        proxy_set_header X-Forwarded-For   $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    location @proxy_to_api {
        proxy_set_header X-Forwarded-Proto https;
        proxy_set_header X-Url-Scheme $scheme;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_redirect off;

        proxy_pass http://api;
    }
}

And that’s all. Here the docker-compose.yml

version: '3.6'

x-base: &base
  image: ${ECR}/exchange2queue:${VERSION}
  build:
    context: .
    dockerfile: Dockerfile
  deploy:
    restart_policy:
      condition: on-failure
  depends_on:
    - rabbit

services:
  rabbit:
    image: rabbitmq:3-management
    deploy:
      restart_policy:
        condition: on-failure
    ports:
      - 5672:5672
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}

  nginx:
    image: ${ECR}/exchange2queue_nginx:${VERSION}
    deploy:
      restart_policy:
        condition: on-failure
    build:
      context: .docker/nginx
      dockerfile: Dockerfile
    ports:
      - 8080:8000
    depends_on:
      - rabbit
      - api

  api:
    <<: *base
    container_name: front
    command: /bin/sh wait-for-it.sh rabbit:5672 -- gunicorn -w 4 api:app -b 0.0.0.0:5000
    deploy:
      restart_policy:
        condition: on-failure
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      API_TOKEN: ${API_TOKEN}

  exchange2topic1:
    <<: *base
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic1
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0

  exchange2topic2:
    <<: *base
    command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py
    environment:
      RABBIT_HOST: rabbit
      RABBIT_USER: ${RABBITMQ_USER}
      RABBIT_PASS: ${RABBITMQ_PASS}
      EXCHANGE: exchange
      QUEUE_TOPIC: topic2
      QUEUE_DURABLE: 1
      QUEUE_AUTO_DELETE: 0

Full code in my github

Playing with threads and Python. Part 2

Today I want to keep on playing with python and threads (part1 here). The idea is create one simple script that prints one asterisk in the console each second. Simple, isn’t it?

[sourcecode language=”python”]
from time import sleep
while True:
print("*")
sleep(1)
[/sourcecode]

I want to keep this script running but I want to send one message externally, for example using RabbitMQ, and do something within the running script. In this demo, for example, stop the script.

In javascript we can do it with a single tread process using the setInterval function but, since the rabbit listener with pika is a blocking action, we need to use threads in Python (please tell me if I’m wrong). The idea is to create a circuit breaker condition in the main loop to check if I need to stop or not the main thread.

First I’ve created my Rabbit listener in a thread:

[sourcecode language=”python”]
from queue import Queue, Empty
import threading
import pika
import os

class Listener(threading.Thread):
def __init__(self, queue=Queue()):
super(Listener, self).__init__()
self.queue = queue
self.daemon = True

def run(self):
channel = self._get_channel()
channel.queue_declare(queue=’stop’)

channel.basic_consume(
queue=’stop’,
on_message_callback=lambda ch, method, properties, body: self.queue.put(item=True),
auto_ack=True)

channel.start_consuming()

def stop(self):
try:
return True if self.queue.get(timeout=0.05) is True else False
except Empty:
return False

def _get_channel(self):
credentials = pika.PlainCredentials(
username=os.getenv(‘RABBITMQ_USER’),
password=os.getenv(‘RABBITMQ_PASS’))

parameters = pika.ConnectionParameters(
host=os.getenv(‘RABBITMQ_HOST’),
credentials=credentials)

connection = pika.BlockingConnection(parameters=parameters)

return connection.channel()
[/sourcecode]

Now in the main process I start the Listener and I enter in one endless loop to print my asterisk each second but at the end of each loop I check if I need to stop the process or not

[sourcecode language=”python”]
from Listener import Listener
from dotenv import load_dotenv
import logging
from time import sleep
import os

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

l = Listener()
l.start()

def main():
while True:
logging.info("*")
sleep(1)
if l.stop():
break

if __name__ == ‘__main__’:
main()
[/sourcecode]

As we can see int the stop function we’re using the queue.Queue package to communicate with our listener loop.

And that’s all. In the example I also provide a minimal RabbitMQ server in a docker container.

[sourcecode language=”xml”]
version: ‘3.4’

services:
rabbit:
image: rabbitmq:3-management
restart: always
environment:
RABBITMQ_ERLANG_COOKIE:
RABBITMQ_DEFAULT_VHOST: /
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
ports:
– "15672:15672"
– "5672:5672"

[/sourcecode]

Source code available in my github

Playing with microservices, Docker, Python an Nameko

In the last projects that I’ve been involved with I’ve playing, in one way or another, with microservices, queues and things like that. I’m always facing the same tasks: Building RPCs, Workers, API gateways, … Because of that I’ve searching one framework to help me with those kind of stuff. Finally I discover Nameko. Basically Nameko is the Python tool that I’ve been looking for. In this post I will create a simple proof of concept to learn how to integrate Nameko within my projects. Let start.

The POC is a simple API gateway that gives me the localtime in iso format. I can create a simple Python script to do it

[sourcecode language=”python”]
import datetime
import time

print(datetime.datetime.fromtimestamp(time()).isoformat())
[/sourcecode]

We also can create a simple Flask API server to consume this information. The idea is create a rpc worker to generate this information and also generate another worker to send the localtime, but taken from a PostgreSQL database (yes I know it not very useful but it’s just an excuse to use a PG database in the microservice)

We’re going to create two rpc workers. One giving the local time:

[sourcecode language=”python”]
from nameko.rpc import rpc
from time import time
import datetime

class TimeService:
name = "local_time_service"

@rpc
def local(self):
return datetime.datetime.fromtimestamp(time()).isoformat()

[/sourcecode]

And another one with the date from PostgreSQL:

[sourcecode language=”python”]
from nameko.rpc import rpc
from dotenv import load_dotenv
import os
from ext.pg import PgService

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

class TimeService:
name = "db_time_service"
conn = PgService(os.getenv(‘DSN’))

@rpc
def db(self):
with self.conn:
with self.conn.cursor() as cur:
cur.execute("select localtimestamp")
timestamp = cur.fetchone()
return timestamp[0]
[/sourcecode]

I’ve created a service called PgService only to learn how to create dependency providers in nameko

[sourcecode language=”python”]
from nameko.extensions import DependencyProvider
import psycopg2

class PgService(DependencyProvider):

def __init__(self, dsn):
self.dsn = dsn

def get_dependency(self, worker_ctx):
return psycopg2.connect(self.dsn)
[/sourcecode]

Now we only need to setup the api gateway. With Nameko we can create http entrypoint also (in the same way than we create rpc) but I want to use it with Flask

[sourcecode language=”python”]
from flask import Flask
from nameko.standalone.rpc import ServiceRpcProxy
from dotenv import load_dotenv
import os

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

app = Flask(__name__)

def rpc_proxy(service):
config = {‘AMQP_URI’: os.getenv(‘AMQP_URI’)}
return ServiceRpcProxy(service, config)

@app.route(‘/’)
def hello():
return "Hello"

@app.route(‘/local’)
def local_time():
with rpc_proxy(‘local_time_service’) as rpc:
time = rpc.local()

return time

@app.route(‘/db’)
def db_time():
with rpc_proxy(‘db_time_service’) as rpc:
time = rpc.db()

return time

if __name__ == ‘__main__’:
app.run()
[/sourcecode]

As well as I wanna run my POC with docker, here the docker-compose file to set up the project

[sourcecode language=”xml”]
version: ‘3.4’

services:
api:
image: nameko/api
container_name: nameko.api
hostname: api
ports:
– "8080:8080"
restart: always
links:
– rabbit
– db.worker
– local.worker
environment:
– ENV=1
– FLASK_APP=app.py
– FLASK_DEBUG=1
build:
context: ./api
dockerfile: .docker/Dockerfile-api
#volumes:
#- ./api:/usr/src/app:ro
command: flask run –host=0.0.0.0 –port 8080
db.worker:
container_name: nameko.db.worker
image: nameko/db.worker
restart: always
build:
context: ./workers/db.worker
dockerfile: .docker/Dockerfile-worker
command: /bin/bash run.sh
local.worker:
container_name: nameko.local.worker
image: nameko/local.worker
restart: always
build:
context: ./workers/local.worker
dockerfile: .docker/Dockerfile-worker
command: /bin/bash run.sh
rabbit:
container_name: nameko.rabbit
image: rabbitmq:3-management
restart: always
ports:
– "15672:15672"
– "5672:5672"
environment:
RABBITMQ_ERLANG_COOKIE:
RABBITMQ_DEFAULT_VHOST: /
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
pg:
container_name: nameko.pg
image: nameko/pg
restart: always
build:
context: ./pg
dockerfile: .docker/Dockerfile-pg
#ports:
#- "5432:5432"
environment:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_DB: ${POSTGRES_DB}
PGDATA: /var/lib/postgresql/data/pgdata
[/sourcecode]

And that’s all. Two nameko rpc services working together behind a api gateway

Code available in my github

Working with SAPUI5 locally (part 3). Adding more services in Docker

In the previous project we moved one project to docker. The idea was to move exactly the same functionality (even without touching anything within the source code). Now we’re going to add more services. Yes, I know, it looks like overenginering (it’s exactly overenginering, indeed), but I want to build something with different services working together. Let start.

We’re going to change a little bit our original project. Now our frontend will only have one button. This button will increment the number of clicks but we’re going to persists this information in a PostgreSQL database. Also, instead of incrementing the counter in the backend, our backend will emit one event to a RabbitMQ message broker. We’ll have one worker service listening to this event and this worker will persist the information. The communication between the worker and the frontend (to show the incremented value), will be via websockets.

With those premises we are going to need:

  • Frontend: UI5 application
  • Backend: PHP/lumen application
  • Worker: nodejs application which is listening to a RabbitMQ event and serving the websocket server (using socket.io)
  • Nginx server
  • PosgreSQL database.
  • RabbitMQ message broker.

As the previous examples, our PHP backend will be server via Nginx and PHP-FPM.

Here we can see to docker-compose file to set up all the services

[sourcecode language=”xml”]
version: ‘3.4’

services:
nginx:
image: gonzalo123.nginx
restart: always
ports:
– "8080:80"
build:
context: ./src
dockerfile: .docker/Dockerfile-nginx
volumes:
– ./src/backend:/code/src
– ./src/.docker/web/site.conf:/etc/nginx/conf.d/default.conf
networks:
– app-network
api:
image: gonzalo123.api
restart: always
build:
context: ./src
dockerfile: .docker/Dockerfile-lumen-dev
environment:
XDEBUG_CONFIG: remote_host=${MY_IP}
volumes:
– ./src/backend:/code/src
networks:
– app-network
ui5:
image: gonzalo123.ui5
ports:
– "8000:8000"
restart: always
volumes:
– ./src/frontend:/code/src
build:
context: ./src
dockerfile: .docker/Dockerfile-ui5
networks:
– app-network
io:
image: gonzalo123.io
ports:
– "9999:9999"
restart: always
volumes:
– ./src/io:/code/src
build:
context: ./src
dockerfile: .docker/Dockerfile-io
networks:
– app-network
pg:
image: gonzalo123.pg
restart: always
ports:
– "5432:5432"
build:
context: ./src
dockerfile: .docker/Dockerfile-pg
environment:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_DB: ${POSTGRES_DB}
PGDATA: /var/lib/postgresql/data/pgdata
networks:
– app-network
rabbit:
image: rabbitmq:3-management
container_name: gonzalo123.rabbit
restart: always
ports:
– "15672:15672"
– "5672:5672"
environment:
RABBITMQ_ERLANG_COOKIE:
RABBITMQ_DEFAULT_VHOST: /
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
networks:
– app-network
networks:
app-network:
driver: bridge
[/sourcecode]

We’re going to use the same docker files than in the previous post but we also need new ones for worker, database server and message queue:

Worker:
[sourcecode language=”xml”]
FROM node:alpine

EXPOSE 8000

WORKDIR /code/src
COPY ./io .
RUN npm install
ENTRYPOINT ["npm", "run", "serve"]
[/sourcecode]

The worker script is simple script that serves the socket.io server and emits a websocket within every message to the RabbitMQ queue.

[sourcecode language=”js”]
var amqp = require(‘amqp’),
httpServer = require(‘http’).createServer(),
io = require(‘socket.io’)(httpServer, {
origins: ‘*:*’,
}),
pg = require(‘pg’)
;

require(‘dotenv’).config();
var pgClient = new pg.Client(process.env.DB_DSN);

rabbitMq = amqp.createConnection({
host: process.env.RABBIT_HOST,
port: process.env.RABBIT_PORT,
login: process.env.RABBIT_USER,
password: process.env.RABBIT_PASS,
});

var sql = ‘SELECT clickCount FROM docker.clicks’;

// Please don’t do this. Use lazy connections
// I’m ‘lazy’ to do it in this POC 🙂
pgClient.connect(function(err) {
io.on(‘connection’, function() {
pgClient.query(sql, function(err, result) {
var count = result.rows[0][‘clickcount’];
io.emit(‘click’, {count: count});
});

});

rabbitMq.on(‘ready’, function() {
var queue = rabbitMq.queue(‘ui5’);
queue.bind(‘#’);

queue.subscribe(function(message) {
pgClient.query(sql, function(err, result) {
var count = parseInt(result.rows[0][‘clickcount’]);
count = count + parseInt(message.data.toString(‘utf8’));
pgClient.query(‘UPDATE docker.clicks SET clickCount = $1’, [count],
function(err) {
io.emit(‘click’, {count: count});
});
});
});
});
});

httpServer.listen(process.env.IO_PORT);
[/sourcecode]

Database server:
[sourcecode language=”xml”]
FROM postgres:9.6-alpine
COPY pg/init.sql /docker-entrypoint-initdb.d/
[/sourcecode]

As we can see we’re going to generate the database estructure in the first build
[sourcecode language=”sql”]
CREATE SCHEMA docker;

CREATE TABLE docker.clicks (
clickCount numeric(8) NOT NULL
);

ALTER TABLE docker.clicks
OWNER TO username;

INSERT INTO docker.clicks(clickCount) values (0);
[/sourcecode]

With the RabbitMQ server we’re going to use the official docker image so we don’t need to create one Dockerfile

We also have changed a little bit our Nginx configuration. We want to use Nginx to serve backend and also socket.io server. That’s because we don’t want to expose different ports to internet.

[sourcecode language=”xml”]
server {
listen 80;
index index.php index.html;
server_name localhost;
error_log /var/log/nginx/error.log;
access_log /var/log/nginx/access.log;
root /code/src/www;

location /socket.io/ {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_pass "http://io:9999&quot;;
}

location / {
try_files $uri $uri/ /index.php?$query_string;
}

location ~ \.php$ {
try_files $uri =404;
fastcgi_split_path_info ^(.+\.php)(/.+)$;
fastcgi_pass api:9000;
fastcgi_index index.php;
include fastcgi_params;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
fastcgi_param PATH_INFO $fastcgi_path_info;
}
}
[/sourcecode]

To avoid CORS issues we can also use SCP destination (the localneo proxy in this example), to serve socket.io also. So we need to:

  • change our neo-app.json file
  • [sourcecode language=”js”]
    "routes": [

    {
    "path": "/socket.io",
    "target": {
    "type": "destination",
    "name": "SOCKETIO"
    },
    "description": "SOCKETIO"
    }
    ],
    [/sourcecode]

    And basically that’s all. Here also we can use a “production” docker-copose file without exposing all ports and mapping the filesystem to our local machine (useful when we’re developing)

    [sourcecode language=”xml”]
    version: ‘3.4’

    services:
    nginx:
    image: gonzalo123.nginx
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-nginx
    networks:
    – app-network
    api:
    image: gonzalo123.api
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-lumen
    networks:
    – app-network
    ui5:
    image: gonzalo123.ui5
    ports:
    – "80:8000"
    restart: always
    volumes:
    – ./src/frontend:/code/src
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-ui5
    networks:
    – app-network
    io:
    image: gonzalo123.io
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-io
    networks:
    – app-network
    pg:
    image: gonzalo123.pg
    restart: always
    build:
    context: ./src
    dockerfile: .docker/Dockerfile-pg
    environment:
    POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    POSTGRES_USER: ${POSTGRES_USER}
    POSTGRES_DB: ${POSTGRES_DB}
    PGDATA: /var/lib/postgresql/data/pgdata
    networks:
    – app-network
    rabbit:
    image: rabbitmq:3-management
    restart: always
    environment:
    RABBITMQ_ERLANG_COOKIE:
    RABBITMQ_DEFAULT_VHOST: /
    RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
    RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
    networks:
    – app-network
    networks:
    app-network:
    driver: bridge
    [/sourcecode]

    And that’s all. The full project is available in my github account

    Happy logins. Only the happy user will pass

    Login forms are bored. In this example we’re going to create an especial login form. Only for happy users. Happiness is something complicated, but at least, one smile is more easy to obtain, and all is better with one smile :). Our login form will only appear if the user smiles. Let’s start.

    I must admit that this project is just an excuse to play with different technologies that I wanted to play. Weeks ago I discovered one library called face_classification. With this library I can perform emotion classification from a picture. The idea is simple. We create RabbitMQ RPC server script that answers with the emotion of the face within a picture. Then we obtain on frame from the video stream of the webcam (with HTML5) and we send this frame using websocket to a socket.io server. This websocket server (node) ask to the RabbitMQ RPC the emotion and it sends back to the browser the emotion and a the original picture with a rectangle over the face.

    Frontend

    As well as we’re going to use socket.io for websockets we will use the same script to serve the frontend (the login and the HTML5 video capture)

    [sourcecode language=”html”]
    <!doctype html>
    <html>
    <head>
    <title>Happy login</title>
    <link rel="stylesheet" href="css/app.css">
    </head>
    <body>

    <div id="login-page" class="login-page">
    <div class="form">
    <h1 id="nonHappy" style="display: block;">Only the happy user will pass</h1>
    <form id="happyForm" class="login-form" style="display: none" onsubmit="return false;">
    <input id="user" type="text" placeholder="username"/>
    <input id="pass" type="password" placeholder="password"/>
    <button id="login">login</button>
    <p></p>
    <img id="smile" width="426" height="320" src=""/>
    </form>
    <div id="video">
    <video style="display:none;"></video>
    <canvas id="canvas" style="display:none"></canvas>
    <canvas id="canvas-face" width="426" height="320"></canvas>
    </div>
    </div>
    </div>

    <div id="private" style="display: none;">
    <h1>Private page</h1>
    </div>

    <script src="https://code.jquery.com/jquery-3.2.1.min.js&quot; integrity="sha256-hwg4gsxgFZhOsEEamdOYGBf13FyQuiTwlAQgxVSNgt4=" crossorigin="anonymous"></script>
    <script src="https://unpkg.com/sweetalert/dist/sweetalert.min.js"></script&gt;
    <script type="text/javascript" src="/socket.io/socket.io.js"></script>
    <script type="text/javascript" src="/js/app.js"></script>
    </body>
    </html>
    [/sourcecode]

    Here we’ll connect to the websocket and we’ll emit the webcam frame to the server. We´ll also be listening to one event called ‘response’ where server will notify us when one emotion has been detected.

    [sourcecode language=”js”]
    let socket = io.connect(location.origin),
    img = new Image(),
    canvasFace = document.getElementById(‘canvas-face’),
    context = canvasFace.getContext(‘2d’),
    canvas = document.getElementById(‘canvas’),
    width = 640,
    height = 480,
    delay = 1000,
    jpgQuality = 0.6,
    isHappy = false;

    socket.on(‘response’, function (r) {
    let data = JSON.parse(r);
    if (data.length > 0 && data[0].hasOwnProperty(’emotion’)) {
    if (isHappy === false && data[0][’emotion’] === ‘happy’) {
    isHappy = true;
    swal({
    title: "Good!",
    text: "All is better with one smile!",
    icon: "success",
    buttons: false,
    timer: 2000,
    });

    $(‘#nonHappy’).hide();
    $(‘#video’).hide();
    $(‘#happyForm’).show();
    $(‘#smile’)[0].src = ‘data:image/png;base64,’ + data[0].image;
    }

    img.onload = function () {
    context.drawImage(this, 0, 0, canvasFace.width, canvasFace.height);
    };

    img.src = ‘data:image/png;base64,’ + data[0].image;
    }
    });

    navigator.getMedia = (navigator.getUserMedia || navigator.webkitGetUserMedia || navigator.mozGetUserMedia);

    navigator.getMedia({video: true, audio: false}, (mediaStream) => {
    let video = document.getElementsByTagName(‘video’)[0];
    video.src = window.URL.createObjectURL(mediaStream);
    video.play();
    setInterval(((video) => {
    return function () {
    let context = canvas.getContext(‘2d’);
    canvas.width = width;
    canvas.height = height;
    context.drawImage(video, 0, 0, width, height);
    socket.emit(‘img’, canvas.toDataURL(‘image/jpeg’, jpgQuality));
    }
    })(video), delay)
    }, error => console.log(error));

    $(() => {
    $(‘#login’).click(() => {
    $(‘#login-page’).hide();
    $(‘#private’).show();
    })
    });
    [/sourcecode]

    Backend
    Finally we’ll work in the backend. Basically I’ve check the examples that we can see in face_classification project and tune it a bit according to my needs.

    [sourcecode language=”python”]
    from rabbit import builder
    import logging
    import numpy as np
    from keras.models import load_model
    from utils.datasets import get_labels
    from utils.inference import detect_faces
    from utils.inference import draw_text
    from utils.inference import draw_bounding_box
    from utils.inference import apply_offsets
    from utils.inference import load_detection_model
    from utils.inference import load_image
    from utils.preprocessor import preprocess_input
    import cv2
    import json
    import base64

    detection_model_path = ‘trained_models/detection_models/haarcascade_frontalface_default.xml’
    emotion_model_path = ‘trained_models/emotion_models/fer2013_mini_XCEPTION.102-0.66.hdf5’
    emotion_labels = get_labels(‘fer2013’)
    font = cv2.FONT_HERSHEY_SIMPLEX

    # hyper-parameters for bounding boxes shape
    emotion_offsets = (20, 40)

    # loading models
    face_detection = load_detection_model(detection_model_path)
    emotion_classifier = load_model(emotion_model_path, compile=False)

    # getting input model shapes for inference
    emotion_target_size = emotion_classifier.input_shape[1:3]

    def format_response(response):
    decoded_json = json.loads(response)
    return "Hello {}".format(decoded_json[‘name’])

    def on_data(data):
    f = open(‘current.jpg’, ‘wb’)
    f.write(base64.decodebytes(data))
    f.close()
    image_path = "current.jpg"

    out = []
    # loading images
    rgb_image = load_image(image_path, grayscale=False)
    gray_image = load_image(image_path, grayscale=True)
    gray_image = np.squeeze(gray_image)
    gray_image = gray_image.astype(‘uint8’)

    faces = detect_faces(face_detection, gray_image)
    for face_coordinates in faces:
    x1, x2, y1, y2 = apply_offsets(face_coordinates, emotion_offsets)
    gray_face = gray_image[y1:y2, x1:x2]

    try:
    gray_face = cv2.resize(gray_face, (emotion_target_size))
    except:
    continue

    gray_face = preprocess_input(gray_face, True)
    gray_face = np.expand_dims(gray_face, 0)
    gray_face = np.expand_dims(gray_face, -1)
    emotion_label_arg = np.argmax(emotion_classifier.predict(gray_face))
    emotion_text = emotion_labels[emotion_label_arg]
    color = (0, 0, 255)

    draw_bounding_box(face_coordinates, rgb_image, color)
    draw_text(face_coordinates, rgb_image, emotion_text, color, 0, -50, 1, 2)
    bgr_image = cv2.cvtColor(rgb_image, cv2.COLOR_RGB2BGR)

    cv2.imwrite(‘predicted.png’, bgr_image)
    data = open(‘predicted.png’, ‘rb’).read()
    encoded = base64.encodebytes(data).decode(‘utf-8’)
    out.append({
    ‘image’: encoded,
    ’emotion’: emotion_text,
    })

    return out

    logging.basicConfig(level=logging.WARN)
    rpc = builder.rpc("image.check", {‘host’: ‘localhost’, ‘port’: 5672})
    rpc.server(on_data)
    [/sourcecode]

    Here you can see in action the working prototype

    Maybe we can do the same with another tools and even more simple but as I said before this example is just an excuse to play with those technologies:

    • Send webcam frames via websockets
    • Connect one web application to a Pyhon application via RabbitMQ RPC
    • Play with face classification script

    Please don’t use this script in production. It’s just a proof of concepts. With smiles but a proof of concepts 🙂

    You can see the project in my github account

    Playing with RabbitMQ (part 2). Now with Python

    Do you remember the las post about RabbitMQ? In that post we created a small wrapper library to use RabbitMQ with node and PHP. I also work with Python and I also want to use the same RabbitMQ wrapper here. With Python there’re several libraries to use Rabbit. I’ll use pika.

    The idea is the same than the another post. I want to use queues, exchanges and RPCs. So let’s start with queues:

    We can create a queue receiver called ‘queue.backend’
    [sourcecode language=”python”]
    from rabbit import builder

    server = {
    ‘host’: ‘localhost’,
    ‘port’: 5672,
    ‘user’: ‘guest’,
    ‘pass’: ‘guest’,
    }

    def onData(data):
    print data[‘aaa’]

    builder.queue(‘queue.backend’, server).receive(onData)
    [/sourcecode]

    and emit messages to the queue
    [sourcecode language=”python”]
    from rabbit import builder

    server = {
    ‘host’: ‘localhost’,
    ‘port’: 5672,
    ‘user’: ‘guest’,
    ‘pass’: ‘guest’,
    }

    queue = builder.queue(‘queue.backend’, server)

    queue.emit({"aaa": 1})
    queue.emit({"aaa": 2})
    queue.emit({"aaa": 3})
    [/sourcecode]

    The library (as the PHP and ones). Uses a builder class to create our instances
    [sourcecode language=”python”]
    from queue import Queue
    from rpc import RPC
    from exchange import Exchange

    defaults = {
    ‘queue’: {
    ‘queue’: {
    ‘passive’: False,
    ‘durable’: True,
    ‘exclusive’: False,
    ‘autoDelete’: False,
    ‘nowait’: False
    },
    ‘consumer’: {
    ‘noLocal’: False,
    ‘noAck’: False,
    ‘exclusive’: False,
    ‘nowait’: False
    }
    },
    ‘exchange’: {
    ‘exchange’: {
    ‘passive’: False,
    ‘durable’: True,
    ‘autoDelete’: True,
    ‘internal’: False,
    ‘nowait’: False
    },
    ‘queue’: {
    ‘passive’: False,
    ‘durable’: True,
    ‘exclusive’: False,
    ‘autoDelete’: True,
    ‘nowait’: False
    },
    ‘consumer’: {
    ‘noLocal’: False,
    ‘noAck’: False,
    ‘exclusive’: False,
    ‘nowait’: False
    }
    },
    ‘rpc’: {
    ‘queue’: {
    ‘passive’: False,
    ‘durable’: True,
    ‘exclusive’: False,
    ‘autoDelete’: True,
    ‘nowait’: False
    },
    ‘consumer’: {
    ‘noLocal’: False,
    ‘noAck’: False,
    ‘exclusive’: False,
    ‘nowait’: False
    }
    }
    }

    def queue(name, server):
    conf = defaults[‘queue’]
    conf[‘server’] = server

    return Queue(name, conf)

    def rpc(name, server):
    conf = defaults[‘rpc’]
    conf[‘server’] = server

    return RPC(name, conf)

    def exchange(name, server):
    conf = defaults[‘exchange’]
    conf[‘server’] = server

    return Exchange(name, conf)
    [/sourcecode]

    And our Queue class
    [sourcecode language=”python”]
    import pika
    import json
    import time

    class Queue:
    def __init__(self, name, conf):
    self.name = name
    self.conf = conf

    def emit(self, data=None):
    credentials = pika.PlainCredentials(self.conf[‘server’][‘user’], self.conf[‘server’][‘pass’])
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf[‘server’][‘host’], port=self.conf[‘server’][‘port’], credentials=credentials))
    channel = connection.channel()

    queueConf = self.conf[‘queue’]
    channel.queue_declare(queue=self.name, passive=queueConf[‘passive’], durable=queueConf[‘durable’], exclusive=queueConf[‘exclusive’], auto_delete=queueConf[‘autoDelete’])

    channel.basic_publish(exchange=”, routing_key=self.name, body=json.dumps(data), properties=pika.BasicProperties(delivery_mode=2))
    connection.close()

    def receive(self, callback):
    credentials = pika.PlainCredentials(self.conf[‘server’][‘user’], self.conf[‘server’][‘pass’])
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf[‘server’][‘host’], port=self.conf[‘server’][‘port’], credentials=credentials))
    channel = connection.channel()

    queueConf = self.conf[‘queue’]
    channel.queue_declare(queue=self.name, passive=queueConf[‘passive’], durable=queueConf[‘durable’], exclusive=queueConf[‘exclusive’], auto_delete=queueConf[‘autoDelete’])

    def _callback(ch, method, properties, body):
    callback(json.loads(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print "%s %s::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)

    print "%s Queue ‘%s’ initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
    consumerConf = self.conf[‘consumer’]
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(_callback, self.name, no_ack=consumerConf[‘noAck’], exclusive=consumerConf[‘exclusive’])

    channel.start_consuming()
    [/sourcecode]

    We also want to use exchanges to emit messages without waiting for answers, just as a event broadcast. We can emit messages:
    [sourcecode language=”python”]
    from rabbit import builder

    server = {
    ‘host’: ‘localhost’,
    ‘port’: 5672,
    ‘user’: ‘guest’,
    ‘pass’: ‘guest’,
    }

    exchange = builder.exchange(‘process.log’, server)

    exchange.emit("xxx.log", "aaaa")
    exchange.emit("xxx.log", ["11", "aaaa"])
    exchange.emit("yyy.log", "aaaa")
    [/sourcecode]

    And listen to messages
    [sourcecode language=”python”]
    from rabbit import builder

    server = {
    ‘host’: ‘localhost’,
    ‘port’: 5672,
    ‘user’: ‘guest’,
    ‘pass’: ‘guest’,
    }

    def onData(routingKey, data):
    print routingKey, data

    builder.exchange(‘process.log’, server).receive("yyy.log", onData)
    [/sourcecode]

    That’s the class
    [sourcecode language=”python”]
    import pika
    import json
    import time

    class Exchange:
    def __init__(self, name, conf):
    self.name = name
    self.conf = conf

    def emit(self, routingKey, data=None):
    credentials = pika.PlainCredentials(self.conf[‘server’][‘user’], self.conf[‘server’][‘pass’])
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf[‘server’][‘host’], port=self.conf[‘server’][‘port’], credentials=credentials))
    channel = connection.channel()

    exchangeConf = self.conf[‘exchange’]
    channel.exchange_declare(exchange=self.name, type=’topic’, passive=exchangeConf[‘passive’], durable=exchangeConf[‘durable’], auto_delete=exchangeConf[‘autoDelete’], internal=exchangeConf[‘internal’])
    channel.basic_publish(exchange=self.name, routing_key=routingKey, body=json.dumps(data))
    connection.close()

    def receive(self, bindingKey, callback):
    credentials = pika.PlainCredentials(self.conf[‘server’][‘user’], self.conf[‘server’][‘pass’])
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf[‘server’][‘host’], port=self.conf[‘server’][‘port’], credentials=credentials))
    channel = connection.channel()

    exchangeConf = self.conf[‘exchange’]
    channel.exchange_declare(exchange=self.name, type=’topic’, passive=exchangeConf[‘passive’], durable=exchangeConf[‘durable’], auto_delete=exchangeConf[‘autoDelete’], internal=exchangeConf[‘internal’])

    queueConf = self.conf[‘queue’]
    result = channel.queue_declare(passive=queueConf[‘passive’], durable=queueConf[‘durable’], exclusive=queueConf[‘exclusive’], auto_delete=queueConf[‘autoDelete’])
    queue_name = result.method.queue

    channel.queue_bind(exchange=self.name, queue=queue_name, routing_key=bindingKey)

    print "%s Exchange ‘%s’ initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)

    def _callback(ch, method, properties, body):
    callback(method.routing_key, json.loads(body))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print "%s %s:::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)

    consumerConf = self.conf[‘consumer’]
    channel.basic_consume(_callback, queue=queue_name, no_ack=consumerConf[‘noAck’], exclusive=consumerConf[‘exclusive’])
    channel.start_consuming()
    [/sourcecode]

    And finally we can use RPCs. Emit
    [sourcecode language=”python”]
    from rabbit import builder

    server = {
    ‘host’: ‘localhost’,
    ‘port’: 5672,
    ‘user’: ‘guest’,
    ‘pass’: ‘guest’,
    }

    print builder.rpc(‘rpc.hello’, server).call("Gonzalo", "Ayuso")
    [/sourcecode]

    And the server side
    [sourcecode language=”python”]
    from rabbit import builder

    server = {
    ‘host’: ‘localhost’,
    ‘port’: 5672,
    ‘user’: ‘guest’,
    ‘pass’: ‘guest’,
    }

    def onData(name, surname):
    return "Hello %s %s" % (name, surname)

    builder.rpc(‘rpc.hello’, server).server(onData)
    [/sourcecode]

    And that’s the class
    [sourcecode language=”python”]
    import pika
    import json
    import time
    import uuid

    class RPC:
    def __init__(self, name, conf):
    self.name = name
    self.conf = conf

    def call(self, *params):
    pika.PlainCredentials(self.conf[‘server’][‘user’], self.conf[‘server’][‘pass’])
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf[‘server’][‘host’], port=self.conf[‘server’][‘port’]))
    channel = connection.channel()

    queueConf = self.conf[‘queue’]
    result = channel.queue_declare(queue=”, passive=queueConf[‘passive’], durable=queueConf[‘durable’], exclusive=queueConf[‘exclusive’], auto_delete=queueConf[‘autoDelete’])
    callback_queue = result.method.queue
    consumerConf = self.conf[‘consumer’]
    channel.basic_consume(self.on_call_response, no_ack=consumerConf[‘noAck’], exclusive=consumerConf[‘exclusive’], queue=”)

    self.response = None
    self.corr_id = str(uuid.uuid4())
    channel.basic_publish(exchange=”, routing_key=self.name, properties=pika.BasicProperties(reply_to=callback_queue, correlation_id=self.corr_id), body=json.dumps(params))
    while self.response is None:
    connection.process_data_events()
    return self.response

    def on_call_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
    self.response = body

    def server(self, callback):
    pika.PlainCredentials(self.conf[‘server’][‘user’], self.conf[‘server’][‘pass’])
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf[‘server’][‘host’], port=self.conf[‘server’][‘port’]))
    channel = connection.channel()

    queueConf = self.conf[‘queue’]
    channel.queue_declare(self.name, passive=queueConf[‘passive’], durable=queueConf[‘durable’], exclusive=queueConf[‘exclusive’], auto_delete=queueConf[‘autoDelete’])

    channel.basic_qos(prefetch_count=1)
    consumerConf = self.conf[‘consumer’]

    def on_server_request(ch, method, props, body):
    response = callback(*json.loads(body))

    ch.basic_publish(exchange=”, routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=json.dumps(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print "%s %s::req => ‘%s’ response => ‘%s’" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body, response)

    channel.basic_consume(on_server_request, queue=self.name, no_ack=consumerConf[‘noAck’], exclusive=consumerConf[‘exclusive’])

    print "%s RPC ‘%s’ initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
    channel.start_consuming()
    [/sourcecode]

    And that’s all. Full project is available within my github account

    Playing with RabbitMQ, PHP and node

    I need to use RabbitMQ in one project. I’m a big fan of Gearman, but I must admit Rabbit is much more powerful. In this project I need to handle with PHP code and node, so I want to build a wrapper for those two languages. I don’t want to re-invent the wheel so I will use existing libraries (php-amqplib and amqplib for node).

    Basically I need to use three things: First I need to create exchange channels to log different actions. I need to decouple those actions from the main code. I also need to create work queues to ensure those works are executed. It doesn’t matter if work is executed later but it must be executed. And finally RPC commands.

    Let’s start with the queues. I want to push events to a queue in PHP
    [sourcecode language=”php”]
    use G\Rabbit\Builder;
    $server = [
    ‘host’ => ‘localhost’,
    ‘port’ => 5672,
    ‘user’ => ‘guest’,
    ‘pass’ => ‘guest’,
    ];
    $queue = Builder::queue(‘queue.backend’, $server);
    $queue->emit(["aaa" => 1]);
    [/sourcecode]

    and also with node
    [sourcecode language=”js”]
    var server = {
    host: ‘localhost’,
    port: 5672,
    user: ‘guest’,
    pass: ‘guest’
    };

    var queue = builder.queue(‘queue.backend’, server);
    queue.emit({aaa: 1});
    [/sourcecode]

    And I also want to register workers to those queues with PHP and node

    [sourcecode language=”php”]
    use G\Rabbit\Builder;
    $server = [
    ‘host’ => ‘localhost’,
    ‘port’ => 5672,
    ‘user’ => ‘guest’,
    ‘pass’ => ‘guest’,
    ];
    Builder::queue(‘queue.backend’, $server)->receive(function ($data) {
    error_log(json_encode($data));
    });
    [/sourcecode]

    [sourcecode language=”js”]
    var server = {
    host: ‘localhost’,
    port: 5672,
    user: ‘guest’,
    pass: ‘guest’
    };

    var queue = builder.queue(‘queue.backend’, server);
    queue.receive(function (data) {
    console.log(data);
    });
    [/sourcecode]

    Both implementations use one builder. In this case we are using Queue:
    [sourcecode language=”php”]
    namespace G\Rabbit;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    class Queue
    {
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
    $this->name = $name;
    $this->conf = $conf;
    }
    private function createConnection()
    {
    $server = $this->conf[‘server’];
    return new AMQPStreamConnection($server[‘host’], $server[‘port’], $server[‘user’], $server[‘pass’]);
    }
    private function declareQueue($channel)
    {
    $conf = $this->conf[‘queue’];
    $channel->queue_declare($this->name, $conf[‘passive’], $conf[‘durable’], $conf[‘exclusive’],
    $conf[‘auto_delete’], $conf[‘nowait’]);
    }
    public function emit($data = null)
    {
    $connection = $this->createConnection();
    $channel = $connection->channel();
    $this->declareQueue($channel);
    $msg = new AMQPMessage(json_encode($data),
    [‘delivery_mode’ => 2] # make message persistent
    );
    $channel->basic_publish($msg, ”, $this->name);
    $channel->close();
    $connection->close();
    }
    public function receive(callable $callback)
    {
    $connection = $this->createConnection();
    $channel = $connection->channel();
    $this->declareQueue($channel);
    $consumer = $this->conf[‘consumer’];
    if ($consumer[‘no_ack’] === false) {
    $channel->basic_qos(null, 1, null);
    }
    $channel->basic_consume($this->name, ”, $consumer[‘no_local’], $consumer[‘no_ack’], $consumer[‘exclusive’],
    $consumer[‘nowait’],
    function ($msg) use ($callback) {
    call_user_func($callback, json_decode($msg->body, true), $this->name);
    $msg->delivery_info[‘channel’]->basic_ack($msg->delivery_info[‘delivery_tag’]);
    $now = new \DateTime();
    echo ‘[‘.$now->format(‘d/m/Y H:i:s’)."] {$this->name}::".$msg->body, "\n";
    });
    $now = new \DateTime();
    echo ‘[‘.$now->format(‘d/m/Y H:i:s’)."] Queue ‘{$this->name}’ initialized \n";
    while (count($channel->callbacks)) {
    $channel->wait();
    }
    $channel->close();
    $connection->close();
    }
    }
    [/sourcecode]

    [sourcecode language=”js”]
    var amqp = require(‘amqplib/callback_api’);

    var Queue = function (name, conf) {
    return {
    emit: function (data, close=true) {
    amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
    conn.createChannel(function (err, ch) {
    var msg = JSON.stringify(data);

    ch.assertQueue(name, conf.queue);
    ch.sendToQueue(name, new Buffer(msg));
    });
    if (close) {
    setTimeout(function () {
    conn.close();
    process.exit(0)
    }, 500);
    }
    });
    },
    receive: function (callback) {
    amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
    conn.createChannel(function (err, ch) {
    ch.assertQueue(name, conf.queue);
    console.log(new Date().toString() + ‘ Queue ‘ + name + ‘ initialized’);
    ch.consume(name, function (msg) {
    console.log(new Date().toString() + " Received %s", msg.content.toString());
    if (callback) {
    callback(JSON.parse(msg.content.toString()), msg.fields.routingKey)
    }
    if (conf.consumer.noAck === false) {
    ch.ack(msg);
    }
    }, conf.consumer);
    });
    });
    }
    };
    };

    module.exports = Queue;
    [/sourcecode]

    We also want to emit messages using an exchange

    Emiter:
    [sourcecode language=”php”]
    use G\Rabbit\Builder;
    $server = [
    ‘host’ => ‘localhost’,
    ‘port’ => 5672,
    ‘user’ => ‘guest’,
    ‘pass’ => ‘guest’,
    ];
    $exchange = Builder::exchange(‘process.log’, $server);
    $exchange->emit("xxx.log", "aaaa");
    $exchange->emit("xxx.log", ["11", "aaaa"]);
    $exchange->emit("yyy.log", "aaaa");
    [/sourcecode]

    [sourcecode language=”js”]
    var builder = require(‘../../src/Builder’);

    var server = {
    host: ‘localhost’,
    port: 5672,
    user: ‘guest’,
    pass: ‘guest’
    };

    var exchange = builder.exchange(‘process.log’, server);

    exchange.emit("xxx.log", "aaaa");
    exchange.emit("xxx.log", ["11", "aaaa"]);
    exchange.emit("yyy.log", "aaaa");
    [/sourcecode]

    and receiver:
    [sourcecode language=”php”]
    use G\Rabbit\Builder;
    $server = [
    ‘host’ => ‘localhost’,
    ‘port’ => 5672,
    ‘user’ => ‘guest’,
    ‘pass’ => ‘guest’,
    ];
    Builder::exchange(‘process.log’, $server)->receive("yyy.log", function ($routingKey, $data) {
    error_log($routingKey." – ".json_encode($data));
    });
    [/sourcecode]

    [sourcecode language=”js”]
    var server = {
    host: ‘localhost’,
    port: 5672,
    user: ‘guest’,
    pass: ‘guest’
    };

    var exchange = builder.exchange(‘process.log’, server);

    exchange.receive("yyy.log", function (routingKey, data) {
    console.log(routingKey, data);
    });
    [/sourcecode]

    And that’s the PHP implementation:
    [sourcecode language=”php”]
    namespace G\Rabbit;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    class Exchange
    {
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
    $this->name = $name;
    $this->conf = $conf;
    }
    private function createConnection()
    {
    $server = $this->conf[‘server’];
    return new AMQPStreamConnection($server[‘host’], $server[‘port’], $server[‘user’], $server[‘pass’]);
    }
    public function emit($routingKey, $data = null)
    {
    $connection = $this->createConnection();
    $channel = $connection->channel();
    $conf = $this->conf[‘exchange’];
    $channel->exchange_declare($this->name, ‘topic’, $conf[‘passive’], $conf[‘durable’], $conf[‘auto_delete’],
    $conf[‘internal’], $conf[‘nowait’]);
    $msg = new AMQPMessage(json_encode($data), [
    ‘delivery_mode’ => 2, # make message persistent
    ]);
    $channel->basic_publish($msg, $this->name, $routingKey);
    $channel->close();
    $connection->close();
    }
    public function receive($bindingKey, callable $callback)
    {
    $connection = $this->createConnection();
    $channel = $connection->channel();
    $conf = $this->conf[‘exchange’];
    $channel->exchange_declare($this->name, ‘topic’, $conf[‘passive’], $conf[‘durable’], $conf[‘auto_delete’],
    $conf[‘internal’], $conf[‘nowait’]);
    $queueConf = $this->conf[‘queue’];
    list($queue_name, ,) = $channel->queue_declare("", $queueConf[‘passive’], $queueConf[‘durable’],
    $queueConf[‘exclusive’], $queueConf[‘auto_delete’], $queueConf[‘nowait’]);
    $channel->queue_bind($queue_name, $this->name, $bindingKey);
    $consumerConf = $this->conf[‘consumer’];
    $channel->basic_consume($queue_name, ”, $consumerConf[‘no_local’], $consumerConf[‘no_ack’],
    $consumerConf[‘exclusive’], $consumerConf[‘nowait’],
    function ($msg) use ($callback) {
    call_user_func($callback, $msg->delivery_info[‘routing_key’], json_decode($msg->body, true));
    $now = new \DateTime();
    echo ‘[‘.$now->format(‘d/m/Y H:i:s’).’] ‘.$this->name.’:’.$msg->delivery_info[‘routing_key’].’::’, $msg->body, "\n";
    $msg->delivery_info[‘channel’]->basic_ack($msg->delivery_info[‘delivery_tag’]);
    });
    $now = new \DateTime();
    echo ‘[‘.$now->format(‘d/m/Y H:i:s’)."] Exchange ‘{$this->name}’ initialized \n";
    while (count($channel->callbacks)) {
    $channel->wait();
    }
    $channel->close();
    $connection->close();
    }
    }
    [/sourcecode]

    And node:
    [sourcecode language=”js”]
    var amqp = require(‘amqplib/callback_api’);

    var Exchange = function (name, conf) {
    return {
    emit: function (routingKey, data, close = true) {
    amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
    conn.createChannel(function (err, ch) {
    var msg = JSON.stringify(data);
    ch.assertExchange(name, ‘topic’, conf.exchange);
    ch.publish(name, routingKey, new Buffer(msg));
    });
    if (close) {
    setTimeout(function () {
    conn.close();
    process.exit(0)
    }, 500);
    }
    });
    },
    receive: function (bindingKey, callback) {
    amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
    conn.createChannel(function (err, ch) {
    ch.assertExchange(name, ‘topic’, conf.exchange);
    console.log(new Date().toString() + ‘ Exchange ‘ + name + ‘ initialized’);
    ch.assertQueue(”, conf.queue, function (err, q) {

    ch.bindQueue(q.queue, name, bindingKey);

    ch.consume(q.queue, function (msg) {
    console.log(new Date().toString(), name, ":", msg.fields.routingKey, "::", msg.content.toString());
    if (callback) {
    callback(msg.fields.routingKey, JSON.parse(msg.content.toString()))
    }
    if (conf.consumer.noAck === false) {
    ch.ack(msg);
    }
    }, conf.consumer);
    });
    });
    });
    }
    };
    };

    module.exports = Exchange;
    [/sourcecode]

    Finally we want to use RPC commands. In fact RPC implementations is similar than Queue but in this case client will receive an answer.

    Client side
    [sourcecode language=”php”]
    use G\Rabbit\Builder;
    $server = [
    ‘host’ => ‘localhost’,
    ‘port’ => 5672,
    ‘user’ => ‘guest’,
    ‘pass’ => ‘guest’,
    ];
    echo Builder::rpc(‘rpc.hello’, $server)->call("Gonzalo", "Ayuso");
    [/sourcecode]

    [sourcecode language=”js”]
    var builder = require(‘../../src/Builder’);

    var server = {
    host: ‘localhost’,
    port: 5672,
    user: ‘guest’,
    pass: ‘guest’
    };

    var rpc = builder.rpc(‘rpc.hello’, server);
    rpc.call("Gonzalo", "Ayuso", function (data) {
    console.log(data);
    });
    [/sourcecode]

    Server side:

    [sourcecode language=”php”]
    use G\Rabbit\Builder;
    $server = [
    ‘host’ => ‘localhost’,
    ‘port’ => 5672,
    ‘user’ => ‘guest’,
    ‘pass’ => ‘guest’,
    ];
    Builder::rpc(‘rpc.hello’, $server)->server(function ($name, $surname) use ($server) {
    return "Hello {$name} {$surname}";
    });
    [/sourcecode]

    [sourcecode language=”js”]
    var builder = require(‘../../src/Builder’);

    var server = {
    host: ‘localhost’,
    port: 5672,
    user: ‘guest’,
    pass: ‘guest’
    };

    var rpc = builder.rpc(‘rpc.hello’, server);

    rpc.server(function (name, surname) {
    return "Hello " + name + " " + surname;
    });
    [/sourcecode]

    And Implementations:

    [sourcecode language=”php”]
    namespace G\Rabbit;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    class RPC
    {
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
    $this->name = $name;
    $this->conf = $conf;
    }
    private function createConnection()
    {
    $server = $this->conf[‘server’];
    return new AMQPStreamConnection($server[‘host’], $server[‘port’], $server[‘user’], $server[‘pass’]);
    }
    public function call()
    {
    $params = (array)func_get_args();
    $response = null;
    $corr_id = uniqid();
    $connection = $this->createConnection();
    $channel = $connection->channel();
    $queueConf = $this->conf[‘queue’];
    list($callback_queue, ,) = $channel->queue_declare("", $queueConf[‘passive’], $queueConf[‘durable’],
    $queueConf[‘exclusive’], $queueConf[‘auto_delete’], $queueConf[‘nowait’]);
    $consumerConf = $this->conf[‘consumer’];
    $channel->basic_consume($callback_queue, ”, $consumerConf[‘no_local’], $consumerConf[‘no_ack’],
    $consumerConf[‘exclusive’], $consumerConf[‘nowait’], function ($rep) use (&$corr_id, &$response) {
    if ($rep->get(‘correlation_id’) == $corr_id) {
    $response = $rep->body;
    }
    });
    $msg = new AMQPMessage(json_encode($params), [
    ‘correlation_id’ => $corr_id,
    ‘reply_to’ => $callback_queue,
    ]);
    $channel->basic_publish($msg, ”, $this->name);
    while (!$response) {
    $channel->wait();
    }
    return json_decode($response, true);
    }
    public function server(callable $callback)
    {
    $connection = $this->createConnection();
    $channel = $connection->channel();
    $queueConf = $this->conf[‘queue’];
    $channel->queue_declare($this->name, $queueConf[‘passive’], $queueConf[‘durable’], $queueConf[‘exclusive’],
    $queueConf[‘auto_delete’], $queueConf[‘nowait’]);
    $now = new \DateTime();
    echo ‘[‘.$now->format(‘d/m/Y H:i:s’)."] RPC server ‘{$this->name}’ initialized \n";
    $channel->basic_qos(null, 1, null);
    $consumerConf = $this->conf[‘consumer’];
    $channel->basic_consume($this->name, ”, $consumerConf[‘no_local’], $consumerConf[‘no_ack’],
    $consumerConf[‘exclusive’],
    $consumerConf[‘nowait’], function ($req) use ($callback) {
    $response = json_encode(call_user_func_array($callback, array_values(json_decode($req->body, true))));
    $msg = new AMQPMessage($response, [
    ‘correlation_id’ => $req->get(‘correlation_id’),
    ‘delivery_mode’ => 2, # make message persistent
    ]);
    $req->delivery_info[‘channel’]->basic_publish($msg, ”, $req->get(‘reply_to’));
    $req->delivery_info[‘channel’]->basic_ack($req->delivery_info[‘delivery_tag’]);
    $now = new \DateTime();
    echo ‘[‘.$now->format(‘d/m/Y H:i:s’).’] ‘.$this->name.":: req => ‘{$req->body}’ response=> ‘{$response}’\n";
    });
    while (count($channel->callbacks)) {
    $channel->wait();
    }
    $channel->close();
    $connection->close();
    }
    }
    [/sourcecode]

    [sourcecode language=”js”]
    var amqp = require(‘amqplib/callback_api’);

    var RPC = function (name, conf) {
    var generateUuid = function () {
    return Math.random().toString() +
    Math.random().toString() +
    Math.random().toString();
    };

    return {
    call: function () {
    var params = [];
    for (i = 0; i < arguments.length – 1; i++) {
    params.push(arguments[i]);
    }
    var callback = arguments[arguments.length – 1];

    amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
    conn.createChannel(function (err, ch) {
    ch.assertQueue(”, conf.queue, function (err, q) {
    var corr = generateUuid();

    ch.consume(q.queue, function (msg) {
    if (msg.properties.correlationId == corr) {
    callback(JSON.parse(msg.content.toString()));
    setTimeout(function () {
    conn.close();
    process.exit(0)
    }, 500);
    }
    }, conf.consumer);
    ch.sendToQueue(name,
    new Buffer(JSON.stringify(params)),
    {correlationId: corr, replyTo: q.queue});
    });
    });
    });
    },
    server: function (callback) {
    amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
    conn.createChannel(function (err, ch) {
    ch.assertQueue(name, conf.queue);
    console.log(new Date().toString() + ‘ RPC ‘ + name + ‘ initialized’);
    ch.prefetch(1);
    ch.consume(name, function reply(msg) {
    console.log(new Date().toString(), msg.fields.routingKey, " :: ", msg.content.toString());
    var response = JSON.stringify(callback.apply(this, JSON.parse(msg.content.toString())));
    ch.sendToQueue(msg.properties.replyTo,
    new Buffer(response),
    {correlationId: msg.properties.correlationId});

    ch.ack(msg);

    }, conf.consumer);
    });
    });
    }
    };
    };

    module.exports = RPC;
    [/sourcecode]

    You can see whole projects at github: RabbitMQ-php, RabbitMQ-node

    Sending logs to a remote server using RabbitMQ

    Time ago I wrote an article to show how to send Silex logs to a remote server. Today I want to use a messaging queue to do it. Normally, when I need queues, I use Gearman but today I want to play with RabbitMQ.

    When we work with web applications it’s important to have, in some way or another, one way to decouple operations from the main request. Messaging queues are great tools to perform those operations. They even allow us to create our workers with a different languages than the main request. This days, for example, I’m working with modbus devices. The whole modbus logic is written in Python and I want to use a Frontend with PHP. I can rewrite the modbus logic with PHP (there’re PHP libraries to connect with modbus devices), but I’m not so crazy. Queues are our friends.

    The idea in this post is the same than the previous post. We’ll use event dispatcher to emit events and we’ll send those events to a RabitMQ queue. We’ll use a Service Provider called.

    [sourcecode language=”php”]
    <?php
    include __DIR__ . ‘/../vendor/autoload.php’;

    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use RabbitLogger\LoggerServiceProvider;
    use Silex\Application;
    use Symfony\Component\HttpKernel\Event;
    use Symfony\Component\HttpKernel\KernelEvents;

    $connection = new AMQPStreamConnection(‘localhost’, 5672, ‘guest’, ‘guest’);
    $channel = $connection->channel();

    $app = new Application([‘debug’ => true]);
    $app->register(new LoggerServiceProvider($connection, $channel));

    $app->on(KernelEvents::TERMINATE, function (Event\PostResponseEvent $event) use ($app) {
    $app[‘rabbit.logger’]->info(‘TERMINATE’);
    });

    $app->on(KernelEvents::CONTROLLER, function (Event\FilterControllerEvent $event) use ($app) {
    $app[‘rabbit.logger’]->info(‘CONTROLLER’);
    });

    $app->on(KernelEvents::EXCEPTION, function (Event\GetResponseForExceptionEvent $event) use ($app) {
    $app[‘rabbit.logger’]->info(‘EXCEPTION’);
    });

    $app->on(KernelEvents::FINISH_REQUEST, function (Event\FinishRequestEvent $event) use ($app) {
    $app[‘rabbit.logger’]->info(‘FINISH_REQUEST’);
    });

    $app->on(KernelEvents::RESPONSE, function (Event\FilterResponseEvent $event) use ($app) {
    $app[‘rabbit.logger’]->info(‘RESPONSE’);
    });

    $app->on(KernelEvents::REQUEST, function (Event\GetResponseEvent $event) use ($app) {
    $app[‘rabbit.logger’]->info(‘REQUEST’);
    });

    $app->on(KernelEvents::VIEW, function (Event\GetResponseForControllerResultEvent $event) use ($app) {
    $app[‘rabbit.logger’]->info(‘VIEW’);
    });

    $app->get(‘/’, function (Application $app) {
    $app[‘rabbit.logger’]->info(‘inside route’);
    return "HELLO";
    });

    $app->run();
    [/sourcecode]

    Here we can see the service provider:

    [sourcecode language=”php”]
    <?php
    namespace RabbitLogger;

    use PhpAmqpLib\Channel\AMQPChannel;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use Silex\Application;
    use Silex\ServiceProviderInterface;

    class LoggerServiceProvider implements ServiceProviderInterface
    {
    private $connection;
    private $channel;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel)
    {
    $this->connection = $connection;
    $this->channel = $channel;
    }

    public function register(Application $app)
    {
    $app[‘rabbit.logger’] = $app->share(
    function () use ($app) {
    $channelName = isset($app[‘logger.channel.name’]) ? $app[‘logger.channel.name’] : ‘logger.channel’;
    return new Logger($this->connection, $this->channel, $channelName);
    }
    );
    }

    public function boot(Application $app)
    {
    }
    }
    [/sourcecode]

    And here the logger:
    [sourcecode language=”php”]
    <?php
    namespace RabbitLogger;

    use PhpAmqpLib\Channel\AMQPChannel;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    use Psr\Log\LoggerInterface;
    use Psr\Log\LogLevel;
    use Silex\Application;

    class Logger implements LoggerInterface
    {
    private $connection;
    private $channel;
    private $queueName;

    public function __construct(AMQPStreamConnection $connection, AMQPChannel $channel, $queueName = ‘logger’)
    {
    $this->connection = $connection;
    $this->channel = $channel;
    $this->queueName = $queueName;
    $this->channel->queue_declare($queueName, false, false, false, false);
    }

    function __destruct()
    {
    $this->channel->close();
    $this->connection->close();
    }

    public function emergency($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::EMERGENCY);
    }

    public function alert($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::ALERT);
    }

    public function critical($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::CRITICAL);
    }

    public function error($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::ERROR);
    }

    public function warning($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::WARNING);
    }

    public function notice($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::NOTICE);
    }

    public function info($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::INFO);
    }

    public function debug($message, array $context = [])
    {
    $this->sendLog($message, $context, LogLevel::DEBUG);
    }
    public function log($level, $message, array $context = [])
    {
    $this->sendLog($message, $context, $level);
    }

    private function sendLog($message, array $context = [], $level = LogLevel::INFO)
    {
    $msg = new AMQPMessage(json_encode([$message, $context, $level]), [‘delivery_mode’ => 2]);
    $this->channel->basic_publish($msg, ”, $this->queueName);
    }
    }
    [/sourcecode]

    And finally the RabbitMQ Worker to process our logs

    [sourcecode language=”php”]
    require_once __DIR__ . ‘/../vendor/autoload.php’;
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    $connection = new AMQPStreamConnection(‘localhost’, 5672, ‘guest’, ‘guest’);
    $channel = $connection->channel();
    $channel->queue_declare(‘logger.channel’, false, false, false, false);
    echo ‘ [*] Waiting for messages. To exit press CTRL+C’, "\n";
    $callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    //$msg->delivery_info[‘channel’]->basic_ack($msg->delivery_info[‘delivery_tag’]);
    };
    //$channel->basic_qos(null, 1, null);
    $channel->basic_consume(‘logger.channel’, ”, false, false, false, false, $callback);
    while(count($channel->callbacks)) {
    $channel->wait();
    }
    $channel->close();
    $connection->close();
    [/sourcecode]

    To run the example we must:

    Start RabbitMQ server
    [sourcecode]
    rabbitmq-server
    [/sourcecode]

    start Silex server

    [sourcecode]
    php -S 0.0.0.0:8080 -t www
    [/sourcecode]

    start worker

    [sourcecode]
    php worker/worker.php
    [/sourcecode]

    You can see whole project in my github account