Category Archives: node.js

Notify events from PostgreSQL to external listeners

Sometimes we need to call external programs from our PostgreSQL database. We can send sockets from SQL statements. I’ve written about it. The problem with this approach the following one. If user rollbacks the transaction the socket has been already emitted. That’s a problem (or not. Depending on our application). Nobody also guarantees that the process behind the socket server has access to the data of the transaction. If we’re very fast maybe the transaction isn’t commited yet. We can use one sleep function but sleep functions are always a bad idea. PostgreSQL gives us another tool to decouple processes: LISTEN and NOTIFY.

Let me show you and example. First we create a table:

CREATE TABLE PUBLIC.TBLEXAMPLE
(
  KEY1 CHARACTER VARYING(10) NOT NULL,
  KEY2 CHARACTER VARYING(14) NOT NULL,

  VALUE1 CHARACTER VARYING(20),
  VALUE2 CHARACTER VARYING(20) NOT NULL,

  CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
)

Now we add a “after insert” trigger to our table

CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER INSERT
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();

And now, within the trigger function, we send a notify event (‘myEvent’ in this case) with the row information. We need to send plain text in the notify event so we’ll use JSON to encode our row data.

CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
  PERFORM pg_notify('myEvent', row_to_json(NEW)::text);
  RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;

Now we’re going to build a server side example that connects to our PostgreSQL database and listen to the event. In this case we’re going to use nodejs to build the prototype. This example also will enqueue events into a gearman server.

var pg = require('pg'),
    gearmanode = require('gearmanode'),
    gearmanClient,
    conString = 'tcp://username:password@localhost:5432/gonzalo',
    pgClient;

gearmanode.Client.logger.transports.console.level = 'error';

gearmanClient = gearmanode.client();

console.log('LISTEN myEvent');
pgClient = new pg.Client(conString);
pgClient.connect();
pgClient.query('LISTEN myEvent');
pgClient.on('notification', function (data) {
    console.log("\033[34m" + new Date + '-\033[0m payload', data.payload);
    gearmanClient.submitJob('sms.sender.one', data.payload);
});

And that’s all. Now we only need to perform an INSERT statement into our table. This process will trigger our event and our nodejs will enqueue the process into a gearman queue.

INSERT INTO PUBLIC.TBLEXAMPLE(KEY1, KEY2, VALUE1, VALUE2) VALUES ('k1', 'k2', 'v1', 'v2');

It’s good to remark that if our insert statement is inside a transaction and we rollback it, notify won’t send any message.

Sharing authentication between socket.io and a PHP frontend (using JSON Web Tokens)

I’ve written a previous post about Sharing authentication between socket.io and a PHP frontend but after publish the post a colleague (hi @mariotux) told me that I can use JSON Web Tokens (jwt) to do this. I had never used jwt before so I decided to study a little bit.

JWT are pretty straightforward. You only need to create the token and send it to the client. You don’t need to store this token within a database. Client can decode and validate it on its own. You also can use any programming language to encode and decode tokens (jwt is available in the most common ones)

We’re going to create the same example than the previous post. Today, with jwt, we don’t need to pass the PHP session and perform a http request to validate it. We’ll only pass the token. Our nodejs server will validate by its own.

var io = require('socket.io')(3000),
    jwt = require('jsonwebtoken'),
    secret = "my_super_secret_key";

// middleware to perform authorization
io.use(function (socket, next) {
    var token = socket.handshake.query.token,
        decodedToken;
    try {
        decodedToken = jwt.verify(token, secret);
        console.log("token valid for user", decodedToken.user);
        socket.connectedUser = decodedToken.user;
        next();
    } catch (err) {
        console.log(err);
        next(new Error("not valid token"));
        //socket.disconnect();
    }
});

io.on('connection', function (socket) {
    console.log('Connected! User: ', socket.connectedUser);
});

That’s the client:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
Welcome {{ user }}!

<script src="http://localhost:3000/socket.io/socket.io.js"></script>
<script src="/assets/jquery/dist/jquery.js"></script>

<script>
    var socket;
    $(function () {
        $.getJSON("/getIoConnectionToken", function (jwt) {
            socket = io('http://localhost:3000', {
                query: 'token=' + jwt
            });

            socket.on('connect', function () {
                console.log("connected!");
            });

            socket.on('error', function (err) {
                console.log(err);
            });
        });
    });
</script>

</body>
</html>

And here the backend. A simple Silex server very similar than the previous post one. JWT has also several reserved claims. For example “exp” to set up an expiration timestamp. It’s very useful. We only set one value and validator will reject tokens with incorrect timestamp. In this example I’m not using expiration date. That’s means that my token will never expires. And never means never. In my first prototype I set up an small expiration date (10 seconds). That means my token is only available during 10 seconds. Sounds great. My backend generate tokens that are going to be used immediately. That’s the normal situation but, what happens if I restart the socket.io server? The client will try to reconnect again using the token but it’s expired. We’ll need to create a new jwt before reconnecting. Because of that I’ve removed expiration date in this example but remember: Without expiration date your generated tokens will be always valid (al always is a very big period of time)

<?php
include __DIR__ . "/../vendor/autoload.php";

use Firebase\JWT\JWT;
use Silex\Application;
use Silex\Provider\SessionServiceProvider;
use Silex\Provider\TwigServiceProvider;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;

$app = new Application([
    'secret' => "my_super_secret_key",
    'debug' => true
]);
$app->register(new SessionServiceProvider());
$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app->get('/', function (Application $app) {
    return $app['twig']->render('home.twig');
});
$app->get('/login', function (Application $app) {
    $username = $app['request']->server->get('PHP_AUTH_USER', false);
    $password = $app['request']->server->get('PHP_AUTH_PW');
    if ('gonzalo' === $username && 'password' === $password) {
        $app['session']->set('user', ['username' => $username]);

        return $app->redirect('/private');
    }
    $response = new Response();
    $response->headers->set('WWW-Authenticate', sprintf('Basic realm="%s"', 'site_login'));
    $response->setStatusCode(401, 'Please sign in.');

    return $response;
});

$app->get('/getIoConnectionToken', function (Application $app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    $jwt = JWT::encode([
        // I can use "exp" reserved claim. It's cool. My connection token is only available
        // during a period of time. The problem is if I restart the io server. Client will
        // try to re-connect using this token and it's expired.
        //"exp"  => (new \DateTimeImmutable())->modify('+10 second')->getTimestamp(),
        "user" => $user
    ], $app['secret']);

    return $app->json($jwt);
});

$app->get('/private', function (Application $app) {
    $user = $app['session']->get('user');

    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    $userName = $user['username'];

    return $app['twig']->render('private.twig', [
        'user'  => $userName
    ]);
});
$app->run();

Full project in my github.

Sharing authentication between socket.io and a PHP frontend

Normally, when I work with websockets, my stack is a socket.io server and a Silex frontend. Protect a PHP frontend with one kind of authentication of another is pretty straightforward. But if we want to use websockets, we need to set up another server and if we protect our frontend we need to protect our websocket server too.

If our frontend is node too (express for example), sharing authentication is more easy but at this time we we want to use two different servers (a node server and a PHP server). I’ve written about it too but today we`ll see another solution. Let’s start.

Imagine we have this simple Silex application. It has three routes:

  • “/” a public route
  • “/login” to perform the login action
  • “/private” a private route. If we try to get here without a valid session we’ll get a 403 error

And this is the code. It’s basically one example using sessions taken from Silex documentation:

use Silex\Application;
use Silex\Provider\SessionServiceProvider;
use Silex\Provider\TwigServiceProvider;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;

$app = new Application();

$app->register(new SessionServiceProvider());
$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app->get('/', function (Application $app) {
    return $app['twig']->render('home.twig');
});

$app->get('/login', function () use ($app) {
    $username = $app['request']->server->get('PHP_AUTH_USER', false);
    $password = $app['request']->server->get('PHP_AUTH_PW');

    if ('gonzalo' === $username && 'password' === $password) {
        $app['session']->set('user', ['username' => $username]);

        return $app->redirect('/private');
    }

    $response = new Response();
    $response->headers->set('WWW-Authenticate', sprintf('Basic realm="%s"', 'site_login'));
    $response->setStatusCode(401, 'Please sign in.');

    return $response;
});

$app->get('/private', function () use ($app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app['twig']->render('private.twig', [
        'username'  => $user['username']
    ]);
});

$app->run();

Our “/private” route also creates a connection with our websocket server.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
Welcome {{ username }}!

<script src="http://localhost:3000/socket.io/socket.io.js"></script>
<script>
    var socket = io('http://localhost:3000/');
    socket.on('connect', function () {
        console.log("connected!");
    });
    socket.on('disconnect', function () {
        console.log("disconnected!");
    });
</script>

</body>
</html>

And that’s our socket.io server. A really simple one.

var io = require('socket.io')(3000);

It works. Our frontend is protected. We need to login with our credentials (in this example “gonzalo/password”), but everyone can connect to our socket.io server. The idea is to use our PHP session to protect our socket.io server too. In fact is very easy how to do it. First we need to pass our PHPSESSID to our socket.io server. To do it, when we perform our socket.io connection in the frontend, we pass our session id

<script>
    var socket = io('http://localhost:3000/', {
        query: 'token={{ sessionId }}'
    });
    socket.on('connect', function () {
        console.log("connected!");
    });
    socket.on('disconnect', function () {
        console.log("disconnect!");
    });
</script>

As well as we’re using a twig template we need to pass sessionId variable

$app->get('/private', function () use ($app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app['twig']->render('private.twig', [
        'username'  => $user['username'],
        'sessionId' => $app['session']->getId()
    ]);
});

Now we only need to validate the token before stabilising connection. Socket.io provides us a middleware to perform those kind of operations. In this example we’re using PHP sessions out of the box. How can we validate it? The answer is easy. We only need to create a http client (in the socket.io server) and perform a request to a protected route (we’ll use “/private”). If we’re using a different provider to store our sessions (I hope you aren’t using Memcached to store PHP session, indeed) you’ll need to validate our sessionId against your provider.

var io = require('socket.io')(3000),
    http = require('http');

io.use(function (socket, next) {
    var options = {
        host: 'localhost',
        port: 8080,
        path: '/private',
        headers: {Cookie: 'PHPSESSID=' + socket.handshake.query.token}
    };

    http.request(options, function (response) {
        response.on('error', function () {
            next(new Error("not authorized"));
        }).on('data', function () {
            next();
        });
    }).end();
});

io.on('connection', function () {
    console.log("connected!");
});

Ok. This example works but we’re generating dynamically a js file injecting our PHPSESSID. If we want to extract the sessionId from the request we can use document.cookie but sometimes it doesn’t work. That’s because HttpOnly. HttpOnly is our friend if we want to protect our cookies against XSS attacks but in this case our protection difficults our task.

We can solve this problem performing a simple request to our server. We’ll create a new route (a private route) called ‘getSessionID’ that gives us our sessionId.

$app->get('/getSessionID', function (Application $app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app->json($app['session']->getId());
});

So before establishing the websocket we just need to create a GET request to our new route to obtain the sessionID.

var io = require('socket.io')(3000),
    http = require('http');

io.use(function (socket, next) {
    var sessionId = socket.handshake.query.token,
        options = {
            host: 'localhost',
            port: 8080,
            path: '/getSessionID',
            headers: {Cookie: 'PHPSESSID=' + sessionId}
        };

    http.request(options, function (response) {
        response.on('error', function () {
            next(new Error("not authorized"));
        });
        response.on('data', function (chunk) {
            var sessionIdFromRequest;
            try {
                sessionIdFromRequest = JSON.parse(chunk.toString());
            } catch (e) {
                next(new Error("not authorized"));
            }

            if (sessionId == sessionIdFromRequest) {
                next();
            } else {
                next(new Error("not authorized"));
            }
        });
    }).end();
});

io.on('connection', function (socket) {
    setInterval(function() {
        socket.emit('hello', {hello: 'world'});
    }, 1000);
});

And thats all. You can see the full example in my github account.

PHP Dumper using Websockets

Another crazy idea. I want to dump my backend output in the browser’s console. There’re several PHP dumpers. For example Raul Fraile’s LadyBug. There’re also libraries to do exactly what I want to do, such as Chrome Logger. But I wanted to use Websockets and dump values in real time, without waiting to the end of backend script. Why? The answer is simple: Because I wanted to it:)

I’ve written several post about Websockets, Silex, PHP. In this case I’ll use a similar approach than the previous posts. First I’ve created a simple Webscocket server with socket.io. This server also starts a Express server to handle internal messages from the Silex Backend

var CONF = {
        IO: {HOST: '0.0.0.0', PORT: 8888},
        EXPRESS: {HOST: '0.0.0.0', PORT: 26300}
    },
    express = require('express'),
    expressApp = express(),
    server = require('http').Server(expressApp),
    io = require('socket.io')(server, {origins: 'localhost:*'})
    ;

expressApp.get('/:type/:session/:message', function (req, res) {
    console.log(req.params);
    var session = req.params.session,
        type = req.params.type,
        message = req.params.message;

    io.sockets.emit('dumper.' + session, {title: type, data: JSON.parse(message)});
    res.json('OK');
});

io.sockets.on('connection', function (socket) {
    console.log("Socket connected!");
});

expressApp.listen(CONF.EXPRESS.PORT, CONF.EXPRESS.HOST, function () {
    console.log('Express started');
});

server.listen(CONF.IO.PORT, CONF.IO.HOST, function () {
    console.log('IO started');
});

Now we create a simple Service provider to connect our Silex Backend to our Express server (and send the dumper’s messages using the websocket connection)

<?php

namespace Dumper\Silex\Provider;

use Silex\Application;
use Silex\ServiceProviderInterface;
use Dumper\Dumper;
use Silex\Provider\SessionServiceProvider;
use GuzzleHttp\Client;

class DumperServiceProvider implements ServiceProviderInterface
{
    private $wsConnector;
    private $client;

    public function __construct(Client $client, $wsConnector)
    {
        $this->client = $client;
        $this->wsConnector = $wsConnector;
    }

    public function register(Application $app)
    {
        $app->register(new SessionServiceProvider());

        $app['dumper'] = function () use ($app) {
            return new Dumper($this->client, $this->wsConnector, $app['session']->get('uid'));
        };

        $app['dumper.init'] = $app->protect(function ($uid) use ($app) {
            $app['session']->set('uid', $uid);
        });

        $app['dumper.uid'] = function () use ($app) {
            return $app['session']->get('uid');
        };
    }

    public function boot(Application $app)
    {
    }
}

Finally our Silex Application looks like that:

include __DIR__ . '/../vendor/autoload.php';

use Silex\Application;
use Silex\Provider\TwigServiceProvider;
use Dumper\Silex\Provider\DumperServiceProvider;
use GuzzleHttp\Client;

$app = new Application([
    'debug' => true
]);

$app->register(new DumperServiceProvider(new Client(), 'http://192.168.1.104:26300'));

$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app->get("/", function (Application $app) {
    $uid = uniqid();

    $app['dumper.init']($uid);

    return $app['twig']->render('index.twig', [
        'uid' => $uid
    ]);
});

$app->get('/api/hello', function (Application $app) {
    $app['dumper']->error("Hello world1");
    $app['dumper']->info([1,2,3]);

    return $app->json('OK');
});


$app->run();

In the client side we have one index.html. I’ve created Twig template to pass uid to the dumper object (the websocket channel to listen to), but we also can fetch this uid from the backend with one ajax call.

<!DOCTYPE html>
<html>
<head lang="en">
    <meta charset="UTF-8">
    <title>Dumper example</title>
</head>
<body>

<a href="#" onclick="api('hello')">hello</a>

<!-- We use jQuery just for the demo. Library doesn't need jQuery -->
<script src="assets/jquery/dist/jquery.min.js"></script>
<!-- We load the library -->
<script src="js/dumper.js"></script>

<script>
    dumper.startSocketIo('{{ uid }}', '//localhost:8888');
    function api(name) {
        // we perform a remote api ajax call that triggers websockets
        $.getJSON('/api/' + name, function (data) {
            // Doing nothing. We only call the api to test php dumper
        });
    }
</script>
</body>
</html>

I use jQuery to handle ajax request and to connect to the websocket dumper object (it doesn’t deppend on jQuery, only depend on socket.io)

var dumper = (function () {
    var socket, sessionUid, socketUri, init;

    init = function () {
        if (typeof(io) === 'undefined') {
            setTimeout(init, 100);
        } else {
            socket = io(socketUri);

            socket.on('dumper.' + sessionUid, function (data) {
                console.group('Dumper:', data.title);
                switch (data.title) {
                    case 'emergency':
                    case 'alert':
                    case 'critical':
                    case 'error':
                        console.error(data.data);
                        break;
                    case 'warning':
                        console.warn(data.data);
                        break;
                    case 'notice':
                    case 'info':
                    //case 'debug':
                        console.info(data.data);
                        break;
                    default:
                        console.log(data.data);
                }
                console.groupEnd();
            });
        }
    };

    return {
        startSocketIo: function (uid, uri) {
            var script = document.createElement('script');
            var node = document.getElementsByTagName('script')[0];

            sessionUid = uid;
            socketUri = uri;
            script.src = socketUri + '/socket.io/socket.io.js';
            node.parentNode.insertBefore(script, node);

            init();
        }
    };
})();

Source code is available in my github account

Enclosing socket.io Websocket connection inside a HTML5 SharedWorker

I really like WebSockets. I’ve written several posts about them. Today we’re going to speak about something related to WebSockets. Let me explain it a little bit.

Imagine that we build a Web Application with WebSockets. That’s means that when we start the application, we need to connect to the WebSockets server. If our application is a Single-page application, we’ll create one socket per application, but: What happens if we open three tabs with the application within the browser? The answer is simple, we’ll create three sockets. Also, if we reload one tab (a full refresh) we’ll disconnect our socket and reconnect again. Maybe we can handle this situation, but we can easily bypass this disconnect-connect situation with a HTML5 feature called SharedWorkers.

Web Workers allows us to run JavaScript process in background. We also can create Shared Workers. SharedWorkers can be shared within our browser session. That’s means that we can enclose our WebSocket server inside s SharedWorker, and if we open various tabs with our browser we only need one Socket (one socket per session instead one socket per tab).

I’ve written a simple library called gio to perform this operation. gio uses socket.io to create WebSockets. WebWorker is a new HTML5 feature and it needs a modern browser. Socket.io works also with old browsers. It checks if WebWorkers are available and if they isn’t, then gio creates a WebSocket connection instead of using a WebWorker to enclose the WebSockets.

We can see one simple video to see how it works. In the video we can see how sockets are created. Only one socket is created even if we open more than one tab in our browser. But if we open a new session (one incognito session for example), a new socket is created

Here we can see the SharedWorker code:

"use strict";

importScripts('socket.io.js');

var socket = io(self.name),
    ports = [];

addEventListener('connect', function (event) {
    var port = event.ports[0];
    ports.push(port);
    port.start();

    port.addEventListener("message", function (event) {
        for (var i = 0; i < event.data.events.length; ++i) {
            var eventName = event.data.events[i];

            socket.on(event.data.events[i], function (e) {
                port.postMessage({type: eventName, message: e});
            });
        }
    });
});

socket.on('connect', function () {
    for (var i = 0; i < ports.length; i++) {
        ports[i].postMessage({type: '_connect'});
    }
});

socket.on('disconnect', function () {
    for (var i = 0; i < ports.length; i++) {
        ports[i].postMessage({type: '_disconnect'});
    }
});

And here we can see the gio source code:

var gio = function (uri, onConnect, onDisConnect) {
    "use strict";
    var worker, onError, workerUri, events = {};

    function getKeys(obj) {
        var keys = [];

        for (var i in obj) {
            if (obj.hasOwnProperty(i)) {
                keys.push(i);
            }
        }

        return keys;
    }

    function onMessage(type, message) {
        switch (type) {
            case '_connect':
                if (onConnect) onConnect();
                break;
            case '_disconnect':
                if (onDisConnect) onDisConnect();
                break;
            default:
                if (events[type]) events[type](message);
        }
    }

    function startWorker() {
        worker = new SharedWorker(workerUri, uri);
        worker.port.addEventListener("message", function (event) {
            onMessage(event.data.type, event.data.message);

        }, false);

        worker.onerror = function (evt) {
            if (onError) onError(evt);
        };

        worker.port.start();
        worker.port.postMessage({events: getKeys(events)});
    }

    function startSocketIo() {
        var socket = io(uri);
        socket.on('connect', function () {
            if (onConnect) onConnect();
        });

        socket.on('disconnect', function () {
            if (onDisConnect) onDisConnect();
        });

        for (var eventName in events) {
            if (events.hasOwnProperty(eventName)) {
                socket.on(eventName, socketOnEventHandler(eventName));
            }
        }
    }

    function socketOnEventHandler(eventName) {
        return function (e) {
            onMessage(eventName, e);
        };
    }

    return {
        registerEvent: function (eventName, callback) {
            events[eventName] = callback;
        },

        start: function () {
            if (!SharedWorker) {
                startSocketIo();
            } else {
                startWorker();
            }
        },

        onError: function (cbk) {
            onError = cbk;
        },

        setWorker: function (uri) {
            workerUri = uri;
        }
    };
};

And here the application code:

(function (gio) {
    "use strict";

    var onConnect = function () {
        console.log("connected!");
    };

    var onDisConnect = function () {
        console.log("disconnect!");
    };

    var ws = gio("http://localhost:8080", onConnect, onDisConnect);
    ws.setWorker("sharedWorker.js");

    ws.registerEvent("message", function (data) {
        console.log("message", data);
    });

    ws.onError(function (data) {
        console.log("error", data);
    });

    ws.start();
}(gio));

I’ve also created a simple webSocket server with socket.io. In this small server there’s a setInterval function broadcasting one message to all clients per second to see the application working

var io, connectedSockets;

io = require('socket.io').listen(8080);
connectedSockets = 0;

io.sockets.on('connection', function (socket) {
    connectedSockets++;
    console.log("Socket connected! Conected sockets:", connectedSockets);

    socket.on('disconnect', function () {
        connectedSockets--;
        console.log("Socket disconnect! Conected sockets:", connectedSockets);
    });
});

setInterval(function() {
    io.emit("message", "Hola " + new Date().getTime());
}, 1000); 

Source code is available in my github account.

Yet Another example of WebSockets, socket.io and AngularJs working with a Silex backend

Remember my last post about WebSockets and AngularJs? Today we’re going to play with something similar. I want to create a key-value interface to play with websockets. Let me explain it a little bit.

First we’re going to see the backend. One Silex application with two routes: a get one and a post one:

<?php

include __DIR__ . '/../../vendor/autoload.php';
include __DIR__ . '/SqlLiteStorage.php';

use Silex\Application;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Silex\Provider\DoctrineServiceProvider;

$app = new Application([
    'debug'      => true,
    'ioServer'   => 'http://localhost:3000',
    'httpServer' => 'http://localhost:3001',
]);

$app->after(function (Request $request, Response $response) {
    $response->headers->set('Access-Control-Allow-Origin', '*');
});

$app->register(new G\Io\EmitterServiceProvider($app['httpServer']));
$app->register(new DoctrineServiceProvider(), [
    'db.options' => [
        'driver' => 'pdo_sqlite',
        'path'   => __DIR__ . '/../../db/app.db.sqlite',
    ],
]);
$app->register(new G\Io\Storage\Provider(new SqlLiteStorage($app['db'])));

$app->get('conf', function (Application $app, Request $request) {
    $chanel = $request->get('token');
    return $app->json([
        'ioServer' => $app['ioServer'],
        'chanel'   => $chanel
    ]);
});

$app->get('/{key}', function (Application $app, $key) {
    return $app->json($app['gdb.get']($key));
});

$app->post('/{key}', function (Application $app, Request $request, $key) {
    $content = json_decode($request->getContent(), true);

    $chanel = $content['token'];
    $app->json($app['gdb.post']($key, $content['value']));

    $app['io.emit']($chanel, [
        'key'   => $key,
        'value' => $content['value']
    ]);

    return $app->json(true);
});

$app->run();

As we can see we register one service provider:

$app->register(new G\Io\Storage\Provider(new SqlLiteStorage($app['db'])));

This provider needs an instance of StorageIface

namespace G\Io\Storage;

interface StorageIface
{
    public function get($key);

    public function post($key, $value);
}

Our implementation uses SqlLite, but it’s pretty straightforward to change to another Database Storage or even a NoSql Database.

use Doctrine\DBAL\Connection;
use G\Io\Storage\StorageIface;

class SqlLiteStorage implements StorageIface
{
    private $db;

    public function __construct(Connection $db)
    {
        $this->db = $db;
    }

    public function get($key)
    {
        $statement = $this->db->executeQuery('select value from storage where key = :KEY', ['KEY' => $key]);
        $data      = $statement->fetchAll();

        return isset($data[0]['value']) ? $data[0]['value'] : null;
    }

    public function post($key, $value)
    {
        $this->db->beginTransaction();

        $statement = $this->db->executeQuery('select value from storage where key = :KEY', ['KEY' =>; $key]);
        $data      = $statement->fetchAll();

        if (count($data) > 0) {
            $this->db->update('storage', ['value' => $value], ['key' => $key]);
        } else {
            $this->db->insert('storage', ['key' => $key, 'value' => $value]);
        }

        $this->db->commit();

        return $value;
    }
}

We also register another Service provider:

$app->register(new G\Io\EmitterServiceProvider($app['httpServer']));

This provider’s responsibility is to notify to the websocket’s server when anything changes within the storage:

namespace G\Io;

use Pimple\Container;
use Pimple\ServiceProviderInterface;

class EmitterServiceProvider implements ServiceProviderInterface
{
    private $server;

    public function __construct($url)
    {
        $this->server = $url;
    }

    public function register(Container $app)
    {
        $app['io.emit'] = $app->protect(function ($chanel, $params) use ($app) {
            $s = curl_init();
            curl_setopt($s, CURLOPT_URL, '{$this->server}/emit/?' . http_build_query($params) . '&_chanel=' . $chanel);
            curl_setopt($s, CURLOPT_RETURNTRANSFER, true);
            $content = curl_exec($s);
            $status  = curl_getinfo($s, CURLINFO_HTTP_CODE);
            curl_close($s);

            if ($status != 200) throw new \Exception();

            return $content;
        });
    }
}

The Websocket server is a simple socket.io server as well as a Express server to handle the backend’s triggers.

var
    express = require('express'),
    expressApp = express(),
    server = require('http').Server(expressApp),
    io = require('socket.io')(server, {origins: 'localhost:*'})
    ;

expressApp.get('/emit', function (req, res) {
    io.sockets.emit(req.query._chanel, req.query);
    res.json('OK');
});

expressApp.listen(3001);

server.listen(3000);

Our client application is an AngularJs application:

<!doctype html>
<html ng-app="app">
<head>
    <script src="//localhost:3000/socket.io/socket.io.js"></script>
    <script src="assets/angularjs/angular.js"></script>
    <script src="js/app.js"></script>
    <script src="js/gdb.js"></script>
</head>
<body>
 
<div ng-controller="MainController">
    <input type="text" ng-model="key">
    <button ng-click="change()">change</button>
</div>
 
</body>
</html>
angular.module('app', ['Gdb'])

    .run(function (Gdb) {
        Gdb.init({
            server: 'http://localhost:8080/gdb',
            token: '4b96716bcb3d42fc01ff421ea2cfd757'
        });
    })

    .controller('MainController', function ($scope, Gdb) {
        $scope.change = function () {
            Gdb.set('key', $scope.key).then(function() {
                console.log(&quot;Value set&quot;);
            });
        };

        Gdb.get('key').then(function (data) {
            $scope.key = data;
        });

        Gdb.watch('key', function (value) {
            console.log(&quot;Value updated&quot;);
            $scope.key = value;
        });
    })
;

As we can see the AngularJs application uses one small library called Gdb to handle the communications with the backend and WebSockets:

angular.module('Gdb', [])
    .factory('Gdb', function ($http, $q, $rootScope) {

        var socket,
            gdbServer,
            token,
            watches = {};

        var Gdb = {
            init: function (conf) {
                gdbServer = conf.server;
                token = conf.token;

                $http.get(gdbServer + '/conf', {params: {token: token}}).success(function (data) {
                    socket = io.connect(data.ioServer);
                    socket.on(data.chanel, function (data) {
                        watches.hasOwnProperty(data.key) ? watches[data.key](data.value) : null;
                        $rootScope.$apply();
                    });
                });
            },

            set: function (key, value) {
                var deferred = $q.defer();

                $http.post(gdbServer + '/' + key, {value: value, token: token}).success(function (data) {
                    deferred.resolve(data);
                });

                return deferred.promise;
            },

            get: function (key) {
                var deferred = $q.defer();

                $http.get(gdbServer + '/' + key, {params: {token: token}}).success(function (data) {
                    deferred.resolve(JSON.parse(data));
                });

                return deferred.promise;
            },

            watch: function (key, closure) {
                watches[key] = closure;
            }
        };

        return Gdb;
    });

And that’s all. You can see the whole project at github.

Playing with websockets, angularjs and socket.io

I’m a big fan of websockets. I’ve got various post about them (here, here). Last months I’m working with angularjs projects and because of that I wanna play a little bit with websockets (with socket.io) and angularjs.

I want to build one angular service.

angular.module('io.service', []).
    factory('io', function ($http) {
        var socket,
            apiServer,
            ioEvent,
            watches = {};

        return {
            init: function (conf) {
                apiServer = conf.apiServer;
                ioEvent = conf.ioEvent;

                socket = io.connect(conf.ioServer);
                socket.on(ioEvent, function (data) {
                    return watches.hasOwnProperty(data.item) ? watches[data.item](data) : null;
                });
            },

            emit: function (arguments) {
                return $http.get(apiServer + '/request', {params: arguments});
            },

            watch: function (item, closure) {
                watches[item] = closure;
            },

            unWatch: function (item) {
                delete watches[item];
            }
        };
    });

And now we can build the application

angular.module('myApp', ['io.service']).

    run(function (io) {
        io.init({
            ioServer: 'http://localhost:3000',
            apiServer: 'http://localhost:8080/api',
            ioEvent: 'io.response'
        });
    }).

    controller('MainController', function ($scope, io) {
        $scope.$watch('question', function (newValue, oldValue) {
            if (newValue != oldValue) {
                io.emit({item: 'question', newValue: newValue, oldValue: oldValue});
            }
        });

        io.watch('answer', function (data) {
            $scope.answer = data.value;
            $scope.$apply();
        });
    });

And this’s the html

<!doctype html>
<html>

<head>
    <title>ws experiment</title>
</head>

<body ng-app="myApp">

<div ng-controller="MainController">

    <input type="text" ng-model="question">
    <hr>
    <h1>Hello {{answer}}!</h1>
</div>

<script src="assets/angular/angular.min.js"></script>
<script src="//localhost:3000/socket.io/socket.io.js"></script>

<script src="js/io.js"></script>
<script src="js/app.js"></script>

</body>
</html>

The idea of the application is to watch one model’s variable (‘question’ in this example) and each time it changes we will send the value to the websocket server and we’ll so something (we will convert the string to upper case in our example)

As you can read in one of my previous post I don’t like to send messages from the web browser to the websocket server directly (due do to authentication issues commented here). I prefer to use one server (a Silex server in this example)

include __DIR__ . '/../../vendor/autoload.php';

use Silex\Application;
use Symfony\Component\HttpFoundation\Request;

$app = new Application(['debug' => true]);
$app->register(new G\Io\EmitterServiceProvider());

$app->get('/request', function (Application $app, Request $request) {

    $params = [
        'item'     => $request->get('item'),
        'newValue' => strtoupper($request->get('newValue')),
        'oldValue' => $request->get('oldValue'),
    ];

    try {
        $app['io.emit']($params);
        $params['status'] = true;
    } catch (\Exception $e) {
        $params['status'] = false;
    }

    return $app->json($params);
});

$app->run();

You can see the code within my github account.

Sending sockets from PostgreSQL triggers with Python

Picture this: We want to notify to one external service each time that one record is inserted in the database. We can find the place where the insert statement is done and create a TCP client there, but: What happens if the application that inserts the data within the database is a legacy application?, or maybe it is too hard to do?. If your database is PostgreSQL it’s pretty straightforward. With the “default” procedural language of PostgreSQL (pgplsql) we cannot do it, but PostgreSQL allows us to use more procedural languages than plpgsql, for example Python. With plpython we can use sockets in the same way than we use it within Python scripts. It’s very simple. Let me show you how to do it.

First we need to create one plpython with our TCP client

CREATE OR REPLACE FUNCTION dummy.sendsocket(msg character varying, host character varying, port integer)
  RETURNS integer AS
$BODY$
  import _socket
  try:
    s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
    s.connect((host, port))
    s.sendall(msg)
    s.close()
    return 1
  except:
    return 0
$BODY$
  LANGUAGE plpython VOLATILE
  COST 100;
ALTER FUNCTION dummy.sendsocket(character varying, character varying, integer)
  OWNER TO username;

Now we create the trigger that use our socket client.

CREATE OR REPLACE FUNCTION dummy.myTriggerToSendSockets()
RETURNS trigger AS
$BODY$
   import json
   stmt = plpy.prepare("select dummy.sendSocket($1, $2, $3)", ["text", "text", "int"])
   rv = plpy.execute(stmt, [json.dumps(TD), "host", 26200])
$BODY$
LANGUAGE plpython VOLATILE
COST 100;

As you can see in my example we are sending all the record as a JSON string in the socket body.

And finally we attach the trigger to one table (or maybe we need to do it to more than one table)

CREATE TRIGGER myTrigger
  AFTER INSERT OR UPDATE OR DELETE
  ON dummy.myTable
  FOR EACH ROW
  EXECUTE PROCEDURE dummy.myTriggerToSendSockets();

And that’s all. Now we can use one simple TCP socket server to handle those requests. Let me show you different examples of TCP servers with different languages. As we can see all are different implementations of Reactor pattern. We can use, for example:

node.js:

var net = require('net');

var host = 'localhost';
var port = 26200;

var server = net.createServer(function (socket) {
    socket.on('data', function(buffer) {
        // do whatever that we want with buffer
    });
});

server.listen(port, host);

python (with Twisted):

from twisted.internet import reactor, protocol

HOST = 'localhost'
PORT = 26200

class MyServer(protocol.Protocol):
    def dataReceived(self, data):
        # do whatever that we want with data
        pass

class MyServerFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return MyServer()

reactor.listenTCP(PORT, MyServerFactory(), interface=HOST)
reactor.run()

(I know that we can create the Python’s TCP server without Twisted, but if don’t use it maybe someone will angry with me. Probably he is angry right now because I put the node.js example first :))

php (with react):

<?php
include __DIR__ . '/vendor/autoload.php';

$host = 'localhost';
$port = 26200;

$loop   = React\EventLoop\Factory::create();
$socket = new React\Socket\Server($loop);

$socket->on('connection', function ($conn) {
    $conn->on('data', function ($data) {
        // do whatever we want with data
        }
    );
});

$socket->listen($port, $host);
$loop->run();

You also can use xinet.d to handle the TCP inbound connections.

Live changes within node.js scripts without stop/start

Imagine that you are working within a nodejs project. This simple script:

var CONF = ['item1', 'item2', 'item3'];

var last;
setInterval(function () {
    var next = CONF.indexOf(last) + 1;
    last = (CONF[next] == undefined) ? CONF[0] : CONF[next];
    console.log(last);
}, 1000);

If we run this script, we will see in the console one element of CONF each second. Simple, isn’t it?. OK, imagine now we want to add one new element to the list (let’s say item4). We can easily change the script, stop the execution and run again. OK but imagine that we cannot stop/start the script as many times as we want. What can we do?. We can store the CONF data into one external storage (Redis, for example), but today we are going to do something more easy. We are going to modify CONF in execution time. The idea is to open a TCP socket and let change CONF with a simple protocol.

If we change the script to:

var net = require('net');
var CONF_PORT = 9730;
var CONF = ['item1', 'item2', 'item3'];

var last;
setInterval(function () {
    var next = CONF.indexOf(last) + 1;
    last = (CONF[next] == undefined) ? CONF[0] : CONF[next];
    console.log(last);
}, 1000);

var serverConf = net.createServer(function (confSocket) {
    confSocket.on("data", function (data) {
        var dataAsString = data.toString().trim();
        switch (dataAsString.substr(0, 1)) {
            case '+':
                var userVariable = dataAsString.substr(1);
                if (CONF.indexOf(userVariable) < 0) {
                    CONF.push(userVariable);
                    confSocket.write("+ " + userVariable + " added\n");
                } else {
                    confSocket.write("+ " + userVariable + " already added\n");
                }
                break;
            case '-':
                var userVariable = dataAsString.substr(1);
                if (CONF.indexOf(userVariable) >= 0) {
                    CONF.splice(CONF.indexOf(userVariable), 1)
                    confSocket.write("- " + userVariable + " deleted\n");
                } else {
                    confSocket.write("- " + userVariable + " don't exists\n");
                }
                break;
            case '=':
                for (var i in CONF) {
                    confSocket.write(CONF[i] + "\n");
                }
                break;
        }
        confSocket.write("\n");
        confSocket.write("Number of elements: " + CONF.length + "\n");
        confSocket.end("\n");
    });
});
serverConf.listen(CONF_PORT);

You can see the script in action here:

How to send the output of Symfony’s process Component to a node.js server in Real Time with Socket.io

Today another crazy idea. Do you know Symfony Process Component? The Process Component is a simple component that executes commands in sub-processes. I like to use it when I need to execute commands in the operating system. The documentation is pretty straightforward. Normally when I want to collect the output of the script (imagine we run those scripts within a crontab) I save the output in a log file and I can check it even in real time with tail -f command.

This approach works, but I want to do it in a browser (call me crazy :)). I’ve written a couple of posts with something similar. What I want to do now? The idea is simple:

First I want to execute custom commands with process. Just follow the process documentation:

<?php
use Symfony\Component\Process\Process;

$process = new Process('ls -lsa');
$process->setTimeout(3600);
$process->run();
if (!$process->isSuccessful()) {
    throw new \RuntimeException($process->getErrorOutput());
}

print $process->getOutput();

Process has one cool feature, we can give feedback in real-time by passing an anonymous function to the run() method:

<?php
use Symfony\Component\Process\Process;

$process = new Process('ls -lsa');
$process->run(function ($type, $buffer) {
    if ('err' === $type) {
        echo 'ERR > '.$buffer;
    } else {
        echo 'OUT > '.$buffer;
    }
});

The idea now is to use this callback to send a TCP socket to one server with node.js

var LOCAL_PORT = 5600;
var server = require('net').createServer(function (socket) {
    socket.on('data', function (msg) {
        console.log(msg);
    });
}).listen(LOCAL_PORT);

server.on('listening', function () {
    console.log("TCP server accepting connection on port: " + LOCAL_PORT);
});

Now we change our php script to

<?php
include __DIR__ . "/../vendor/autoload.php";

use Symfony\Component\Process\Process;

function runProcess($command)
{
    $address = 'localhost';
    $port = 5600;
    $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
    socket_connect($socket, $address, $port);

    $process = new Process($command);
    $process->setTimeout(3600);
    $process->run(
        function ($type, $buffer) use ($socket) {
            if ('err' === $type) {
                socket_write($socket, "ERROR\n", strlen("ERROR\n"));
                socket_write($socket, $buffer, strlen($buffer));
            } else {

                socket_write($socket, $buffer, strlen($buffer));
            }
        }
    );
    if (!$process->isSuccessful()) {
        throw new \RuntimeException($process->getErrorOutput());
    }
    socket_close($socket);
}

runProcess('ls -latr /');

Now with the node.js started, if we run the php script, we will see the output of the process command in the node’s terminal. But we want to show it in a browser. What can we do? Of course, socket.io. We change the node.js command to:

var LOCAL_PORT = 5600;
var SOKET_IO_PORT = 8000;
var ioClients = [];

var io = require('socket.io').listen(SOKET_IO_PORT);

var server = require('net').createServer(function (socket) {
    socket.on('data', function (msg) {
        ioClients.forEach(function (ioClient) {
            ioClient.emit('log', msg.toString().trim());
        });
    });
}).listen(LOCAL_PORT);

io.sockets.on('connection', function (socketIo) {
    ioClients.push(socketIo);
});

server.on('listening', function () {
    console.log("TCP server accepting connection on port: " + LOCAL_PORT);
});

and finally we create a simple web client:

<script src="http://localhost:8000/socket.io/socket.io.js"></script>
<script>
    var socket = io.connect('http://localhost:8000');
    socket.on('log', function (data) {
        console.log(data);
    });
</script>

Now if we start one browser we will see the output of our command line process within the console tab of the browser.

If you want something like webConsole, I also have created the example, With a Web UI enabling to send custom commands. You can see it in github.

Obviously that’s only one experiment with a lot of security issues that we need to take into account if we want to use it in production. What do you think?

Follow

Get every new post delivered to your Inbox.

Join 1,141 other followers