P G Jones

How to use Postgres with Quart

2021-03-26

PostgreSQL is a fantastic database system that I've been using for a number of years and highly recommend. It is also really easy to use with Quart, which I'll show in this article. Note though that I don't use ORMs, as I prefer to write the SQL queries directly.

The asyncio ecosystem has a number of Postgres client libraries, of which asyncpg, and aiopg are the most popular. I prefer to use asyncpg, as it has a pleasant API and is the most performant.

The downside I find with asyncpg is that it doesn't implement PEP-249 instead prefering it's own custom paramstyle, specifically params are $ prefixed e.g.

await conn.execute("SELECT * FROM tbl WHERE id = $1", id)

this is problematic as it is very easy to mistake the ordering of the variables, e.g. this type of bug,

await conn.execute(
    "SELECT * FROM tbl WHERE id = $1 AND active = $2",
     active,
     id,
)

whereas a named paramstyle e.g.

await conn.execute(
    "SELECT * FROM tbl WHERE id = :id AND active = :active",
    {"id": id, "active": active},
)

is much harder to get wrong.

To enable a named paramstyle I use Databases which wraps asyncpg allowing SQLAlchemy text clauses - as used in the snippet above.

Basic setup

I advocate this simple snippet to setup a Databases connection with Quart,

from typing import Any, Optional

from databases import Database
from quart import Quart


class QuartDatabases:
    def __init__(self, app: Optional[Quart] = None, **db_args: Any) -> None:
        self._db_args = db_args
        if app is not None:
            self.init_app(app)

    def init_app(self, app: Quart) -> None:
        self._url = app.config["QUART_DATABASES_URI"]
        app.before_serving(self._before_serving)
        app.after_serving(self._after_serving)

    async def _before_serving(self) -> None:
        self._db = Database(url=self._url, **self._db_args)
        await self._db.connect()

    async def _after_serving(self) -> None:
        await self._db.disconnect()

    def __getattr__(self, name: str) -> Any:
        return getattr(self._db, name)

which allows usage like,

app = Quart(__name__)
db = QuartDatabases(app)

@app.route("/")
async def index():
    return await db.fetch_val("SELECT COUNT(*) FROM mytable")

with all the core methods (fetch_one, fetch_all, execute, execute_many) transactions and connection options supported. For example, (following on from the above snippet),

@app.route("/<int:id_>/", methods=["POST"])
async def index(id_: int):
    data = await request.get_json()
    async with db.connection() as connection
        await connection.fetch_val("SELECT COUNT(*) FROM mytable")
        await connection.execute(
            "UPDATE mytable SET clm = :val WHERE id = :id",
            values={"val": data["clm"], "id": id_},
        )

Advanced; type conversion

asyncpg supports custom type conversion between Postgres and Python types. For example a JSON column in the database can be automatically dumpted to the database and loaded from it, or a enum can be converted from a Python Enum to the database and back when loaded. For example if we have this DB structure,

CREATE TYPE TRAFFIC_LIGHT_T AS ENUM ('RED', 'AMBER', 'GREEN');

CREATE TABLE lights (
    id SERIAL PRIMARY KEY,
    details JSONB,
    state TRAFFIC_LIGHT_T
);

and run a queries such as,

from enum import Enum

class TrafficLight(Enum):
    RED = "RED"
    AMBER = "AMBER"
    GREEN = "GREEN"

result = await db.fetch_one("SELECT details, state FROM lights LIMIT 1")
await db.execute(
    "INSERT INTO lights (details, state) VALUES (:details, :state)",
    values={"details": {"location": "London"}, "state": TrafficLight.RED},
)

it would be great if this worked and that result["details"] was a dict and result["state"] was a TrafficLight instance. This is possible by defining how to encode and decode types to and from postgres types using a type codec,

import json

async with db.connection() as connection:
    await connection.raw_connection.set_type_code(
        "jsonb",
         encoder=json.dumps,
         decoder=json.loads,
         schema="pg_catalog",
    )

    await connection.raw_connection.set_type_code(
        "traffic_light_t",
        encoder=lambda type_: type_.value,
        decoder=TrafficLight
        schema="public",
        format="text",
    )

    ... # Run queries as above

this though is a pain, as the type codecs have to be set each time a connection is used. Instead the asyncpg init argument can be used to initialise the connection. Putting this together with the basic example gives,

from typing import Any, Callable, Optional

from databases import Database
from quart import Quart


class QuartDatabases:
    def __init__(self, app: Optional[Quart] = None, **db_args: Any) -> None:
        self._db_args = db_args
        self._codecs = []
        if app is not None:
            self.init_app(app)

    def init_app(self, app: Quart) -> None:
        self._url = app.config["QUART_DATABASES_URI"]
        app.before_serving(self._before_serving)
        app.after_serving(self._after_serving)

    def set_type_codec(
        self,
        type_: str,
        encoder: Callable,
        decoder: Callable,
        schema: Optional[str] = None,
        format: Optional[str] = None,
    ) -> None:
        self._codecs.append(type_, encoder, decoder, schema, format)

    async def _init(self, connection: asyncpg.Connection) -> None:
        for type_, encoder, decoder, schema, format in self._codecs:
            await connection.set_type_code(
                type_, encoder, decoder, schema, format
            )

    async def _before_serving(self) -> None:
        self._db = Database(url=self._url, init=self._init, **self._db_args)
        await self._db.connect()

    async def _after_serving(self) -> None:
        await self._db.disconnect()

    def __getattr__(self, name: str) -> Any:
        return getattr(self._db, name)

which then allows usage like,

import json
from enum import Enum

class TrafficLight(Enum):
    RED = "RED"
    AMBER = "AMBER"
    GREEN = "GREEN"

app = Quart(__name__)
db = QuartDatabases(app)

db.set_type_codec(
    "jsonb",
     encoder=json.dumps,
     decoder=json.loads,
     schema="pg_catalog",
)

db.set_type_code(
    "traffic_light_t",
    encoder=lambda type_: type_.value,
    decoder=TrafficLight
    schema="public",
    format="text",
)

@app.route("/lights/<int:id_>/")
async def index(id_: int):
    return await db.fetch_val(
        "SELECT details, state FROM lights WHERE id = :id",
        values={"id": id_},
    )

I hope this helps! I often wonder if this should be an extension, please let me know if you think it should be...