Playing with threads and Python. Part 2

Today I want to keep on playing with python and threads (part1 here). The idea is create one simple script that prints one asterisk in the console each second. Simple, isn’t it?

from time import sleep
while True:
    print("*")
    sleep(1)

I want to keep this script running but I want to send one message externally, for example using RabbitMQ, and do something within the running script. In this demo, for example, stop the script.

In javascript we can do it with a single tread process using the setInterval function but, since the rabbit listener with pika is a blocking action, we need to use threads in Python (please tell me if I’m wrong). The idea is to create a circuit breaker condition in the main loop to check if I need to stop or not the main thread.

First I’ve created my Rabbit listener in a thread:

from queue import Queue, Empty
import threading
import pika
import os


class Listener(threading.Thread):
    def __init__(self, queue=Queue()):
        super(Listener, self).__init__()
        self.queue = queue
        self.daemon = True

    def run(self):
        channel = self._get_channel()
        channel.queue_declare(queue='stop')

        channel.basic_consume(
            queue='stop',
            on_message_callback=lambda ch, method, properties, body: self.queue.put(item=True),
            auto_ack=True)

        channel.start_consuming()

    def stop(self):
        try:
            return True if self.queue.get(timeout=0.05) is True else False
        except Empty:
            return False

    def _get_channel(self):
        credentials = pika.PlainCredentials(
            username=os.getenv('RABBITMQ_USER'),
            password=os.getenv('RABBITMQ_PASS'))

        parameters = pika.ConnectionParameters(
            host=os.getenv('RABBITMQ_HOST'),
            credentials=credentials)

        connection = pika.BlockingConnection(parameters=parameters)

        return connection.channel()

Now in the main process I start the Listener and I enter in one endless loop to print my asterisk each second but at the end of each loop I check if I need to stop the process or not

from Listener import Listener
from dotenv import load_dotenv
import logging
from time import sleep
import os

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

l = Listener()
l.start()


def main():
    while True:
        logging.info("*")
        sleep(1)
        if l.stop():
            break


if __name__ == '__main__':
    main()

As we can see int the stop function we’re using the queue.Queue package to communicate with our listener loop.

And that’s all. In the example I also provide a minimal RabbitMQ server in a docker container.

version: '3.4'

services:
  rabbit:
    image: rabbitmq:3-management
    restart: always
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS}
    ports:
      - "15672:15672"
      - "5672:5672"

Source code available in my github

Alexa skill example with Serverless framework

Today I want to play with Alexa and serverless framework. I’ve created a sample hello world skill. In fact this example is more or less the official hello-world sample.

const Alexa = require('ask-sdk-core')
const RequestInterceptor = require('./interceptors/RequestInterceptor')
const ResponseInterceptor = require('./interceptors/ResponseInterceptor')
const LaunchRequestHandler = require('./handlers/LaunchRequestHandler')
const HelloWorldIntentHandler = require('./handlers/HelloWorldIntentHandler')
const HelpIntentHandler = require('./handlers/HelpIntentHandler')
const CancelAndStopIntentHandler = require('./handlers/CancelAndStopIntentHandler')
const SessionEndedRequestHandler = require('./handlers/SessionEndedRequestHandler')
const FallbackHandler = require('./handlers/FallbackHandler')
const ErrorHandler = require('./handlers/ErrorHandler')

let skill

module.exports.handler = async (event, context) => {
  if (!skill) {
    skill = Alexa.SkillBuilders.custom().
      addRequestInterceptors(RequestInterceptor).
      addResponseInterceptors(ResponseInterceptor).
      addRequestHandlers(
        LaunchRequestHandler,
        HelloWorldIntentHandler,
        HelpIntentHandler,
        CancelAndStopIntentHandler,
        SessionEndedRequestHandler,
        FallbackHandler).
      addErrorHandlers(
        ErrorHandler).
      create()
  }

  return await skill.invoke(event, context)
}

It has one Intent that answers to hello command.

const HelloWorldIntentHandler = {
  canHandle (handlerInput) {
    return handlerInput.requestEnvelope.request.type === 'IntentRequest'
      && handlerInput.requestEnvelope.request.intent.name === 'HelloWorldIntent'
  },
  handle (handlerInput) {
    const speechOutput = 'Hello World'
    const cardTitle = 'Hello world'

    return handlerInput.responseBuilder.
      speak(speechOutput).
      reprompt(speechOutput).
      withSimpleCard(cardTitle, speechOutput).
      getResponse()
  }
}

module.exports = HelloWorldIntentHandler

I’m using Serverless framework to deploy the skill. I know that serverless frameworks has plugins for Alexa skills that help us to create the whole skill, but in this example I want to do it a little more manually (it’s the way that I learn new things).

First I create the skill in the Alexa developer console (or via ask cli). There’re a lot of tutorials about it. Then I take my alexaSkillId and I use this id within my serverless config file as the trigger event of my lambda function.

service: hello-world

provider:
  name: aws
  runtime: nodejs8.10
  region: ${opt:region, self:custom.defaultRegion}
  stage: ${opt:stage, self:custom.defaultStage}

custom:
  defaultRegion: eu-west-1
  defaultStage: prod

functions:
  info:
    handler: src/index.handler
    events:
      - alexaSkill: amzn1.ask.skill.my_skill

then I deploy the lambda function

npx serverless deploy –aws-s3-accelerate

And I take the ARN of the lambda function and I use this lambda as my endpoint in the Alexa developer console.

Also we can test our skill (at least the lambda function) using our favorite testing framework. I will use jest in this example.
Testing is very important, at least for me, when I’m working with lambdas and serverless. I want to test my script locally, instead of deploying to AWS again and again (it’s slow).

const when = require('./steps/when')
const { init } = require('./steps/init')

describe('When we invoke the skill', () => {
  beforeAll(() => {
    init()
  })

  test('launch intent', async () => {
    const res = await when.we_invoke_intent(require('./events/use_skill'))
    const card = res.response.card
    expect(card.title).toBe('Hello world')
    expect(card.content).toBe('Welcome to Hello world, you can say Hello or Help. Which would you like to try?')
  })

  test('help handler', async () => {
    const res = await when.we_invoke_intent(require('./events/help_handler'))
    console.log(res.response.outputSpeech.ssml)
    expect(res.response.outputSpeech.ssml).toBe('<speak>You can say hello to me! How can I help?</speak>')
  })

  test('Hello world handler', async () => {
    const res = await when.we_invoke_intent(require('./events/hello_world_handler'))
    const card = res.response.card
    expect(card.title).toBe('Hello world')
    expect(card.content).toBe('Hello World')
  })
})

Full code in my github account.

Playing with threads and Python

Today I want to play a little bit with Python and threads. This kind of post are one kind of cheat sheet that I like to publish, basically to remember how do do things.

I don’t like to use threads. They are very powerful but, as uncle Ben said: Great power comes great responsibility. I prefer decouple process with queues instead using threads, but sometimes I need to use them. Let’s start

First I will build a simple script without threads. This script will append three numbers (1, 2 and 3) to a list and it will sum them. The function that append numbers to the list sleeps a number of seconds equals to the number that we’re appending.

import time

start_time = time.time()

total = []


def sleeper(seconds):
    time.sleep(seconds)
    total.append(seconds)


for s in (1, 2, 3):
    sleeper(s)

end_time = time.time()

total_time = end_time - start_time

print("Total time: {:.3} seconds. Sum: {}".format(total_time, sum(total)))

The the outcome of the script is obviously 6 (1 + 2 + 3) and as we’re sleeping the script a number of seconds equals to the the number that we’re appending our script, it takes 6 seconds to finish.

Now we’re going to use a threading version of the script. We’re going to do the same, but we’re going to use one thread for each append (with the same sleep function)

import time
from queue import Queue
import threading

start_time = time.time()

queue = Queue()


def sleeper(seconds):
    time.sleep(seconds)
    queue.put(seconds)


threads = []
for s in (1, 2, 3):
    t = threading.Thread(target=sleeper, args=(s,))
    threads.append(t)
    t.start()

for one_thread in threads:
    one_thread.join()

total = 0
while not queue.empty():
    total = total + queue.get()

end_time = time.time()

total_time = end_time - start_time
print("Total time: {:.3} seconds. Sum: {}".format(total_time, total))

The outcome of our script is 6 again, but now it takes 3 seconds (the highest sleep).

Source code in my github

Playing with TOTP (2FA) and mobile applications with ionic

Today I want to play with Two Factor Authentication. When we speak about 2FA, TOTP come to our mind. There’re a lot of TOTP clients, for example Google Authenticator.

My idea with this prototype is to build one Mobile application (with ionic) and validate one totp token in a server (in this case a Python/Flask application). The token will be generated with a standard TOTP client. Let’s start

The sever will be a simple Flask server to handle routes. One route (GET /) will generate one QR code to allow us to configure or TOTP client. I’m using the library pyotp to handle totp operations.

from flask import Flask, jsonify, abort, render_template, request
import os
from dotenv import load_dotenv
from functools import wraps
import pyotp
from flask_qrcode import QRcode

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

totp = pyotp.TOTP(os.getenv('TOTP_BASE32_SECRET'))

app = Flask(__name__)
QRcode(app)


def verify(key):
    return totp.verify(key)


def authorize(f):
    @wraps(f)
    def decorated_function(*args, **kws):
        if not 'Authorization' in request.headers:
            abort(401)

        data = request.headers['Authorization']
        token = str.replace(str(data), 'Bearer ', '')

        if token != os.getenv('BEARER'):
            abort(401)

        return f(*args, **kws)

    return decorated_function


@app.route('/')
def index():
    return render_template('index.html', totp=pyotp.totp.TOTP(os.getenv('TOTP_BASE32_SECRET')).provisioning_uri("gonzalo123.com", issuer_name="TOTP Example"))


@app.route('/check/<key>', methods=['GET'])
@authorize
def alert(key):
    status = verify(key)
    return jsonify({'status': status})


if __name__ == "__main__":
    app.run(host='0.0.0.0')

I’ll use an standard TOTP client to generate the tokens but with pyotp we can easily create a client also

import pyotp
import time
import os
from dotenv import load_dotenv
import logging

logging.basicConfig(level=logging.INFO)

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

totp = pyotp.TOTP(os.getenv('TOTP_BASE32_SECRET'))

mem = None
while True:
    now = totp.now()
    if mem != now:
        logging.info(now)
        mem = now
        time.sleep(1)

And finally the mobile application. It’s a simple ionic application. That’s the view:

<ion-header>
  <ion-toolbar>
    <ion-title>
      TOTP Validation demo
    </ion-title>
  </ion-toolbar>
</ion-header>

<ion-content>
  <div class="ion-padding">
    <ion-item>
      <ion-label position="stacked">totp</ion-label>
      <ion-input placeholder="Enter value" [(ngModel)]="totp"></ion-input>
    </ion-item>
    <ion-button fill="solid" color="secondary" (click)="validate()" [disabled]="!totp">
      Validate
      <ion-icon slot="end" name="help-circle-outline"></ion-icon>
    </ion-button>
  </div>
</ion-content>

The controller:

import { Component } from '@angular/core'
import { ApiService } from '../sercices/api.service'
import { ToastController } from '@ionic/angular'

@Component({
  selector: 'app-home',
  templateUrl: 'home.page.html',
  styleUrls: ['home.page.scss']
})
export class HomePage {
  public totp

  constructor (private api: ApiService, public toastController: ToastController) {}

  validate () {
    this.api.get('/check/' + this.totp).then(data => this.alert(data.status))
  }

  async alert (status) {
    const toast = await this.toastController.create({
      message: status ? 'OK' : 'Not valid code',
      duration: 2000,
      color: status ? 'primary' : 'danger',
    })
    toast.present()
  }
}

I’ve also put a simple security system. In a real life application we’ll need something better, but here I’ve got a Auth Bearer harcoded and I send it en every http request. To do it I’ve created a simple api service

import { Injectable } from '@angular/core'
import { isDevMode } from '@angular/core'
import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http'
import { CONF } from './conf'

@Injectable({
  providedIn: 'root'
})
export class ApiService {

  private isDev: boolean = isDevMode()
  private apiUrl: string

  constructor (private http: HttpClient) {
    this.apiUrl = this.isDev ? CONF.API_DEV : CONF.API_PROD
  }

  public get (uri: string, params?: Object): Promise<any> {
    return new Promise((resolve, reject) => {
      this.http.get(this.apiUrl + uri, {
        headers: ApiService.getHeaders(),
        params: ApiService.getParams(params)
      }).subscribe(
        res => {this.handleHttpNext(res), resolve(res)},
        err => {this.handleHttpError(err), reject(err)},
        () => this.handleHttpComplete()
      )
    })
  }

  private static getHeaders (): HttpHeaders {

    const headers = {
      'Content-Type': 'application/json'
    }

    headers['Authorization'] = 'Bearer ' + CONF.bearer

    return new HttpHeaders(headers)
  }

  private static getParams (params?: Object): HttpParams {
    let Params = new HttpParams()
    for (const key in params) {
      if (params.hasOwnProperty(key)) {
        Params = Params.set(key, params[key])
      }
    }

    return Params
  }

  private handleHttpError (err) {
    console.log('HTTP Error', err)
  }

  private handleHttpNext (res) {
    console.log('HTTP response', res)
  }

  private handleHttpComplete () {
    console.log('HTTP request completed.')
  }
}

And that’s all. Here one video with a working example of the prototype:

Source code here

PHP/Lumen data source for Grafana

Today I need to integrate a third party service into Grafana. I cannot access directly to the service’s database, so I will integrate via JSON datasource. Grafana allows us to build custom data sources but in this case I don’t need to create a new one. I can use the simple JSON datasource

grafana-cli plugins install grafana-simple-json-datasource

Now I need to create one REST server to serve the data that our JSON datasource needs. According to the documentation we need three routes:

  • GET / should return 200 ok.
  • POST /search used by the find metric options on the query tab in panels.
  • POST /query should return metrics based on input.
  • POST /annotations should return annotations.

We’re going to create a PHP/Lumen server. Basically the routes of the application are those ones:

<?php

use Laravel\Lumen\Routing\Router;
use App\Http\Middleware;
use Laravel\Lumen\Application;
use Dotenv\Dotenv;
use App\Http\Handlers;

require_once __DIR__ . '/../vendor/autoload.php';

(Dotenv::create(__DIR__ . '/../env/local'))->load();

$app = new Application(dirname(__DIR__));
$app->middleware([
    Middleware\CorsMiddleware::class,
]);

$app->router->group(['middleware' => Middleware\AuthMiddleware::class], function (Router $router) {
    $router->get('/', Handlers\HelloHandler::class);
    $router->post('/search', Handlers\SearchHandler::class);
    $router->post('/query', Handlers\QueryHandler::class);
    $router->post('/annotations', Handlers\AnnotationHandler::class);
});

return $app;

We need to take care with CORS. I will use the Middleware that I normally use in those cases

<?php

namespace App\Http\Middleware;

use Closure;

class CorsMiddleware
{
    public function handle($request, Closure $next)
    {
        $headers = [
            'Access-Control-Allow-Origin'      => 'http://localhost:3000',
            'Access-Control-Allow-Methods'     => 'POST, GET, OPTIONS, PUT, DELETE',
            'Access-Control-Allow-Credentials' => 'true',
            'Access-Control-Max-Age'           => '86400',
            'Access-Control-Allow-Headers'     => 'accept, content-type, Content-Type, Authorization, X-Requested-With',
        ];

        if ($request->isMethod('OPTIONS')) {
            return response()->json('{"method":"OPTIONS"}', 200, $headers);
        }

        $response = $next($request);
        foreach ($headers as $key => $value) {
            $response->header($key, $value);
        }

        return $response;
    }
}

I’ll use also a basic authentication so we’ll use a simple Http Basic Authentication middleware

<?php

namespace App\Http\Middleware;

use Closure;
use Illuminate\Http\Request;

class AuthMiddleware
{
    const NAME = 'auth.web';

    public function handle(Request $request, Closure $next)
    {
        if ($request->getUser() != env('HTTP_USER') || $request->getPassword() != env('HTTP_PASS')) {
            $headers = ['WWW-Authenticate' => 'Basic'];

            return response('Unauthorized', 401, $headers);
        }

        return $next($request);
    }
}

HelloHandler is a dummy route that the datasource needs to check the connection. We only need to answer with a 200-OK

<?php
namespace App\Http\Handlers;

class HelloHandler
{
    public function __invoke()
    {
        return "Ok";
    }
}

SearchHandler will return the list of available metrics that we´ll use within our grafana panels. They aren’t strictly necessary. We can return an empty array and use later one metric that it isn’t defined here (it’s only to fill the combo that grafana shows us)

<?php
namespace App\Http\Handlers;

class SearchHandler
{
    public function __invoke()
    {
        return [25, 50, 100];
    }
}
```

QueryHandler is an important one. Here we'll return the datapoints that we´ll show in grafana. For testing purposes I've created one handler that read the metric, and the date from and date to that grafana sends to the backend and return a random values for several metrics and fixed ones to the rest. It's basically to see something in grafana. Later, in the real life project, I'll query the database and return real data.
 

<?php

namespace App\Http\Handlers;

use Illuminate\Http\Request;

class QueryHandler
{
    public function __invoke(Request $request)
    {
        $json   = $request->json();
        $range  = $json->get('range');
        $target = $json->get('targets')[0]['target'];

        $tz   = new \DateTimeZone('Europe/Madrid');
        $from = \DateTimeImmutable::createFromFormat("Y-m-d\TH:i:s.uP", $range['from'], $tz);
        $to   = \DateTimeImmutable::createFromFormat("Y-m-d\TH:i:s.uP", $range['to'], $tz);

        return ['target' => $target, 'datapoints' => $this->getDataPoints($from, $to, $target)];
    }

    private function getDataPoints($from, $to, $target)
    {
        $interval = new \DateInterval('PT1H');
        $period   = new \DatePeriod($from, $interval, $to->add($interval));

        $dataPoints = [];
        foreach ($period as $date) {
            $value        = $target > 50 ? rand(0, 100) : $target;
            $dataPoints[] = [$value, strtotime($date->format('Y-m-d H:i:sP')) * 1000];
        }

        return $dataPoints;
    }
}

Also I’ll like to use annotations. It’s something similar. AnnotationHandler will handle this request. For this test I’ve created two types of annotations: One each hour and another one each 6 hours

<?php

namespace App\Http\Handlers;

use Illuminate\Http\Request;

class AnnotationHandler
{
    public function __invoke(Request $request)
    {
        $json       = $request->json();
        $annotation = $json->get('annotation');
        $range      = $json->get('range');
  
        return $this->getAnnotations($annotation, $range);
    }

    private function getAnnotations($annotation, $range)
    {
        return $this->getValues($range, 'PT' . $annotation['query'] . 'H');
    }


    private function getValues($range, $int)
    {
        $tz   = new \DateTimeZone('Europe/Madrid');
        $from = \DateTimeImmutable::createFromFormat("Y-m-d\TH:i:s.uP", $range['from'], $tz);
        $to   = \DateTimeImmutable::createFromFormat("Y-m-d\TH:i:s.uP", $range['to'], $tz);

        $annotation = [
            'name'       => $int,
            'enabled'    => true,
            'datasource' => "gonzalo datasource",
            'showLine'   => true,
        ];

        $interval = new \DateInterval($int);
        $period   = new \DatePeriod($from, $interval, $to->add($interval));

        $annotations = [];
        foreach ($period as $date) {
            $annotations[] = ['annotation' => $annotation, "title" => "H " . $date->format('H'), "time" => strtotime($date->format('Y-m-d H:i:sP')) * 1000, 'text' => "teeext"];
        }

        return $annotations;
    }
}

And that’s all. I’ve also put the whole example in a docker-compose file to test it

version: '2'

services:
  nginx:
    image: gonzalo123.nginx
    restart: always
    ports:
      - "80:80"
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-nginx
    volumes:
      - ./src/api:/code/src
  api:
    image: gonzalo123.api
    restart: always
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-lumen-dev
    environment:
      XDEBUG_CONFIG: remote_host=${MY_IP}
    volumes:
      - ./src/api:/code/src
  grafana:
    image: gonzalo123.grafana
    build:
      context: ./src
      dockerfile: .docker/Dockerfile-grafana
    restart: always
    environment:
      - GF_SECURITY_ADMIN_USER=${GF_SECURITY_ADMIN_USER}
      - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD}
      - GF_USERS_DEFAULT_THEME=${GF_USERS_DEFAULT_THEME}
      - GF_USERS_ALLOW_SIGN_UP=${GF_USERS_ALLOW_SIGN_UP}
      - GF_USERS_ALLOW_ORG_CREATE=${GF_USERS_ALLOW_ORG_CREATE}
      - GF_AUTH_ANONYMOUS_ENABLED=${GF_AUTH_ANONYMOUS_ENABLED}
      - GF_INSTALL_PLUGINS=${GF_INSTALL_PLUGINS}
    ports:
      - "3000:3000"
    volumes:
      - grafana-db:/var/lib/grafana
      - grafana-log:/var/log/grafana
      - grafana-conf:/etc/grafana
volumes:
  grafana-db:
    driver: local
  grafana-log:
    driver: local
  grafana-conf:
    driver: local

Here you can see the example in action:

Full code in my github

Playing with lambda, serverless and Python

Couple of weeks ago I attended to serverless course. I’ve played with lambdas from time to time (basically when AWS forced me to use them) but without knowing exactly what I was doing. After this course I know how to work with the serverless framework and I understand better lambda world. Today I want to hack a little bit and create a simple Python service to obtain random numbers. Let’s start

We don’t need Flask to create lambdas but as I’m very comfortable with it so we’ll use it here.
Basically I follow the steps that I’ve read here.

from flask import Flask

app = Flask(__name__)


@app.route("/", methods=["GET"])
def hello():
    return "Hello from lambda"


if __name__ == '__main__':
    app.run()

And serverless yaml to configure the service

service: random

plugins:
  - serverless-wsgi
  - serverless-python-requirements
  - serverless-pseudo-parameters

custom:
  defaultRegion: eu-central-1
  defaultStage: dev
  wsgi:
    app: app.app
    packRequirements: false
  pythonRequirements:
    dockerizePip: non-linux

provider:
  name: aws
  runtime: python3.7
  region: ${opt:region, self:custom.defaultRegion}
  stage: ${opt:stage, self:custom.defaultStage}

functions:
  home:
    handler: wsgi_handler.handler
    events:
      - http: GET /

We’re going to use serverless plugins. We need to install them:

npx serverless plugin install -n serverless-wsgi
npx serverless plugin install -n serverless-python-requirements
npx serverless plugin install -n serverless-pseudo-parameters

And that’s all. Our “Hello world” lambda service with Python and Flask is up and running.
Now We’re going to create a “more complex” service. We’re going to return a random number with random.randint function.
randint requires two parameters: start, end. We’re going to pass the end parameter to our service. The start value will be parameterized. I’ll parameterize it only because I want to play with AWS’s Parameter Store (SSM). It’s just an excuse.

Let’s start with the service:

from random import randint
from flask import Flask, jsonify
import boto3
from ssm_parameter_store import SSMParameterStore

import os
from dotenv import load_dotenv

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

app = Flask(__name__)

app.config.update(
    STORE=SSMParameterStore(
        prefix="{}/{}".format(os.environ.get('ssm_prefix'), os.environ.get('stage')),
        ssm_client=boto3.client('ssm', region_name=os.environ.get('region')),
        ttl=int(os.environ.get('ssm_ttl'))
    )
)


@app.route("/", methods=["GET"])
def hello():
    return "Hello from lambda"


@app.route("/random/<int:to_int>", methods=["GET"])
def get_random_quote(to_int):
    from_int = app.config['STORE']['from_int']
    return jsonify(randint(from_int, to_int))


if __name__ == '__main__':
    app.run()

Now the serverless configuration. I can use only one function, handling all routes and letting Flask do the job.

functions:
  app:
    handler: wsgi_handler.handler
    events:
      - http: ANY /
      - http: 'ANY {proxy+}'

But in this example I want to create two different functions. Only for fun (and to use different role statements and different logs in cloudwatch).

service: random

plugins:
  - serverless-wsgi
  - serverless-python-requirements
  - serverless-pseudo-parameters
  - serverless-iam-roles-per-function

custom:
  defaultRegion: eu-central-1
  defaultStage: dev
  wsgi:
    app: app.app
    packRequirements: false
  pythonRequirements:
    dockerizePip: non-linux

provider:
  name: aws
  runtime: python3.7
  region: ${opt:region, self:custom.defaultRegion}
  stage: ${opt:stage, self:custom.defaultStage}
  memorySize: 128
  environment:
    region: ${self:provider.region}
    stage: ${self:provider.stage}

functions:
  app:
    handler: wsgi_handler.handler
    events:
      - http: ANY /
      - http: 'ANY {proxy+}'
    iamRoleStatements:
      - Effect: Allow
        Action: ssm:DescribeParameters
        Resource: arn:aws:ssm:${self:provider.region}:#{AWS::AccountId}:*
      - Effect: Allow
        Action: ssm:GetParameter
        Resource: arn:aws:ssm:${self:provider.region}:#{AWS::AccountId}:parameter/random/*
  home:
    handler: wsgi_handler.handler
    events:
      - http: GET /

And that’s all. “npx serverless deploy” and my random generator is running.

Data Analysis with Python. Pivot tables with Pandas

One of the first post in my blog was about Pivot tables. I’d created a library to pivot tables in my PHP scripts. The library is not very beautiful (it throws a lot of warnings), but it works. These days I’m playing with Python Data Analysis and I’m using Pandas. The purpose of this post is something that I like a lot: Learn by doing. So I want to do the same operations that I did eight years ago in the post but now with Pandas. Let’s start.

I’ll start with the same datasource that I used almost ten years ago. One simple recordset with cliks and number of users

I create a dataframe with this data

import numpy as np
import pandas as pd

data = pd.DataFrame([
    {'host': 1, 'country': 'fr', 'year': 2010, 'month': 1, 'clicks': 123, 'users': 4},
    {'host': 1, 'country': 'fr', 'year': 2010, 'month': 2, 'clicks': 134, 'users': 5},
    {'host': 1, 'country': 'fr', 'year': 2010, 'month': 3, 'clicks': 341, 'users': 2},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 1, 'clicks': 113, 'users': 4},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 2, 'clicks': 234, 'users': 5},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 3, 'clicks': 421, 'users': 2},
    {'host': 1, 'country': 'es', 'year': 2010, 'month': 4, 'clicks': 22, 'users': 3},
    {'host': 2, 'country': 'es', 'year': 2010, 'month': 1, 'clicks': 111, 'users': 2},
    {'host': 2, 'country': 'es', 'year': 2010, 'month': 2, 'clicks': 2, 'users': 4},
    {'host': 3, 'country': 'es', 'year': 2010, 'month': 3, 'clicks': 34, 'users': 2},
    {'host': 3, 'country': 'es', 'year': 2010, 'month': 4, 'clicks': 1, 'users': 1}
])

Pivot_tables

Now we want to do a simple pivot operation. We want to pivot on host

pd.pivot_table(data,
   index=['host'],
   values=['users', 'clicks'],
   columns=['year', 'month'],
   fill_value=''
  )

Pivot_tables_2

We can add totals

pd.pivot_table(data,
               index=['host'],
               values=['users', 'clicks'],
               columns=['year', 'month'],
               fill_value='',
               aggfunc=np.sum,
               margins=True,
               margins_name='Total'
              )

Pivot_tables_3

We can also pivot on more than one column. For example host and country

pd.pivot_table(data,
               index=['host', 'country'],
               values=['users', 'clicks'],
               columns=['year', 'month'],
               fill_value=''
              )

Pivot_tables_4

and also with totals

pd.pivot_table(data,
               index=['host', 'country'],
               values=['users', 'clicks'],
               columns=['year', 'month'],
               aggfunc=np.sum,
               fill_value='',
               margins=True,
               margins_name='Total'
              )

Pivot_tables_5

We can group by dataframe and calculate subtotals

data.groupby(['host', 'country'])[('clicks', 'users')].sum()

Pivot_tables_6

data.groupby(['host', 'country'])[('clicks', 'users')].mean()

Pivot_tables_7

And finally we can mix totals and subtotals.

out = data.groupby('host').apply(lambda sub: sub.pivot_table(
    index=['host', 'country'],
    values=['users', 'clicks'],
    columns=['year', 'month'],
    aggfunc=np.sum,
    margins=True,
    margins_name='SubTotal',
))

out.loc[('', 'Max', '')] = out.max()
out.loc[('', 'Min', '')] = out.min()
out.loc[('', 'Total', '')] = out.sum()

out.index = out.index.droplevel(0)

out.fillna('', inplace=True)

Pivot_tables_8

And that’s all. A lot of to learn yet about data analysis, but Pandas will be definitely a good friend of mine.

You can see the Jupiter notebook in my github account

Monitoring the bandwidth (part 2) now with Python Nameko microservice

This days I’ve been playing with Nameko. The Python framework for building microservices. Today I want to upgrade one small pet project that I’ve got in my house to monitor the bandwidth of my internet connection. I want to use one nameko microservice using the Timer entrypoint.

That’s the worker:

from nameko.timer import timer
import datetime
import logging
import os
import speedtest
from dotenv import load_dotenv
from influxdb import InfluxDBClient

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))


class SpeedService:
    name = "speed"

    def __init__(self):
        self.influx_client = InfluxDBClient(
            host=os.getenv("INFLUXDB_HOST"),
            port=os.getenv("INFLUXDB_PORT"),
            database=os.getenv("INFLUXDB_DATABASE")
        )

    @timer(interval=3600)
    def speedTest(self):
        logging.info("speedTest")
        current_time = datetime.datetime.utcnow().isoformat()
        speed = self.get_speed()

        self.persists(measurement='download', fields={"value": speed['download']}, time=current_time)
        self.persists(measurement='upload', fields={"value": speed['upload']}, time=current_time)
        self.persists(measurement='ping', fields={"value": speed['ping']}, time=current_time)

    def persists(self, measurement, fields, time):
        logging.info("{} {} {}".format(time, measurement, fields))
        self.influx_client.write_points([{
            "measurement": measurement,
            "time": time,
            "fields": fields
        }])

    @staticmethod
    def get_speed():
        logging.info("Calculating speed ...")
        s = speedtest.Speedtest()
        s.get_best_server()
        s.download()
        s.upload()

        return s.results.dict()

I need to adapt my docker-compose file to include the RabbitMQ server (Nameko needs a RabbitMQ message broker)

version: '3'

services:
  speed.worker:
    container_name: speed.worker
    image: speed/worker
    restart: always
    build:
      context: ./src/speed.worker
      dockerfile: .docker/Dockerfile-worker
    command: /bin/bash run.sh
  rabbit:
    container_name: speed.rabbit
    image: rabbitmq:3-management
    restart: always
    ports:
      - "15672:15672"
      - "5672:5672"
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
  influxdb:
    container_name: speed.influxdb
    image: influxdb:latest
    restart: always
    environment:
    - INFLUXDB_DB=${INFLUXDB_DB}
    - INFLUXDB_ADMIN_USER=${INFLUXDB_ADMIN_USER}
    - INFLUXDB_ADMIN_PASSWORD=${INFLUXDB_ADMIN_PASSWORD}
    - INFLUXDB_HTTP_AUTH_ENABLED=${INFLUXDB_HTTP_AUTH_ENABLED}
    volumes:
    - influxdb-data:/data
  grafana:
    container_name: speed.grafana
    build:
      context: ./src/grafana
      dockerfile: .docker/Dockerfile-grafana
    restart: always
    environment:
    - GF_SECURITY_ADMIN_USER=${GF_SECURITY_ADMIN_USER}
    - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD}
    - GF_USERS_DEFAULT_THEME=${GF_USERS_DEFAULT_THEME}
    - GF_USERS_ALLOW_SIGN_UP=${GF_USERS_ALLOW_SIGN_UP}
    - GF_USERS_ALLOW_ORG_CREATE=${GF_USERS_ALLOW_ORG_CREATE}
    - GF_AUTH_ANONYMOUS_ENABLED=${GF_AUTH_ANONYMOUS_ENABLED}
    ports:
    - "3000:3000"
    depends_on:
    - influxdb
volumes:
  influxdb-data:
    driver: local

And that’s all. Over engineering to control my Internet Connection? Maybe, but that’s the way I learn new stuff 🙂

Source code available in my github.

Using cache buster with OpenUI5 outside SCP

When we work with SPAs and web applications we need to handle with the browser’s cache. Sometimes we change our static files but the client’s browser uses a cached version of the file instead of the new one. We can tell the user: Please empty your cache to use the new version. But most of the times the user don’t know what we’re speaking about, and we have a problem. There’s a technique called cache buster used to bypass this issue. It consists on to change the name of the file (or adding an extra parameter), basically to ensure that the browser will send a different request to the server to prevent the browser from reusing the cached version of the file.

When we work with sapui5 application over SCP, we only need to use the cachebuster version of sap-ui-core

<script id="sap-ui-bootstrap"
      src="https://sapui5.hana.ondemand.com/resources/sap-ui-cachebuster/sap-ui-core.js"
      data-sap-ui-libs="sap.m"
      data-sap-ui-theme="sap_belize"
      data-sap-ui-compatVersion="edge"
      data-sap-ui-appCacheBuster=""
      data-sap-ui-preload="async"
      data-sap-ui-resourceroots='{"app": ""}'
      data-sap-ui-frameOptions="trusted">
</script>

With this configuration, our framework will use a “cache buster friendly” version of our files and SCP will serve them properly.

For example, when our framework wants the /dist/Component.js file, the browser will request /dist/~1541685070813~/Component.js to the server. And the server will server the file /dist/Component.js. As I said before when we work with SCP, our standard build process automatically takes care about it. It creates a file called sap-ui-cachebuster-info.json where we can find all our files with one kind of hash that our build process changes each time our file is changed.

{
  "Component-dbg.js": 1545316733136,
  "Component-preload.js": 1545316733226,
  "Component.js": 1541685070813,
  ...
}

It works like a charm but I not always use SCP. Sometimes I use OpenUI5 in one nginx server, for example. So cache buster “doesn’t work”. That’s a problem because I need to handle with browser caches again each time we deploy the new version of the application. I wanted to solve the issue. Let me explain how I did it.

Since I was using one Lumen/PHP server to the backend, my first idea was to create a dynamic route in Lumen to handle cache-buster urls. With this approach I know I can solve the problem but there’s something that I did not like: I’ll use a dynamic server to serve static content. I don’t have a huge traffic. I can use this approach but it isn’t beautiful.

My second approach was: Ok I’ve got a sap-ui-cachebuster-info.json file where I can see all the files that cache buster will use and their hashes. So, Why not I create those files in my build script. With this approach I will create the full static structure each time I deploy de application, without needing any server side scripting language to generate dynamic content. OpenUI5 uses grunt so I can create a simple grunt task to create my files.

'use strict';

var fs = require('fs');
var path = require('path');
var chalk = require('chalk');

module.exports = function(grunt) {
  var name = 'cacheBuster';
  var info = 'Generates Cache buster files';

  var cacheBuster = function() {
    var config = grunt.config.get(name);
    var data, t, src, dest, dir, prop;

    data = grunt.file.readJSON(config.src + '/sap-ui-cachebuster-info.json');
    for (prop in data) {
      if (data.hasOwnProperty(prop)) {
        t = data[prop];
        src = config.src + '/' + prop;
        dest = config.src + '/~' + t + '~/' + prop;
        grunt.verbose.writeln(
            name + ': ' + chalk.cyan(path.basename(src)) + ' to ' +
            chalk.cyan(dest) + '.');
        dir = path.dirname(dest);
        grunt.file.mkdir(dir);
        fs.copyFileSync(src, dest);
      }
    }
  };

  grunt.registerMultiTask(name, info, cacheBuster);
};

I deploy my grunt task to npm so when I need to use it I only need to:

Install the task

npm install gonzalo123-cachebuster

Add the task to my gruntfile

require('gonzalo123-cachebuster')(grunt);

and set up the path where ui5 task generates our dist files

  grunt.config.merge({
    pkg: grunt.file.readJSON('package.json'),
    ...
    cacheBuster: {
      src: 'dist'
    }
  });

And that’s all. My users with enjoy (or suffer) the new versions of my applications without having problems with cached files.

Grunt task available in my github

Playing with microservices, Docker, Python an Nameko

In the last projects that I’ve been involved with I’ve playing, in one way or another, with microservices, queues and things like that. I’m always facing the same tasks: Building RPCs, Workers, API gateways, … Because of that I’ve searching one framework to help me with those kind of stuff. Finally I discover Nameko. Basically Nameko is the Python tool that I’ve been looking for. In this post I will create a simple proof of concept to learn how to integrate Nameko within my projects. Let start.

The POC is a simple API gateway that gives me the localtime in iso format. I can create a simple Python script to do it

import datetime
import time

print(datetime.datetime.fromtimestamp(time()).isoformat())

We also can create a simple Flask API server to consume this information. The idea is create a rpc worker to generate this information and also generate another worker to send the localtime, but taken from a PostgreSQL database (yes I know it not very useful but it’s just an excuse to use a PG database in the microservice)

We’re going to create two rpc workers. One giving the local time:

from nameko.rpc import rpc
from time import time
import datetime


class TimeService:
    name = "local_time_service"

    @rpc
    def local(self):
        return datetime.datetime.fromtimestamp(time()).isoformat()

And another one with the date from PostgreSQL:

from nameko.rpc import rpc
from dotenv import load_dotenv
import os
from ext.pg import PgService

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))


class TimeService:
    name = "db_time_service"
    conn = PgService(os.getenv('DSN'))

    @rpc
    def db(self):
        with self.conn:
            with self.conn.cursor() as cur:
                cur.execute("select localtimestamp")
                timestamp = cur.fetchone()
        return timestamp[0]

I’ve created a service called PgService only to learn how to create dependency providers in nameko

from nameko.extensions import DependencyProvider
import psycopg2


class PgService(DependencyProvider):

    def __init__(self, dsn):
        self.dsn = dsn

    def get_dependency(self, worker_ctx):
        return psycopg2.connect(self.dsn)

Now we only need to setup the api gateway. With Nameko we can create http entrypoint also (in the same way than we create rpc) but I want to use it with Flask

from flask import Flask
from nameko.standalone.rpc import ServiceRpcProxy
from dotenv import load_dotenv
import os

current_dir = os.path.dirname(os.path.abspath(__file__))
load_dotenv(dotenv_path="{}/.env".format(current_dir))

app = Flask(__name__)


def rpc_proxy(service):
    config = {'AMQP_URI': os.getenv('AMQP_URI')}
    return ServiceRpcProxy(service, config)


@app.route('/')
def hello():
    return "Hello"


@app.route('/local')
def local_time():
    with rpc_proxy('local_time_service') as rpc:
        time = rpc.local()

    return time


@app.route('/db')
def db_time():
    with rpc_proxy('db_time_service') as rpc:
        time = rpc.db()

    return time


if __name__ == '__main__':
    app.run()

As well as I wanna run my POC with docker, here the docker-compose file to set up the project

version: '3.4'

services:
  api:
    image: nameko/api
    container_name: nameko.api
    hostname: api
    ports:
    - "8080:8080"
    restart: always
    links:
    - rabbit
    - db.worker
    - local.worker
    environment:
    - ENV=1
    - FLASK_APP=app.py
    - FLASK_DEBUG=1
    build:
      context: ./api
      dockerfile: .docker/Dockerfile-api
    #volumes:
    #- ./api:/usr/src/app:ro
    command: flask run --host=0.0.0.0 --port 8080
  db.worker:
    container_name: nameko.db.worker
    image: nameko/db.worker
    restart: always
    build:
      context: ./workers/db.worker
      dockerfile: .docker/Dockerfile-worker
    command: /bin/bash run.sh
  local.worker:
    container_name:  nameko.local.worker
    image: nameko/local.worker
    restart: always
    build:
      context: ./workers/local.worker
      dockerfile: .docker/Dockerfile-worker
    command: /bin/bash run.sh
  rabbit:
    container_name: nameko.rabbit
    image: rabbitmq:3-management
    restart: always
    ports:
    - "15672:15672"
    - "5672:5672"
    environment:
      RABBITMQ_ERLANG_COOKIE:
      RABBITMQ_DEFAULT_VHOST: /
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
  pg:
    container_name: nameko.pg
    image: nameko/pg
    restart: always
    build:
      context: ./pg
      dockerfile: .docker/Dockerfile-pg
    #ports:
    #- "5432:5432"
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

And that’s all. Two nameko rpc services working together behind a api gateway

Code available in my github