Lately in all my projects message queues appear in one way or another. Normally I work with AWS and I use SQS and SNS, but I also use quite often, RabbitMQ and MQTT. In AWS there’s something that I use a lot to isolate services. The process that emits messages emits message to SNS and I bind SNS to SQS. With this technique I can attach n SQS to the the same SNS. I’ve used it here. In AWS is pretty straightforward to do that. Today We’re going to do the same with RabbitMQ. In fact it’s very easy to do it in RabbitMQ. We only need to follow the tutorial in official RabbitMQ documentation.
The script listen to a exchange and resend the message to a queue topic. Basically the same that we can see within RabbitMQ documentation.
import settings from lib.logger import logger from lib.rabbit import get_channel channel = get_channel() channel.exchange_declare( exchange=settings.EXCHANGE, exchange_type='fanout') result = channel.queue_declare( queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind( exchange=settings.EXCHANGE, queue=queue_name) logger.info(f' [*] Waiting for {settings.EXCHANGE}. To exit press CTRL+C') channel.queue_declare( durable=settings.QUEUE_DURABLE, auto_delete=settings.QUEUE_AUTO_DELETE, queue=settings.QUEUE_TOPIC ) def callback(ch, method, properties, body): channel.basic_publish( exchange='', routing_key=settings.QUEUE_TOPIC, body=body) logger.info(f'Message sent to topic {settings.QUEUE_TOPIC}. Message: {body}') channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
We send one message to the exchange using rabbitmqadmin and see the message in the queue:
rabbitmqadmin -u username -p password publish exchange=exchange routing_key= payload="hello, world"
My idea with this project is to deploy it into a docker swarm cluster and add/remove listener only adding new services to the stack:
... exchange2topic1: image: ${ECR}/exchange2queue:${VERSION} build: context: . dockerfile: Dockerfile deploy: restart_policy: condition: on-failure depends_on: - rabbit command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py environment: RABBIT_HOST: rabbit RABBIT_USER: ${RABBITMQ_USER} RABBIT_PASS: ${RABBITMQ_PASS} EXCHANGE: exchange QUEUE_TOPIC: topic1 QUEUE_DURABLE: 1 QUEUE_AUTO_DELETE: 0 ...
With this approach it’s very simple form me add and remove new listeners without touching the emitter
Also, as plus, I like to add a simple http api to allow me to send messages to the exchange with a post request, instead of using a RabbitMQ client. That’s because sometimes I work with legacy systems where using a AMQP client isn’t simple. That’s a simple Flask API
from flask import Flask, request from flask import jsonify import settings from lib.auth import authorize_bearer from lib.logger import logger from lib.rabbit import get_channel import json app = Flask(__name__) @app.route('/health') def health(): return jsonify({"status": "ok"}) @app.route('/publish/<path:exchange>', methods=['POST']) @authorize_bearer(bearer=settings.API_TOKEN) def publish(exchange): channel = get_channel() try: message = request.get_json() channel.basic_publish( exchange=exchange, routing_key='', body=json.dumps(message) ) logger.info(f"message sent to exchange {exchange}") return jsonify({"status": "OK", "exchange": exchange}) except: return jsonify({"status": "NOK", "exchange": exchange})
Now we can emit messages to the exchange using curl, postman or any other http client.
POST http://localhost:5000/publish/exchange Content-Type: application/json Authorization: Bearer super_secret_key { "hello": "Gonzalo" }
Also, in the docker stack I want to use a reverse proxy to server my Flask application (served with gunicorn) and the RabbitMQ management console. I’m using nginx to do that:
upstream api { server api:5000; } server { listen 8000 default_server; listen [::]:8000; client_max_body_size 20M; location / { try_files $uri @proxy_to_api; } location ~* /rabbitmq/api/(.*?)/(.*) { proxy_pass http://rabbit:15672/api/$1/%2F/$2?$query_string; proxy_buffering off; proxy_set_header Host $http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location ~* /rabbitmq/(.*) { rewrite ^/rabbitmq/(.*)$ /$1 break; proxy_pass http://rabbit:15672; proxy_buffering off; proxy_set_header Host $http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } location @proxy_to_api { proxy_set_header X-Forwarded-Proto https; proxy_set_header X-Url-Scheme $scheme; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $http_host; proxy_redirect off; proxy_pass http://api; } }
And that’s all. Here the docker-compose.yml
version: '3.6' x-base: &base image: ${ECR}/exchange2queue:${VERSION} build: context: . dockerfile: Dockerfile deploy: restart_policy: condition: on-failure depends_on: - rabbit services: rabbit: image: rabbitmq:3-management deploy: restart_policy: condition: on-failure ports: - 5672:5672 environment: RABBITMQ_ERLANG_COOKIE: RABBITMQ_DEFAULT_VHOST: / RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER} RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS} nginx: image: ${ECR}/exchange2queue_nginx:${VERSION} deploy: restart_policy: condition: on-failure build: context: .docker/nginx dockerfile: Dockerfile ports: - 8080:8000 depends_on: - rabbit - api api: <<: *base container_name: front command: /bin/sh wait-for-it.sh rabbit:5672 -- gunicorn -w 4 api:app -b 0.0.0.0:5000 deploy: restart_policy: condition: on-failure environment: RABBIT_HOST: rabbit RABBIT_USER: ${RABBITMQ_USER} RABBIT_PASS: ${RABBITMQ_PASS} API_TOKEN: ${API_TOKEN} exchange2topic1: <<: *base command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py environment: RABBIT_HOST: rabbit RABBIT_USER: ${RABBITMQ_USER} RABBIT_PASS: ${RABBITMQ_PASS} EXCHANGE: exchange QUEUE_TOPIC: topic1 QUEUE_DURABLE: 1 QUEUE_AUTO_DELETE: 0 exchange2topic2: <<: *base command: /bin/sh wait-for-it.sh rabbit:5672 -- python exchange2queue.py environment: RABBIT_HOST: rabbit RABBIT_USER: ${RABBITMQ_USER} RABBIT_PASS: ${RABBITMQ_PASS} EXCHANGE: exchange QUEUE_TOPIC: topic2 QUEUE_DURABLE: 1 QUEUE_AUTO_DELETE: 0
Full code in my github