Category Archives: js

Playing with IoT, MQTT, Arduino and Raspberry Pi. Building a dashboard with OpenUI5

I’ve been playing with MQTT in previous posts. Today I want to build a simple dashboard. Basically because I’ve got a 3.5inch display for my Raspberry Py and I want to use it. The idea is set up my Rasperry Pi as a web kiosk and display the MQTT variables in real time using websockets. Let’s start.

Set up Raspberry Pi as a web kiosk is pretty straightforward. You only need to follow instructions detailed here. Now we will prepare the MQTT inputs. Today we’re going to reuse one example of previous post. A potentiometer controlled by a nodemcu microcontroller connected to our MQTT server via Wifi.

We also will build another circuit using a Arduino board and a ethernet Shield.

With this circuit we’ll register the temperature (using a LM35 temperature sensor), a photo resistor (CDS) to show the light level and a relay to switch on/off a light bulb. The Idea of the circuit is emit the temperature and light level to mosquitto mqtt server and listen to switch status form mqtt server to fire the relay. That’s the arduino code

#include <SPI.h>
#include <Ethernet.h>
#include <PubSubClient.h>

const int photocellPin = 1;
const int tempPin = 0;
const int relayPin = 9;
bool lightStatus = false;

const byte mac[] = { 0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED };

// mqtt configuration
const char* mqttServer = "192.168.1.104";
const int mqttPort = 1883;
const String topicLightChange = "sensors/arduino/light/change";
const String topicLightStatus = "sensors/arduino/light/status";
const String topicTemp = "sensors/arduino/temperature/room1";
const String topicLight = "sensors/arduino/light/room1";
const char* clientName = "com.gonzalo123.arduino";

EthernetClient ethClient;
PubSubClient client(ethClient);

void mqttReConnect() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    if (client.connect(clientName)) {
      Serial.println("connected");
      client.subscribe(topicLightChange.c_str());
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

void mqttEmit(String topic, String value) {
  if (client.publish((char*) topic.c_str(), (char*) value.c_str())) {
    //Serial.print("Publish ok (topic: ");
    //Serial.print(topic);
    //Serial.print(", value: ");
    //Serial.print(value);
    //Serial.println(")");
  } else {
    Serial.println("Publish failed");
  }
}

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] payload: ");
  String data;
  for (int i = 0; i < length; i++) {
    data += (char)payload[i];
  }

  if (strcmp(topic, topicLightChange.c_str()) == 0) {
    lightStatus = (data == "1") ? true : false;
    Serial.print(data);
  }
  
  Serial.println("");
}

void setup()
{
  Serial.begin(9600);
  pinMode(relayPin, OUTPUT);
  digitalWrite(relayPin, LOW);
  
  client.setServer(mqttServer, mqttPort);
  client.setCallback(callback);
  if (Ethernet.begin(mac) == 0) {
    Serial.println("Failed to configure Ethernet using DHCP");
  }

  delay(1500);
}

void loop()
{
  if (!client.connected()) {
    mqttReConnect();
  }

  client.loop();

  if (lightStatus == 1) {
    digitalWrite(relayPin, HIGH);
  } else {
    digitalWrite(relayPin, LOW);
  }
  mqttEmit(topicLightStatus, lightStatus ? "1" : "0");
  mqttEmit(topicLight, (String) analogRead(photocellPin));
  mqttEmit(topicTemp, (String) ((5.0 * analogRead(tempPin) * 100.0) / 1024.0));

  delay(500);
}

Now we’re going to work with dashboard. This days I’m working with OpenUI5 within various projects and because of that we’ll use this library to build the dashboard. we’ll build something like this:

Basically it’s a view

<mvc:View
        controllerName="gonzalo123.controller.Controller"
        height="100%"
        width="100%"
        xmlns="sap.m"
        xmlns:mvc="sap.ui.core.mvc"
        xmlns:app="http://schemas.sap.com/sapui5/extension/sap.ui.core.CustomData/1"
>
    <IconTabBar expandable="false"
                stretchContentHeight="true"
                class="sapUiResponsiveContentPadding">
        <items>
            <IconTabFilter icon="sap-icon://bbyd-dashboard">
                <TileContainer>
                    <StandardTile
                            icon="sap-icon://explorer"
                            number="{/potentiometer}"
                            numberUnit="%"
                            title="{i18n>potentiometer}"/>
                    <StandardTile
                            icon="sap-icon://temperature"
                            number="{/temperature}"
                            numberUnit="ºC"
                            title="{i18n>temperature}"/>
                    <StandardTile
                            icon="sap-icon://lightbulb"
                            number="{/light/level}"
                            title="{i18n>light}"/>
                </TileContainer>
            </IconTabFilter>
            <IconTabFilter icon="sap-icon://lightbulb">
                <Page showHeader="false"
                      enableScrolling="true">
                    <List>
                        <InputListItem label="{i18n>light}">
                            <Switch state="{/light/status}"
                                    change="onStatusChange"/>
                        </InputListItem>
                    </List>
                </Page>
            </IconTabFilter>
        </items>
    </IconTabBar>
</mvc:View>

And a controller:

sap.ui.define([
        'jquery.sap.global',
        'sap/ui/core/mvc/Controller',
        'sap/ui/model/json/JSONModel',
        "sap/ui/model/resource/ResourceModel",
        'gonzalo123/model/io'
    ],

    function (jQuery, Controller, JSONModel, ResourceModel, io) {
        "use strict";

        io.connect("//192.168.1.104:3000/");

        return Controller.extend("gonzalo123.controller.Controller", {
            model: new JSONModel({
                light: {
                    status: false,
                    level: undefined
                },
                potentiometer: undefined,
                temperature: undefined
            }),

            onInit: function () {
                var model = this.model;
                io.on('mqtt', function (data) {
                    switch (data.topic) {
                        case 'sensors/arduino/temperature/room1':
                            model.setProperty("/temperature", data.payload);
                            break;
                        case 'sensors/arduino/light/room1':
                            model.setProperty("/light/level", data.payload);
                            break;
                        case 'sensors/nodemcu/potentiometer/room1':
                            model.setProperty("/potentiometer", data.payload);
                            break;
                        case 'sensors/arduino/light/status':
                            model.setProperty("/light/status", data.payload == "1");
                            break;
                    }
                });

                this.getView().setModel(this.model);

                var i18nModel = new ResourceModel({
                    bundleName: "gonzalo123.i18n.i18n"
                });

                this.getView().setModel(i18nModel, "i18n");
            },

            onStatusChange: function () {
                io.emit('mqtt', {
                    topic: 'sensors/arduino/light/change',
                    payload: (this.getView().getModel().oData.light.status ? "1" : "0")
                });
            }
        });
    }
);

The real time part we need a gateway between websockets and mqtt data. We’ll use socket.io. Here is the server:

var mqtt = require('mqtt');
var mqttClient = mqtt.connect('mqtt://192.168.1.104');
var httpServer = require('http').createServer();
io = require('socket.io')(httpServer, {origins: '*:*'});

io.on('connection', function(client){
    client.on('mqtt', function(msg){
        console.log("ws", msg);
        mqttClient.publish(msg.topic, msg.payload.toString());
    })
});

mqttClient.on('connect', function () {
    mqttClient.subscribe('sensors/#');
});

mqttClient.on('message', function (topic, message) {
    console.log("mqtt", topic, message.toString());
    io.sockets.emit('mqtt', {
        topic: topic,
        payload: message.toString()
    });
});

httpServer.listen(3000, '0.0.0.0');

Hardware

  • 1 Arduino Uno
  • 1 NodeMCU (V3)
  • 1 potentiometer
  • 1 Servo (SG90)
  • 1 Raspberry Pi 3
  • 3.5inch Display Hat for Raspberry Pi
  • LM35
  • CDS
  • pull down resistor

Source code available in my github account

Advertisements

Playing with RabbitMQ, PHP and node

I need to use RabbitMQ in one project. I’m a big fan of Gearman, but I must admit Rabbit is much more powerful. In this project I need to handle with PHP code and node, so I want to build a wrapper for those two languages. I don’t want to re-invent the wheel so I will use existing libraries (php-amqplib and amqplib for node).

Basically I need to use three things: First I need to create exchange channels to log different actions. I need to decouple those actions from the main code. I also need to create work queues to ensure those works are executed. It doesn’t matter if work is executed later but it must be executed. And finally RPC commands.

Let’s start with the queues. I want to push events to a queue in PHP

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
$queue = Builder::queue('queue.backend', $server);
$queue->emit(["aaa" => 1]);

and also with node

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var queue = builder.queue('queue.backend', server);
queue.emit({aaa: 1});

And I also want to register workers to those queues with PHP and node

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::queue('queue.backend', $server)->receive(function ($data) {
    error_log(json_encode($data));
});
var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var queue = builder.queue('queue.backend', server);
queue.receive(function (data) {
    console.log(data);
});

Both implementations use one builder. In this case we are using Queue:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Queue
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    private function declareQueue($channel)
    {
        $conf = $this->conf['queue'];
        $channel->queue_declare($this->name, $conf['passive'], $conf['durable'], $conf['exclusive'],
            $conf['auto_delete'], $conf['nowait']);
    }
    public function emit($data = null)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $this->declareQueue($channel);
        $msg = new AMQPMessage(json_encode($data),
            ['delivery_mode' => 2] # make message persistent
        );
        $channel->basic_publish($msg, '', $this->name);
        $channel->close();
        $connection->close();
    }
    public function receive(callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $this->declareQueue($channel);
        $consumer = $this->conf['consumer'];
        if ($consumer['no_ack'] === false) {
            $channel->basic_qos(null, 1, null);
        }
        $channel->basic_consume($this->name, '', $consumer['no_local'], $consumer['no_ack'], $consumer['exclusive'],
            $consumer['nowait'],
            function ($msg) use ($callback) {
                call_user_func($callback, json_decode($msg->body, true), $this->name);
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s')."] {$this->name}::".$msg->body, "\n";
            });
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] Queue '{$this->name}' initialized \n";
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
var amqp = require('amqplib/callback_api');

var Queue = function (name, conf) {
    return {
        emit: function (data, close=true) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    var msg = JSON.stringify(data);

                    ch.assertQueue(name, conf.queue);
                    ch.sendToQueue(name, new Buffer(msg));
                });
                if (close) {
                    setTimeout(function () {
                        conn.close();
                        process.exit(0)
                    }, 500);
                }
            });
        },
        receive: function (callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue(name, conf.queue);
                    console.log(new Date().toString() + ' Queue ' + name + ' initialized');
                    ch.consume(name, function (msg) {
                        console.log(new Date().toString() + " Received %s", msg.content.toString());
                        if (callback) {
                            callback(JSON.parse(msg.content.toString()), msg.fields.routingKey)
                        }
                        if (conf.consumer.noAck === false) {
                            ch.ack(msg);
                        }
                    }, conf.consumer);
                });
            });
        }
    };
};

module.exports = Queue;

We also want to emit messages using an exchange

Emiter:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
$exchange = Builder::exchange('process.log', $server);
$exchange->emit("xxx.log", "aaaa");
$exchange->emit("xxx.log", ["11", "aaaa"]);
$exchange->emit("yyy.log", "aaaa");
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var exchange = builder.exchange('process.log', server);

exchange.emit("xxx.log", "aaaa");
exchange.emit("xxx.log", ["11", "aaaa"]);
exchange.emit("yyy.log", "aaaa");

and receiver:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::exchange('process.log', $server)->receive("yyy.log", function ($routingKey, $data) {
    error_log($routingKey." - ".json_encode($data));
});
var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var exchange = builder.exchange('process.log', server);

exchange.receive("yyy.log", function (routingKey, data) {
    console.log(routingKey, data);
});

And that’s the PHP implementation:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Exchange
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    public function emit($routingKey, $data = null)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $conf = $this->conf['exchange'];
        $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
            $conf['internal'], $conf['nowait']);
        $msg = new AMQPMessage(json_encode($data), [
            'delivery_mode' => 2, # make message persistent
        ]);
        $channel->basic_publish($msg, $this->name, $routingKey);
        $channel->close();
        $connection->close();
    }
    public function receive($bindingKey, callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $conf = $this->conf['exchange'];
        $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
            $conf['internal'], $conf['nowait']);
        $queueConf = $this->conf['queue'];
        list($queue_name, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
            $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
        $channel->queue_bind($queue_name, $this->name, $bindingKey);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($queue_name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'], $consumerConf['nowait'],
            function ($msg) use ($callback) {
                call_user_func($callback, $msg->delivery_info['routing_key'], json_decode($msg->body, true));
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.':'.$msg->delivery_info['routing_key'].'::', $msg->body, "\n";
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            });
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] Exchange '{$this->name}' initialized \n";
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}

And node:

var amqp = require('amqplib/callback_api');

var Exchange = function (name, conf) {
    return {
        emit: function (routingKey, data, close = true) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    var msg = JSON.stringify(data);
                    ch.assertExchange(name, 'topic', conf.exchange);
                    ch.publish(name, routingKey, new Buffer(msg));
                });
                if (close) {
                    setTimeout(function () {
                        conn.close();
                        process.exit(0)
                    }, 500);
                }
            });
        },
        receive: function (bindingKey, callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertExchange(name, 'topic', conf.exchange);
                    console.log(new Date().toString() + ' Exchange ' + name + ' initialized');
                    ch.assertQueue('', conf.queue, function (err, q) {

                        ch.bindQueue(q.queue, name, bindingKey);

                        ch.consume(q.queue, function (msg) {
                            console.log(new Date().toString(), name, ":", msg.fields.routingKey, "::", msg.content.toString());
                            if (callback) {
                                callback(msg.fields.routingKey, JSON.parse(msg.content.toString()))
                            }
                            if (conf.consumer.noAck === false) {
                                ch.ack(msg);
                            }
                        }, conf.consumer);
                    });
                });
            });
        }
    };
};

module.exports = Exchange;

Finally we want to use RPC commands. In fact RPC implementations is similar than Queue but in this case client will receive an answer.

Client side

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
echo Builder::rpc('rpc.hello', $server)->call("Gonzalo", "Ayuso");
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var rpc = builder.rpc('rpc.hello', server);
rpc.call("Gonzalo", "Ayuso", function (data) {
    console.log(data);
});

Server side:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::rpc('rpc.hello', $server)->server(function ($name, $surname) use ($server) {
    return "Hello {$name} {$surname}";
});
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var rpc = builder.rpc('rpc.hello', server);

rpc.server(function (name, surname) {
    return "Hello " + name + " " + surname;
});

And Implementations:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RPC
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    public function call()
    {
        $params = (array)func_get_args();
        $response = null;
        $corr_id = uniqid();
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $queueConf = $this->conf['queue'];
        list($callback_queue, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
            $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($callback_queue, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'], $consumerConf['nowait'], function ($rep) use (&$corr_id, &$response) {
                if ($rep->get('correlation_id') == $corr_id) {
                    $response = $rep->body;
                }
            });
        $msg = new AMQPMessage(json_encode($params), [
            'correlation_id' => $corr_id,
            'reply_to'       => $callback_queue,
        ]);
        $channel->basic_publish($msg, '', $this->name);
        while (!$response) {
            $channel->wait();
        }
        return json_decode($response, true);
    }
    public function server(callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $queueConf = $this->conf['queue'];
        $channel->queue_declare($this->name, $queueConf['passive'], $queueConf['durable'], $queueConf['exclusive'],
            $queueConf['auto_delete'], $queueConf['nowait']);
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] RPC server '{$this->name}' initialized \n";
        $channel->basic_qos(null, 1, null);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($this->name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'],
            $consumerConf['nowait'], function ($req) use ($callback) {
                $response = json_encode(call_user_func_array($callback, array_values(json_decode($req->body, true))));
                $msg = new AMQPMessage($response, [
                    'correlation_id' => $req->get('correlation_id'),
                    'delivery_mode'  => 2, # make message persistent
                ]);
                $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
                $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.":: req => '{$req->body}' response=> '{$response}'\n";
            });
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
var amqp = require('amqplib/callback_api');

var RPC = function (name, conf) {
    var generateUuid = function () {
        return Math.random().toString() +
            Math.random().toString() +
            Math.random().toString();
    };

    return {
        call: function () {
            var params = [];
            for (i = 0; i < arguments.length - 1; i++) {
                params.push(arguments[i]);
            }
            var callback = arguments[arguments.length - 1];

            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue('', conf.queue, function (err, q) {
                        var corr = generateUuid();

                        ch.consume(q.queue, function (msg) {
                            if (msg.properties.correlationId == corr) {
                                callback(JSON.parse(msg.content.toString()));
                                setTimeout(function () {
                                    conn.close();
                                    process.exit(0)
                                }, 500);
                            }
                        }, conf.consumer);
                        ch.sendToQueue(name,
                            new Buffer(JSON.stringify(params)),
                            {correlationId: corr, replyTo: q.queue});
                    });
                });
            });
        },
        server: function (callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue(name, conf.queue);
                    console.log(new Date().toString() + ' RPC ' + name + ' initialized');
                    ch.prefetch(1);
                    ch.consume(name, function reply(msg) {
                        console.log(new Date().toString(), msg.fields.routingKey, " :: ", msg.content.toString());
                        var response = JSON.stringify(callback.apply(this, JSON.parse(msg.content.toString())));
                        ch.sendToQueue(msg.properties.replyTo,
                            new Buffer(response),
                            {correlationId: msg.properties.correlationId});

                        ch.ack(msg);

                    }, conf.consumer);
                });
            });
        }
    };
};

module.exports = RPC;

You can see whole projects at github: RabbitMQ-php, RabbitMQ-node

Playing with Docker, Silex, Python, Node and WebSockets

I’m learning Docker. In this post I want to share a little experiment that I have done. I know the code looks like over-engineering but it’s just an excuse to build something with docker and containers. Let me explain it a little bit.

The idea is build a Time clock in the browser. Something like this:

Clock

Yes I know. We can do it only with js, css and html but we want to hack a little bit more. The idea is to create:

  • A Silex/PHP frontend
  • A WebSocket server with socket.io/node
  • A Python script to obtain the current time

WebSocket server will open 2 ports: One port to serve webSockets (socket.io) and another one as a http server (express). Python script will get the current time and it’ll send it to the webSocket server. Finally one frontend(silex) will be listening to WebSocket’s event and it will render the current time.

That’s the WebSocket server (with socket.io and express)

var
    express = require('express'),
    expressApp = express(),
    server = require('http').Server(expressApp),
    io = require('socket.io')(server, {origins: 'localhost:*'})
    ;

expressApp.get('/tic', function (req, res) {
    io.sockets.emit('time', req.query.time);
    res.json('OK');
});

expressApp.listen(6400, '0.0.0.0');

server.listen(8080);

That’s our Python script

from time import gmtime, strftime, sleep
import httplib2

h = httplib2.Http()
while True:
    (resp, content) = h.request("http://node:6400/tic?time=" + strftime("%H:%M:%S", gmtime()))
    sleep(1)

And our Silex frontend

use Silex\Application;
use Silex\Provider\TwigServiceProvider;

$app = new Application(['debug' => true]);
$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app->get("/", function (Application $app) {
    return $app['twig']->render('index.twig', []);
});

$app->run();

using this twig template

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Docker example</title>
    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
    <link href="css/app.css" rel="stylesheet">
    <script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script>
    <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
</head>
<body>
<div class="site-wrapper">
    <div class="site-wrapper-inner">
        <div class="cover-container">
            <div class="inner cover">
                <h1 class="cover-heading">
                    <div id="display">
                        display
                    </div>
                </h1>
            </div>
        </div>
    </div>
</div>
<script src="//localhost:8080/socket.io/socket.io.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.12.4/jquery.min.js"></script>
<script>
var socket = io.connect('//localhost:8080');

$(function () {
    socket.on('time', function (data) {
        $('#display').html(data);
    });
});
</script>
</body>
</html>

The idea is to use one Docker container for each process. I like to have all the code in one place so all containers will share the same volume with source code.

First the node container (WebSocket server)

FROM node:argon

RUN mkdir -p /mnt/src
WORKDIR /mnt/src/node

EXPOSE 8080 6400

Now the python container

FROM python:2

RUN pip install httplib2

RUN mkdir -p /mnt/src
WORKDIR /mnt/src/python

And finally Frontend contailer (apache2 with Ubuntu 16.04)

FROM ubuntu:16.04

RUN locale-gen es_ES.UTF-8
RUN update-locale LANG=es_ES.UTF-8
ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update -y
RUN apt-get install --no-install-recommends -y apache2 php libapache2-mod-php
RUN apt-get clean -y

COPY ./apache2/sites-available/000-default.conf /etc/apache2/sites-available/000-default.conf

RUN mkdir -p /mnt/src

RUN a2enmod rewrite
RUN a2enmod proxy
RUN a2enmod mpm_prefork

RUN chown -R www-data:www-data /mnt/src
ENV APACHE_RUN_USER www-data
ENV APACHE_RUN_GROUP www-data
ENV APACHE_LOG_DIR /var/log/apache2
ENV APACHE_LOCK_DIR /var/lock/apache2
ENV APACHE_PID_FILE /var/run/apache2/apache2.pid
ENV APACHE_SERVERADMIN admin@localhost
ENV APACHE_SERVERNAME localhost

EXPOSE 80

Now we’ve got the three containers but we want to use all together. We’ll use a docker-compose.yml file. The web container will expose port 80 and node container 8080. Node container also opens 6400 but this port is an internal port. We don’t need to access to this port outside. Only Python container needs to access to this port. Because of that 6400 is not mapped to any port in docker-compose

version: '2'

services:
  web:
    image: gonzalo123/example_web
    container_name: example_web
    ports:
     - "80:80"
    restart: always
    depends_on:
      - node
    build:
      context: ./images/php
      dockerfile: Dockerfile
    entrypoint:
      - /usr/sbin/apache2
      - -D
      - FOREGROUND
    volumes:
     - ./src:/mnt/src

  node:
    image: gonzalo123/example_node
    container_name: example_node
    ports:
     - "8080:8080"
    restart: always
    build:
      context: ./images/node
      dockerfile: Dockerfile
    entrypoint:
      - npm
      - start
    volumes:
     - ./src:/mnt/src

  python:
      image: gonzalo123/example_python
      container_name: example_python
      restart: always
      depends_on:
        - node
      build:
        context: ./images/python
        dockerfile: Dockerfile
      entrypoint:
        - python
        - tic.py
      volumes:
       - ./src:/mnt/src

And that’s all. We only need to start our containers

docker-compose up --build -d

and open our browser at: http://localhost to see our Time clock

Full source code available within my github account

Playing with arduino, IoT, crossbar and websockets

Yes. Finally I’ve got an arduino board. It’s time to hack a little bit. Today I want to try different things. I want to display in a webpage one value from my arduino board. For example one analog data using a potentiometer. Let’s start.

We are going to use one potentiometer. A potentiometer is a resistor with a rotating contact that forms an adjustable voltage divider. It has three pins. If we connect one pin to 5V power source of our arduino, another one to the ground and another to one A0 (analog input 0), we can read different values depending on the position of potentiometer’s rotating contact.

arduino_analog

Arduino has 10 bit analog resolution. That means 1024 possible values, from 0 to 1023. So when our potentiometer gives us 5 volts we’ll obtain 1024 and when our it gives us 0V we’ll read 0. Here we can see a simple arduino program to read this analog input and send data via serial port:

int mem;

void setup() {
  Serial.begin(9600);
}

void loop() {
  int value = analogRead(A0);
  if (value != mem) {
    Serial.println(value);
  }
  mem = value;

  delay(100);
}

This program is simple loop with a delay of 100 milliseconds that reads A0 and if value is different than previously read (to avoid sending the same value when nobody is touching the potentiometer) we send the value via serial port (with 9600 bauds)

We can test our program using the serial monitor of our arduino IDE our using another serial monitor.

Now we’re going to create one script to read this serial port data. We’re going to use Python. I’ll use my laptop and my serial port is /dev/tty.usbmodem14231

import serial

arduino = serial.Serial('/dev/tty.usbmodem14231', 9600)

while 1:
  print arduino.readline().strip()

Basically we’ve got our backend running. Now we can create a simple frontend.

...
<div id='display'></div>
...

We’ll need websockets. I normally use socket.io but today I’ll use Crossbar.io. Since I hear about it in a Ronny’s talk at deSymfony conference I wanted to use it.

I’ll change a little bit our backend to emit one event

import serial
from os import environ
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import LoopingCall
from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner

arduino = serial.Serial('/dev/tty.usbmodem14231', 9600)

class SeriaReader(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        def publish():
            return self.publish(u'iot.serial.reader', arduino.readline().strip())

        yield LoopingCall(publish).start(0.1)

if __name__ == '__main__':
    runner = ApplicationRunner(environ.get("GONZALO_ROUTER", u"ws://127.0.0.1:8080/ws"), u"iot")
    runner.run(SeriaReader)

Now I only need to create a crossbar.io server. I will use node to do it

var autobahn = require('autobahn'),
    connection = new autobahn.Connection({
            url: 'ws://0.0.0.0:8080/ws',
            realm: 'iot'
        }
    );

connection.open();

And now we only need to connect our frontend to the websocket server

$(function () {
    var connection = new autobahn.Connection({
        url: "ws://192.168.1.104:8080/ws",
        realm: "iot"
    });

    connection.onopen = function (session) {
        session.subscribe('iot.serial.reader', function (args) {
            $('#display').html(args[0]);
        });
    };

    connection.open();
});

It works but thre’s a problem. The first time we connect with our browser we won’t see the display value until we change the position of the potentiometer. That’s because ‘iot.serial.reader’ event is only emitted when potentiometer changes. No change means no new value. To solve this problem we only need to change a little bit our crossbar.io server. We’ll “memorize” the last value and we’ll expose one method ‘iot.serial.get’ to ask about this value

var autobahn = require('autobahn'),
    connection = new autobahn.Connection({
            url: 'ws://0.0.0.0:8080/ws',
            realm: 'iot'
        }
    ),
    mem;

connection.onopen = function (session) {
    session.register('iot.serial.get', function () {
        return mem;
    });

    session.subscribe('iot.serial.reader', function (args) {
        mem = args[0];
    });
};

connection.open();

An now in the frontend we ask for ‘iot.serial.get’ when we connect to the socket

$(function () {
    var connection = new autobahn.Connection({
        url: "ws://192.168.1.104:8080/ws",
        realm: "iot"
    });

    connection.onopen = function (session) {
        session.subscribe('iot.serial.reader', function (args) {
            $('#display').html(args[0]);
        }).then(function () {
                session.call('iot.serial.get').then(
                    function (result) {
                        $('#display').htmlresult);
                    }
                );
            }
        );
    };
    connection.open();
});

And thats all. The source code is available in my github account. You also can see a demo of the working prototype here

Encrypt Websocket (socket.io) communications

I’m a big fan of WebSockets and socket.io. I’ve written a lot of about it. In last posts I’ve written about socket.io and authentication. Today we’re going to speak about communications.

Imagine we’ve got a websocket server and we connect our application to this server (even using https/wss). If we open our browser’s console we can inspect our WebSocket communications. We also can enable debugging. This works in a similar way than when we start the promiscuous mode within our network interface. We will see every packets. Not only the packets that server is sending to us.

If we send send sensitive information over websockets, that means than one logged user can see another ones information. We can separate namespaces in our socket.io server. We also can do another thing: Encrypt communications using crypto-js.

I’ve created one small wrapper to use it with socket.io.
We can install our server dependency

npm g-crypt

And install our client dependency with bower

bower install g-crypt

And use it in our server

var io = require('socket.io')(3000),
    Crypt = require("g-crypt"),
    passphrase = 'super-secret-passphrase',
    crypter = Crypt(passphrase);

io.on('connection', function (socket) {
    socket.on('counter', function (data) {
        var decriptedData = crypter.decrypt(data);
        setTimeout(function () {
            console.log("counter status: " + decriptedData.id);
            decriptedData.id++;
            socket.emit('counter', crypter.encrypt(decriptedData));
        }, 1000);
    });
});

And now a simple HTTP application

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
Open console to see the messages

<script src="http://localhost:3000/socket.io/socket.io.js"></script>
<script src="assets/cryptojslib/rollups/aes.js"></script>
<script src="assets/g-crypt/src/Crypt.js"></script>
<script>
    var socket = io('http://localhost:3000/'),
        passphrase = 'super-secret-passphrase',
        crypter = Crypt(passphrase),
        id = 0;

    socket.on('connect', function () {
        console.log("connected! Let's start the counter with: " + id);
        socket.emit('counter', crypter.encrypt({id: id}));
    });

    socket.on('counter', function (data) {
        var decriptedData = crypter.decrypt(data);
        console.log("counter status: " + decriptedData.id);
        socket.emit('counter', crypter.encrypt({id: decriptedData.id}));
    });
</script>

</body>
</html>

Now our communications are encrypted and logged user cannot read another ones data.

Library is a simple wrapper

Crypt = function (passphrase) {
    "use strict";
    var pass = passphrase;
    var CryptoJSAesJson = {
        parse: function (jsonStr) {
            var j = JSON.parse(jsonStr);
            var cipherParams = CryptoJS.lib.CipherParams.create({ciphertext: CryptoJS.enc.Base64.parse(j.ct)});
            if (j.iv) cipherParams.iv = CryptoJS.enc.Hex.parse(j.iv);
            if (j.s) cipherParams.salt = CryptoJS.enc.Hex.parse(j.s);
            return cipherParams;
        },
        stringify: function (cipherParams) {
            var j = {ct: cipherParams.ciphertext.toString(CryptoJS.enc.Base64)};
            if (cipherParams.iv) j.iv = cipherParams.iv.toString();
            if (cipherParams.salt) j.s = cipherParams.salt.toString();
            return JSON.stringify(j);
        }
    };

    return {
        decrypt: function (data) {
            return JSON.parse(CryptoJS.AES.decrypt(data, pass, {format: CryptoJSAesJson}).toString(CryptoJS.enc.Utf8));
        },
        encrypt: function (data) {
            return CryptoJS.AES.encrypt(JSON.stringify(data), pass, {format: CryptoJSAesJson}).toString();
        }
    };
};

if (typeof module !== 'undefined' && typeof module.exports !== 'undefined') {
    CryptoJS = require("crypto-js");
    module.exports = Crypt;
} else {
    window.Crypt = Crypt;
}

Library available in my github and also we can use it using npm and bower.

Home automation pet project. Playing with IoT, temperature sensors, fans and Telegram bots

Summer holidays are over. Besides my bush walks I’ve been also hacking a little bit with one idea that I had in mind. Summer means high temperatures and I wanted to control my fan. For example turn on the fan when temperature is over a threshold. I can do it using an Arduino board and a temperature sensor, but I don’t have the one Arduino board. I have several devices. For example a Wemo switch. With this device connected to my Wifi network I can switch on and off my fan remotely from my mobile phone (using its android app) or even from my Pebble watch using the API. I also have a BeeWi temperature/humidity sensor. It’s a BTLE device. It comes with its own app for android, but there’s also a API. Yes. I known that one Arduino board with a couple of sensors can be cheaper than one of this devices, but when I’m a shop and I’ve got one of this devices in my hands I cannot resist.

I also have a new Raspberry pi 3. I’ve recently upgraded my home multimedia server from a rpi2 to the new rpi3. Basically I use it as multimedia server and now also as retro console. This new rpi3 has Bluetooth so I wanted to do something with it. Read temperature from the Bluetooth sensor sounds good so I started to hack a little bit.

I found this post. I started working with Python. The script almost works but it uses Bluetooth connection and as someone said in the comments it uses a lot of battery. So I switched to a BTLE version. I found a simple node library to connect BTLE devices called noble, really simple to use. In one afternoon I had one small script ready. The idea was put this script in my RP3’s crontab, and scan the temperature each minute (via noble) and if the temperature was over a threshold switch on the wemo device (via ouimeaux). I also wanted to be informed when my fan is switch on and off. The most easier way to do it was via Telegram (I already knew telebot library).

var noble = require('noble'),
    Wemo = require('wemo-client'),
    TeleBot = require('telebot'),
    fs = require('fs'),
    beeWiData,
    wemo,
    threshold,
    address,
    bot,
    chatId,
    wemoDevice,
    configuration,
    confPath;

if (process.argv.length <= 2) {
    console.log("Usage: " + __filename + " conf.json");
    process.exit(-1);
}

confPath = process.argv[2];
try {
    configuration = JSON.parse(
        fs.readFileSync(process.argv[2])
    );
} catch (e) {
    console.log("configuration file not valid");
    process.exit(-1);
}

bot = new TeleBot(configuration.telegramBotAPIKey);
address = configuration.beeWiAddress;
threshold = configuration.threshold;
wemoDevice = configuration.wemoDevice;
chatId = configuration.telegramChatId;

function persists() {
    configuration.beeWiData = beeWiData;
    fs.writeFileSync(confPath, JSON.stringify(configuration));
}

function setSwitchState(state, callback) {
    wemo = new Wemo();
    wemo.discover(function(deviceInfo) {
        if (deviceInfo.friendlyName == wemoDevice) {
            console.log("device found:", deviceInfo.friendlyName, "setting the state to", state);
            var client = wemo.client(deviceInfo);
            client.on('binaryState', function(value) {
                callback();
            });

            client.on('statusChange', function(a) {
                console.log("statusChange", a);
            });
            client.setBinaryState(state);
        }
    });
}

beeWiData = {temperature: undefined, humidity: undefined, batery: undefined};

function hexToInt(hex) {
    if (hex.length % 2 !== 0) {
        hex = "0" + hex;
    }
    var num = parseInt(hex, 16);
    var maxVal = Math.pow(2, hex.length / 2 * 8);
    if (num > maxVal / 2 - 1) {
        num = num - maxVal;
    }
    return num;
}

noble.on('stateChange', function(state) {
    if (state === 'poweredOn') {
        noble.stopScanning();
        noble.startScanning();
    } else {
        noble.stopScanning();
    }
});

noble.on('scanStop', function() {
    var message, state;
    if (beeWiData.temperature > threshold) {
        state = 1;
        message = "temperature (" + beeWiData.temperature + ") over threshold (" + threshold + "). Fan ON. Humidity: " + beeWiData.humidity;
    } else {
        message = "temperature (" + beeWiData.temperature + ") under threshold (" + threshold + "). Fan OFF. Humidity: " + beeWiData.humidity;
        state = 0;
    }
    setSwitchState(state, function() {
        if (configuration.beeWiData.hasOwnProperty('temperature') && configuration.beeWiData.temperature < threshold && state === 1 || configuration.beeWiData.temperature > threshold && state === 0) {
            console.log("Notify to telegram bot", message);
            bot.sendMessage(chatId, message).then(function() {
                process.exit(0);
            }, function(e) {
                console.error(e);
                process.exit(0);
            });
            persists();
        } else {
            console.log(message);
            persists();
            process.exit(0);
        }
    });
});

noble.on('discover', function(peripheral) {
    if (peripheral.address == address) {
        var data = peripheral.advertisement.manufacturerData.toString('hex');
        beeWiData.temperature = parseFloat(hexToInt(data.substr(10, 2)+data.substr(8, 2))/10).toFixed(1);
        beeWiData.humidity = Math.min(100,parseInt(data.substr(14, 2),16));
        beeWiData.batery = parseInt(data.substr(24, 2),16);
        beeWiData.date = new Date();
        noble.stopScanning();
    }
});

setTimeout(function() {
    console.error("timeout exceded!");
    process.exit(0);
}, 5000);

The script is here.

It works but I wanted to keep on hacking. One Sunday morning I read this post. I don’t have an amazon button, but I wanted to do something similar. I started to play with scapy library sniffing ARP packets in my home network. I realize that I can detect when my Kindle connects to the network, my tv, or even my mobile phone. Then I had one I idea: Detect when my mobile phone connects to my wifi. My mobile phone connects to my wifi before I enter in my house so my idea was simple: Detect when I’m close to my home’s door and send me a telegram message saying “Wellcome home” in addition to the temperature inside my house at this moment.

#!/usr/bin/env python

import sys
from scapy.all import *
import telebot
import gearman
import json
from StringIO import StringIO

BUFFER_SIZE = 1024

try:
    with open(sys.argv[1]) as data_file:
        data = json.load(data_file)
        myPhone = data['myPhone']
        routerIP = data['routerIP']
        TOKEN = data['telegramBotAPIKey']
        chatID = data['telegramChatId']
        gearmanServer = data['gearmanServer']
except:
    print("Unexpected error:", sys.exc_info()[0])
    raise

def getSensorData():
    gm_client = gearman.GearmanClient([gearmanServer])
    completed_job_request = gm_client.submit_job("temp", '')
    io = StringIO(completed_job_request.result)

    return json.load(io)

tb = telebot.TeleBot(TOKEN)

def arp_display(pkt):
    if pkt[ARP].op == 1 and pkt[ARP].hwsrc == myPhone and pkt[ARP].pdst == routerIP:
        sensorData = getSensorData()
        message = "Wellcome home Gonzalo! Temperature: %s humidity: %s" % (sensorData['temperature'], sensorData['humidity'])
        tb.send_message(chatID, message)
        print message

print sniff(prn=arp_display, filter='arp', store=0)

I have one node script to read temperature and one Python script to sniff my network. I can find how to read temperature from Python and use only one script but I was lazy (remember that I was on holiday) so I turned the node script that reads temperature into a gearman worker.

var noble = require('noble'),
    fs = require('fs'),
    Gearman = require('node-gearman'),
    beeWiData,
    address,
    bot,
    configuration,
    confPath,
    status,
    callback;

var gearman = new Gearman();

if (process.argv.length <= 2) {
    console.log("Usage: " + __filename + " conf.json");
    process.exit(-1);
}

confPath = process.argv[2];
try {
    configuration = JSON.parse(
        fs.readFileSync(process.argv[2])
    );
} catch (e) {
    console.log("configuration file not valid", e);
    process.exit(-1);
}

address = configuration.beeWiAddress;
delay = configuration.tempServerDelayMinutes * 60 * 1000;
tcpPort = configuration.tempServerPort;

beeWiData = {};

function hexToInt(hex) {
    if (hex.length % 2 !== 0) {
        hex = "0" + hex;
    }
    var num = parseInt(hex, 16);
    var maxVal = Math.pow(2, hex.length / 2 * 8);
    if (num > maxVal / 2 - 1) {
        num = num - maxVal;
    }
    return num;
}

noble.on('stateChange', function(state) {
    if (state === 'poweredOn') {
        console.log("stateChange:poweredOn");
        status = true;
    } else {
        status = false;
    }
});

noble.on('discover', function(peripheral) {
    if (peripheral.address == address) {
        var data = peripheral.advertisement.manufacturerData.toString('hex');
        beeWiData.temperature = parseFloat(hexToInt(data.substr(10, 2)+data.substr(8, 2))/10).toFixed(1);
        beeWiData.humidity = Math.min(100,parseInt(data.substr(14, 2),16));
        beeWiData.batery = parseInt(data.substr(24, 2),16);
        beeWiData.date = new Date();
        noble.stopScanning();
    }
});

noble.on('scanStop', function() {
    console.log(beeWiData);
    noble.stopScanning();
    callback();
});

var worker;

function workerCallback(payload, worker) {
    callback = function() {
        worker.end(JSON.stringify(beeWiData));
    }

    beeWiData = {temperature: undefined, humidity: undefined, batery: undefined};

    if (status) {
        noble.stopScanning();
        noble.startScanning();
    } else {
        setInterval(function() {
            workerCallback(payload, worker);
        }, 1000);
    }
}

gearman.registerWorker("temp", workerCallback);

Now I only need to call this worker from my Python sniffer and thats all.

I wanted to play a little bit. I also wanted to ask the temperature on demand. Since I was using Telegram I had an idea. Create a Telegram bot running in my RP3. And that’s my summer pet project. Basically it has three parts:

worker.js
It’s a gearman worker. It reads temperature and humidity from my BeeWi sensor via BTLE

bot.py
It’s a Telegram bot with the following commands available:

/switchInfo: get switch info
/switchOFF: switch OFF the switch
/help: Gives you information about the available commands
/temp: Get temperature
/switchON: switch ON the switch

sniff.py
It’s just a ARP sniffer. It detects when I’m close to my home and sends me a message via Telegram with the temperature. It detects when my mobile phone sends a ARP package to my router (aka when I connect to my Wifi). It happens before I enter in my house, so the Telegram message arrives before I put the key in the door 🙂

I run al my scripts in my Raspberry Pi3. To ensure all scripts are up an running I use supervisor

All the scripts are available in my github account

Notify events from PostgreSQL to external listeners

Sometimes we need to call external programs from our PostgreSQL database. We can send sockets from SQL statements. I’ve written about it. The problem with this approach the following one. If user rollbacks the transaction the socket has been already emitted. That’s a problem (or not. Depending on our application). Nobody also guarantees that the process behind the socket server has access to the data of the transaction. If we’re very fast maybe the transaction isn’t commited yet. We can use one sleep function but sleep functions are always a bad idea. PostgreSQL gives us another tool to decouple processes: LISTEN and NOTIFY.

Let me show you and example. First we create a table:

CREATE TABLE PUBLIC.TBLEXAMPLE
(
  KEY1 CHARACTER VARYING(10) NOT NULL,
  KEY2 CHARACTER VARYING(14) NOT NULL,

  VALUE1 CHARACTER VARYING(20),
  VALUE2 CHARACTER VARYING(20) NOT NULL,

  CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
)

Now we add a “after insert” trigger to our table

CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER INSERT
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();

And now, within the trigger function, we send a notify event (‘myEvent’ in this case) with the row information. We need to send plain text in the notify event so we’ll use JSON to encode our row data.

CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
  PERFORM pg_notify('myEvent', row_to_json(NEW)::text);
  RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;

Now we’re going to build a server side example that connects to our PostgreSQL database and listen to the event. In this case we’re going to use nodejs to build the prototype. This example also will enqueue events into a gearman server.

var pg = require('pg'),
    gearmanode = require('gearmanode'),
    gearmanClient,
    conString = 'tcp://username:password@localhost:5432/gonzalo',
    pgClient;

gearmanode.Client.logger.transports.console.level = 'error';

gearmanClient = gearmanode.client();

console.log('LISTEN myEvent');
pgClient = new pg.Client(conString);
pgClient.connect();
pgClient.query('LISTEN myEvent');
pgClient.on('notification', function (data) {
    console.log("\033[34m" + new Date + '-\033[0m payload', data.payload);
    gearmanClient.submitJob('sms.sender.one', data.payload);
});

And that’s all. Now we only need to perform an INSERT statement into our table. This process will trigger our event and our nodejs will enqueue the process into a gearman queue.

INSERT INTO PUBLIC.TBLEXAMPLE(KEY1, KEY2, VALUE1, VALUE2) VALUES ('k1', 'k2', 'v1', 'v2');

It’s good to remark that if our insert statement is inside a transaction and we rollback it, notify won’t send any message.

Sharing authentication between socket.io and a PHP frontend

Normally, when I work with websockets, my stack is a socket.io server and a Silex frontend. Protect a PHP frontend with one kind of authentication of another is pretty straightforward. But if we want to use websockets, we need to set up another server and if we protect our frontend we need to protect our websocket server too.

If our frontend is node too (express for example), sharing authentication is more easy but at this time we we want to use two different servers (a node server and a PHP server). I’ve written about it too but today we`ll see another solution. Let’s start.

Imagine we have this simple Silex application. It has three routes:

  • “/” a public route
  • “/login” to perform the login action
  • “/private” a private route. If we try to get here without a valid session we’ll get a 403 error

And this is the code. It’s basically one example using sessions taken from Silex documentation:

use Silex\Application;
use Silex\Provider\SessionServiceProvider;
use Silex\Provider\TwigServiceProvider;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;

$app = new Application();

$app->register(new SessionServiceProvider());
$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app->get('/', function (Application $app) {
    return $app['twig']->render('home.twig');
});

$app->get('/login', function () use ($app) {
    $username = $app['request']->server->get('PHP_AUTH_USER', false);
    $password = $app['request']->server->get('PHP_AUTH_PW');

    if ('gonzalo' === $username && 'password' === $password) {
        $app['session']->set('user', ['username' => $username]);

        return $app->redirect('/private');
    }

    $response = new Response();
    $response->headers->set('WWW-Authenticate', sprintf('Basic realm="%s"', 'site_login'));
    $response->setStatusCode(401, 'Please sign in.');

    return $response;
});

$app->get('/private', function () use ($app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app['twig']->render('private.twig', [
        'username'  => $user['username']
    ]);
});

$app->run();

Our “/private” route also creates a connection with our websocket server.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
Welcome {{ username }}!

<script src="http://localhost:3000/socket.io/socket.io.js"></script>
<script>
    var socket = io('http://localhost:3000/');
    socket.on('connect', function () {
        console.log("connected!");
    });
    socket.on('disconnect', function () {
        console.log("disconnected!");
    });
</script>

</body>
</html>

And that’s our socket.io server. A really simple one.

var io = require('socket.io')(3000);

It works. Our frontend is protected. We need to login with our credentials (in this example “gonzalo/password”), but everyone can connect to our socket.io server. The idea is to use our PHP session to protect our socket.io server too. In fact is very easy how to do it. First we need to pass our PHPSESSID to our socket.io server. To do it, when we perform our socket.io connection in the frontend, we pass our session id

<script>
    var socket = io('http://localhost:3000/', {
        query: 'token={{ sessionId }}'
    });
    socket.on('connect', function () {
        console.log("connected!");
    });
    socket.on('disconnect', function () {
        console.log("disconnect!");
    });
</script>

As well as we’re using a twig template we need to pass sessionId variable

$app->get('/private', function () use ($app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app['twig']->render('private.twig', [
        'username'  => $user['username'],
        'sessionId' => $app['session']->getId()
    ]);
});

Now we only need to validate the token before stabilising connection. Socket.io provides us a middleware to perform those kind of operations. In this example we’re using PHP sessions out of the box. How can we validate it? The answer is easy. We only need to create a http client (in the socket.io server) and perform a request to a protected route (we’ll use “/private”). If we’re using a different provider to store our sessions (I hope you aren’t using Memcached to store PHP session, indeed) you’ll need to validate our sessionId against your provider.

var io = require('socket.io')(3000),
    http = require('http');

io.use(function (socket, next) {
    var options = {
        host: 'localhost',
        port: 8080,
        path: '/private',
        headers: {Cookie: 'PHPSESSID=' + socket.handshake.query.token}
    };

    http.request(options, function (response) {
        response.on('error', function () {
            next(new Error("not authorized"));
        }).on('data', function () {
            next();
        });
    }).end();
});

io.on('connection', function () {
    console.log("connected!");
});

Ok. This example works but we’re generating dynamically a js file injecting our PHPSESSID. If we want to extract the sessionId from the request we can use document.cookie but sometimes it doesn’t work. That’s because HttpOnly. HttpOnly is our friend if we want to protect our cookies against XSS attacks but in this case our protection difficults our task.

We can solve this problem performing a simple request to our server. We’ll create a new route (a private route) called ‘getSessionID’ that gives us our sessionId.

$app->get('/getSessionID', function (Application $app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app->json($app['session']->getId());
});

So before establishing the websocket we just need to create a GET request to our new route to obtain the sessionID.

var io = require('socket.io')(3000),
    http = require('http');

io.use(function (socket, next) {
    var sessionId = socket.handshake.query.token,
        options = {
            host: 'localhost',
            port: 8080,
            path: '/getSessionID',
            headers: {Cookie: 'PHPSESSID=' + sessionId}
        };

    http.request(options, function (response) {
        response.on('error', function () {
            next(new Error("not authorized"));
        });
        response.on('data', function (chunk) {
            var sessionIdFromRequest;
            try {
                sessionIdFromRequest = JSON.parse(chunk.toString());
            } catch (e) {
                next(new Error("not authorized"));
            }

            if (sessionId == sessionIdFromRequest) {
                next();
            } else {
                next(new Error("not authorized"));
            }
        });
    }).end();
});

io.on('connection', function (socket) {
    setInterval(function() {
        socket.emit('hello', {hello: 'world'});
    }, 1000);
});

And thats all. You can see the full example in my github account.

Working with Ionic and PHP Backends. Remote debugging with PHP7 and Xdebug working with real devices

Sometimes I speak with PHP developers and they don’t use remote debugging in their development environments. Some people don’t like to use remote debugging. They prefer to use TDD and rely on the unit tests. That’s a good point of view, but sometimes they don’t use remote debugging only because they don’t know how to do it, and that’s inadmissible. Remote debugger is a powerful tool especially to handle with legacy applications. I’ve using xdebug for years with my linux workstation for years. This days I’m using Mac and it’s also very simple to set up xdebug here.

First we need to install PHP:

brew install php70

Then Xdebug

brew install php70-xdebug

(in a Ubuntu box we only need to use apt-get instead of brew)

Now we need to setup xdebug to enable remote debugging:
In a standard installation xdebug configuration is located at: /usr/local/etc/php/7.0/conf.d/ext-xdebug.ini

[xdebug]
zend_extension="/usr/local/opt/php70-xdebug/xdebug.so"

xdebug.remote_enable=1
xdebug.remote_port=9000
xdebug.profiler_enable=0
xdebug.profiler_output_dir="/tmp"
xdebug.idekey= "PHPSTORM"
xdebug.remote_connect_back = 1
xdebug.max_nesting_level = 250

And basically that’s all. To set/unset the cookie you can use one bookmarklet in your browser (you can generate your bookmarklets here). Or use a Chrome extension to enable xdebug.

Now se only need to start the built-in server with

php -S 0.0.0.0:8080

And remote debugging will be available
Remote debugger works this way:

  • We open on port within our IDE. In my case PHPStorm (it happens when we click on “Start listening for PHP debug connections”)
  • We set one cookie in our browser (it happens when click on Chrome extension)
  • When our server receives one request with the cookie, it connects to the port that our IDE opens (usually port 9000). If you use a personal firewall in your workstation, ensure that you allow incoming connections to this port.

Nowadays I’m involved with several projects building hybrid applications with Apache Cordova. In the Frontend I’m using ionic and Silex in the Backend. When I’m working with hybrid applications normally I go through two phases.

In the first one I build a working prototype. To to this I run a local server and I use my browser to develop the application. This phase is very similar than a traditional Web development process. If we also set up properly LiveReload, our application will be reloaded each time we change one javaScript file. Ionic framework integrates LiveReload and we only need to run:

ionic serve -l

to start our application. We also need to start our backend server. For example

php -S 0.0.0.0:8080 -t api/www

Now we can debug our Backend with remote debugger and Frontend with Chrome’s developer’s tools. Chrome also allows us to edit Frontend files and save them within the filesystem using workspaces. This phase is the easy one. But sooner or later we’ll need start working with a real device. We need a real device basically if we use plugins such as Camera plugin, Geolocation plugin, or things like that. OK there are emulators, but usually emulators don’t allow to use all plugins in the same way than we use then with a real device. Chrome also allow us to see the console logs of the device from our workstation. OK we can see all logs of our plugged Android device using “adb logcat” but follow the flow of our logs with logcat is similar than understand Matrix code. It’s a mess.

If we plug our android device to our computer and we open with Chrome:

chrome://inspect/#devices

We can see our device’s console, use breakpoints and things like that. Cool, isn’t it? Of course it only works if we compile our application without “–release” option. We can do something similar with Safary and iOS devices.

With ionic if we want to use LiveReload from the real device and not to recompile and re-install again and again our application each time we change our javaScript files, we can run the application using

ionic run android --device -l

When we’re developing our application and we’re in this phase we also need to handle with CORS. CORS isn’t a problem when we run our hybrid application in production. When we run the hybrid application with our device our “origin” is the local filesystem. That’s means CORS don’t apply, but when we run our application in the device, but served from our computer (when we use “-l” option), our origin isn’t local filesystem. So if our Backend is served from another origin we need to enable CORS.

We can enable CORS in the backend. I’ve written about it here, but ionic people allows us a easier way. We can set up a local proxy to serve our backend through the same origin than the application does and forget about CORS. Here we can read a good article about it.

Anyway if we want to start the remote debugger we need to create one cookie called XDEBUG_SESSION. In the browser we can use chrome extension, but when we inspect the plugged device isn’t so simple. It would be cool that ionic people allows us to inject cookies to our proxy server. I’ve try to see how to do it with ionic-cli. Maybe is possible but I didn’t realize how to do it. Because of that I’ve created a simple AngularJS service to inject this cookie. Then, if I start listening debug connections in my IDE I’ll be able to use remote debugger as well as I do when I work with the browser.

First we need to install service via Bower:

bower install ng-xdebugger --save

Now we need to include javaScript files

<script src="lib/angular-cookies/angular-cookies.min.js"></script>
<script src="lib/ng-xdebugger/dist/gonzalo123.xdebugger.min.js"></script>

then we add our service to the project.

angular.module("starter", ["ionic", "gonzalo123.xdebugger"])

Now we only need to configure our application and set de debugger key (it must be the same key than we use within the server-side configuration of xdebug)

.config(function (xdebuggerProvider) {
        xdebuggerProvider.setKey('PHPSTORM');
    })
})

And that’s all. The service is very simple. It only uses one http interceptor to inject the cookie in our http requests:

(function () {
    "use strict";

    angular.module("gonzalo123.xdebugger", ["ngCookies"])
        .provider("xdebugger", ['$httpProvider', function ($httpProvider) {
            var debugKey;

            this.$get = function () {
                return {
                    getDebugKey: function () {
                        return debugKey;
                    }
                };
            };

            this.setKey = function (string) {
                if (string) {
                    debugKey = string;
                    $httpProvider.interceptors.push("xdebuggerCookieInterceptor");
                }
            };
        }])

        .factory("xdebuggerCookieInterceptor", ['$cookieStore', 'xdebugger', function ($cookieStore, xdebugger) {
            return {
                response: function (response) {
                    $cookieStore.put("XDEBUG_SESSION", xdebugger.getDebugKey());

                    return response;
                }
            };
        }])
    ;
})();

And of course you can see the whole project in my github account.

Book review: Socket.IO Cookbook

Last summer I collaborated as a technical reviewer in the book “Socket.IO Cookbook” written by Tyson Cadenhead and finally I’ve got the book in my hands

I’m a big fan of real time technologies and I’m normally Socket.io user. Because of that, when people of Packt Publishing contacted me to join to the project as technical reviewer my answer was yes. I’ve got serious problems nowadays to find time to pet projects and extra activities, but if there’re WebSockets inside I cannot resists.

The book is correct and it’s a good starting point to event-based communication with JavaScript. I normally don’t like beginners books (even if I’m a beginner in the technology). I don’t like the books where author explains how to do one thing that I can see how to do it within the website of the. OK. This book isn’t one of those of books. The writer don’t assume reader is a totally newbie. Because of that newbies sometimes can be lost in some chapters, but this exactly the way we all learn new technologies. I like the way Tyson introduces concepts about socket.io.

The book is focused in JavaScript and also uses JavaScript to the backend (with node). Maybe I miss the integration with non-JavaScript environments, but as socket.io is a javascript library I understand that the usage of JavaScript in all application lifecycle is a good approach.

IMG_20151106_204902_jpg

Also those days I was reading and playing a little bit with WebRTC and the book has one chapter about it! #cool