Alexa skill example with Serverless framework

Today I want to play with Alexa and serverless framework. I’ve created a sample hello world skill. In fact this example is more or less the official hello-world sample.

const Alexa = require('ask-sdk-core')
const RequestInterceptor = require('./interceptors/RequestInterceptor')
const ResponseInterceptor = require('./interceptors/ResponseInterceptor')
const LaunchRequestHandler = require('./handlers/LaunchRequestHandler')
const HelloWorldIntentHandler = require('./handlers/HelloWorldIntentHandler')
const HelpIntentHandler = require('./handlers/HelpIntentHandler')
const CancelAndStopIntentHandler = require('./handlers/CancelAndStopIntentHandler')
const SessionEndedRequestHandler = require('./handlers/SessionEndedRequestHandler')
const FallbackHandler = require('./handlers/FallbackHandler')
const ErrorHandler = require('./handlers/ErrorHandler')

let skill

module.exports.handler = async (event, context) => {
  if (!skill) {
    skill = Alexa.SkillBuilders.custom().
      addRequestInterceptors(RequestInterceptor).
      addResponseInterceptors(ResponseInterceptor).
      addRequestHandlers(
        LaunchRequestHandler,
        HelloWorldIntentHandler,
        HelpIntentHandler,
        CancelAndStopIntentHandler,
        SessionEndedRequestHandler,
        FallbackHandler).
      addErrorHandlers(
        ErrorHandler).
      create()
  }

  return await skill.invoke(event, context)
}

It has one Intent that answers to hello command.

const HelloWorldIntentHandler = {
  canHandle (handlerInput) {
    return handlerInput.requestEnvelope.request.type === 'IntentRequest'
      && handlerInput.requestEnvelope.request.intent.name === 'HelloWorldIntent'
  },
  handle (handlerInput) {
    const speechOutput = 'Hello World'
    const cardTitle = 'Hello world'

    return handlerInput.responseBuilder.
      speak(speechOutput).
      reprompt(speechOutput).
      withSimpleCard(cardTitle, speechOutput).
      getResponse()
  }
}

module.exports = HelloWorldIntentHandler

I’m using Serverless framework to deploy the skill. I know that serverless frameworks has plugins for Alexa skills that help us to create the whole skill, but in this example I want to do it a little more manually (it’s the way that I learn new things).

First I create the skill in the Alexa developer console (or via ask cli). There’re a lot of tutorials about it. Then I take my alexaSkillId and I use this id within my serverless config file as the trigger event of my lambda function.

service: hello-world

provider:
  name: aws
  runtime: nodejs8.10
  region: ${opt:region, self:custom.defaultRegion}
  stage: ${opt:stage, self:custom.defaultStage}

custom:
  defaultRegion: eu-west-1
  defaultStage: prod

functions:
  info:
    handler: src/index.handler
    events:
      - alexaSkill: amzn1.ask.skill.my_skill

then I deploy the lambda function

npx serverless deploy –aws-s3-accelerate

And I take the ARN of the lambda function and I use this lambda as my endpoint in the Alexa developer console.

Also we can test our skill (at least the lambda function) using our favorite testing framework. I will use jest in this example.
Testing is very important, at least for me, when I’m working with lambdas and serverless. I want to test my script locally, instead of deploying to AWS again and again (it’s slow).

const when = require('./steps/when')
const { init } = require('./steps/init')

describe('When we invoke the skill', () => {
  beforeAll(() => {
    init()
  })

  test('launch intent', async () => {
    const res = await when.we_invoke_intent(require('./events/use_skill'))
    const card = res.response.card
    expect(card.title).toBe('Hello world')
    expect(card.content).toBe('Welcome to Hello world, you can say Hello or Help. Which would you like to try?')
  })

  test('help handler', async () => {
    const res = await when.we_invoke_intent(require('./events/help_handler'))
    console.log(res.response.outputSpeech.ssml)
    expect(res.response.outputSpeech.ssml).toBe('<speak>You can say hello to me! How can I help?</speak>')
  })

  test('Hello world handler', async () => {
    const res = await when.we_invoke_intent(require('./events/hello_world_handler'))
    const card = res.response.card
    expect(card.title).toBe('Hello world')
    expect(card.content).toBe('Hello World')
  })
})

Full code in my github account.

Encrypt Websocket (socket.io) communications

I’m a big fan of WebSockets and socket.io. I’ve written a lot of about it. In last posts I’ve written about socket.io and authentication. Today we’re going to speak about communications.

Imagine we’ve got a websocket server and we connect our application to this server (even using https/wss). If we open our browser’s console we can inspect our WebSocket communications. We also can enable debugging. This works in a similar way than when we start the promiscuous mode within our network interface. We will see every packets. Not only the packets that server is sending to us.

If we send send sensitive information over websockets, that means than one logged user can see another ones information. We can separate namespaces in our socket.io server. We also can do another thing: Encrypt communications using crypto-js.

I’ve created one small wrapper to use it with socket.io.
We can install our server dependency

npm g-crypt

And install our client dependency with bower

bower install g-crypt

And use it in our server

var io = require('socket.io')(3000),
    Crypt = require("g-crypt"),
    passphrase = 'super-secret-passphrase',
    crypter = Crypt(passphrase);

io.on('connection', function (socket) {
    socket.on('counter', function (data) {
        var decriptedData = crypter.decrypt(data);
        setTimeout(function () {
            console.log("counter status: " + decriptedData.id);
            decriptedData.id++;
            socket.emit('counter', crypter.encrypt(decriptedData));
        }, 1000);
    });
});

And now a simple HTTP application

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
Open console to see the messages

<script src="http://localhost:3000/socket.io/socket.io.js"></script>
<script src="assets/cryptojslib/rollups/aes.js"></script>
<script src="assets/g-crypt/src/Crypt.js"></script>
<script>
    var socket = io('http://localhost:3000/'),
        passphrase = 'super-secret-passphrase',
        crypter = Crypt(passphrase),
        id = 0;

    socket.on('connect', function () {
        console.log("connected! Let's start the counter with: " + id);
        socket.emit('counter', crypter.encrypt({id: id}));
    });

    socket.on('counter', function (data) {
        var decriptedData = crypter.decrypt(data);
        console.log("counter status: " + decriptedData.id);
        socket.emit('counter', crypter.encrypt({id: decriptedData.id}));
    });
</script>

</body>
</html>

Now our communications are encrypted and logged user cannot read another ones data.

Library is a simple wrapper

Crypt = function (passphrase) {
    "use strict";
    var pass = passphrase;
    var CryptoJSAesJson = {
        parse: function (jsonStr) {
            var j = JSON.parse(jsonStr);
            var cipherParams = CryptoJS.lib.CipherParams.create({ciphertext: CryptoJS.enc.Base64.parse(j.ct)});
            if (j.iv) cipherParams.iv = CryptoJS.enc.Hex.parse(j.iv);
            if (j.s) cipherParams.salt = CryptoJS.enc.Hex.parse(j.s);
            return cipherParams;
        },
        stringify: function (cipherParams) {
            var j = {ct: cipherParams.ciphertext.toString(CryptoJS.enc.Base64)};
            if (cipherParams.iv) j.iv = cipherParams.iv.toString();
            if (cipherParams.salt) j.s = cipherParams.salt.toString();
            return JSON.stringify(j);
        }
    };

    return {
        decrypt: function (data) {
            return JSON.parse(CryptoJS.AES.decrypt(data, pass, {format: CryptoJSAesJson}).toString(CryptoJS.enc.Utf8));
        },
        encrypt: function (data) {
            return CryptoJS.AES.encrypt(JSON.stringify(data), pass, {format: CryptoJSAesJson}).toString();
        }
    };
};

if (typeof module !== 'undefined' && typeof module.exports !== 'undefined') {
    CryptoJS = require("crypto-js");
    module.exports = Crypt;
} else {
    window.Crypt = Crypt;
}

Library available in my github and also we can use it using npm and bower.

Notify events from PostgreSQL to external listeners

Sometimes we need to call external programs from our PostgreSQL database. We can send sockets from SQL statements. I’ve written about it. The problem with this approach the following one. If user rollbacks the transaction the socket has been already emitted. That’s a problem (or not. Depending on our application). Nobody also guarantees that the process behind the socket server has access to the data of the transaction. If we’re very fast maybe the transaction isn’t commited yet. We can use one sleep function but sleep functions are always a bad idea. PostgreSQL gives us another tool to decouple processes: LISTEN and NOTIFY.

Let me show you and example. First we create a table:

CREATE TABLE PUBLIC.TBLEXAMPLE
(
  KEY1 CHARACTER VARYING(10) NOT NULL,
  KEY2 CHARACTER VARYING(14) NOT NULL,

  VALUE1 CHARACTER VARYING(20),
  VALUE2 CHARACTER VARYING(20) NOT NULL,

  CONSTRAINT TBLEXAMPLE_PKEY PRIMARY KEY (KEY1, KEY2)
)

Now we add a “after insert” trigger to our table

CREATE TRIGGER TBLEXAMPLE_AFTER
AFTER INSERT
ON PUBLIC.TBLEXAMPLE
FOR EACH ROW
EXECUTE PROCEDURE PUBLIC.NOTIFY();

And now, within the trigger function, we send a notify event (‘myEvent’ in this case) with the row information. We need to send plain text in the notify event so we’ll use JSON to encode our row data.

CREATE OR REPLACE FUNCTION PUBLIC.NOTIFY() RETURNS trigger AS
$BODY$
BEGIN
  PERFORM pg_notify('myEvent', row_to_json(NEW)::text);
  RETURN new;
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE COST 100;

Now we’re going to build a server side example that connects to our PostgreSQL database and listen to the event. In this case we’re going to use nodejs to build the prototype. This example also will enqueue events into a gearman server.

var pg = require('pg'),
    gearmanode = require('gearmanode'),
    gearmanClient,
    conString = 'tcp://username:password@localhost:5432/gonzalo',
    pgClient;

gearmanode.Client.logger.transports.console.level = 'error';

gearmanClient = gearmanode.client();

console.log('LISTEN myEvent');
pgClient = new pg.Client(conString);
pgClient.connect();
pgClient.query('LISTEN myEvent');
pgClient.on('notification', function (data) {
    console.log("\033[34m" + new Date + '-\033[0m payload', data.payload);
    gearmanClient.submitJob('sms.sender.one', data.payload);
});

And that’s all. Now we only need to perform an INSERT statement into our table. This process will trigger our event and our nodejs will enqueue the process into a gearman queue.

INSERT INTO PUBLIC.TBLEXAMPLE(KEY1, KEY2, VALUE1, VALUE2) VALUES ('k1', 'k2', 'v1', 'v2');

It’s good to remark that if our insert statement is inside a transaction and we rollback it, notify won’t send any message.

Sharing authentication between socket.io and a PHP frontend

Normally, when I work with websockets, my stack is a socket.io server and a Silex frontend. Protect a PHP frontend with one kind of authentication of another is pretty straightforward. But if we want to use websockets, we need to set up another server and if we protect our frontend we need to protect our websocket server too.

If our frontend is node too (express for example), sharing authentication is more easy but at this time we we want to use two different servers (a node server and a PHP server). I’ve written about it too but today we`ll see another solution. Let’s start.

Imagine we have this simple Silex application. It has three routes:

  • “/” a public route
  • “/login” to perform the login action
  • “/private” a private route. If we try to get here without a valid session we’ll get a 403 error

And this is the code. It’s basically one example using sessions taken from Silex documentation:

use Silex\Application;
use Silex\Provider\SessionServiceProvider;
use Silex\Provider\TwigServiceProvider;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;

$app = new Application();

$app->register(new SessionServiceProvider());
$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app->get('/', function (Application $app) {
    return $app['twig']->render('home.twig');
});

$app->get('/login', function () use ($app) {
    $username = $app['request']->server->get('PHP_AUTH_USER', false);
    $password = $app['request']->server->get('PHP_AUTH_PW');

    if ('gonzalo' === $username && 'password' === $password) {
        $app['session']->set('user', ['username' => $username]);

        return $app->redirect('/private');
    }

    $response = new Response();
    $response->headers->set('WWW-Authenticate', sprintf('Basic realm="%s"', 'site_login'));
    $response->setStatusCode(401, 'Please sign in.');

    return $response;
});

$app->get('/private', function () use ($app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app['twig']->render('private.twig', [
        'username'  => $user['username']
    ]);
});

$app->run();

Our “/private” route also creates a connection with our websocket server.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
Welcome {{ username }}!

<script src="http://localhost:3000/socket.io/socket.io.js"></script>
<script>
    var socket = io('http://localhost:3000/');
    socket.on('connect', function () {
        console.log("connected!");
    });
    socket.on('disconnect', function () {
        console.log("disconnected!");
    });
</script>

</body>
</html>

And that’s our socket.io server. A really simple one.

var io = require('socket.io')(3000);

It works. Our frontend is protected. We need to login with our credentials (in this example “gonzalo/password”), but everyone can connect to our socket.io server. The idea is to use our PHP session to protect our socket.io server too. In fact is very easy how to do it. First we need to pass our PHPSESSID to our socket.io server. To do it, when we perform our socket.io connection in the frontend, we pass our session id

<script>
    var socket = io('http://localhost:3000/', {
        query: 'token={{ sessionId }}'
    });
    socket.on('connect', function () {
        console.log("connected!");
    });
    socket.on('disconnect', function () {
        console.log("disconnect!");
    });
</script>

As well as we’re using a twig template we need to pass sessionId variable

$app->get('/private', function () use ($app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app['twig']->render('private.twig', [
        'username'  => $user['username'],
        'sessionId' => $app['session']->getId()
    ]);
});

Now we only need to validate the token before stabilising connection. Socket.io provides us a middleware to perform those kind of operations. In this example we’re using PHP sessions out of the box. How can we validate it? The answer is easy. We only need to create a http client (in the socket.io server) and perform a request to a protected route (we’ll use “/private”). If we’re using a different provider to store our sessions (I hope you aren’t using Memcached to store PHP session, indeed) you’ll need to validate our sessionId against your provider.

var io = require('socket.io')(3000),
    http = require('http');

io.use(function (socket, next) {
    var options = {
        host: 'localhost',
        port: 8080,
        path: '/private',
        headers: {Cookie: 'PHPSESSID=' + socket.handshake.query.token}
    };

    http.request(options, function (response) {
        response.on('error', function () {
            next(new Error("not authorized"));
        }).on('data', function () {
            next();
        });
    }).end();
});

io.on('connection', function () {
    console.log("connected!");
});

Ok. This example works but we’re generating dynamically a js file injecting our PHPSESSID. If we want to extract the sessionId from the request we can use document.cookie but sometimes it doesn’t work. That’s because HttpOnly. HttpOnly is our friend if we want to protect our cookies against XSS attacks but in this case our protection difficults our task.

We can solve this problem performing a simple request to our server. We’ll create a new route (a private route) called ‘getSessionID’ that gives us our sessionId.

$app->get('/getSessionID', function (Application $app) {
    $user = $app['session']->get('user');
    if (null === $user) {
        throw new AccessDeniedHttpException('Access Denied');
    }

    return $app->json($app['session']->getId());
});

So before establishing the websocket we just need to create a GET request to our new route to obtain the sessionID.

var io = require('socket.io')(3000),
    http = require('http');

io.use(function (socket, next) {
    var sessionId = socket.handshake.query.token,
        options = {
            host: 'localhost',
            port: 8080,
            path: '/getSessionID',
            headers: {Cookie: 'PHPSESSID=' + sessionId}
        };

    http.request(options, function (response) {
        response.on('error', function () {
            next(new Error("not authorized"));
        });
        response.on('data', function (chunk) {
            var sessionIdFromRequest;
            try {
                sessionIdFromRequest = JSON.parse(chunk.toString());
            } catch (e) {
                next(new Error("not authorized"));
            }

            if (sessionId == sessionIdFromRequest) {
                next();
            } else {
                next(new Error("not authorized"));
            }
        });
    }).end();
});

io.on('connection', function (socket) {
    setInterval(function() {
        socket.emit('hello', {hello: 'world'});
    }, 1000);
});

And thats all. You can see the full example in my github account.

Enclosing socket.io Websocket connection inside a HTML5 SharedWorker

I really like WebSockets. I’ve written several posts about them. Today we’re going to speak about something related to WebSockets. Let me explain it a little bit.

Imagine that we build a Web Application with WebSockets. That’s means that when we start the application, we need to connect to the WebSockets server. If our application is a Single-page application, we’ll create one socket per application, but: What happens if we open three tabs with the application within the browser? The answer is simple, we’ll create three sockets. Also, if we reload one tab (a full refresh) we’ll disconnect our socket and reconnect again. Maybe we can handle this situation, but we can easily bypass this disconnect-connect situation with a HTML5 feature called SharedWorkers.

Web Workers allows us to run JavaScript process in background. We also can create Shared Workers. SharedWorkers can be shared within our browser session. That’s means that we can enclose our WebSocket server inside s SharedWorker, and if we open various tabs with our browser we only need one Socket (one socket per session instead one socket per tab).

I’ve written a simple library called gio to perform this operation. gio uses socket.io to create WebSockets. WebWorker is a new HTML5 feature and it needs a modern browser. Socket.io works also with old browsers. It checks if WebWorkers are available and if they isn’t, then gio creates a WebSocket connection instead of using a WebWorker to enclose the WebSockets.

We can see one simple video to see how it works. In the video we can see how sockets are created. Only one socket is created even if we open more than one tab in our browser. But if we open a new session (one incognito session for example), a new socket is created

Here we can see the SharedWorker code:

"use strict";

importScripts('socket.io.js');

var socket = io(self.name),
    ports = [];

addEventListener('connect', function (event) {
    var port = event.ports[0];
    ports.push(port);
    port.start();

    port.addEventListener("message", function (event) {
        for (var i = 0; i < event.data.events.length; ++i) {
            var eventName = event.data.events[i];

            socket.on(event.data.events[i], function (e) {
                port.postMessage({type: eventName, message: e});
            });
        }
    });
});

socket.on('connect', function () {
    for (var i = 0; i < ports.length; i++) {
        ports[i].postMessage({type: '_connect'});
    }
});

socket.on('disconnect', function () {
    for (var i = 0; i < ports.length; i++) {
        ports[i].postMessage({type: '_disconnect'});
    }
});

And here we can see the gio source code:

var gio = function (uri, onConnect, onDisConnect) {
    "use strict";
    var worker, onError, workerUri, events = {};

    function getKeys(obj) {
        var keys = [];

        for (var i in obj) {
            if (obj.hasOwnProperty(i)) {
                keys.push(i);
            }
        }

        return keys;
    }

    function onMessage(type, message) {
        switch (type) {
            case '_connect':
                if (onConnect) onConnect();
                break;
            case '_disconnect':
                if (onDisConnect) onDisConnect();
                break;
            default:
                if (events[type]) events[type](message);
        }
    }

    function startWorker() {
        worker = new SharedWorker(workerUri, uri);
        worker.port.addEventListener("message", function (event) {
            onMessage(event.data.type, event.data.message);

        }, false);

        worker.onerror = function (evt) {
            if (onError) onError(evt);
        };

        worker.port.start();
        worker.port.postMessage({events: getKeys(events)});
    }

    function startSocketIo() {
        var socket = io(uri);
        socket.on('connect', function () {
            if (onConnect) onConnect();
        });

        socket.on('disconnect', function () {
            if (onDisConnect) onDisConnect();
        });

        for (var eventName in events) {
            if (events.hasOwnProperty(eventName)) {
                socket.on(eventName, socketOnEventHandler(eventName));
            }
        }
    }

    function socketOnEventHandler(eventName) {
        return function (e) {
            onMessage(eventName, e);
        };
    }

    return {
        registerEvent: function (eventName, callback) {
            events[eventName] = callback;
        },

        start: function () {
            if (!SharedWorker) {
                startSocketIo();
            } else {
                startWorker();
            }
        },

        onError: function (cbk) {
            onError = cbk;
        },

        setWorker: function (uri) {
            workerUri = uri;
        }
    };
};

And here the application code:

(function (gio) {
    "use strict";

    var onConnect = function () {
        console.log("connected!");
    };

    var onDisConnect = function () {
        console.log("disconnect!");
    };

    var ws = gio("http://localhost:8080", onConnect, onDisConnect);
    ws.setWorker("sharedWorker.js");

    ws.registerEvent("message", function (data) {
        console.log("message", data);
    });

    ws.onError(function (data) {
        console.log("error", data);
    });

    ws.start();
}(gio));

I’ve also created a simple webSocket server with socket.io. In this small server there’s a setInterval function broadcasting one message to all clients per second to see the application working

var io, connectedSockets;

io = require('socket.io').listen(8080);
connectedSockets = 0;

io.sockets.on('connection', function (socket) {
    connectedSockets++;
    console.log("Socket connected! Conected sockets:", connectedSockets);

    socket.on('disconnect', function () {
        connectedSockets--;
        console.log("Socket disconnect! Conected sockets:", connectedSockets);
    });
});

setInterval(function() {
    io.emit("message", "Hola " + new Date().getTime());
}, 1000); 

Source code is available in my github account.

Sending sockets from PostgreSQL triggers with Python

Picture this: We want to notify to one external service each time that one record is inserted in the database. We can find the place where the insert statement is done and create a TCP client there, but: What happens if the application that inserts the data within the database is a legacy application?, or maybe it is too hard to do?. If your database is PostgreSQL it’s pretty straightforward. With the “default” procedural language of PostgreSQL (pgplsql) we cannot do it, but PostgreSQL allows us to use more procedural languages than plpgsql, for example Python. With plpython we can use sockets in the same way than we use it within Python scripts. It’s very simple. Let me show you how to do it.

First we need to create one plpython with our TCP client

CREATE OR REPLACE FUNCTION dummy.sendsocket(msg character varying, host character varying, port integer)
  RETURNS integer AS
$BODY$
  import _socket
  try:
    s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
    s.connect((host, port))
    s.sendall(msg)
    s.close()
    return 1
  except:
    return 0
$BODY$
  LANGUAGE plpython VOLATILE
  COST 100;
ALTER FUNCTION dummy.sendsocket(character varying, character varying, integer)
  OWNER TO username;

Now we create the trigger that use our socket client.

CREATE OR REPLACE FUNCTION dummy.myTriggerToSendSockets()
RETURNS trigger AS
$BODY$
   import json
   stmt = plpy.prepare("select dummy.sendSocket($1, $2, $3)", ["text", "text", "int"])
   rv = plpy.execute(stmt, [json.dumps(TD), "host", 26200])
$BODY$
LANGUAGE plpython VOLATILE
COST 100;

As you can see in my example we are sending all the record as a JSON string in the socket body.

And finally we attach the trigger to one table (or maybe we need to do it to more than one table)

CREATE TRIGGER myTrigger
  AFTER INSERT OR UPDATE OR DELETE
  ON dummy.myTable
  FOR EACH ROW
  EXECUTE PROCEDURE dummy.myTriggerToSendSockets();

And that’s all. Now we can use one simple TCP socket server to handle those requests. Let me show you different examples of TCP servers with different languages. As we can see all are different implementations of Reactor pattern. We can use, for example:

node.js:

var net = require('net');

var host = 'localhost';
var port = 26200;

var server = net.createServer(function (socket) {
    socket.on('data', function(buffer) {
        // do whatever that we want with buffer
    });
});

server.listen(port, host);

python (with Twisted):

from twisted.internet import reactor, protocol

HOST = 'localhost'
PORT = 26200

class MyServer(protocol.Protocol):
    def dataReceived(self, data):
        # do whatever that we want with data
        pass

class MyServerFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return MyServer()

reactor.listenTCP(PORT, MyServerFactory(), interface=HOST)
reactor.run()

(I know that we can create the Python’s TCP server without Twisted, but if don’t use it maybe someone will angry with me. Probably he is angry right now because I put the node.js example first :))

php (with react):

<?php
include __DIR__ . '/vendor/autoload.php';

$host = 'localhost';
$port = 26200;

$loop   = React\EventLoop\Factory::create();
$socket = new React\Socket\Server($loop);

$socket->on('connection', function ($conn) {
    $conn->on('data', function ($data) {
        // do whatever we want with data
        }
    );
});

$socket->listen($port, $host);
$loop->run();

You also can use xinet.d to handle the TCP inbound connections.

Building a simple TCP proxy server with node.js

Today we are going to build a simple TCP proxy server. The scenario is the following one. We have got one host (the client) that establishes a TCP connection to another one (the remote).

client —> remote

We want to set up a proxy server in the middle, so the client will establish the connection with the proxy and the proxy will forward it to the remote, keeping in mind the remote response also.
With node.js is really simple to perform those kind of network operations.

client —> proxy -> remote

var net = require('net');

var LOCAL_PORT  = 6512;
var REMOTE_PORT = 6512;
var REMOTE_ADDR = "192.168.1.25";

var server = net.createServer(function (socket) {
    socket.on('data', function (msg) {
        console.log('  ** START **');
        console.log('<< From client to proxy ', msg.toString());
        var serviceSocket = new net.Socket();
        serviceSocket.connect(parseInt(REMOTE_PORT), REMOTE_ADDR, function () {
            console.log('>> From proxy to remote', msg.toString());
            serviceSocket.write(msg);
        });
        serviceSocket.on("data", function (data) {
            console.log('<< From remote to proxy', data.toString());
            socket.write(data);
            console.log('>> From proxy to client', data.toString());
        });
    });
});

server.listen(LOCAL_PORT);
console.log("TCP server accepting connection on port: " + LOCAL_PORT);

Simple, isn’t it?
Source code in github

Web console with node.js

Continuing with my experiments of node.js, this time I want to create a Web console. The idea is simple. I want to send a few command to the server and I display the output inside the browser. I can do it entirely with PHP but I want to send the output to the browser as fast as they appear without waiting for the end of the command. OK we can do it flushing the output in the server but this solution normally crashes if we keep the application open for a long time. WebSockets again to the rescue. If we need a cross-browser implementation we need the socket.io library. Let’s start:

The node server is a simple websocket server. In this example we will launch each command with spawn function (require(‘child_process’).spawn) and send the output within the websoket. Simple and pretty straightforward.

var sys   = require('sys'),
http  = require('http'),
url   = require('url'),
spawn = require('child_process').spawn,
ws    = require('./ws.js');

var availableComands = ['ls', 'ps', 'uptime', 'tail', 'cat'];
ws.createServer(function(websocket) {
    websocket.on('connect', function(resource) {
        var parsedUrl = url.parse(resource, true);
        var rawCmd = parsedUrl.query.cmd;
        var cmd = rawCmd.split(" ");
        if (cmd[0] == 'help') {
            websocket.write("Available comands: \n");
            for (i=0;i<availableComands.length;i++) {
                websocket.write(availableComands[i]);
                if (i< availableComands.length - 1) {
                    websocket.write(", ");
                }
            }
            websocket.write("\n");

            websocket.end();
        } else if (availableComands.indexOf(cmd[0]) >= 0) {
            if (cmd.length > 1) {
                options = cmd.slice(1);
            } else {
                options = [];
            }
            
            try {
                var process = spawn(cmd[0], options);
            } catch(err) {
                console.log(err);
                websocket.write("ERROR");
            }

            websocket.on('end', function() {
                process.kill();
            });

            process.stdout.on('data', function(data) {
                websocket.write(data);
            });

            process.stdout.on('end', function() {
                websocket.end();
            });
        } else {
             websocket.write("Comand not available. Type help for available comands\n");
             websocket.end();
        }
    });
  
}).listen(8880, '127.0.0.1');

The web client is similar than the example of my previous post

var timeout = 5000;
var wsServer = '127.0.0.1:8880';

var ws;


function cleanString(string) {
    return string.replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
}


function pad(n) {
    return ("0" + n).slice(-2);
}

var cmdHistory = [];
function send(msg) {
    if (msg == 'clear') {
        $('#log').html('');
        return;
    }
    try {
        ws = new WebSocket('ws://' + wsServer + '?cmd=' + msg);
        $('#toolbar').css('background', '#933');
        $('#socketStatus').html("working ... [<a href='#' onClick='quit()'>X</a>]");
        cmdHistory.push(msg);
        $('#log').append("<div class='cmd'>" + msg + "</div>");
        console.log("startWs:");
    } catch (err) {
        console.log(err);
        setTimeout(startWs, timeout);
    }

    ws.onmessage = function(event) {
        $('#log').append(util.toStaticHTML(event.data));
        window.scrollBy(0, 100000000000000000);
    };

    ws.onclose = function(){
        //console.log("ws.onclose");
        $('#toolbar').css('background', '#65A33F');
        $('#socketStatus').html('Type your comand:');
    }
}

function quit() {
    ws.close();
    window.scrollBy(0, 100000000000000000);
}
util = {
  urlRE: /https?:\/\/([-\w\.]+)+(:\d+)?(\/([^\s]*(\?\S+)?)?)?/g, 

  //  html sanitizer 
  toStaticHTML: function(inputHtml) {
    inputHtml = inputHtml.toString();
    return inputHtml.replace(/&/g, "&amp;")
                    .replace(/</g, "&lt;")
                    .replace("/n", "<br/>")
                    .replace(/>/g, "&gt;");
  }, 

  //pads n with zeros on the left,
  //digits is minimum length of output
  //zeroPad(3, 5); returns "005"
  //zeroPad(2, 500); returns "500"
  zeroPad: function (digits, n) {
    n = n.toString();
    while (n.length < digits) 
      n = '0' + n;
    return n;
  },

  //it is almost 8 o'clock PM here
  //timeString(new Date); returns "19:49"
  timeString: function (date) {
    var minutes = date.getMinutes().toString();
    var hours = date.getHours().toString();
    return this.zeroPad(2, hours) + ":" + this.zeroPad(2, minutes);
  },

  //does the argument only contain whitespace?
  isBlank: function(text) {
    var blank = /^\s*$/;
    return (text.match(blank) !== null);
  }
};
$(document).ready(function() {
  //submit new messages when the user hits enter if the message isnt blank
  $("#entry").keypress(function (e) {
    console.log(e);
    if (e.keyCode != 13 /* Return */) return;
    var msg = $("#entry").attr("value").replace("\n", "");
    if (!util.isBlank(msg)) send(msg);
    $("#entry").attr("value", ""); // clear the entry field.
  });
});

And that’s all. In fact we don’t need any line of PHP to perform this web console. Last year I tried to do something similar with PHP but it was a big mess. With node those kind of jobs are trivial. I don’t know if node.js is the future or is just another hype, but it’s easy. And cool. Really cool.

You can see the full code at Github here. Anyway you must take care if you run this application on your host. You are letting user to execute raw unix commands. A bit of security layer would be necessary.

Real time monitoring PHP applications with websockets and node.js

The inspection of the error logs is a common way to detect errors and bugs. We also can show errors on-screen within our developement server, or we even can use great tools like firePHP to show our PHP errors and warnings inside our firebug console. That’s cool, but we only can see our session errors/warnings. If we want to see another’s errors we need to inspect the error log. tail -f is our friend, but we need to surf against all the warnings of all sessions to see our desired ones. Because of that I want to build a tool to monitor my PHP applications in real-time. Let’s start:

What’s the idea? The idea is catch all PHP’s errors and warnings at run time and send them to a node.js HTTP server. This server will work similar than a chat server but our clients will only be able to read the server’s logs. Basically the applications have three parts: the node.js server, the web client (html5) and the server part (PHP). Let me explain a bit each part:

The node Server

Basically it has two parts: a http server to handle the PHP errors/warnings and a websocket server to manage the realtime communications with the browser. When I say that I’m using websockets that’s means the web client will only work with a browser with websocket support like chrome. Anyway it’s pretty straightforward swap from a websocket sever to a socket.io server to use it with every browser. But websockets seems to be the future, so I will use websockets in this example.

The http server:

http.createServer(function (req, res) {
    var remoteAdrress = req.socket.remoteAddress;
    if (allowedIP.indexOf(remoteAdrress) >= 0) {
        res.writeHead(200, {
            'Content-Type': 'text/plain'
        });
        res.end('Ok\n');
        try {
            var parsedUrl = url.parse(req.url, true);
            var type = parsedUrl.query.type;
            var logString = parsedUrl.query.logString;
            var ip = eval(parsedUrl.query.logString)[0];
            if (inspectingUrl == "" ||  inspectingUrl == ip) {
                clients.forEach(function(client) {
                    client.write(logString);
                });
            }
        } catch(err) {
            console.log("500 to " + remoteAdrress);
            res.writeHead(500, {
                'Content-Type': 'text/plain'
            });
            res.end('System Error\n');
        }
    } else {
        console.log("401 to " + remoteAdrress);
        res.writeHead(401, {
            'Content-Type': 'text/plain'
        });
        res.end('Not Authorized\n');
    }
}).listen(httpConf.port, httpConf.host);

and the web socket server:

var inspectingUrl = undefined;

ws.createServer(function(websocket) {
    websocket.on('connect', function(resource) {
        var parsedUrl = url.parse(resource, true);
        inspectingUrl = parsedUrl.query.ip;
        clients.push(websocket);
    });

    websocket.on('close', function() {
        var pos = clients.indexOf(websocket);
        if (pos >= 0) {
            clients.splice(pos, 1);
        }
    });

}).listen(wsConf.port, wsConf.host);

If you want to know more about node.js and see more examples, have a look to the great site: http://nodetuts.com/. In this site Pedro Teixeira will show examples and node.js tutorials. In fact my node.js http + websoket server is a mix of two tutorials from this site.

The web client.

The web client is a simple websockets application. We will handle the websockets connection, reconnect if it dies and a bit more. I’s based on node.js chat demo

<?php $ip = filter_input(INPUT_GET, 'ip', FILTER_SANITIZE_STRING); ?>

        Real time <?= $ip ?> monitor
<script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.3/jquery.min.js"></script><script type="text/javascript">// <![CDATA[
            selectedIp = '<?= $ip ?>';

// ]]></script>
<script type="text/javascript" src="js.js"></script>
</pre>
<div id="toolbar">
<ul id="status">
	<li>Socket status: <span id="socketStatus">Conecting ...</span></li>
	<li>IP: <!--?= $ip == '' ? 'all' : $ip . " <a href='?ip='-->[all]" ?></li>
	<li>count: <span id="count">0</span></li>
</ul>
</div>
<pre>


And the javascript magic

var timeout = 5000;
var wsServer = '192.168.2.2:8880';
var unread = 0;
var focus = false;

var count = 0;
function updateCount() {
    count++;
    $("#count").text(count);
}

function cleanString(string) {
    return string.replace(/&/g,"&amp;").replace(/</g,"&lt;").replace(/>/g,"&gt;");
}

function updateUptime () {
    var now = new Date();
    $("#uptime").text(now.toRelativeTime());
}

function updateTitle(){
    if (unread) {
        document.title = "(" + unread.toString() + ") Real time " + selectedIp + " monitor";
    } else {
        document.title = "Real time " + selectedIp + " monitor";
    }
}

function pad(n) {
    return ("0" + n).slice(-2);
}

function startWs(ip) {
    try {
        ws = new WebSocket("ws://" + wsServer + "?ip=" + ip);
        $('#toolbar').css('background', '#65A33F');
        $('#socketStatus').html('Connected to ' + wsServer);
        //console.log("startWs:" + ip);
        //listen for browser events so we know to update the document title
        $(window).bind("blur", function() {
            focus = false;
            updateTitle();
        });

        $(window).bind("focus", function() {
            focus = true;
            unread = 0;
            updateTitle();
        });
    } catch (err) {
        //console.log(err);
        setTimeout(startWs, timeout);
    }

    ws.onmessage = function(event) {
        unread++;
        updateTitle();
        var now = new Date();
        var hh = pad(now.getHours());
        var mm = pad(now.getMinutes());
        var ss = pad(now.getSeconds());

        var timeMark = '[' + hh + ':' + mm + ':' + ss + '] ';
        logString = eval(event.data);
        var host = logString[0];
        var line = "<table class='message'><tr><td width='1%' class='date'>" + timeMark + "</td><td width='1%' valign='top' class='host'><a href=?ip=" + host + ">" + host + "</a></td>";
        line += "<td class='msg-text' width='98%'>" + logString[1]; + "</td></tr>";
        if (logString[2]) {
            line += "<tr><td>&nbsp;</td><td colspan='3' class='msg-text'>" + logString[2] + "</td></tr>";
        }

        $('#log').append(line);
        updateCount();
        window.scrollBy(0, 100000000000000000);
    };

    ws.onclose = function(){
        //console.log("ws.onclose");
        $('#toolbar').css('background', '#933');
        $('#socketStatus').html('Disconected');
        setTimeout(function() {startWs(selectedIp)}, timeout);
    }
}

$(document).ready(function() {
    startWs(selectedIp);
});

The server part:

The server part will handle silently all PHP warnings and errors and it will send them to the node server. The idea is to place a minimal PHP line of code at the beginning of the application that we want to monitor. Imagine the following piece of PHP code

$a = $var[1];
$a = 1/0;
class Dummy
{
    static function err()
    {
        throw new Exception("error");
    }
}
Dummy1::err();

it will throw:
A notice: Undefined variable: var
A warning: Division by zero
An Uncaught exception ‘Exception’ with message ‘error’

So we will add our small library to catch those errors and send them to the node server

include('client/NodeLog.php');
NodeLog::init('192.168.2.2');

$a = $var[1];
$a = 1/0;
class Dummy
{
    static function err()
    {
        throw new Exception("error");
    }
}
Dummy1::err();

The script will work in the same way than the fist version but if we start our node.js server in a console:

$ node server.js
HTTP server started at 192.168.2.2::5672
Web Socket server started at 192.168.2.2::8880

We will see those errors/warnings in real-time when we start our browser

Here we can see a small screencast with the working application:

This is the server side library:

class NodeLog
{
    const NODE_DEF_HOST = '127.0.0.1';
    const NODE_DEF_PORT = 5672;

    private $_host;
    private $_port;

    /**
     * @param String $host
     * @param Integer $port
     * @return NodeLog
     */
    static function connect($host = null, $port = null)
    {
        return new self(is_null($host) ? self::$_defHost : $host, is_null($port) ? self::$_defPort : $port);
    }

    function __construct($host, $port)
    {
        $this->_host = $host;
        $this->_port = $port;
    }

    /**
     * @param String $log
     * @return Array array($status, $response)
     */
    public function log($log)
    {
        list($status, $response) = $this->send(json_encode($log));
        return array($status, $response);
    }

    private function send($log)
    {
        $url = "http://{$this->_host}:{$this->_port}?logString=" . urlencode($log);
        $ch = curl_init();
        curl_setopt($ch, CURLOPT_URL, $url);
        curl_setopt($ch, CURLOPT_NOBODY, true);
        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);

        $response = curl_exec($ch);
        $status   = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        return array($status, $response);
    }

    static function getip() {
        $realip = '0.0.0.0';
        if ($_SERVER) {
            if ( isset($_SERVER['HTTP_X_FORWARDED_FOR']) && $_SERVER['HTTP_X_FORWARDED_FOR'] ) {
                $realip = $_SERVER["HTTP_X_FORWARDED_FOR"];
            } elseif ( isset($_SERVER['HTTP_CLIENT_IP']) && $_SERVER["HTTP_CLIENT_IP"] ) {
                $realip = $_SERVER["HTTP_CLIENT_IP"];
            } else {
                $realip = $_SERVER["REMOTE_ADDR"];
            }
        } else {
            if ( getenv('HTTP_X_FORWARDED_FOR') ) {
                $realip = getenv('HTTP_X_FORWARDED_FOR');
            } elseif ( getenv('HTTP_CLIENT_IP') ) {
                $realip = getenv('HTTP_CLIENT_IP');
            } else {
                $realip = getenv('REMOTE_ADDR');
            }
        }
        return $realip;
    }

    public static function getErrorName($err)
    {
        $errors = array(
            E_ERROR             => 'ERROR',
            E_RECOVERABLE_ERROR => 'RECOVERABLE_ERROR',
            E_WARNING           => 'WARNING',
            E_PARSE             => 'PARSE',
            E_NOTICE            => 'NOTICE',
            E_STRICT            => 'STRICT',
            E_DEPRECATED        => 'DEPRECATED',
            E_CORE_ERROR        => 'CORE_ERROR',
            E_CORE_WARNING      => 'CORE_WARNING',
            E_COMPILE_ERROR     => 'COMPILE_ERROR',
            E_COMPILE_WARNING   => 'COMPILE_WARNING',
            E_USER_ERROR        => 'USER_ERROR',
            E_USER_WARNING      => 'USER_WARNING',
            E_USER_NOTICE       => 'USER_NOTICE',
            E_USER_DEPRECATED   => 'USER_DEPRECATED',
        );
        return $errors[$err];
    }

    private static function set_error_handler($nodeHost, $nodePort)
    {
        set_error_handler(function ($errno, $errstr, $errfile, $errline) use($nodeHost, $nodePort) {
            $err = NodeLog::getErrorName($errno);
            /*
            if (!(error_reporting() & $errno)) {
                // This error code is not included in error_reporting
                return;
            }
            */
            $log = array(
                NodeLog::getip(),
                "<strong class="{$err}">{$err}</strong> {$errfile}:{$errline}",
                nl2br($errstr)
            );
            NodeLog::connect($nodeHost, $nodePort)->log($log);
            return false;
        });
    }

    private static function register_exceptionHandler($nodeHost, $nodePort)
    {
        set_exception_handler(function($exception) use($nodeHost, $nodePort) {
            $exceptionName = get_class($exception);
            $message = $exception->getMessage();
            $file = $exception->getFile();
            $line = $exception->getLine();
            $trace = $exception->getTraceAsString();

            $msg = count($trace) > 0 ? "Stack trace:\n{$trace}" : null;
            $log = array(
                NodeLog::getip(),
                nl2br("<strong class="ERROR">Uncaught exception '{$exceptionName}'</strong> with message '{$message}' in {$file}:{$line}"),
                nl2br($msg)
            );
            NodeLog::connect($nodeHost, $nodePort)->log($log);
            return false;
        });
    }

    private static function register_shutdown_function($nodeHost, $nodePort)
    {
        register_shutdown_function(function() use($nodeHost, $nodePort) {
            $error = error_get_last();

            if ($error['type'] == E_ERROR) {
                $err = NodeLog::getErrorName($error['type']);
                $log = array(
                    NodeLog::getip(),
                    "<strong class="{$err}">{$err}</strong> {$error['file']}:{$error['line']}",
                    nl2br($error['message'])
                );
                NodeLog::connect($nodeHost, $nodePort)->log($log);
            }
            echo NodeLog::connect($nodeHost, $nodePort)->end();
        });
    }

    private static $_defHost = self::NODE_DEF_HOST;
    private static $_defPort = self::NODE_DEF_PORT;

    /**
     * @param String $host
     * @param Integer $port
     * @return NodeLog
     */
    public static function init($host = self::NODE_DEF_HOST, $port = self::NODE_DEF_PORT)
    {
        self::$_defHost = $host;
        self::$_defPort = $port;

        self::register_exceptionHandler($host, $port);
        self::set_error_handler($host, $port);
        self::register_shutdown_function($host, $port);

        $node = self::connect($host, $port);
        $node->start();
        return $node;
    }

    private static $time;
    private static $mem;

    public function start()
    {
        self::$time = microtime(TRUE);
        self::$mem = memory_get_usage();
        $log = array(NodeLog::getip(), "<strong class="OK">Start</strong> >>>> {$_SERVER['REQUEST_URI']}");
        $this->log($log);
    }

    public function end()
    {
        $mem = (memory_get_usage() - self::$mem) / (1024 * 1024);
        $time = microtime(TRUE) - self::$time;
        $log = array(NodeLog::getip(), "<strong class="OK">End</strong> <<<< mem: {$mem} time {$time}");         $this->log($log);
    }
}

And of course the full code on gitHub: RealTimeMonitor