Sprockets Postgres

An set of mixins and classes for interacting with PostgreSQL using asyncio in Tornado / sprockets.http applications using aiopg.

Python versions supported: 3.7+

Installation

sprockets-postgres is available on the Python package index and is installable via pip:

pip3 install sprockets-postgres

Documentation

Configuration

Configuration of sprockets-postgres is done by using of environment variables or tornado.web.Application.settings dictionary. The sprockets_postgres.ApplicationMixin will use configuration as applied to the settings dictionary, falling back to the environment variable if the value is not set in the dictionary. Keys in the settings dictionary are lowercase, and if provided as environment variables, are uppercase. For example to set the Postgres URL in a tornado.web.Application.settings, you’d do the following:

settings = {'postgres_url': 'postgresql://postgres@localhost:5432/postgres'}
app = web.Application([routes], settings=settings)

and as an environment variable:

POSTGRES_URL=postgresql://postgres@localhost:5432/postgres

Available Settings

The following table details the available configuration options:

Variable

Definition

Type

Default

postgres_url

The PostgreSQL URL to connect to

str

postgres_max_pool_size

Maximum connection count to Postgres per backend

int

10

postgres_min_pool_size

Minimum or starting pool size.

int

1

postgres_connection_timeout

The maximum time in seconds to spend attempting to create a new connection.

int

10

postgres_connection_ttl

Time-to-life in seconds for a pooled connection.

int

300

postgres_query_timeout

Maximum execution time for a query in seconds.

int

60

postgres_hstore

Enable HSTORE support in the client.

bool

FALSE

postgres_json

Enable JSON support in the client.

bool

FALSE

postgres_uuid

Enable UUID support in the client.

bool

TRUE

If postgres_url uses a scheme of postgresql+srv, a SRV DNS lookup will be performed and the lowest priority record with the highest weight will be selected for connecting to Postgres.

AWS’s ECS service discovery does not follow the SRV standard, but creates SRV records. If postgres_url uses a scheme of aws+srv, a SRV DNS lookup will be performed and the URL will be constructed containing all host and port combinations in priority and weighted order, utilizing libpq’s supoprt for multiple hosts in a URL.

Application Mixin

class sprockets_postgres.ApplicationMixin(*args, **kwargs)[source]

sprockets.http.app.Application mixin for handling the connection to Postgres and exporting functions for querying the database, getting the status, and proving a cursor.

Automatically creates and shuts down aiopg.Pool on startup and shutdown by installing on_start and shutdown callbacks into the Application instance.

postgres_connector(on_error=None, on_duration=None, timeout=None, _attempt=1)[source]

Asynchronous context-manager that returns a PostgresConnector instance from the connection pool with a cursor.

Note

This function is designed to work in conjunction with the RequestHandlerMixin and is generally not invoked directly.

Parameters
  • on_error (typing.Optional[typing.Callable]) – A callback function that is invoked on exception. If an exception is returned from that function, it will raise it.

  • on_duration (typing.Optional[typing.Callable]) – An optional callback function that is invoked after a query has completed to record the duration that encompasses both executing the query and retrieving the returned records, if any.

  • timeout (Timeout) – Used to override the default query timeout.

Raises
Parameters

_attempt (int) –

Return type

typing.AbstractAsyncContextManager[sprockets_postgres.PostgresConnector]

property postgres_is_connected

Returns True if Postgres is currently connected

Return type

bool

async postgres_status()[source]

Invoke from the /status RequestHandler to check that there is a Postgres connection handler available and return info about the pool.

The available item in the dictionary indicates that the application was able to perform a SELECT 1 against the database using a PostgresConnector instance.

The pool_size item indicates the current quantity of open connections to Postgres.

The pool_free item indicates the current number of idle connections available to process queries.

Example return value

{
    'available': True,
    'pool_size': 10,
    'pool_free': 8
}
Return type

dict

RequestHandler Mixin

The RequestHandlerMixin is a Tornado tornado.web.RequestHandler mixin that provides easy to use functionality for interacting with PostgreSQL.

class sprockets_postgres.RequestHandlerMixin[source]

A RequestHandler mixin class exposing functions for querying the database, recording the duration to either sprockets-influxdb or sprockets.mixins.metrics, and handling exceptions.

async postgres_callproc(name, parameters=None, metric_name='', *, timeout=None)[source]

Execute a stored procedure / function

Parameters
  • name (str) – The stored procedure / function name to call

  • parameters (QueryParameters) – Query parameters to pass when calling

  • metric_name (str) – The metric name for duration recording and logging

  • timeout (Timeout) – Timeout value to override the default or the value specified when creating the PostgresConnector.

Raises
Return type

QueryResult

async postgres_execute(sql, parameters=None, metric_name='', *, timeout=None)[source]

Execute a query, specifying a name for the query, the SQL statement, and optional positional arguments to pass in with the query.

Parameters may be provided as sequence or mapping and will be bound to variables in the operation. Variables are specified either with positional %s or named %({name})s placeholders.

Parameters
  • sql (str) – The SQL statement to execute

  • parameters (QueryParameters) – Query parameters to pass as part of the execution

  • metric_name (str) – The metric name for duration recording and logging

  • timeout (Timeout) – Timeout value to override the default or the value specified when creating the PostgresConnector.

Raises
Return type

QueryResult

postgres_transaction(timeout=None)[source]

asynchronous context-manager function that implements full BEGIN, COMMIT, and ROLLBACK semantics. If there is a psycopg2.Error raised during the transaction, the entire transaction will be rolled back.

If no exception is raised, the transaction will be committed when exiting the context manager.

Usage Example

class RequestHandler(sprockets_postgres.RequestHandlerMixin,
                     web.RequestHandler):

async def post(self):
    async with self.postgres_transaction() as transaction:
        result1 = await transaction.execute(QUERY_ONE)
        result2 = await transaction.execute(QUERY_TWO)
        result3 = await transaction.execute(QUERY_THREE)
Parameters

timeout (Timeout) – Timeout value to override the default or the value specified when creating the PostgresConnector.

Raises
  • asyncio.TimeoutError – when there is a query or network timeout when starting the transaction

  • psycopg2.Error – when there is an exception raised by Postgres when starting the transaction

Return type

typing.AbstractAsyncContextManager[sprockets_postgres.PostgresConnector]

on_postgres_error(metric_name, exc)[source]

Invoked when an error occurs when executing a query

If tornado-problem-details is available, problemdetails.Problem will be raised instead of tornado.web.HTTPError.

Override for different error handling behaviors.

Return an exception if you would like for it to be raised, or swallow it here.

Parameters
Return type

typing.Optional[Exception]

on_postgres_timing(metric_name, duration)[source]

Override for custom metric recording. As a default behavior it will attempt to detect sprockets-influxdb and sprockets.mixins.metrics and record the metrics using them if they are available. If they are not available, it will record the query duration to the DEBUG log.

Parameters
  • metric_name (str) – The name of the metric to record

  • duration (float) – The duration to record for the metric

Return type

None

_postgres_connection_check()[source]

Ensures Postgres is connected, exiting the request in error if not

Raises

problemdetails.Problem

Raises

web.HTTPError

The StatusRequestHandler is a Tornado tornado.web.RequestHandler that can be used for application health monitoring. If the Postgres connection is unavailable, it will report the API as unavailable and return a 503 status code.

class sprockets_postgres.StatusRequestHandler(application, request, **kwargs)[source]

A RequestHandler that can be used to expose API health or status

Parameters
async get(*_args, **_kwarg)[source]

Query Result

A QueryResult instance is created for every query and contains the count of rows effected by the query and either the row as a dict or rows as a list of dict. For queries that do not return any data, both row and rows will be None.

class sprockets_postgres.QueryResult(row_count, row, rows)[source]

Contains the results of the query that was executed.

Parameters
  • row_count (int) – The quantity of rows impacted by the query

  • row (typing.Optional[dict]) – If a single row is returned, the data for that row

  • rows (typing.Optional[typing.List[dict]]) – If more than one row is returned, this attribute is set as the list of rows, in order.

property row_count

Return the number of rows for the result

Return type

int

property rows

Return the result as a list of one or more rows

Return type

typing.List[dict]

Postgres Connector

A PostgresConnector instance contains a cursor for a Postgres connection and methods to execute queries.

class sprockets_postgres.PostgresConnector(cursor, on_error=None, on_duration=None, timeout=None)[source]

Wraps a aiopg.Cursor instance for creating explicit transactions, calling stored procedures, and executing queries.

Unless the transaction() asynchronous context-manager is used, each call to callproc() and execute() is an explicit transaction.

Note

PostgresConnector instances are created by ApplicationMixin.postgres_connector and should not be created directly.

Parameters
  • cursor (aiopg.Cursor) – The cursor to use in the connector

  • on_error (typing.Optional[typing.Callable]) – The callback to invoke when an exception is caught

  • on_duration (typing.Optional[typing.Callable]) – The callback to invoke when a query is complete and all of the data has been returned.

  • timeout (Timeout) – A timeout value in seconds for executing queries. If unspecified, defaults to the configured query timeout of 120 seconds.

async callproc(name, parameters=None, metric_name='', *, timeout=None)[source]

Execute a stored procedure / function

Parameters
  • name (str) – The stored procedure / function name to call

  • parameters (QueryParameters) – Query parameters to pass when calling

  • metric_name (str) – The metric name for duration recording and logging

  • timeout (Timeout) – Timeout value to override the default or the value specified when creating the PostgresConnector.

Raises
Return type

QueryResult

async execute(sql, parameters=None, metric_name='', *, timeout=None)[source]

Execute a query, specifying a name for the query, the SQL statement, and optional positional arguments to pass in with the query.

Parameters may be provided as sequence or mapping and will be bound to variables in the operation. Variables are specified either with positional %s or named %({name})s placeholders.

Parameters
  • sql (str) – The SQL statement to execute

  • parameters (QueryParameters) – Query parameters to pass as part of the execution

  • metric_name (str) – The metric name for duration recording and logging

  • timeout (Timeout) – Timeout value to override the default or the value specified when creating the PostgresConnector.

Raises
Return type

QueryResult

transaction()[source]

asynchronous context-manager function that implements full BEGIN, COMMIT, and ROLLBACK semantics. If there is a psycopg2.Error raised during the transaction, the entire transaction will be rolled back.

If no exception is raised, the transaction will be committed when exiting the context manager.

Note

This method is provided for edge case usage. As a generalization sprockets_postgres.RequestHandlerMixin.postgres_transaction() should be used instead.

Usage Example

class RequestHandler(sprockets_postgres.RequestHandlerMixin,
                     web.RequestHandler):

    async def post(self):
        async with self.postgres_transaction() as transaction:
            result1 = await transaction.execute(QUERY_ONE)
            result2 = await transaction.execute(QUERY_TWO)
            result3 = await transaction.execute(QUERY_THREE)
Raises
  • asyncio.TimeoutError – when there is a query or network timeout when starting the transaction

  • psycopg2.Error – when there is an exception raised by Postgres when starting the transaction

Return type

typing.AbstractAsyncContextManager[sprockets_postgres.PostgresConnector]

Type Annotations

sprockets_postgres.QueryParameters = typing.Union[dict, list, tuple, NoneType]

Type annotation for query parameters

sprockets_postgres.Timeout = typing.Union[int, float, NoneType]

Type annotation for timeout values

Exceptions

class sprockets_postgres.ConnectionException[source]

Raised when the connection to Postgres can not be established

Example Web Application

The following code provides a simple example for using the

import sprockets.http
import sprockets_postgres as postgres
from sprockets.http import app


class RequestHandler(postgres.RequestHandlerMixin, web.RequestHandler):

    GET_SQL = """\
    SELECT foo_id, bar, baz, qux
      FROM public.foo
     WHERE foo_id = %(foo_id)s;"""

    async def get(self, foo_id: str) -> None:
        result = await self.postgres_execute(self.GET_SQL, {'foo_id': foo_id})
        await self.finish(result.row)


class Application(postgres.ApplicationMixin, app.Application):
    """
    The :class:`sprockets_postgres.ApplicationMixin` provides the foundation
    for the :class:`sprockets_postgres.RequestHandlerMixin` to properly function
    and will automatically setup the pool to connect to PostgreSQL and will
    shutdown the connections cleanly when the application stops.

    It should be used in conjunction with :class:`sprockets.http.app.Application`
    and not directly with :class:`tornado.web.Application`.

    """


def make_app(**settings):
    return Application([
       web.url(r'/foo/(?P<foo_id>.*)', FooRequestHandler)
    ], **settings)


if __name__ == '__main__':
    sprockets.http.run(make_app)

Index