Sprockets Postgres¶
An set of mixins and classes for interacting with PostgreSQL using asyncio
in Tornado
/
sprockets.http
applications using
aiopg
.
Python versions supported: 3.7+
Installation¶
sprockets-postgres
is available on the Python package index and is installable via pip:
pip3 install sprockets-postgres
Documentation¶
Configuration¶
Configuration of sprockets-postgres
is done by
using of environment variables or tornado.web.Application.settings
dictionary. The sprockets_postgres.ApplicationMixin
will use configuration
as applied to the settings dictionary, falling back to the environment variable
if the value is not set in the dictionary. Keys in the settings dictionary are
lowercase, and if provided as environment variables, are uppercase. For example
to set the Postgres URL in a tornado.web.Application.settings
,
you’d do the following:
settings = {'postgres_url': 'postgresql://postgres@localhost:5432/postgres'}
app = web.Application([routes], settings=settings)
and as an environment variable:
POSTGRES_URL=postgresql://postgres@localhost:5432/postgres
Available Settings¶
The following table details the available configuration options:
Variable |
Definition |
Type |
Default |
---|---|---|---|
|
The PostgreSQL URL to connect to |
str |
|
|
Maximum connection count to Postgres per backend |
int |
|
|
Minimum or starting pool size. |
int |
|
|
The maximum time in seconds to spend attempting to create a new connection. |
int |
|
|
Time-to-life in seconds for a pooled connection. |
int |
|
|
Maximum execution time for a query in seconds. |
int |
|
|
Enable HSTORE support in the client. |
bool |
|
|
Enable JSON support in the client. |
bool |
|
|
Enable UUID support in the client. |
bool |
|
If postgres_url
uses a scheme of postgresql+srv
, a SRV DNS lookup will be
performed and the lowest priority record with the highest weight will be selected
for connecting to Postgres.
AWS’s ECS service discovery does not follow the SRV standard, but creates SRV
records. If postgres_url
uses a scheme of aws+srv
, a SRV DNS lookup will be
performed and the URL will be constructed containing all host and port combinations
in priority and weighted order, utilizing libpq’s supoprt
for multiple hosts in a URL.
Application Mixin¶
-
class
sprockets_postgres.
ApplicationMixin
(*args, **kwargs)[source]¶ sprockets.http.app.Application
mixin for handling the connection to Postgres and exporting functions for querying the database, getting the status, and proving a cursor.Automatically creates and shuts down
aiopg.Pool
on startup and shutdown by installing on_start and shutdown callbacks into theApplication
instance.-
postgres_connector
(on_error=None, on_duration=None, timeout=None, _attempt=1)[source]¶ Asynchronous context-manager that returns a
PostgresConnector
instance from the connection pool with a cursor.Note
This function is designed to work in conjunction with the
RequestHandlerMixin
and is generally not invoked directly.- Parameters
on_error (
typing.Optional
[typing.Callable
]) – A callback function that is invoked on exception. If an exception is returned from that function, it will raise it.on_duration (
typing.Optional
[typing.Callable
]) – An optional callback function that is invoked after a query has completed to record the duration that encompasses both executing the query and retrieving the returned records, if any.timeout (
Timeout
) – Used to override the default query timeout.
- Raises
asyncio.TimeoutError – when the request to retrieve a connection from the pool times out.
sprockets_postgres.ConnectionException – when the application can not connect to the configured Postgres instance.
psycopg2.Error – when Postgres raises an exception during the creation of the cursor.
- Parameters
_attempt (
int
) –- Return type
typing.AbstractAsyncContextManager
[sprockets_postgres.PostgresConnector
]
-
async
postgres_status
()[source]¶ Invoke from the
/status
RequestHandler to check that there is a Postgres connection handler available and return info about the pool.The
available
item in the dictionary indicates that the application was able to perform aSELECT 1
against the database using aPostgresConnector
instance.The
pool_size
item indicates the current quantity of open connections to Postgres.The
pool_free
item indicates the current number of idle connections available to process queries.Example return value
{ 'available': True, 'pool_size': 10, 'pool_free': 8 }
- Return type
-
RequestHandler Mixin¶
The RequestHandlerMixin
is a Tornado
tornado.web.RequestHandler
mixin that provides easy to use
functionality for interacting with PostgreSQL.
-
class
sprockets_postgres.
RequestHandlerMixin
[source]¶ A RequestHandler mixin class exposing functions for querying the database, recording the duration to either
sprockets-influxdb
orsprockets.mixins.metrics
, and handling exceptions.-
async
postgres_callproc
(name, parameters=None, metric_name='', *, timeout=None)[source]¶ Execute a stored procedure / function
- Parameters
name (
str
) – The stored procedure / function name to callparameters (
QueryParameters
) – Query parameters to pass when callingmetric_name (
str
) – The metric name for duration recording and loggingtimeout (
Timeout
) – Timeout value to override the default or the value specified when creating thePostgresConnector
.
- Raises
asyncio.TimeoutError – when there is a query or network timeout
psycopg2.Error – when there is an exception raised by Postgres
- Return type
-
async
postgres_execute
(sql, parameters=None, metric_name='', *, timeout=None)[source]¶ Execute a query, specifying a name for the query, the SQL statement, and optional positional arguments to pass in with the query.
Parameters may be provided as sequence or mapping and will be bound to variables in the operation. Variables are specified either with positional
%s
or named%({name})s
placeholders.- Parameters
sql (
str
) – The SQL statement to executeparameters (
QueryParameters
) – Query parameters to pass as part of the executionmetric_name (
str
) – The metric name for duration recording and loggingtimeout (
Timeout
) – Timeout value to override the default or the value specified when creating thePostgresConnector
.
- Raises
asyncio.TimeoutError – when there is a query or network timeout
psycopg2.Error – when there is an exception raised by Postgres
- Return type
-
postgres_transaction
(timeout=None)[source]¶ asynchronous context-manager function that implements full
BEGIN
,COMMIT
, andROLLBACK
semantics. If there is apsycopg2.Error
raised during the transaction, the entire transaction will be rolled back.If no exception is raised, the transaction will be committed when exiting the context manager.
Usage Example
class RequestHandler(sprockets_postgres.RequestHandlerMixin, web.RequestHandler): async def post(self): async with self.postgres_transaction() as transaction: result1 = await transaction.execute(QUERY_ONE) result2 = await transaction.execute(QUERY_TWO) result3 = await transaction.execute(QUERY_THREE)
- Parameters
timeout (
Timeout
) – Timeout value to override the default or the value specified when creating thePostgresConnector
.- Raises
asyncio.TimeoutError – when there is a query or network timeout when starting the transaction
psycopg2.Error – when there is an exception raised by Postgres when starting the transaction
- Return type
typing.AbstractAsyncContextManager
[sprockets_postgres.PostgresConnector
]
-
on_postgres_error
(metric_name, exc)[source]¶ Invoked when an error occurs when executing a query
If tornado-problem-details is available,
problemdetails.Problem
will be raised instead oftornado.web.HTTPError
.Override for different error handling behaviors.
Return an exception if you would like for it to be raised, or swallow it here.
- Parameters
- Return type
-
on_postgres_timing
(metric_name, duration)[source]¶ Override for custom metric recording. As a default behavior it will attempt to detect sprockets-influxdb and sprockets.mixins.metrics and record the metrics using them if they are available. If they are not available, it will record the query duration to the DEBUG log.
-
async
The StatusRequestHandler
is a Tornado
tornado.web.RequestHandler
that can be used for application health
monitoring. If the Postgres connection is unavailable, it will report the
API as unavailable and return a 503 status code.
-
class
sprockets_postgres.
StatusRequestHandler
(application, request, **kwargs)[source]¶ A RequestHandler that can be used to expose API health or status
- Parameters
application (
tornado.web.Application
) –request (
tornado.httputil.HTTPServerRequest
) –kwargs (
typing.Any
) –
Query Result¶
A QueryResult
instance is created for every query
and contains the count of rows effected by the query and either the row
as
a dict
or rows
as a list of dict
. For queries that do
not return any data, both row
and rows
will be None
.
-
class
sprockets_postgres.
QueryResult
(row_count, row, rows)[source]¶ Contains the results of the query that was executed.
- Parameters
row_count (
int
) – The quantity of rows impacted by the queryrow (
typing.Optional
[dict
]) – If a single row is returned, the data for that rowrows (
typing.Optional
[typing.List
[dict
]]) – If more than one row is returned, this attribute is set as the list of rows, in order.
-
property
rows
¶ Return the result as a list of one or more rows
- Return type
Postgres Connector¶
A PostgresConnector
instance contains a cursor for
a Postgres connection and methods to execute queries.
-
class
sprockets_postgres.
PostgresConnector
(cursor, on_error=None, on_duration=None, timeout=None)[source]¶ Wraps a
aiopg.Cursor
instance for creating explicit transactions, calling stored procedures, and executing queries.Unless the
transaction()
asynchronous context-manager is used, each call tocallproc()
andexecute()
is an explicit transaction.Note
PostgresConnector
instances are created byApplicationMixin.postgres_connector
and should not be created directly.- Parameters
cursor (aiopg.Cursor) – The cursor to use in the connector
on_error (
typing.Optional
[typing.Callable
]) – The callback to invoke when an exception is caughton_duration (
typing.Optional
[typing.Callable
]) – The callback to invoke when a query is complete and all of the data has been returned.timeout (
Timeout
) – A timeout value in seconds for executing queries. If unspecified, defaults to the configured query timeout of 120 seconds.
-
async
callproc
(name, parameters=None, metric_name='', *, timeout=None)[source]¶ Execute a stored procedure / function
- Parameters
name (
str
) – The stored procedure / function name to callparameters (
QueryParameters
) – Query parameters to pass when callingmetric_name (
str
) – The metric name for duration recording and loggingtimeout (
Timeout
) – Timeout value to override the default or the value specified when creating thePostgresConnector
.
- Raises
asyncio.TimeoutError – when there is a query or network timeout
psycopg2.Error – when there is an exception raised by Postgres
- Return type
-
async
execute
(sql, parameters=None, metric_name='', *, timeout=None)[source]¶ Execute a query, specifying a name for the query, the SQL statement, and optional positional arguments to pass in with the query.
Parameters may be provided as sequence or mapping and will be bound to variables in the operation. Variables are specified either with positional
%s
or named%({name})s
placeholders.- Parameters
sql (
str
) – The SQL statement to executeparameters (
QueryParameters
) – Query parameters to pass as part of the executionmetric_name (
str
) – The metric name for duration recording and loggingtimeout (
Timeout
) – Timeout value to override the default or the value specified when creating thePostgresConnector
.
- Raises
asyncio.TimeoutError – when there is a query or network timeout
psycopg2.Error – when there is an exception raised by Postgres
- Return type
-
transaction
()[source]¶ asynchronous context-manager function that implements full
BEGIN
,COMMIT
, andROLLBACK
semantics. If there is apsycopg2.Error
raised during the transaction, the entire transaction will be rolled back.If no exception is raised, the transaction will be committed when exiting the context manager.
Note
This method is provided for edge case usage. As a generalization
sprockets_postgres.RequestHandlerMixin.postgres_transaction()
should be used instead.Usage Example
class RequestHandler(sprockets_postgres.RequestHandlerMixin, web.RequestHandler): async def post(self): async with self.postgres_transaction() as transaction: result1 = await transaction.execute(QUERY_ONE) result2 = await transaction.execute(QUERY_TWO) result3 = await transaction.execute(QUERY_THREE)
- Raises
asyncio.TimeoutError – when there is a query or network timeout when starting the transaction
psycopg2.Error – when there is an exception raised by Postgres when starting the transaction
- Return type
typing.AbstractAsyncContextManager
[sprockets_postgres.PostgresConnector
]
Type Annotations¶
-
sprockets_postgres.
QueryParameters
= typing.Union[dict, list, tuple, NoneType]¶ Type annotation for query parameters
-
sprockets_postgres.
Timeout
= typing.Union[int, float, NoneType]¶ Type annotation for timeout values
Exceptions¶
Example Web Application¶
The following code provides a simple example for using the
import sprockets.http
import sprockets_postgres as postgres
from sprockets.http import app
class RequestHandler(postgres.RequestHandlerMixin, web.RequestHandler):
GET_SQL = """\
SELECT foo_id, bar, baz, qux
FROM public.foo
WHERE foo_id = %(foo_id)s;"""
async def get(self, foo_id: str) -> None:
result = await self.postgres_execute(self.GET_SQL, {'foo_id': foo_id})
await self.finish(result.row)
class Application(postgres.ApplicationMixin, app.Application):
"""
The :class:`sprockets_postgres.ApplicationMixin` provides the foundation
for the :class:`sprockets_postgres.RequestHandlerMixin` to properly function
and will automatically setup the pool to connect to PostgreSQL and will
shutdown the connections cleanly when the application stops.
It should be used in conjunction with :class:`sprockets.http.app.Application`
and not directly with :class:`tornado.web.Application`.
"""
def make_app(**settings):
return Application([
web.url(r'/foo/(?P<foo_id>.*)', FooRequestHandler)
], **settings)
if __name__ == '__main__':
sprockets.http.run(make_app)