Transforming TCP sockets to MQTT with Go





In the last post we’ve created one proxy to upgrade one legacy application that sends raw TCP sockets to a HTTP server without changing the original application.

Now we’re going to do the same but instead sending HTTP request we’re going to connect to a MQTT broker. Probably try to change the legacy application to connect to a MQTT broker can be a nightmare but with with this approach is pretty straightforward.

The idea is the same. We’re going to send our TCP sockets to localhost. Then we’re going to build a go client that reads the TCP sockets and send the information to the MQTT broker.

We’re going to use Mosquitto as MQTT broker. We can set up easily with docker:

version: '2'

services:
  mosquitto:
    image: eclipse-mosquitto
    hostname: mosquitto
    container_name: mosquitto
    build:
      context: .docker/mosquitto
      dockerfile: Dockerfile
    expose:
      - "1883"
      - "9001"
    ports:
      - "1883:1883"
      - "9001:9001"

We can also set up our Mosquitto server with user and password with mosquitto.conf and users.txt. For this example we’re going to use the credentials: username:password

username:$6$6jOr4vVqaKxisTls$4KVYh8NBZdP+z4S/YbuoSHKlJ+5F1DxiE7XtWWXVHQ+7PlCI+b6LhqSbj8lL45HnGlo4D5t0AVFYrYGjb5lTxg==

Our Go program is very similar than the http version:

package main

import (
	"bufio"
	"encoding/json"
	"flag"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"log"
	"net"
	"os"
	"strings"
	"time"
)

func main() {
	port, closeConnection, topic, broker := parseFlags()
	openSocket(*port, *closeConnection, *topic, *broker, onMessage)
}

func openSocket(port string, closeConnection bool, topic string, broker string, onMessage func(url string, topic string, buffer string)) {
	PORT := "localhost:" + port
	l, err := net.Listen("tcp4", PORT)
	log.Printf("Serving %s\n", l.Addr().String())
	if err != nil {
		log.Fatalln(err)
	}
	defer l.Close()

	for {
		c, err := l.Accept()
		if err != nil {
			log.Fatalln(err)
		}
		go handleConnection(c, closeConnection, topic, broker, onMessage)
	}
}

func createClientOptions(url string) *mqtt.ClientOptions {
	opts := mqtt.NewClientOptions()
	opts.AddBroker(url)
	opts.SetUsername(os.Getenv("MQTT_USERNAME"))
	opts.SetPassword(os.Getenv("MQTT_PASSWORD"))
	return opts
}

func connect(url string) mqtt.Client {
	opts := createClientOptions(url)
	client := mqtt.NewClient(opts)
	token := client.Connect()
	for !token.WaitTimeout(3 * time.Second) {
	}
	if err := token.Error(); err != nil {
		log.Fatal(err)
	}
	return client
}

func onMessage(url string, topic string, buffer string) {
	client := connect(url)
	client.Publish(topic, 0, false, buffer)
}

func parseFlags() (*string, *bool, *string, *string) {
	port := flag.String("port", "7777", "port number")
	closeConnection := flag.Bool("close", true, "Close connection")
	topic := flag.String("topic", "topic", "mqtt topic")
	broker := flag.String("broker", "tcp://localhost:1883", "mqtt topic")
	flag.Parse()

	return port, closeConnection, topic, broker
}

func handleConnection(c net.Conn, closeConnection bool, topic string, broker string, onMessage func(url string, topic string, buffer string)) {
	log.Printf("Accepted connection from %s\n", c.RemoteAddr().String())
	for {
		ip, port, err := net.SplitHostPort(c.RemoteAddr().String())
		netData, err := bufio.NewReader(c).ReadString('\n')
		if err != nil {
			log.Println(err)
		}

		message := map[string]interface{}{
			"body":   strings.TrimSpace(netData),
			"ipFrom": ip,
			"port":   port,
		}

		log.Printf("sending to topic %s message:%s\n", topic, message)
		bytesRepresentation, err := json.Marshal(message)
		if err != nil {
			log.Println(err)
		} else {
			onMessage(broker, topic, string(bytesRepresentation))
		}

		if closeConnection {
			c.Close()
			return
		}
	}
	c.Close()
}

And that’s all. Our legacy application can now speak MQTT without problems.

Source code available in my github

Transforming TCP sockets to HTTP with Go

Sometimes we need to work with legacy applications. Legacy application that are hard to rewrite and hard to change. Imagine, for example, this application is sending raw TCP sockets to communicate with another process. Raw TCP sockets are fast but they have various problems, for example all data is sent in plain text over the network and without authentication (if we don’t implement one protocol).

One solution is use https connections instead. We can also authenticate those requests with an Authentication Bearer. For example I’ve created one simple http server with Python and Flask:

import logging
import os
from functools import wraps

from flask import Flask, request, abort
from flask import jsonify

logging.basicConfig(level=logging.DEBUG)

logger = logging.getLogger(__name__)
app = Flask(__name__)


def authorize_bearer(bearer):
    def authorize(f):
        @wraps(f)
        def decorated_function(*args, **kws):
            if 'Authorization' not in request.headers:
                abort(401)

            data = request.headers['Authorization']

            if str.replace(str(data), 'Bearer ', '') != bearer:
                abort(401)

            return f(*args, **kws)

        return decorated_function

    return authorize


@app.route('/register', methods=['POST'])
@authorize_bearer(bearer=os.getenv('TOKEN'))
def hello_world():
    req_data = request.get_json()
    logger.info(req_data)
    return jsonify({"status": "OK", "request_data": req_data})

Now we only need to change our legacy application to use one http client instead raw TCP sockets. But sometimes it’s not possible. Imagine, for example, if this application runs on a old OS without https support or we cannot find and compile an http client in the legacy application.

One possible solution is isolate the application and change only the destination of the TCP socket. Instead the original ip address whe can use localhost and we can create a proxy at localhost that listen to TCP sockets and send the information to the HTTP server.

We’re going to build this proxy in Go. We can do it with any language (Python, C#, Javascript, …). My Kung Fu in Go is not so good (I’m more comfortable with Python) but it’s not so difficult and we can build a binary with our proxy for Windows, Linux and Mac without any problem. Then we only need to copy the binary into the target host and it works (no installation, no SDK, nothing. Just copy and run)

package main

import (
	"bufio"
	"encoding/json"
	"flag"
	"log"
	"net"
	"net/http"
	"os"
	"strings"
)

func main() {
	port, closeConnection, url := parseFlags()
	openSocket(*port, *closeConnection, *url, onMessage)
}

func onMessage(url string, buffer string) {
	bearer := os.Getenv("TOKEN")
	client := &http.Client{}
	req, _ := http.NewRequest("POST", url, strings.NewReader(buffer))
	req.Header.Add("Authorization", "Bearer "+bearer)
	req.Header.Add("content-type", "application/json")
	resp, err := client.Do(req)

	if err != nil {
		log.Println(err)
	} else {
		if resp.Status == "200" {
			var result map[string]interface{}
			json.NewDecoder(resp.Body).Decode(&result)
			log.Println(result["status"])
		} else {
			log.Println("Response status: " + resp.Status)
		}
		defer resp.Body.Close()
	}
}

func parseFlags() (*string, *bool, *string) {
	port := flag.String("port", "7777", "port number")
	closeConnection := flag.Bool("close", true, "Close connection")
	url := flag.String("url", "http://localhost:5000/register", "Destination endpoint")
	flag.Parse()
	return port, closeConnection, url
}

func openSocket(port string, closeConnection bool, url string, onMessage func(url string, buffer string)) {
	PORT := "localhost:" + port
	l, err := net.Listen("tcp4", PORT)
	log.Printf("Serving %s\n", l.Addr().String())
	if err != nil {
		log.Fatalln(err)
	}
	defer l.Close()

	for {
		c, err := l.Accept()
		if err != nil {
			log.Fatalln(err)
		}
		go handleConnection(c, closeConnection, url, onMessage)
	}
}

func handleConnection(c net.Conn, closeConnection bool, url string, onMessage func(url string, buffer string)) {
	log.Printf("Accepted connection from %s\n", c.RemoteAddr().String())
	for {
		ip, port, err := net.SplitHostPort(c.RemoteAddr().String())
		netData, err := bufio.NewReader(c).ReadString('\n')
		if err != nil {
			log.Println(err)
		}

		message := map[string]interface{}{
			"body":   strings.TrimSpace(netData),
			"ipFrom": ip,
			"port":   port,
		}

		log.Printf("Making request with %s\n", message)
		bytesRepresentation, err := json.Marshal(message)
		if err != nil {
			log.Println(err)
		} else {
			//buffer := bytes.NewBuffer(bytesRepresentation)
			onMessage(url, string(bytesRepresentation))
		}

		if closeConnection {
			c.Close()
			return
		}
	}
	c.Close()

And that’s all. We can upgrade our legacy application without almost changing the code.

Source code available in my github

Playing with Go and file system watchers

Let me explain the idea. I want to emit one RabbitMQ message each time new file is generated in a folder. The problem is that I cannot modify the code of the software that generate the files. The idea is generate a filesystem watcher that emits the message. Let’s start.

I’m not a Go expert, but Go it’s cool for those kind of task. You can create a executable file and just copy in the desired server and that’s all. Just works.

Also there’s a fsnotify package to listen filesystem events. I’ve used fsnotify in the past with PHP and Python.

func main() {
	config := GetConf()
	watcher, _ = fsnotify.NewWatcher()
	defer watcher.Close()

	if err := filepath.Walk(config.Path, watchDir); err != nil {
		fmt.Println("ERROR", err)
	}
	done := make(chan bool)

	go func() {
		for {
			select {
			case event := <-watcher.Events:
				processEvent(event, *config)
			case err := <-watcher.Errors:
				fmt.Println("ERROR", err)
			}
		}
	}()
	<-done
}

func processEvent(event fsnotify.Event, conf Conf) {
	filePath := event.Name
	fileName := filepath.Base(filePath)
	switch op := event.Op.String(); op {
	case "CREATE":
		if strings.HasSuffix(filePath, conf.Extension) {
			bytes, _ := copy(event.Name, conf.CopyTo+fileName)
			if bytes > 0 {
				emitEvent(fileName, conf)
			}
		}
	default:
		fmt.Println("Unhandled event: ", op, " on file: ", fileName)
	}
}

And basically that’s all. We only need to take care with filesystem events. Fsnotify is very low level and when we use it we realized how programs write files. Some of them create a temp file, then it writes it and finally it renames the file. Sometimes the program creates an empty file and finally writes the file. Basically it’s the same but the events are different. In my example I only listen to "CREATE" just enough for my test.

Emit event to RabbitMQ it’s also simple. Well documented within documentation.

func emitEvent(fileName string, conf Conf) {
	fmt.Println("Event on file", fileName)
	message := map[string]interface{}{
		"fileName": fileName,
	}
	bytesRepresentation, err := json.Marshal(message)
	if err != nil {
		log.Println(err)
	} else {
		emmitToRabbit(conf, bytesRepresentation)
	}
}

In this example I also want to use a YAML file to store configuration. Just for learn how to read YAML files in go.

type Conf struct {
	Path        string `yaml:"path"`
	CopyTo      string `yaml:"copy_to"`
	Extension   string `yaml:"extension"`
	BrokerTopic string `yaml:"brokerTopic"`
	Broker      string `yaml:"broker"`
}

func readConf(filename string) (*Conf, error) {
	buf, err := ioutil.ReadFile(filename)
	if err != nil {
		return nil, err
	}

	c := &Conf{}
	err = yaml.Unmarshal(buf, c)
	if err != nil {
		return nil, fmt.Errorf("in file %q: %v", filename, err)
	}

	return c, nil
}

And that’s all. My binary executable is ready.

Full code in my github