Database utils for psycopg2


I normally need to perform raw queries when I’m working with Python. Even when I’m using Django and it’s ORM I need to execute sql against the database. As I use (almost always) PostgreSQL and I use (as we all do) psycopg2. Psycopg2 is very complete and easy to use but sometimes I need a few helpers to make my usual tasks easier. I don’t want to create a complex library on top of psycopg2, only a helper.

Something that I miss from PHP and DBAL library is the way that DBAL helps me to create simple CRUD operations. The idea of this helper functions is to do something similar. Let’s start

I’ve created procedural functions to do all and also a Db class. Normally all functions needs the cursor parameter.

def fetch_all(cursor, sql, params=None):
    cursor.execute(
        query=sql,
        vars={} if params is None else params)

    return cursor.fetchall()

The Db class accepts cursor in the constructor

db = Db(cursor=cursor)
data = db.fetch_all("SELECT * FROM table")

Connection

Nothing especial in the connection. I like to use "connection_factory=NamedTupleConnection" to allow me to access to the recordset as an Object. I’ve created simple factory helper to create connections:

def get_conn(dsn, named_tuple=False, autocommit=False):
    conn = psycopg2.connect(
        dsn=dsn,
        connection_factory=NamedTupleConnection if named_tuple else None,
    )
    conn.autocommit = autocommit

    return conn

Sometimes I use NamedTuple in the cursor instead connection, so I’ve created a simple helper

def get_cursor(conn, named_tuple=True):
    return conn.cursor(cursor_factory=NamedTupleCursor if named_tuple else None)

Fetch all

db = Db(cursor)
for reg in db.fetch_all(sql="SELECT email, name from users"):
    assert 'user1' == reg.name
    assert 'user1@email.com' == reg.email

Fetch one

data == db.fetch_one(sql=SQL)

Select

data = db.select(
        table='users',
        where={'email': 'user1@email.com'}

This helper only allows me to perform simple where statements (joined with AND). If I need one complex Where, then I use fetch

Insert

Insert one row

db.insert(
        table='users',
        values={'email': 'user1@email.com', 'name': 'user1'})

Or insert multiple rows (using spycopg2’s executemany)

db.insert(
        table='users',
        values=[
            {'email': 'user2@email.com', 'name': 'user2'},
            {'email': 'user3@email.com', 'name': 'user3'}
        ])

We also can use insert_batch to insert all rows within one command

db.insert_batch(
        table='users',
        values=[
            {'email': 'user2@email.com', 'name': 'user2'},
            {'email': 'user3@email.com', 'name': 'user3'}
        ])

Update

db.update(
        table='users',
        data={'name': 'xxxx'},
        identifier={'email': 'user1@email.com'},
    )

Delete

db.delete(table='users', where={'email': 'user1@email.com'})

Upsert

Sometimes we need to insert one row or update the row if the primary key already exists. We can do that with two statements: One select and if there isn’t any result one insert. Else on update. We can do that with one sql statement. I’ve created a helper to to that:

def _get_upsert_sql(data, identifier, table):
    raw_sql = """
        WITH
            upsert AS (
                UPDATE {tbl}
                SET {t_set}
                WHERE {t_where}
                RETURNING {tbl}.*),
            inserted AS (
                INSERT INTO {tbl} ({t_fields})
                SELECT {t_select_fields}
                WHERE NOT EXISTS (SELECT 1 FROM upsert)
                RETURNING *)
        SELECT * FROM upsert
        UNION ALL
        SELECT * FROM inserted
    """
    merger_data = {**data, **identifier}
    sql = psycopg2_sql.SQL(raw_sql).format(
        tbl=psycopg2_sql.Identifier(table),
        t_set=psycopg2_sql.SQL(', ').join(_get_conditions(data)),
        t_where=psycopg2_sql.SQL(' and ').join(_get_conditions(identifier)),
        t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, merger_data.keys())),
        t_select_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, merger_data.keys())))

    return merger_data, sql


def upsert(cursor, table, data, identifier):
    merger_data, sql = _get_upsert_sql(data, identifier, table)

    cursor.execute(sql, merger_data)
    return cursor.rowcount

Now we can execute a upsert in a simple way:

db.upsert(
        table='users',
        data={'name': 'yyyy'},
        identifier={'email': 'user1@email.com'})

Stored procedures

data = db.sp_fetch_one(
    function='hello',
    params={'name': 'Gonzalo'})
data = db.sp_fetch_all(
    function='hello',
    params={'name': 'Gonzalo'})

Transactions

In PHP and DBAL to create one transaction we can use this syntax

<?php
$conn->transactional(function($conn) {
    // do stuff
});

In Python we can do the same with context management, but I prefer to use this syntax:

with transactional(conn) as db:
    assert 1 == db.insert(
        table='users',
        values={'email': 'user1@email.com', 'name': 'user1'})

The transactional function is like that (I’ve created two functions. One with raw cursor and another one with my Db class):

def _get_transactional(conn, named_tuple, callback):
    try:
        with conn as connection:
            with get_cursor(conn=connection, named_tuple=named_tuple) as cursor:
                yield callback(cursor)
            conn.commit()
    except Exception as e:
        conn.rollback()
        raise e

@contextmanager
def transactional(conn, named_tuple=False):
    return _get_transactional(conn, named_tuple, lambda cursor: Db(cursor))


@contextmanager
def transactional_cursor(conn, named_tuple=False):
    return _get_transactional(conn, named_tuple, lambda cursor: cursor)

Return values

Select and fetch helpers returns the recordset, but insert, update, delete and upsert returns the number of affected rows. For example that’s the insert function:

def _get_insert_sql(table, values):
    keys = values[0].keys() if type(values) is list else values.keys()

    raw_sql = "insert into {tbl} ({t_fields}) values ({t_values})"
    return psycopg2_sql.SQL(raw_sql).format(
        tbl=psycopg2_sql.Identifier(table),
        t_fields=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Identifier, keys)),
        t_values=psycopg2_sql.SQL(', ').join(map(psycopg2_sql.Placeholder, keys)))


def insert(cursor, table, values):
    sql = _get_insert_sql(table=table, values=values)
    cursor.executemany(query=sql, vars=values) if type(values) is list else cursor.execute(sql, values)

    return cursor.rowcount

Unit tests

You can try the library running the unit tests. I’ve also provide one docker-compose.yml file to set up a PostgreSQL database to run the tests.

version: '3.6'

services:
  pg:
    build:
      context: .docker/pg
      dockerfile: Dockerfile
    ports:
      - 5432:5432
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_DB: ${POSTGRES_DB}
      PGDATA: /var/lib/postgresql/data/pgdata

Install

You can install the library using pip

pip install dbutils-gonzalo123

Full code in my github

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.