Implementing Industrial OPC UA Communication with Python and Asyncio

Today we’re going to work with an industrial protocol called OPC UA. We’ll be using the opcua-asyncio library to create a simple OPC UA server and client. We’ll also be using the `asyncio` library to handle the asynchronous communication between the server and the client. The idea es build a OPC UA server that exposes a variable and a client that reads and writes to that variable.

To simulate a changing variable, I’ve created a simple script that changes one variable every second with the value of the current time and persists it to a Redis database.

import logging
import time

import redis

from settings import REDIS_HOST, REDIS_PORT

logger = logging.getLogger(__name__)


def update_redis_variable_loop():
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    while True:
        timestamp_ms = int(time.time() * 1_000)
        r.set('ts', timestamp_ms)
        logger.info(f"Updated variable: {timestamp_ms}")
        time.sleep(1)

The server will have an authentication mechanism using a username and password, and it will also have a self-signed certificate and a private key to encrypt the communication. To generate the self-signed certificate and private key, you can use the following commands:

openssl genpkey -algorithm RSA -out private_key.pem
openssl req -new -key private_key.pem -out certificate.csr
openssl x509 -req -days 365 -in certificate.csr -signkey private_key.pem -out certificate.pem

This OPC UA server will expose the variable that we’re updating in the Redis database.

class UserManager:
    def get_user(self, iserver, username=None, password=None, certificate=None):
        if certificate and OPC_USERS_DB.get(username, False) == password:
            logger.info(f"User '{username}' authenticated")
            return User(role=UserRole.User)
        return None


async def main():
    server = Server(user_manager=UserManager())
    await server.init()
    server.set_endpoint(OPC_ENDPOINT)

    await server.load_certificate(OPC_CERTIFICATE)
    await server.load_private_key(OPC_PRIVATE_KEY)
    server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])

    namespace_idx = await server.register_namespace(OPC_NAMESPACE)
    obj = await server.nodes.objects.add_object(namespace_idx, "Gonzalo")
    var = await obj.add_variable(namespace_idx, "T", 0, datatype=ua.VariantType.Int32)
    await var.set_writable(False)

    redis_client = await redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    logger.info(f"Starting server on {OPC_ENDPOINT}")

    async with server:
        while True:
            await asyncio.sleep(1)
            value = await redis_client.get('ts')
            if value is not None:
                value = int(value)
                logger.info(f"Set value of {var} to {value}")
                await var.write_value(value)


def server(debug: bool = False):
    asyncio.run(main(), debug=debug)

And now we create a OPC UA client that reads the variable from the server and prints it to the console.

import asyncio
import logging

from asyncua import Client

from settings import OPC_ENDPOINT, OPC_CERTIFICATE, OPC_PRIVATE_KEY, OPC_USERNAME, OPC_PASSWORD

logger = logging.getLogger(__name__)


async def main():
    c = Client(url=OPC_ENDPOINT)
    c.set_user(OPC_USERNAME)
    c.set_password(OPC_PASSWORD)
    await c.set_security_string(f"Basic256Sha256,SignAndEncrypt,{OPC_CERTIFICATE},{OPC_PRIVATE_KEY}")

    async with c:
        node = c.get_node("ns=2;i=2")
        value = await node.read_value()
        logger.info(f"Value: {value}")


def client(debug: bool = False):
    asyncio.run(main(), debug=debug)

In our example we are using click to create a CLI interface to run the server and the client.

# Start Redis server
docker-compose up

# Start the process that updates the variable in Redis
python cli.py backend

# Run the server
python cli.py server

# Run the client
python cli.py client

Full code available in my github account

Creating a Real-Time Flask Application with Flask-SocketIO and Redis

Today, we’re going to create a simple Flask application with real-time communication using websockets and the SocketIO library. We’ll leverage the Flask-SocketIO extension for integration.

Here’s the plan: while websockets support bidirectional communication, we’ll use them exclusively for server-to-client messages. For client-to-server interactions, we’ll stick with traditional HTTP communication.

Our application will include session-based authentication. To simulate login, we’ve created a route called /login that establishes a session. This session-based authentication will also apply to our websocket connections.

A key objective of this tutorial is to enable sending websocket messages from outside the web application. For instance, you might want to send messages from a cron job or an external service. To achieve this, we’ll use a message queue to facilitate communication between the SocketIO server and the client application. We’ll utilize Redis as our message queue.

That’s the main application

from flask import Flask, render_template, session, request

from lib.ws import register_ws, emit_event, EmitWebsocketRequest
from settings import REDIS_HOST, WS_PATH

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

register_ws(app=app, socketio_path=WS_PATH, redis_host=REDIS_HOST)


@app.route('/')
def index():
    return render_template('index.html')


@app.route('/login')
def login():
    session['user'] = 'Gonzalo'
    return dict(name=session['user'])


@app.post('/api/')
def api():
    data = EmitWebsocketRequest(**request.json)
    emit_event(data.channel, data.body)

    return dict(status=True)

That’s the html template

<pre class="wp-block-syntaxhighlighter-code"><!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Flask-SocketIO Websocket Example</title>
    <a href="//cdn.socket.io/4.0.0/socket.io.min.js">//cdn.socket.io/4.0.0/socket.io.min.js</a>
</head>
<body>

<h1>Flask-SocketIO Websocket Example</h1>
<label for="message">Message:</label>
<input type="text" id="message" placeholder="type a message...">

<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>

<script>
    document.addEventListener("DOMContentLoaded", function () {
        let host = location.protocol + '//' + location.hostname + ':' + location.port
        let socket = io.connect(host, {
            path: '/ws/socket.io',
            reconnection: true,
            reconnectionDelayMax: 5000,
            reconnectionDelay: 1000
        });

        socket.on('connect', function () {
            console.log('Connected to ws');
        });

        socket.on('disconnect', function () {
            console.log('Disconnected from ws');
        });

        socket.on('message', function (msg) {
            let messages = document.getElementById('messages');
            let messageItem = document.createElement('li');
            messageItem.textContent = msg;
            messages.appendChild(messageItem);
        });

        window.sendMessage = async function () {
            const url = '/api/';
            const payload = {"channel": "message", "body": this.message.value};

            try {
                const response = await fetch(url, {
                    method: 'POST',
                    headers: {'Content-Type': 'application/json'},
                    body: JSON.stringify(payload)
                });

                if (!response.ok) {
                    console.error('Error: ' + response.statusText);
                }

                await response.json();
            } catch (error) {
                console.error('Error:', error);
            }
        };
    });
</script>
</body>
</html></pre>

The register_ws function binds SocketIO to our Flask server. To enable sending messages from outside our Flask application, we need to instantiate SocketIO in two different ways. For this purpose, I’ve created a ws.py file. Note: I’m using Pydantic to validate the HTTP requests.

import logging
from typing import Dict, Any, Union

from flask import session
from flask_socketio import SocketIO
from pydantic import BaseModel

logger = logging.getLogger(__name__)


class Conf:
    def __init__(self, socketio=None):
        self._socketio = socketio

    @property
    def socketio(self):
        return self._socketio

    @socketio.setter
    def socketio(self, value):
        self._socketio = value


conf = Conf()


def emit_event(channel, body):
    conf.socketio.emit(channel, body)


class EmitWebsocketRequest(BaseModel):
    channel: str
    body: Union[Dict[str, Any], str]


def setup_ws(redis_host, redis_port=6379):
    conf.socketio = SocketIO(message_queue=f'redis://{redis_host}:{redis_port}')


def register_ws(
        app,
        redis_host,
        socketio_path='/ws/socket.io',
        redis_port=6379
):
    redis_url = f'redis://{redis_host}:{redis_port}' if redis_host else None
    conf.socketio = SocketIO(app, path=socketio_path, message_queue=redis_url)

    @conf.socketio.on('connect')
    def handle_connect():
        if not session.get("user"):
            raise ConnectionRefusedError('unauthorized!')
        logger.debug(f'Client connected: {session["user"]}')

    @conf.socketio.on('disconnect')
    def handle_disconnect():
        logger.debug('Client disconnected')

    return conf.socketio

Now, we can emit an event from outside the Flask application.

from lib.ws import emit_event, setup_ws
from settings import REDIS_HOST

setup_ws(redis_host=REDIS_HOST)
emit_event('message', 'Hi')

The application needs a Redis server. I set up the server using docker.

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"

Source code available in my github.

PHP application in SAP Cloud Platform. With PostgreSQL, Redis and Cloud Foundry

Keeping on with my study of SAP’s cloud platform (SCP) and Cloud Foundry today I’m going to build a simple PHP application. This application serves a simple Bootstrap landing page. The application uses a HTTP basic authentication. The credentials are validated against a PostgreSQL database. It also has a API to retrieve the localtimestamp from database server (just for play with a database server). I also want to play with Redis in the cloud too, so the API request will have a Time To Live (ttl) of 5 seconds. I will use a Redis service to do it.

First we create our services in cloud foundry. I’m using the free layer of SAP cloud foundry for this example. I’m not going to explain here how to do that. It’s pretty straightforward within SAP’s coopkit. Time ago I played with IBM’s cloud foundry too. I remember that it was also very simple too.

Then we create our application (.bp-config/options.json)

{
"WEBDIR": "www",
"LIBDIR": "lib",
"PHP_VERSION": "{PHP_70_LATEST}",
"PHP_MODULES": ["cli"],
"WEB_SERVER": "nginx"
}

If we want to use our PostgreSQL and Redis services with our PHP Appliacation we need to connect those services to our application. This operation can be done also with SAP’s Cockpit.

Now is the turn of PHP application. I normally use Silex framework within my backends, but now there’s a problem: Silex is dead. I feel a little bit sad but I’m not going to cry. It’s just a tool and there’re another ones. I’ve got my example with Silex but, as an exercise, I will also do it with Lumen.

Let’s start with Silex. If you’re familiar with Silex micro framework (or another microframework, indeed) you can see that there isn’t anything especial.

use Symfony\Component\HttpKernel\Exception\HttpException;
use Symfony\Component\HttpFoundation\Request;
use Silex\Provider\TwigServiceProvider;
use Silex\Application;
use Predis\Client;

if (php_sapi_name() == "cli-server") {
    // when I start the server my local machine vendors are in a different path
    require __DIR__ . '/../vendor/autoload.php';
    // and also I mock VCAP_SERVICES env
    $env   = file_get_contents(__DIR__ . "/../conf/vcap_services.json");
    $debug = true;
} else {
    require 'vendor/autoload.php';
    $env   = $_ENV["VCAP_SERVICES"];
    $debug = false;
}

$vcapServices = json_decode($env, true);

$app = new Application(['debug' => $debug, 'ttl' => 5]);

$app->register(new TwigServiceProvider(), [
    'twig.path' => __DIR__ . '/../views',
]);

$app['db'] = function () use ($vcapServices) {
    $dbConf = $vcapServices['postgresql'][0]['credentials'];
    $dsn    = "pgsql:dbname={$dbConf['dbname']};host={$dbConf['hostname']};port={$dbConf['port']}";
    $dbh    = new PDO($dsn, $dbConf['username'], $dbConf['password']);
    $dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    $dbh->setAttribute(PDO::ATTR_CASE, PDO::CASE_UPPER);
    $dbh->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);

    return $dbh;
};

$app['redis'] = function () use ($vcapServices) {
    $redisConf = $vcapServices['redis'][0]['credentials'];

    return new Client([
        'scheme'   => 'tcp',
        'host'     => $redisConf['hostname'],
        'port'     => $redisConf['port'],
        'password' => $redisConf['password'],
    ]);
};

$app->get("/", function (Application $app) {
    return $app['twig']->render('index.html.twig', [
        'user' => $app['user'],
        'ttl'  => $app['ttl'],
    ]);
});

$app->get("/timestamp", function (Application $app) {
    if (!$app['redis']->exists('timestamp')) {
        $stmt = $app['db']->prepare('SELECT localtimestamp');
        $stmt->execute();
        $app['redis']->set('timestamp', $stmt->fetch()['TIMESTAMP'], 'EX', $app['ttl']);
    }

    return $app->json($app['redis']->get('timestamp'));
});

$app->before(function (Request $request) use ($app) {
    $username = $request->server->get('PHP_AUTH_USER', false);
    $password = $request->server->get('PHP_AUTH_PW');

    $stmt = $app['db']->prepare('SELECT name, surname FROM public.user WHERE username=:USER AND pass=:PASS');
    $stmt->execute(['USER' => $username, 'PASS' => md5($password)]);
    $row = $stmt->fetch();
    if ($row !== false) {
        $app['user'] = $row;
    } else {
        header("WWW-Authenticate: Basic realm='RIS'");
        throw new HttpException(401, 'Please sign in.');
    }
}, 0);

$app->run();

Maybe the only especial thing is the way that autoloader is done. We are initializing autoloader in two different ways. One way when the application is run in the cloud and another one when the application is run locally with PHP’s built-in server. That’s because vendors are located in different paths depending on which environment the application lives in. When Cloud Foundry connect services to appliations it injects environment variables with the service configuration (credentials, host, …). It uses VCAP_SERVICES env var.

I use the built-in server to run the application locally. When I’m doing that I don’t have VCAP_SERVICES variable. And also my services information are different than when I’m running the application in the cloud. Maybe it’s better with an environment variable but I’m using this trick:

if (php_sapi_name() == "cli-server") {
    // I'm runing the application locally
} else {
    // I'm in the cloud
}

So when I’m locally I mock VCAP_SERVICES with my local values and also, for example, configure Silex application in debug mode.

Sometimes I want to run my application locally but I want to use the cloud services. I cannot connect directly to those services, but we can do it over ssh through our connected application. For example If our PostgreSQL application is running on 10.11.241.0:48825 we can map this remote port (in a private network) to our local port with this command.

cf ssh -N -T -L 48825:10.11.241.0:48825 silex

You can see more information about this command here.

Now we can use pgAdmin, for example, in our local machine to connect to cloud server.

We can do the same with Redis

cf ssh -N -T -L 54266:10.11.241.9:54266 silex

And basically that’s all. Now we’ll do the same with Lumen. The idea is create the same application with Lumen instead of Silex. It’s a dummy application but it cover task that I normally use. I also will re-use the Redis and PostgreSQL services from the previous project.

use App\Http\Middleware;
use Laravel\Lumen\Application;
use Laravel\Lumen\Routing\Router;
use Predis\Client;

if (php_sapi_name() == "cli-server") {
    require __DIR__ . '/../vendor/autoload.php';
    $env = 'dev';
} else {
    require 'vendor/autoload.php';
    $env = 'prod';
}

(new Dotenv\Dotenv(__DIR__ . "/../env/{$env}"))->load();

$app = new Application();

$app->routeMiddleware([
    'auth' => Middleware\AuthMiddleware::class,
]);

$app->register(App\Providers\VcapServiceProvider::class);
$app->register(App\Providers\StdoutLogServiceProvider::class);
$app->register(App\Providers\DbServiceProvider::class);
$app->register(App\Providers\RedisServiceProvider::class);

$app->router->group(['middleware' => 'auth'], function (Router $router) {
    $router->get("/", function () {
        return view("index", [
            'user' => config("user"),
            'ttl'  => getenv('TTL'),
        ]);
    });

    $router->get("/timestamp", function (Client $redis, PDO $conn) {
        if (!$redis->exists('timestamp')) {
            $stmt = $conn->prepare('SELECT localtimestamp');
            $stmt->execute();
            $redis->set('timestamp', $stmt->fetch()['TIMESTAMP'], 'EX', getenv('TTL'));
        }

        return response()->json($redis->get('timestamp'));
    });
});

$app->run();

I’ve created four servicer providers. One for handle Database connections (I don’t like ORMs)

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use PDO;

class DbServiceProvider extends ServiceProvider
{
    public function register()
    {
    }

    public function boot()
    {
        $vcapServices = app('vcap_services');

        $dbConf = $vcapServices['postgresql'][0]['credentials'];
        $dsn    = "pgsql:dbname={$dbConf['dbname']};host={$dbConf['hostname']};port={$dbConf['port']}";
        $dbh    = new PDO($dsn, $dbConf['username'], $dbConf['password']);
        $dbh->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $dbh->setAttribute(PDO::ATTR_CASE, PDO::CASE_UPPER);
        $dbh->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);

        $this->app->bind(PDO::class, function ($app) use ($dbh) {
            return $dbh;
        });
    }
}

Another one for Redis. I need to study a little bit more Lumen. I know that Lumen has a built-in tool to work with Redis.

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use Predis\Client;

class RedisServiceProvider extends ServiceProvider
{
    public function register()
    {
    }

    public function boot()
    {
        $vcapServices = app('vcap_services');
        $redisConf    = $vcapServices['redis'][0]['credentials'];

        $redis = new Client([
            'scheme'   => 'tcp',
            'host'     => $redisConf['hostname'],
            'port'     => $redisConf['port'],
            'password' => $redisConf['password'],
        ]);

        $this->app->bind(Client::class, function ($app) use ($redis) {
            return $redis;
        });
    }
}

Another one to tell monolog to send logs to Stdout

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use Monolog;

class StdoutLogServiceProvider extends ServiceProvider
{
    public function register()
    {
        app()->configureMonologUsing(function (Monolog\Logger $monolog) {
            return $monolog->pushHandler(new \Monolog\Handler\ErrorLogHandler());
        });
    }
}

And the last one to work with Vcap environment variables. Probably I need to integrate it with dotenv files

namespace App\Providers;

use Illuminate\Support\ServiceProvider;

class VcapServiceProvider extends ServiceProvider
{
    public function register()
    {
        if (php_sapi_name() == "cli-server") {
            $env = file_get_contents(__DIR__ . "/../../conf/vcap_services.json");
        } else {
            $env = $_ENV["VCAP_SERVICES"];
        }

        $vcapServices = json_decode($env, true);

        $this->app->bind('vcap_services', function ($app) use ($vcapServices) {
            return $vcapServices;
        });
    }
}

We also need to handle authentication (http basic auth in this case) so we’ll create a simple middleware

namespace App\Http\Middleware;

use Closure;
use Illuminate\Http\Request;
use PDO;

class AuthMiddleware
{
    public function handle(Request $request, Closure $next)
    {
        $user = $request->getUser();
        $pass = $request->getPassword();

        $db = app(PDO::class);
        $stmt = $db->prepare('SELECT name, surname FROM public.user WHERE username=:USER AND pass=:PASS');
        $stmt->execute(['USER' => $user, 'PASS' => md5($pass)]);
        $row = $stmt->fetch();
        if ($row !== false) {
            config(['user' => $row]);
        } else {
            $headers = ['WWW-Authenticate' => 'Basic'];
            return response('Admin Login', 401, $headers);
        }

        return $next($request);
    }
}

In summary: Lumen is cool. The interface is very similar to Silex. I can swap my mind from thinking in Silex to thinking in Lumen easily. Blade instead Twig: no problem. Service providers are very similar. Routing is almost the same and Middlewares are much better. Nowadays backend is a commodity for me so I don’t want to spend to much time working on it. I want something that just work. Lumen looks like that.

Both projects: Silex and Lumen are available in my github

PHP Redis cache wrapper

Today I want to share a simple Redis Wrapper for PHP I’m using in one project. Nowadays I don’t like reinvent the wheel as years ago. I’m not going to create a new Redis Library for PHP there’re too many. I normally use Predis (it’s the “de facto” standard, indeed).

As I said before I don’t like to re-invent the wheel, but I like to create wrappers to adapt libraries to my common operations. For example with Redis I normally use a lot read key from cache (and create the key in the cache if it doesn’t exits). Because of that I’ve created this simple wrapper to enclose this common operation for me.

The idea is to use something like this:

use G\Redis\Cache;
use Predis\Client;

$cache = new Cache(new Client(json_decode(file_get_contents(__DIR__ . '/conf.json'), true)));

$value = $cache-&gt;get("aaa.aaa.aaa", function () {
return [
'a' =&gt; 1,
];
});

$cache-&gt;delete("aaa.aaa.aaa");

It’s very simple. I create a new instance (injecting a Predis instance), and when I want to get the key from Redis I also pass a callback with all that I need if to create the key in the cache if it doesn’t exits.

The wrapper’s code is very simple also

namespace G\Redis;
use Predis\ClientInterface;
class Cache
{
private $redis;

public function __construct(ClientInterface $redis)
{
$this-&gt;redis = $redis;
}

public function get($key, callable $callback = null, $ttl = null)
{
if ($this-&gt;redis-&gt;exists($key)) {
return json_decode($this-&gt;redis-&gt;get($key), true);
}
if (!is_callable($callback)) {
throw new Exception("Key value '{$key}' not in cache and not available callback to populate cache");
}
$out = call_user_func($callback);
if (!is_null($ttl)) {
$this-&gt;redis-&gt;set($key, json_encode($out), 'EX', $ttl);
} else {
$this-&gt;redis-&gt;set($key, json_encode($out));
}
return $out;
}

public function delete($key)
{
$keys = $this-&gt;redis-&gt;keys($key);
if (count($keys) &gt; 0) {
$this-&gt;redis-&gt;del($keys);
}
return $keys;
}
}

The code is available in my github and also in Packagist