Blog Archives

NFC tag reader with Arduino

Today I want to use the NFC tag reader module with my Arduino. The idea is build a simple prototype to read NFC tags and validate them against a remote server (for example a node tcp server). Depending on the tag we’ll trigger one digital output or another. In the example we’ll connect leds to those outputs, but in the real life we can open door or something similar.

We’ll use a MFRC522 module. It’s a cheap Mifare RFID/NFC tag reader and writer.

MFRC522 Connection:

  • sda: 10 (*) -> 8
  • sck: 13
  • Mosi: 11
  • Miso: 12
  • Rq: —
  • Gnd: Gnd
  • Rst: 9
  • 3.3V: 3.3V

In this example we’ll use a ethernet shield to connect our Arduino board to the LAN. We must take care with it. If we use ethernet shield with a MFRC522 there’s a SPI conflict (due to ethernet shield’s SD card reader). We need to use another SDA pin (here I’m using pin 8 instead of 10) and disable w5100 SPI before configure ethernet.

// disable w5100 SPI
pinMode(10, OUTPUT);
digitalWrite(10, HIGH);

Here is the Arduino code

#include <SPI.h>
#include <MFRC522.h>
#include <Ethernet.h>
#include <EthernetClient.h>

#define RST_PIN 9
#define SS_PIN  8
#define ERROR_PIN 7
#define OPEN_PIN 6
#define OPEN_DELAY 2000

char server[] = "192.168.1.104";
int port = 28001;

signed long timeout;

byte mac[] = { 0x00, 0xAA, 0xBB, 0xCC, 0xDE, 0x02 };
MFRC522 mfrc522(SS_PIN, RST_PIN);
EthernetClient client;

void printArray(byte *buffer, byte bufferSize) {
  for (byte i = 0; i < bufferSize; i++) {
    Serial.print(buffer[i] < 0x10 ? " 0" : " ");
    Serial.print(buffer[i], HEX);
  }
}

String dump_byte_array(byte *buffer, byte bufferSize) {
          String out = "";
    for (byte i = 0; i < bufferSize; i++) {
        out += String(buffer[i] < 0x10 ? " 0" : " ") + String(buffer[i], HEX);
    }
    out.toUpperCase();
    out.replace(" ", "");
    
    return out;
}

void resetLeds() {
  digitalWrite(OPEN_PIN, LOW);
  digitalWrite(ERROR_PIN, LOW);
}

void open() {
  Serial.println("OPEN!");
  digitalWrite(OPEN_PIN, HIGH);
  delay(OPEN_DELAY);
  digitalWrite(OPEN_PIN, LOW);
}

void error() {
  Serial.println("ERROR!");
  digitalWrite(ERROR_PIN, HIGH);
  delay(OPEN_DELAY);
  digitalWrite(ERROR_PIN, LOW);
}

void scanCard() {
  byte status;
  byte buffer[18];
  int err = 0;
  byte size = sizeof(buffer);
  EthernetClient c;
      
  if (mfrc522.PICC_IsNewCardPresent()) {
    if (mfrc522.PICC_ReadCardSerial()) {
      const String ID = dump_byte_array(mfrc522.uid.uidByte, mfrc522.uid.size);
      Serial.println("New tag read: " + ID);
      mfrc522.PICC_HaltA();
     
      if (client.connect(server, port)) {
        timeout = millis() + 3000;
        client.println("OPEN:" + ID);
        delay(10);

        while(client.available() == 0) {
          if (timeout - millis() < 1000) {
              error();
              goto close;
          }
        } 
        int size;
        bool status;
        
        while((size = client.available()) > 0) {
          uint8_t* msg = (uint8_t*)malloc(size);
          size = client.read(msg,size);
          //Serial.write(msg, size);
          // 4F4B   -> OK
          // 4E4F4B -> NOK
          status = dump_byte_array(msg, size) == "4F4B";
          free(msg);
        }
        
        Serial.println(status ? "OK!" : "NOK!");
        if (status) {
          open();
        } else {
          error();
        }
close:
        client.stop();
      } else {
        Serial.println("Connection Error");
        error();
      }
    }
  }
}

void setup()
{
  resetLeds();
  Serial.begin(9600);
  Serial.println("Setup ...");

  // disable w5100 SPI
  pinMode(10, OUTPUT);
  digitalWrite(10, HIGH);

  SPI.begin();
  mfrc522.PCD_Init();

  if (Ethernet.begin(mac) == 0) {
    Serial.println("DHCP Error");
    error();
    while (true) {}
  }
  Serial.print("My IP: ");
  for (byte B = 0; B < 4; B++) {
    Serial.print(Ethernet.localIP()[B], DEC);
    Serial.print(".");
  }
  Serial.println();
  Serial.println("Finish setup");
  timeout = 0;
}

void loop()
{
  resetLeds();
  scanCard();
  delay(200);
}

Now we only need to create a simple tcp server with node to validate our NFC tags.

var net = require('net');

var LOCAL_PORT = 28001;
var validTags = ['X3C86AD9'];

var validateTag = function(tag) {
    return validTags.indexOf(tag) > -1;
};

var server = net.createServer(function (socket) {
    console.log(socket.remoteAddress + ":" + socket.remotePort);
    socket.on('data', function(msg) {
        var out;
        [action, tag] = msg.toString().toUpperCase().trim().split(":");
        console.log(action, tag);
        switch (action) {
            case 'OPEN':
                out = validateTag(tag) ? "OK" : "NOK";
                console.log(out);
                socket.write(out);
                break;
            default:
                console.log("unknown action:", action);
        }

        socket.destroy();
    });
});

server.listen(LOCAL_PORT, '0.0.0.0');

And that’s all. Here a video with the working example

And full code available in my github account.

References about rfid and Arduino: here, here and here

    Advertisements

Control humidity with a Raspberry Pi and IoT devices

I’ve got a Wemo switch and a BeeWi temperature/humidity sensor. I’ve use them in previous projects. Today I want a control humidity level in a room. The idea is switch on/off a dehumidifier (plugged to Wemo switch) depending on the humidity (from BeeWi sensor). Let’s start.

I’ve got one script (node) that reads humidity from the sensor (via BTLE)

#!/usr/bin/env node
noble = require('noble');

var status = false;
var address = process.argv[2];

if (!address) {
    console.log('Usage "./reader.py <sensor mac address>"');
    process.exit();
}

function hexToInt(hex) {
    var num, maxVal;
    if (hex.length % 2 !== 0) {
        hex = "0" + hex;
    }
    num = parseInt(hex, 16);
    maxVal = Math.pow(2, hex.length / 2 * 8);
    if (num > maxVal / 2 - 1) {
        num = num - maxVal;
    }

    return num;
}

noble.on('stateChange', function(state) {
    status = (state === 'poweredOn');
});

noble.on('discover', function(peripheral) {
    if (peripheral.address == address) {
        var data = peripheral.advertisement.manufacturerData.toString('hex');
        console.log(Math.min(100,parseInt(data.substr(14, 2),16)));
        noble.stopScanning();
        process.exit();
    }
});

noble.on('scanStop', function() {
    noble.stopScanning();
});

setTimeout(function() {
    noble.stopScanning();
    noble.startScanning();
}, 3000);

Now I’ve got another script to control the switch. A Python script using ouimeaux library

#!/usr/bin/env python
from ouimeaux.environment import Environment
from subprocess import check_output
import sys
import os

threshold = 3

def action(switch):
    humidity = int(check_output(["%s/reader.js" % os.path.dirname(sys.argv[0]), sensorMac]))
    if "Switch1" == switch.name:
        botton = expected - threshold
        isOn = False if switch.get_state() == 0 else True
        log = ""

        if isOn and humidity < botton:
            switch.basicevent.SetBinaryState(BinaryState=0)
            log = "humidity < %s Switch to OFF" % botton
        elif not isOn and humidity > expected:
            switch.basicevent.SetBinaryState(BinaryState=1)
            log = "humidity > %s Switch to ON" % expected

        print "Humidity: %s Switch is OK (%s) %s" % (humidity, 'On' if isOn else 'Off', log)

if __name__ == '__main__':
    try:
        sensorMac = sys.argv[1]
        mySwitch = sys.argv[2]
        expected = int(sys.argv[3])
    except:
        print 'Usage "./dehumidifier.py <sensorMac> <switch name> <expected humidity>"'
        sys.exit()

    env = Environment(action)
    env.start()
    env.discover(seconds=3)

And that’s all. Now I only need to configure my Raspberry Pi’s crontab and run the script each minute

*/1 * * * *     /mnt/media/projects/hum/dehumidifier.py ff:ff:ff:ff:ff:ff Switch1 50

Project is available in my github account.

Nowadays I’m involved with Arduino and iot, so I wand to do something similar with cheaper Arduino stuff.

Playing with RabbitMQ, PHP and node

I need to use RabbitMQ in one project. I’m a big fan of Gearman, but I must admit Rabbit is much more powerful. In this project I need to handle with PHP code and node, so I want to build a wrapper for those two languages. I don’t want to re-invent the wheel so I will use existing libraries (php-amqplib and amqplib for node).

Basically I need to use three things: First I need to create exchange channels to log different actions. I need to decouple those actions from the main code. I also need to create work queues to ensure those works are executed. It doesn’t matter if work is executed later but it must be executed. And finally RPC commands.

Let’s start with the queues. I want to push events to a queue in PHP

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
$queue = Builder::queue('queue.backend', $server);
$queue->emit(["aaa" => 1]);

and also with node

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var queue = builder.queue('queue.backend', server);
queue.emit({aaa: 1});

And I also want to register workers to those queues with PHP and node

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::queue('queue.backend', $server)->receive(function ($data) {
    error_log(json_encode($data));
});
var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var queue = builder.queue('queue.backend', server);
queue.receive(function (data) {
    console.log(data);
});

Both implementations use one builder. In this case we are using Queue:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Queue
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    private function declareQueue($channel)
    {
        $conf = $this->conf['queue'];
        $channel->queue_declare($this->name, $conf['passive'], $conf['durable'], $conf['exclusive'],
            $conf['auto_delete'], $conf['nowait']);
    }
    public function emit($data = null)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $this->declareQueue($channel);
        $msg = new AMQPMessage(json_encode($data),
            ['delivery_mode' => 2] # make message persistent
        );
        $channel->basic_publish($msg, '', $this->name);
        $channel->close();
        $connection->close();
    }
    public function receive(callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $this->declareQueue($channel);
        $consumer = $this->conf['consumer'];
        if ($consumer['no_ack'] === false) {
            $channel->basic_qos(null, 1, null);
        }
        $channel->basic_consume($this->name, '', $consumer['no_local'], $consumer['no_ack'], $consumer['exclusive'],
            $consumer['nowait'],
            function ($msg) use ($callback) {
                call_user_func($callback, json_decode($msg->body, true), $this->name);
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s')."] {$this->name}::".$msg->body, "\n";
            });
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] Queue '{$this->name}' initialized \n";
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
var amqp = require('amqplib/callback_api');

var Queue = function (name, conf) {
    return {
        emit: function (data, close=true) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    var msg = JSON.stringify(data);

                    ch.assertQueue(name, conf.queue);
                    ch.sendToQueue(name, new Buffer(msg));
                });
                if (close) {
                    setTimeout(function () {
                        conn.close();
                        process.exit(0)
                    }, 500);
                }
            });
        },
        receive: function (callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue(name, conf.queue);
                    console.log(new Date().toString() + ' Queue ' + name + ' initialized');
                    ch.consume(name, function (msg) {
                        console.log(new Date().toString() + " Received %s", msg.content.toString());
                        if (callback) {
                            callback(JSON.parse(msg.content.toString()), msg.fields.routingKey)
                        }
                        if (conf.consumer.noAck === false) {
                            ch.ack(msg);
                        }
                    }, conf.consumer);
                });
            });
        }
    };
};

module.exports = Queue;

We also want to emit messages using an exchange

Emiter:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
$exchange = Builder::exchange('process.log', $server);
$exchange->emit("xxx.log", "aaaa");
$exchange->emit("xxx.log", ["11", "aaaa"]);
$exchange->emit("yyy.log", "aaaa");
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var exchange = builder.exchange('process.log', server);

exchange.emit("xxx.log", "aaaa");
exchange.emit("xxx.log", ["11", "aaaa"]);
exchange.emit("yyy.log", "aaaa");

and receiver:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::exchange('process.log', $server)->receive("yyy.log", function ($routingKey, $data) {
    error_log($routingKey." - ".json_encode($data));
});
var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var exchange = builder.exchange('process.log', server);

exchange.receive("yyy.log", function (routingKey, data) {
    console.log(routingKey, data);
});

And that’s the PHP implementation:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Exchange
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    public function emit($routingKey, $data = null)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $conf = $this->conf['exchange'];
        $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
            $conf['internal'], $conf['nowait']);
        $msg = new AMQPMessage(json_encode($data), [
            'delivery_mode' => 2, # make message persistent
        ]);
        $channel->basic_publish($msg, $this->name, $routingKey);
        $channel->close();
        $connection->close();
    }
    public function receive($bindingKey, callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $conf = $this->conf['exchange'];
        $channel->exchange_declare($this->name, 'topic', $conf['passive'], $conf['durable'], $conf['auto_delete'],
            $conf['internal'], $conf['nowait']);
        $queueConf = $this->conf['queue'];
        list($queue_name, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
            $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
        $channel->queue_bind($queue_name, $this->name, $bindingKey);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($queue_name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'], $consumerConf['nowait'],
            function ($msg) use ($callback) {
                call_user_func($callback, $msg->delivery_info['routing_key'], json_decode($msg->body, true));
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.':'.$msg->delivery_info['routing_key'].'::', $msg->body, "\n";
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            });
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] Exchange '{$this->name}' initialized \n";
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}

And node:

var amqp = require('amqplib/callback_api');

var Exchange = function (name, conf) {
    return {
        emit: function (routingKey, data, close = true) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    var msg = JSON.stringify(data);
                    ch.assertExchange(name, 'topic', conf.exchange);
                    ch.publish(name, routingKey, new Buffer(msg));
                });
                if (close) {
                    setTimeout(function () {
                        conn.close();
                        process.exit(0)
                    }, 500);
                }
            });
        },
        receive: function (bindingKey, callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertExchange(name, 'topic', conf.exchange);
                    console.log(new Date().toString() + ' Exchange ' + name + ' initialized');
                    ch.assertQueue('', conf.queue, function (err, q) {

                        ch.bindQueue(q.queue, name, bindingKey);

                        ch.consume(q.queue, function (msg) {
                            console.log(new Date().toString(), name, ":", msg.fields.routingKey, "::", msg.content.toString());
                            if (callback) {
                                callback(msg.fields.routingKey, JSON.parse(msg.content.toString()))
                            }
                            if (conf.consumer.noAck === false) {
                                ch.ack(msg);
                            }
                        }, conf.consumer);
                    });
                });
            });
        }
    };
};

module.exports = Exchange;

Finally we want to use RPC commands. In fact RPC implementations is similar than Queue but in this case client will receive an answer.

Client side

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
echo Builder::rpc('rpc.hello', $server)->call("Gonzalo", "Ayuso");
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var rpc = builder.rpc('rpc.hello', server);
rpc.call("Gonzalo", "Ayuso", function (data) {
    console.log(data);
});

Server side:

use G\Rabbit\Builder;
$server = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'guest',
    'pass' => 'guest',
];
Builder::rpc('rpc.hello', $server)->server(function ($name, $surname) use ($server) {
    return "Hello {$name} {$surname}";
});
var builder = require('../../src/Builder');

var server = {
    host: 'localhost',
    port: 5672,
    user: 'guest',
    pass: 'guest'
};

var rpc = builder.rpc('rpc.hello', server);

rpc.server(function (name, surname) {
    return "Hello " + name + " " + surname;
});

And Implementations:

namespace G\Rabbit;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RPC
{
    private $name;
    private $conf;
    public function __construct($name, $conf)
    {
        $this->name = $name;
        $this->conf = $conf;
    }
    private function createConnection()
    {
        $server = $this->conf['server'];
        return new AMQPStreamConnection($server['host'], $server['port'], $server['user'], $server['pass']);
    }
    public function call()
    {
        $params = (array)func_get_args();
        $response = null;
        $corr_id = uniqid();
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $queueConf = $this->conf['queue'];
        list($callback_queue, ,) = $channel->queue_declare("", $queueConf['passive'], $queueConf['durable'],
            $queueConf['exclusive'], $queueConf['auto_delete'], $queueConf['nowait']);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($callback_queue, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'], $consumerConf['nowait'], function ($rep) use (&$corr_id, &$response) {
                if ($rep->get('correlation_id') == $corr_id) {
                    $response = $rep->body;
                }
            });
        $msg = new AMQPMessage(json_encode($params), [
            'correlation_id' => $corr_id,
            'reply_to'       => $callback_queue,
        ]);
        $channel->basic_publish($msg, '', $this->name);
        while (!$response) {
            $channel->wait();
        }
        return json_decode($response, true);
    }
    public function server(callable $callback)
    {
        $connection = $this->createConnection();
        $channel = $connection->channel();
        $queueConf = $this->conf['queue'];
        $channel->queue_declare($this->name, $queueConf['passive'], $queueConf['durable'], $queueConf['exclusive'],
            $queueConf['auto_delete'], $queueConf['nowait']);
        $now = new \DateTime();
        echo '['.$now->format('d/m/Y H:i:s')."] RPC server '{$this->name}' initialized \n";
        $channel->basic_qos(null, 1, null);
        $consumerConf = $this->conf['consumer'];
        $channel->basic_consume($this->name, '', $consumerConf['no_local'], $consumerConf['no_ack'],
            $consumerConf['exclusive'],
            $consumerConf['nowait'], function ($req) use ($callback) {
                $response = json_encode(call_user_func_array($callback, array_values(json_decode($req->body, true))));
                $msg = new AMQPMessage($response, [
                    'correlation_id' => $req->get('correlation_id'),
                    'delivery_mode'  => 2, # make message persistent
                ]);
                $req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
                $req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
                $now = new \DateTime();
                echo '['.$now->format('d/m/Y H:i:s').'] '.$this->name.":: req => '{$req->body}' response=> '{$response}'\n";
            });
        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
var amqp = require('amqplib/callback_api');

var RPC = function (name, conf) {
    var generateUuid = function () {
        return Math.random().toString() +
            Math.random().toString() +
            Math.random().toString();
    };

    return {
        call: function () {
            var params = [];
            for (i = 0; i < arguments.length - 1; i++) {
                params.push(arguments[i]);
            }
            var callback = arguments[arguments.length - 1];

            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue('', conf.queue, function (err, q) {
                        var corr = generateUuid();

                        ch.consume(q.queue, function (msg) {
                            if (msg.properties.correlationId == corr) {
                                callback(JSON.parse(msg.content.toString()));
                                setTimeout(function () {
                                    conn.close();
                                    process.exit(0)
                                }, 500);
                            }
                        }, conf.consumer);
                        ch.sendToQueue(name,
                            new Buffer(JSON.stringify(params)),
                            {correlationId: corr, replyTo: q.queue});
                    });
                });
            });
        },
        server: function (callback) {
            amqp.connect(`amqp://${conf.server.user}:${conf.server.pass}@${conf.server.host}:${conf.server.port}`, function (err, conn) {
                conn.createChannel(function (err, ch) {
                    ch.assertQueue(name, conf.queue);
                    console.log(new Date().toString() + ' RPC ' + name + ' initialized');
                    ch.prefetch(1);
                    ch.consume(name, function reply(msg) {
                        console.log(new Date().toString(), msg.fields.routingKey, " :: ", msg.content.toString());
                        var response = JSON.stringify(callback.apply(this, JSON.parse(msg.content.toString())));
                        ch.sendToQueue(msg.properties.replyTo,
                            new Buffer(response),
                            {correlationId: msg.properties.correlationId});

                        ch.ack(msg);

                    }, conf.consumer);
                });
            });
        }
    };
};

module.exports = RPC;

You can see whole projects at github: RabbitMQ-php, RabbitMQ-node

Playing with Docker, Silex, Python, Node and WebSockets

I’m learning Docker. In this post I want to share a little experiment that I have done. I know the code looks like over-engineering but it’s just an excuse to build something with docker and containers. Let me explain it a little bit.

The idea is build a Time clock in the browser. Something like this:

Clock

Yes I know. We can do it only with js, css and html but we want to hack a little bit more. The idea is to create:

  • A Silex/PHP frontend
  • A WebSocket server with socket.io/node
  • A Python script to obtain the current time

WebSocket server will open 2 ports: One port to serve webSockets (socket.io) and another one as a http server (express). Python script will get the current time and it’ll send it to the webSocket server. Finally one frontend(silex) will be listening to WebSocket’s event and it will render the current time.

That’s the WebSocket server (with socket.io and express)

var
    express = require('express'),
    expressApp = express(),
    server = require('http').Server(expressApp),
    io = require('socket.io')(server, {origins: 'localhost:*'})
    ;

expressApp.get('/tic', function (req, res) {
    io.sockets.emit('time', req.query.time);
    res.json('OK');
});

expressApp.listen(6400, '0.0.0.0');

server.listen(8080);

That’s our Python script

from time import gmtime, strftime, sleep
import httplib2

h = httplib2.Http()
while True:
    (resp, content) = h.request("http://node:6400/tic?time=" + strftime("%H:%M:%S", gmtime()))
    sleep(1)

And our Silex frontend

use Silex\Application;
use Silex\Provider\TwigServiceProvider;

$app = new Application(['debug' => true]);
$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app->get("/", function (Application $app) {
    return $app['twig']->render('index.twig', []);
});

$app->run();

using this twig template

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>Docker example</title>
    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
    <link href="css/app.css" rel="stylesheet">
    <script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script>
    <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
</head>
<body>
<div class="site-wrapper">
    <div class="site-wrapper-inner">
        <div class="cover-container">
            <div class="inner cover">
                <h1 class="cover-heading">
                    <div id="display">
                        display
                    </div>
                </h1>
            </div>
        </div>
    </div>
</div>
<script src="//localhost:8080/socket.io/socket.io.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.12.4/jquery.min.js"></script>
<script>
var socket = io.connect('//localhost:8080');

$(function () {
    socket.on('time', function (data) {
        $('#display').html(data);
    });
});
</script>
</body>
</html>

The idea is to use one Docker container for each process. I like to have all the code in one place so all containers will share the same volume with source code.

First the node container (WebSocket server)

FROM node:argon

RUN mkdir -p /mnt/src
WORKDIR /mnt/src/node

EXPOSE 8080 6400

Now the python container

FROM python:2

RUN pip install httplib2

RUN mkdir -p /mnt/src
WORKDIR /mnt/src/python

And finally Frontend contailer (apache2 with Ubuntu 16.04)

FROM ubuntu:16.04

RUN locale-gen es_ES.UTF-8
RUN update-locale LANG=es_ES.UTF-8
ENV DEBIAN_FRONTEND=noninteractive

RUN apt-get update -y
RUN apt-get install --no-install-recommends -y apache2 php libapache2-mod-php
RUN apt-get clean -y

COPY ./apache2/sites-available/000-default.conf /etc/apache2/sites-available/000-default.conf

RUN mkdir -p /mnt/src

RUN a2enmod rewrite
RUN a2enmod proxy
RUN a2enmod mpm_prefork

RUN chown -R www-data:www-data /mnt/src
ENV APACHE_RUN_USER www-data
ENV APACHE_RUN_GROUP www-data
ENV APACHE_LOG_DIR /var/log/apache2
ENV APACHE_LOCK_DIR /var/lock/apache2
ENV APACHE_PID_FILE /var/run/apache2/apache2.pid
ENV APACHE_SERVERADMIN admin@localhost
ENV APACHE_SERVERNAME localhost

EXPOSE 80

Now we’ve got the three containers but we want to use all together. We’ll use a docker-compose.yml file. The web container will expose port 80 and node container 8080. Node container also opens 6400 but this port is an internal port. We don’t need to access to this port outside. Only Python container needs to access to this port. Because of that 6400 is not mapped to any port in docker-compose

version: '2'

services:
  web:
    image: gonzalo123/example_web
    container_name: example_web
    ports:
     - "80:80"
    restart: always
    depends_on:
      - node
    build:
      context: ./images/php
      dockerfile: Dockerfile
    entrypoint:
      - /usr/sbin/apache2
      - -D
      - FOREGROUND
    volumes:
     - ./src:/mnt/src

  node:
    image: gonzalo123/example_node
    container_name: example_node
    ports:
     - "8080:8080"
    restart: always
    build:
      context: ./images/node
      dockerfile: Dockerfile
    entrypoint:
      - npm
      - start
    volumes:
     - ./src:/mnt/src

  python:
      image: gonzalo123/example_python
      container_name: example_python
      restart: always
      depends_on:
        - node
      build:
        context: ./images/python
        dockerfile: Dockerfile
      entrypoint:
        - python
        - tic.py
      volumes:
       - ./src:/mnt/src

And that’s all. We only need to start our containers

docker-compose up --build -d

and open our browser at: http://localhost to see our Time clock

Full source code available within my github account

Home automation pet project. Playing with IoT, temperature sensors, fans and Telegram bots

Summer holidays are over. Besides my bush walks I’ve been also hacking a little bit with one idea that I had in mind. Summer means high temperatures and I wanted to control my fan. For example turn on the fan when temperature is over a threshold. I can do it using an Arduino board and a temperature sensor, but I don’t have the one Arduino board. I have several devices. For example a Wemo switch. With this device connected to my Wifi network I can switch on and off my fan remotely from my mobile phone (using its android app) or even from my Pebble watch using the API. I also have a BeeWi temperature/humidity sensor. It’s a BTLE device. It comes with its own app for android, but there’s also a API. Yes. I known that one Arduino board with a couple of sensors can be cheaper than one of this devices, but when I’m a shop and I’ve got one of this devices in my hands I cannot resist.

I also have a new Raspberry pi 3. I’ve recently upgraded my home multimedia server from a rpi2 to the new rpi3. Basically I use it as multimedia server and now also as retro console. This new rpi3 has Bluetooth so I wanted to do something with it. Read temperature from the Bluetooth sensor sounds good so I started to hack a little bit.

I found this post. I started working with Python. The script almost works but it uses Bluetooth connection and as someone said in the comments it uses a lot of battery. So I switched to a BTLE version. I found a simple node library to connect BTLE devices called noble, really simple to use. In one afternoon I had one small script ready. The idea was put this script in my RP3’s crontab, and scan the temperature each minute (via noble) and if the temperature was over a threshold switch on the wemo device (via ouimeaux). I also wanted to be informed when my fan is switch on and off. The most easier way to do it was via Telegram (I already knew telebot library).

var noble = require('noble'),
    Wemo = require('wemo-client'),
    TeleBot = require('telebot'),
    fs = require('fs'),
    beeWiData,
    wemo,
    threshold,
    address,
    bot,
    chatId,
    wemoDevice,
    configuration,
    confPath;

if (process.argv.length <= 2) {
    console.log("Usage: " + __filename + " conf.json");
    process.exit(-1);
}

confPath = process.argv[2];
try {
    configuration = JSON.parse(
        fs.readFileSync(process.argv[2])
    );
} catch (e) {
    console.log("configuration file not valid");
    process.exit(-1);
}

bot = new TeleBot(configuration.telegramBotAPIKey);
address = configuration.beeWiAddress;
threshold = configuration.threshold;
wemoDevice = configuration.wemoDevice;
chatId = configuration.telegramChatId;

function persists() {
    configuration.beeWiData = beeWiData;
    fs.writeFileSync(confPath, JSON.stringify(configuration));
}

function setSwitchState(state, callback) {
    wemo = new Wemo();
    wemo.discover(function(deviceInfo) {
        if (deviceInfo.friendlyName == wemoDevice) {
            console.log("device found:", deviceInfo.friendlyName, "setting the state to", state);
            var client = wemo.client(deviceInfo);
            client.on('binaryState', function(value) {
                callback();
            });

            client.on('statusChange', function(a) {
                console.log("statusChange", a);
            });
            client.setBinaryState(state);
        }
    });
}

beeWiData = {temperature: undefined, humidity: undefined, batery: undefined};

function hexToInt(hex) {
    if (hex.length % 2 !== 0) {
        hex = "0" + hex;
    }
    var num = parseInt(hex, 16);
    var maxVal = Math.pow(2, hex.length / 2 * 8);
    if (num > maxVal / 2 - 1) {
        num = num - maxVal;
    }
    return num;
}

noble.on('stateChange', function(state) {
    if (state === 'poweredOn') {
        noble.stopScanning();
        noble.startScanning();
    } else {
        noble.stopScanning();
    }
});

noble.on('scanStop', function() {
    var message, state;
    if (beeWiData.temperature > threshold) {
        state = 1;
        message = "temperature (" + beeWiData.temperature + ") over threshold (" + threshold + "). Fan ON. Humidity: " + beeWiData.humidity;
    } else {
        message = "temperature (" + beeWiData.temperature + ") under threshold (" + threshold + "). Fan OFF. Humidity: " + beeWiData.humidity;
        state = 0;
    }
    setSwitchState(state, function() {
        if (configuration.beeWiData.hasOwnProperty('temperature') && configuration.beeWiData.temperature < threshold && state === 1 || configuration.beeWiData.temperature > threshold && state === 0) {
            console.log("Notify to telegram bot", message);
            bot.sendMessage(chatId, message).then(function() {
                process.exit(0);
            }, function(e) {
                console.error(e);
                process.exit(0);
            });
            persists();
        } else {
            console.log(message);
            persists();
            process.exit(0);
        }
    });
});

noble.on('discover', function(peripheral) {
    if (peripheral.address == address) {
        var data = peripheral.advertisement.manufacturerData.toString('hex');
        beeWiData.temperature = parseFloat(hexToInt(data.substr(10, 2)+data.substr(8, 2))/10).toFixed(1);
        beeWiData.humidity = Math.min(100,parseInt(data.substr(14, 2),16));
        beeWiData.batery = parseInt(data.substr(24, 2),16);
        beeWiData.date = new Date();
        noble.stopScanning();
    }
});

setTimeout(function() {
    console.error("timeout exceded!");
    process.exit(0);
}, 5000);

The script is here.

It works but I wanted to keep on hacking. One Sunday morning I read this post. I don’t have an amazon button, but I wanted to do something similar. I started to play with scapy library sniffing ARP packets in my home network. I realize that I can detect when my Kindle connects to the network, my tv, or even my mobile phone. Then I had one I idea: Detect when my mobile phone connects to my wifi. My mobile phone connects to my wifi before I enter in my house so my idea was simple: Detect when I’m close to my home’s door and send me a telegram message saying “Wellcome home” in addition to the temperature inside my house at this moment.

#!/usr/bin/env python

import sys
from scapy.all import *
import telebot
import gearman
import json
from StringIO import StringIO

BUFFER_SIZE = 1024

try:
    with open(sys.argv[1]) as data_file:
        data = json.load(data_file)
        myPhone = data['myPhone']
        routerIP = data['routerIP']
        TOKEN = data['telegramBotAPIKey']
        chatID = data['telegramChatId']
        gearmanServer = data['gearmanServer']
except:
    print("Unexpected error:", sys.exc_info()[0])
    raise

def getSensorData():
    gm_client = gearman.GearmanClient([gearmanServer])
    completed_job_request = gm_client.submit_job("temp", '')
    io = StringIO(completed_job_request.result)

    return json.load(io)

tb = telebot.TeleBot(TOKEN)

def arp_display(pkt):
    if pkt[ARP].op == 1 and pkt[ARP].hwsrc == myPhone and pkt[ARP].pdst == routerIP:
        sensorData = getSensorData()
        message = "Wellcome home Gonzalo! Temperature: %s humidity: %s" % (sensorData['temperature'], sensorData['humidity'])
        tb.send_message(chatID, message)
        print message

print sniff(prn=arp_display, filter='arp', store=0)

I have one node script to read temperature and one Python script to sniff my network. I can find how to read temperature from Python and use only one script but I was lazy (remember that I was on holiday) so I turned the node script that reads temperature into a gearman worker.

var noble = require('noble'),
    fs = require('fs'),
    Gearman = require('node-gearman'),
    beeWiData,
    address,
    bot,
    configuration,
    confPath,
    status,
    callback;

var gearman = new Gearman();

if (process.argv.length <= 2) {
    console.log("Usage: " + __filename + " conf.json");
    process.exit(-1);
}

confPath = process.argv[2];
try {
    configuration = JSON.parse(
        fs.readFileSync(process.argv[2])
    );
} catch (e) {
    console.log("configuration file not valid", e);
    process.exit(-1);
}

address = configuration.beeWiAddress;
delay = configuration.tempServerDelayMinutes * 60 * 1000;
tcpPort = configuration.tempServerPort;

beeWiData = {};

function hexToInt(hex) {
    if (hex.length % 2 !== 0) {
        hex = "0" + hex;
    }
    var num = parseInt(hex, 16);
    var maxVal = Math.pow(2, hex.length / 2 * 8);
    if (num > maxVal / 2 - 1) {
        num = num - maxVal;
    }
    return num;
}

noble.on('stateChange', function(state) {
    if (state === 'poweredOn') {
        console.log("stateChange:poweredOn");
        status = true;
    } else {
        status = false;
    }
});

noble.on('discover', function(peripheral) {
    if (peripheral.address == address) {
        var data = peripheral.advertisement.manufacturerData.toString('hex');
        beeWiData.temperature = parseFloat(hexToInt(data.substr(10, 2)+data.substr(8, 2))/10).toFixed(1);
        beeWiData.humidity = Math.min(100,parseInt(data.substr(14, 2),16));
        beeWiData.batery = parseInt(data.substr(24, 2),16);
        beeWiData.date = new Date();
        noble.stopScanning();
    }
});

noble.on('scanStop', function() {
    console.log(beeWiData);
    noble.stopScanning();
    callback();
});

var worker;

function workerCallback(payload, worker) {
    callback = function() {
        worker.end(JSON.stringify(beeWiData));
    }

    beeWiData = {temperature: undefined, humidity: undefined, batery: undefined};

    if (status) {
        noble.stopScanning();
        noble.startScanning();
    } else {
        setInterval(function() {
            workerCallback(payload, worker);
        }, 1000);
    }
}

gearman.registerWorker("temp", workerCallback);

Now I only need to call this worker from my Python sniffer and thats all.

I wanted to play a little bit. I also wanted to ask the temperature on demand. Since I was using Telegram I had an idea. Create a Telegram bot running in my RP3. And that’s my summer pet project. Basically it has three parts:

worker.js
It’s a gearman worker. It reads temperature and humidity from my BeeWi sensor via BTLE

bot.py
It’s a Telegram bot with the following commands available:

/switchInfo: get switch info
/switchOFF: switch OFF the switch
/help: Gives you information about the available commands
/temp: Get temperature
/switchON: switch ON the switch

sniff.py
It’s just a ARP sniffer. It detects when I’m close to my home and sends me a message via Telegram with the temperature. It detects when my mobile phone sends a ARP package to my router (aka when I connect to my Wifi). It happens before I enter in my house, so the Telegram message arrives before I put the key in the door 🙂

I run al my scripts in my Raspberry Pi3. To ensure all scripts are up an running I use supervisor

All the scripts are available in my github account

Building a simple TCP proxy server with node.js

Today we are going to build a simple TCP proxy server. The scenario is the following one. We have got one host (the client) that establishes a TCP connection to another one (the remote).

client —> remote

We want to set up a proxy server in the middle, so the client will establish the connection with the proxy and the proxy will forward it to the remote, keeping in mind the remote response also.
With node.js is really simple to perform those kind of network operations.

client —> proxy -> remote

var net = require('net');

var LOCAL_PORT  = 6512;
var REMOTE_PORT = 6512;
var REMOTE_ADDR = "192.168.1.25";

var server = net.createServer(function (socket) {
    socket.on('data', function (msg) {
        console.log('  ** START **');
        console.log('<< From client to proxy ', msg.toString());
        var serviceSocket = new net.Socket();
        serviceSocket.connect(parseInt(REMOTE_PORT), REMOTE_ADDR, function () {
            console.log('>> From proxy to remote', msg.toString());
            serviceSocket.write(msg);
        });
        serviceSocket.on("data", function (data) {
            console.log('<< From remote to proxy', data.toString());
            socket.write(data);
            console.log('>> From proxy to client', data.toString());
        });
    });
});

server.listen(LOCAL_PORT);
console.log("TCP server accepting connection on port: " + LOCAL_PORT);

Simple, isn’t it?
Source code in github

Talk about node.js and WebSockets

Last friday I spoke about node.js and Websockets with the people of The Mêlée. The talk was an introduction to node.js and focused in the new HTML5 feature: the WebSockets.

When I spoke about Websockets I also introduced the great library socket.io. The jQuery of WebSockets.

Using node.js to store PHP sessions

We use sessions when we want to preserve certain data across subsequent accesses. PHP allows us to use different handlers when we’re using sessions. The default one is filesystem, but we can change it with session.save_handler in the php.ini. session.save_handler defines the name of the handler which is used for storing and retrieving data associated with a session. We also can create our own handler to manage sessions. In this post we’re going to create a custom handler to store sessions in a node.js service. Let’s start:

Imagine we’ve got the following php script:

session_start();

if (!isset($_SESSION["gonzalo"])) $_SESSION["gonzalo"] = 0;
$_SESSION["gonzalo"]++;
$_SESSION["arr"] = array('key' => uniqid());
var_dump($_SESSION);

A simple usage of sessions with PHP. If we reload the page our counter will be incremented by one. We’re using the default session handler. It works without any problem.

The idea is create a custom handler to use a server with node.js to store the session information instead of filesystem. To create custom handlers we need to use the PHP function: session_set_save_handler and rewrite the callbacks for: open, close, read, write, destroy and gc. PHP’s documentation is great. My proposal is the following one:

Our custom handler:

class NodeSession
{
    const NODE_DEF_HOST = '127.0.0.1';
    const NODE_DEF_PORT = 5672;

    static function start($host = self::NODE_DEF_HOST, $port = self::NODE_DEF_PORT)
    {
        $obj = new self($host, $port);
        session_set_save_handler(
            array($obj, "open"),
            array($obj, "close"),
            array($obj, "read"),
            array($obj, "write"),
            array($obj, "destroy"),
            array($obj, "gc"));
        session_start();
        return $obj;
    }

    private function unserializeSession($data)
    {
        if(  strlen( $data) == 0) {
            return array();
        }

        // match all the session keys and offsets
        preg_match_all('/(^|;|\})([a-zA-Z0-9_]+)\|/i', $data, $matchesarray, PREG_OFFSET_CAPTURE);
        $returnArray = array();

        $lastOffset = null;
        $currentKey = '';
        foreach ( $matchesarray[2] as $value ) {
            $offset = $value[1];
            if(!is_null( $lastOffset)) {
                $valueText = substr($data, $lastOffset, $offset - $lastOffset );
                $returnArray[$currentKey] = unserialize($valueText);
            }
            $currentKey = $value[0];

            $lastOffset = $offset + strlen( $currentKey )+1;
        }

        $valueText = substr($data, $lastOffset );
        $returnArray[$currentKey] = unserialize($valueText);

        return $returnArray;
    }
    
    function __construct($host = self::NODE_DEF_HOST, $port = self::NODE_DEF_PORT)
    {
        $this->_host = $host;
        $this->_port = $port;
    }

    function open($save_path, $session_name)
    {
        return true;
    }

    function close()
    {
        return true;
    }

    public function read($id)
    {
        return (string) $this->send(__FUNCTION__, array('id' => $id));
    }

    public function write($id, $data)
    {
        try {
            $this->send(__FUNCTION__, array(
                'id'       => $id,
                'data'     => $data,
                'time'     => time(),
                'dataJSON' => json_encode($this->unserializeSession($data))));
            return true;
        } catch (Exception $e) {
            return false;
        }
    }

    public function destroy($id)
    {
        try {
            $this->send(__FUNCTION__, array('id' => $id));
        } catch (Exception $e) {
            return false;
        }
         return true;
    }

    function gc($maxlifetime)
    {
        try {
            $this->send(__FUNCTION__, array('maxlifetime' => $maxlifetime, 'time' => time()));
        } catch (Exception $e) {
            return false;
        }
        return true;
    }

    private function send($action, $params)
    {
        $params = array('action' => $action) + $params;
        return file_get_contents("http://{$this->_host}:{$this->_port}?" . http_build_query($params));
    }
}

Our node.js server:

var http = require('http'),
    url  = require('url'),
    session = require('nodePhpSessions').SessionHandler;

var sessionHandler = new session();

var server = http.createServer(function (req, res) {
    var parsedUrl = url.parse(req.url, true).query;
    res.writeHead(200, {'Content-Type': 'text/plain'});
    res.end(sessionHandler.run(parsedUrl));
});

server.listen(5672, "127.0.0.1", function() {
  var address = server.address();
  console.log("opened server on %j", address);
});

As we can see we need the node.js module nodePhpSessions. You can easily install with:

npm install nodePhpSessions

You can see nodePhpSessions library here.

The library is tested with nodeunit. Without TDD is very hard to test things such as garbage collector.:

var session = require('nodePhpSessions').SessionHandler;
var sessionHandler = new session();
var parsedUrl;

exports["testReadUndefinedSession"] = function(test){
    parsedUrl = { action: 'read', id: 'ts49vmf0p732iafr25mdu8gvg2' };
    test.equal(sessionHandler.run(parsedUrl), undefined);
    test.done();
};

exports["oneSessionShouldReturns1"] = function(test){
    parsedUrl = {
        action: 'write',
        id: 'ts49vmf0p732iafr25mdu8gvg2',
        data: 'gonzalo|i:1;arr|a:1:{s:3:"key";s:13:"4e2b1a40d136a";}',
        time: '1311447616',
        dataJSON: '{"gonzalo":1,"arr":{"key":"4e2b1a40d136a"}}' };
    sessionHandler.run(parsedUrl);

    parsedUrl = { action: 'readAsArray', id: 'ts49vmf0p732iafr25mdu8gvg2' };
    test.equal(sessionHandler.run(parsedUrl).gonzalo, 1);
    test.done();
};

exports["oneSessionShouldReturns2"] = function(test){
    parsedUrl = {
        action: 'write',
        id: 'ts49vmf0p732iafr25mdu8gvg2',
        data: 'gonzalo|i:2;arr|a:1:{s:3:"key";s:13:"4e2b1a40d136a";}',
        time: '1311447616',
        dataJSON: '{"gonzalo":2,"arr":{"key":"4e2b1a40d136a"}}' };
    sessionHandler.run(parsedUrl);
    parsedUrl = { action: 'readAsArray', id: 'ts49vmf0p732iafr25mdu8gvg2' };
    test.equal(sessionHandler.run(parsedUrl).gonzalo, 2);
    test.done();
};

exports["destroySession"] = function(test){
    parsedUrl = {
        action: 'destroy',
        id: 'ts49vmf0p732iafr25mdu8gvg2'};
    sessionHandler.run(parsedUrl);

    parsedUrl = { action: 'readAsArray', id: 'ts49vmf0p732iafr25mdu8gvg2' };
    test.equal(sessionHandler.run(parsedUrl), undefined);

	test.done();
};

exports["garbageColector"] = function(test){
    parsedUrl = {
        action: 'write',
        id: 'session1',
        data: 'gonzalo|i:1;arr|a:1:{s:3:"key";s:13:"4e2b1a40d136a";}',
        time: '1111111200',
        dataJSON: '{"gonzalo":1,"arr":{"key":"4e2b1a40d136a"}}' };
    sessionHandler.run(parsedUrl);

    parsedUrl = {
        action: 'write',
        id: 'session2',
        data: 'gonzalo|i:1;arr|a:1:{s:3:"key";s:13:"4e2b1a40d136a";}',
        time: '1111111100',
        dataJSON: '{"gonzalo":1,"arr":{"key":"4e2b1a40d136a"}}' };
    sessionHandler.run(parsedUrl);

    parsedUrl = { action: 'gc', maxlifetime: '100', time: '1111111210'};
    sessionHandler.run(parsedUrl);

    parsedUrl = { action: 'readAsArray', id: 'session2' };
    test.equal(sessionHandler.run(parsedUrl), undefined);

    parsedUrl = { action: 'readAsArray', id: 'session1' };
    test.equal(sessionHandler.run(parsedUrl).gonzalo, 1);

    test.done();
};

Here you can see the output of the tests:

nodeunit testNodeSessions.js 

testNodeSessions.js
✔ testReadUndefinedSession
✔ oneSessionShouldReturns1
✔ oneSessionShouldReturns2
✔ destroySession
✔ garbageColector

OK: 6 assertions (5ms)

Now we change the original PHP script to:

include_once 'NodeSessions.php';
NodeSession::start();

if (!isset($_SESSION["gonzalo"])) $_SESSION["gonzalo"] = 0;
$_SESSION["gonzalo"]++;
$_SESSION["arr"] = array('key' => uniqid());
var_dump($_SESSION);

We start the node.js server:

node serverSessions.js 

Now if we reload our script in the browser we will see the same behaviour, but now our sessions are stored in the node.js server.

array(2) {
  ["gonzalo"]=>
  int(16)
  ["arr"]=>
  array(1) {
    ["key"]=>
    string(13) "4e2a9f6a966f4"
  }
}

This kind of techniques are good when clustering PHP applications.

Full code is available on github (node server, PHP handler, tests and examples) here.

Web console with node.js

Continuing with my experiments of node.js, this time I want to create a Web console. The idea is simple. I want to send a few command to the server and I display the output inside the browser. I can do it entirely with PHP but I want to send the output to the browser as fast as they appear without waiting for the end of the command. OK we can do it flushing the output in the server but this solution normally crashes if we keep the application open for a long time. WebSockets again to the rescue. If we need a cross-browser implementation we need the socket.io library. Let’s start:

The node server is a simple websocket server. In this example we will launch each command with spawn function (require(‘child_process’).spawn) and send the output within the websoket. Simple and pretty straightforward.

var sys   = require('sys'),
http  = require('http'),
url   = require('url'),
spawn = require('child_process').spawn,
ws    = require('./ws.js');

var availableComands = ['ls', 'ps', 'uptime', 'tail', 'cat'];
ws.createServer(function(websocket) {
    websocket.on('connect', function(resource) {
        var parsedUrl = url.parse(resource, true);
        var rawCmd = parsedUrl.query.cmd;
        var cmd = rawCmd.split(" ");
        if (cmd[0] == 'help') {
            websocket.write("Available comands: \n");
            for (i=0;i<availableComands.length;i++) {
                websocket.write(availableComands[i]);
                if (i< availableComands.length - 1) {
                    websocket.write(", ");
                }
            }
            websocket.write("\n");

            websocket.end();
        } else if (availableComands.indexOf(cmd[0]) >= 0) {
            if (cmd.length > 1) {
                options = cmd.slice(1);
            } else {
                options = [];
            }
            
            try {
                var process = spawn(cmd[0], options);
            } catch(err) {
                console.log(err);
                websocket.write("ERROR");
            }

            websocket.on('end', function() {
                process.kill();
            });

            process.stdout.on('data', function(data) {
                websocket.write(data);
            });

            process.stdout.on('end', function() {
                websocket.end();
            });
        } else {
             websocket.write("Comand not available. Type help for available comands\n");
             websocket.end();
        }
    });
  
}).listen(8880, '127.0.0.1');

The web client is similar than the example of my previous post

var timeout = 5000;
var wsServer = '127.0.0.1:8880';

var ws;


function cleanString(string) {
    return string.replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
}


function pad(n) {
    return ("0" + n).slice(-2);
}

var cmdHistory = [];
function send(msg) {
    if (msg == 'clear') {
        $('#log').html('');
        return;
    }
    try {
        ws = new WebSocket('ws://' + wsServer + '?cmd=' + msg);
        $('#toolbar').css('background', '#933');
        $('#socketStatus').html("working ... [<a href='#' onClick='quit()'>X</a>]");
        cmdHistory.push(msg);
        $('#log').append("<div class='cmd'>" + msg + "</div>");
        console.log("startWs:");
    } catch (err) {
        console.log(err);
        setTimeout(startWs, timeout);
    }

    ws.onmessage = function(event) {
        $('#log').append(util.toStaticHTML(event.data));
        window.scrollBy(0, 100000000000000000);
    };

    ws.onclose = function(){
        //console.log("ws.onclose");
        $('#toolbar').css('background', '#65A33F');
        $('#socketStatus').html('Type your comand:');
    }
}

function quit() {
    ws.close();
    window.scrollBy(0, 100000000000000000);
}
util = {
  urlRE: /https?:\/\/([-\w\.]+)+(:\d+)?(\/([^\s]*(\?\S+)?)?)?/g, 

  //  html sanitizer 
  toStaticHTML: function(inputHtml) {
    inputHtml = inputHtml.toString();
    return inputHtml.replace(/&/g, "&amp;")
                    .replace(/</g, "&lt;")
                    .replace("/n", "<br/>")
                    .replace(/>/g, "&gt;");
  }, 

  //pads n with zeros on the left,
  //digits is minimum length of output
  //zeroPad(3, 5); returns "005"
  //zeroPad(2, 500); returns "500"
  zeroPad: function (digits, n) {
    n = n.toString();
    while (n.length < digits) 
      n = '0' + n;
    return n;
  },

  //it is almost 8 o'clock PM here
  //timeString(new Date); returns "19:49"
  timeString: function (date) {
    var minutes = date.getMinutes().toString();
    var hours = date.getHours().toString();
    return this.zeroPad(2, hours) + ":" + this.zeroPad(2, minutes);
  },

  //does the argument only contain whitespace?
  isBlank: function(text) {
    var blank = /^\s*$/;
    return (text.match(blank) !== null);
  }
};
$(document).ready(function() {
  //submit new messages when the user hits enter if the message isnt blank
  $("#entry").keypress(function (e) {
    console.log(e);
    if (e.keyCode != 13 /* Return */) return;
    var msg = $("#entry").attr("value").replace("\n", "");
    if (!util.isBlank(msg)) send(msg);
    $("#entry").attr("value", ""); // clear the entry field.
  });
});

And that’s all. In fact we don’t need any line of PHP to perform this web console. Last year I tried to do something similar with PHP but it was a big mess. With node those kind of jobs are trivial. I don’t know if node.js is the future or is just another hype, but it’s easy. And cool. Really cool.

You can see the full code at Github here. Anyway you must take care if you run this application on your host. You are letting user to execute raw unix commands. A bit of security layer would be necessary.

Real time monitoring PHP applications with websockets and node.js

The inspection of the error logs is a common way to detect errors and bugs. We also can show errors on-screen within our developement server, or we even can use great tools like firePHP to show our PHP errors and warnings inside our firebug console. That’s cool, but we only can see our session errors/warnings. If we want to see another’s errors we need to inspect the error log. tail -f is our friend, but we need to surf against all the warnings of all sessions to see our desired ones. Because of that I want to build a tool to monitor my PHP applications in real-time. Let’s start:

What’s the idea? The idea is catch all PHP’s errors and warnings at run time and send them to a node.js HTTP server. This server will work similar than a chat server but our clients will only be able to read the server’s logs. Basically the applications have three parts: the node.js server, the web client (html5) and the server part (PHP). Let me explain a bit each part:

The node Server

Basically it has two parts: a http server to handle the PHP errors/warnings and a websocket server to manage the realtime communications with the browser. When I say that I’m using websockets that’s means the web client will only work with a browser with websocket support like chrome. Anyway it’s pretty straightforward swap from a websocket sever to a socket.io server to use it with every browser. But websockets seems to be the future, so I will use websockets in this example.

The http server:

http.createServer(function (req, res) {
    var remoteAdrress = req.socket.remoteAddress;
    if (allowedIP.indexOf(remoteAdrress) >= 0) {
        res.writeHead(200, {
            'Content-Type': 'text/plain'
        });
        res.end('Ok\n');
        try {
            var parsedUrl = url.parse(req.url, true);
            var type = parsedUrl.query.type;
            var logString = parsedUrl.query.logString;
            var ip = eval(parsedUrl.query.logString)[0];
            if (inspectingUrl == "" ||  inspectingUrl == ip) {
                clients.forEach(function(client) {
                    client.write(logString);
                });
            }
        } catch(err) {
            console.log("500 to " + remoteAdrress);
            res.writeHead(500, {
                'Content-Type': 'text/plain'
            });
            res.end('System Error\n');
        }
    } else {
        console.log("401 to " + remoteAdrress);
        res.writeHead(401, {
            'Content-Type': 'text/plain'
        });
        res.end('Not Authorized\n');
    }
}).listen(httpConf.port, httpConf.host);

and the web socket server:

var inspectingUrl = undefined;

ws.createServer(function(websocket) {
    websocket.on('connect', function(resource) {
        var parsedUrl = url.parse(resource, true);
        inspectingUrl = parsedUrl.query.ip;
        clients.push(websocket);
    });

    websocket.on('close', function() {
        var pos = clients.indexOf(websocket);
        if (pos >= 0) {
            clients.splice(pos, 1);
        }
    });

}).listen(wsConf.port, wsConf.host);

If you want to know more about node.js and see more examples, have a look to the great site: http://nodetuts.com/. In this site Pedro Teixeira will show examples and node.js tutorials. In fact my node.js http + websoket server is a mix of two tutorials from this site.

The web client.

The web client is a simple websockets application. We will handle the websockets connection, reconnect if it dies and a bit more. I’s based on node.js chat demo

<?php $ip = filter_input(INPUT_GET, 'ip', FILTER_SANITIZE_STRING); ?>

        Real time <?= $ip ?> monitor
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.3/jquery.min.js"></script><script type="text/javascript">// <![CDATA[
            selectedIp = '<?= $ip ?>';

// ]]></script>
<script type="text/javascript" src="js.js"></script>
</pre>
<div id="toolbar">
<ul id="status">
	<li>Socket status: <span id="socketStatus">Conecting ...</span></li>
	<li>IP: <!--?= $ip == '' ? 'all' : $ip . " <a href='?ip='-->[all]" ?></li>
	<li>count: <span id="count">0</span></li>
</ul>
</div>
<pre>


And the javascript magic

var timeout = 5000;
var wsServer = '192.168.2.2:8880';
var unread = 0;
var focus = false;

var count = 0;
function updateCount() {
    count++;
    $("#count").text(count);
}

function cleanString(string) {
    return string.replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
}

function updateUptime () {
    var now = new Date();
    $("#uptime").text(now.toRelativeTime());
}

function updateTitle(){
    if (unread) {
        document.title = "(" + unread.toString() + ") Real time " + selectedIp + " monitor";
    } else {
        document.title = "Real time " + selectedIp + " monitor";
    }
}

function pad(n) {
    return ("0" + n).slice(-2);
}

function startWs(ip) {
    try {
        ws = new WebSocket("ws://" + wsServer + "?ip=" + ip);
        $('#toolbar').css('background', '#65A33F');
        $('#socketStatus').html('Connected to ' + wsServer);
        //console.log("startWs:" + ip);
        //listen for browser events so we know to update the document title
        $(window).bind("blur", function() {
            focus = false;
            updateTitle();
        });

        $(window).bind("focus", function() {
            focus = true;
            unread = 0;
            updateTitle();
        });
    } catch (err) {
        //console.log(err);
        setTimeout(startWs, timeout);
    }

    ws.onmessage = function(event) {
        unread++;
        updateTitle();
        var now = new Date();
        var hh = pad(now.getHours());
        var mm = pad(now.getMinutes());
        var ss = pad(now.getSeconds());

        var timeMark = '[' + hh + ':' + mm + ':' + ss + '] ';
        logString = eval(event.data);
        var host = logString[0];
        var line = "<table class='message'><tr><td width='1%' class='date'>" + timeMark + "</td><td width='1%' valign='top' class='host'><a href=?ip=" + host + ">" + host + "</a></td>";
        line += "<td class='msg-text' width='98%'>" + logString[1]; + "</td></tr>";
        if (logString[2]) {
            line += "<tr><td>&nbsp;</td><td colspan='3' class='msg-text'>" + logString[2] + "</td></tr>";
        }

        $('#log').append(line);
        updateCount();
        window.scrollBy(0, 100000000000000000);
    };

    ws.onclose = function(){
        //console.log("ws.onclose");
        $('#toolbar').css('background', '#933');
        $('#socketStatus').html('Disconected');
        setTimeout(function() {startWs(selectedIp)}, timeout);
    }
}

$(document).ready(function() {
    startWs(selectedIp);
});

The server part:

The server part will handle silently all PHP warnings and errors and it will send them to the node server. The idea is to place a minimal PHP line of code at the beginning of the application that we want to monitor. Imagine the following piece of PHP code

$a = $var[1];
$a = 1/0;
class Dummy
{
    static function err()
    {
        throw new Exception("error");
    }
}
Dummy1::err();

it will throw:
A notice: Undefined variable: var
A warning: Division by zero
An Uncaught exception ‘Exception’ with message ‘error’

So we will add our small library to catch those errors and send them to the node server

include('client/NodeLog.php');
NodeLog::init('192.168.2.2');

$a = $var[1];
$a = 1/0;
class Dummy
{
    static function err()
    {
        throw new Exception("error");
    }
}
Dummy1::err();

The script will work in the same way than the fist version but if we start our node.js server in a console:

$ node server.js
HTTP server started at 192.168.2.2::5672
Web Socket server started at 192.168.2.2::8880

We will see those errors/warnings in real-time when we start our browser

Here we can see a small screencast with the working application:

This is the server side library:

class NodeLog
{
    const NODE_DEF_HOST = '127.0.0.1';
    const NODE_DEF_PORT = 5672;

    private $_host;
    private $_port;

    /**
     * @param String $host
     * @param Integer $port
     * @return NodeLog
     */
    static function connect($host = null, $port = null)
    {
        return new self(is_null($host) ? self::$_defHost : $host, is_null($port) ? self::$_defPort : $port);
    }

    function __construct($host, $port)
    {
        $this->_host = $host;
        $this->_port = $port;
    }

    /**
     * @param String $log
     * @return Array array($status, $response)
     */
    public function log($log)
    {
        list($status, $response) = $this->send(json_encode($log));
        return array($status, $response);
    }

    private function send($log)
    {
        $url = "http://{$this->_host}:{$this->_port}?logString=" . urlencode($log);
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_NOBODY, true);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);

        $response = curl_exec($ch);
        $status   = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        return array($status, $response);
    }

    static function getip() {
        $realip = '0.0.0.0';
        if ($_SERVER) {
            if ( isset($_SERVER['HTTP_X_FORWARDED_FOR']) && $_SERVER['HTTP_X_FORWARDED_FOR'] ) {
                $realip = $_SERVER["HTTP_X_FORWARDED_FOR"];
            } elseif ( isset($_SERVER['HTTP_CLIENT_IP']) && $_SERVER["HTTP_CLIENT_IP"] ) {
                $realip = $_SERVER["HTTP_CLIENT_IP"];
            } else {
                $realip = $_SERVER["REMOTE_ADDR"];
            }
        } else {
            if ( getenv('HTTP_X_FORWARDED_FOR') ) {
                $realip = getenv('HTTP_X_FORWARDED_FOR');
            } elseif ( getenv('HTTP_CLIENT_IP') ) {
                $realip = getenv('HTTP_CLIENT_IP');
            } else {
                $realip = getenv('REMOTE_ADDR');
            }
        }
        return $realip;
    }

    public static function getErrorName($err)
    {
        $errors = array(
            E_ERROR             => 'ERROR',
            E_RECOVERABLE_ERROR => 'RECOVERABLE_ERROR',
            E_WARNING           => 'WARNING',
            E_PARSE             => 'PARSE',
            E_NOTICE            => 'NOTICE',
            E_STRICT            => 'STRICT',
            E_DEPRECATED        => 'DEPRECATED',
            E_CORE_ERROR        => 'CORE_ERROR',
            E_CORE_WARNING      => 'CORE_WARNING',
            E_COMPILE_ERROR     => 'COMPILE_ERROR',
            E_COMPILE_WARNING   => 'COMPILE_WARNING',
            E_USER_ERROR        => 'USER_ERROR',
            E_USER_WARNING      => 'USER_WARNING',
            E_USER_NOTICE       => 'USER_NOTICE',
            E_USER_DEPRECATED   => 'USER_DEPRECATED',
        );
        return $errors[$err];
    }

    private static function set_error_handler($nodeHost, $nodePort)
    {
        set_error_handler(function ($errno, $errstr, $errfile, $errline) use($nodeHost, $nodePort) {
            $err = NodeLog::getErrorName($errno);
            /*
            if (!(error_reporting() & $errno)) {
                // This error code is not included in error_reporting
                return;
            }
            */
            $log = array(
                NodeLog::getip(),
                "<strong class="{$err}">{$err}</strong> {$errfile}:{$errline}",
                nl2br($errstr)
            );
            NodeLog::connect($nodeHost, $nodePort)->log($log);
            return false;
        });
    }

    private static function register_exceptionHandler($nodeHost, $nodePort)
    {
        set_exception_handler(function($exception) use($nodeHost, $nodePort) {
            $exceptionName = get_class($exception);
            $message = $exception->getMessage();
            $file = $exception->getFile();
            $line = $exception->getLine();
            $trace = $exception->getTraceAsString();

            $msg = count($trace) > 0 ? "Stack trace:\n{$trace}" : null;
            $log = array(
                NodeLog::getip(),
                nl2br("<strong class="ERROR">Uncaught exception '{$exceptionName}'</strong> with message '{$message}' in {$file}:{$line}"),
                nl2br($msg)
            );
            NodeLog::connect($nodeHost, $nodePort)->log($log);
            return false;
        });
    }

    private static function register_shutdown_function($nodeHost, $nodePort)
    {
        register_shutdown_function(function() use($nodeHost, $nodePort) {
            $error = error_get_last();

            if ($error['type'] == E_ERROR) {
                $err = NodeLog::getErrorName($error['type']);
                $log = array(
                    NodeLog::getip(),
                    "<strong class="{$err}">{$err}</strong> {$error['file']}:{$error['line']}",
                    nl2br($error['message'])
                );
                NodeLog::connect($nodeHost, $nodePort)->log($log);
            }
            echo NodeLog::connect($nodeHost, $nodePort)->end();
        });
    }

    private static $_defHost = self::NODE_DEF_HOST;
    private static $_defPort = self::NODE_DEF_PORT;

    /**
     * @param String $host
     * @param Integer $port
     * @return NodeLog
     */
    public static function init($host = self::NODE_DEF_HOST, $port = self::NODE_DEF_PORT)
    {
        self::$_defHost = $host;
        self::$_defPort = $port;

        self::register_exceptionHandler($host, $port);
        self::set_error_handler($host, $port);
        self::register_shutdown_function($host, $port);

        $node = self::connect($host, $port);
        $node->start();
        return $node;
    }

    private static $time;
    private static $mem;

    public function start()
    {
        self::$time = microtime(TRUE);
        self::$mem = memory_get_usage();
        $log = array(NodeLog::getip(), "<strong class="OK">Start</strong> >>>> {$_SERVER['REQUEST_URI']}");
        $this->log($log);
    }

    public function end()
    {
        $mem = (memory_get_usage() - self::$mem) / (1024 * 1024);
        $time = microtime(TRUE) - self::$time;
        $log = array(NodeLog::getip(), "<strong class="OK">End</strong> <<<< mem: {$mem} time {$time}");         $this->log($log);
    }
}

And of course the full code on gitHub: RealTimeMonitor