Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • gmr/queries
1 result
Show changes
Commits on Source (20)
Showing with 472 additions and 263 deletions
sudo: false
language: python
python:
- 2.6
- 2.7
- pypy
- 3.3
- 3.4
- 3.5
services:
- postgresql
- 2.7
- 3.4
- 3.5
- 3.6
- pypy
- pypy3
env:
global:
- PATH=$HOME/.local/bin:$PATH
- AWS_DEFAULT_REGION=us-east-1
- secure: "inURdx4ldkJqQXL1TyvKImC3EnL5TixC1DlNMBYi5ttygwAk+mSSSw8Yc7klB6D1m6q79xUlHRk06vbz23CsXTM4AClC5Emrk6XN2GlUKl5WI+z+A2skI59buEhLWe7e2KzhB/AVx2E3TfKa0oY7raM0UUnaOkpV1Cj+mHKPIT0="
- secure: "H32DV3713a6UUuEJujrG7SfUX4/5WrwQy/3DxeptC6L7YPlTYxHBdEsccTfN5z806EheIl4BdIoxoDtq7PU/tWQoG1Lp2ze60mpwrniHajhFnjk7zP6pHvkhGLr8flhSmAb6CQBreNFOHTLWBMGPfi7k1Q9Td9MHbRo/FsTxqsM="
install:
- pip install -r test-requirements.txt
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi
- python setup.py develop
- pip install awscli
- pip install -r requires/testing.txt
- python setup.py develop
stages:
- test
- name: upload_coverage
if: branch = master
- name: deploy
if: tag IS present
script: nosetests
after_success:
- codecov
deploy:
distributions: sdist
provider: pypi
on:
python: 2.7
tags: true
all_branches: true
user: crad
password:
secure: UWQWui+QhAL1cz6oW/vqjEEp6/EPn1YOlItNJcWHNOO/WMMOlaTVYVUuXp+y+m52B+8PtYZZCTHwKCUKe97Grh291FLxgd0RJCawA40f4v1gmOFYLNKyZFBGfbC69/amxvGCcDvOPtpChHAlTIeokS5EQneVcAhXg2jXct0HTfI=
- aws s3 cp .coverage "s3://com-gavinroy-travis/queries/$TRAVIS_BUILD_NUMBER/.coverage.${TRAVIS_PYTHON_VERSION}"
jobs:
include:
- stage: test
services:
- postgres
script: nosetests
- stage: upload coverage
python: 3.6
install:
- pip install awscli coverage codecov
script:
- mkdir coverage
- aws s3 cp --recursive s3://com-gavinroy-travis/queries/$TRAVIS_BUILD_NUMBER/
coverage
- cd coverage
- coverage combine
- cd ..
- mv coverage/.coverage .
- coverage report
after_success: codecov
- stage: deploy
python: 3.6
deploy:
distributions: sdist bdist_wheel
provider: pypi
user: crad
on:
tags: true
all_branches: true
password:
secure: UWQWui+QhAL1cz6oW/vqjEEp6/EPn1YOlItNJcWHNOO/WMMOlaTVYVUuXp+y+m52B+8PtYZZCTHwKCUKe97Grh291FLxgd0RJCawA40f4v1gmOFYLNKyZFBGfbC69/amxvGCcDvOPtpChHAlTIeokS5EQneVcAhXg2jXct0HTfI=
Loading
Loading
@@ -12,7 +12,7 @@ both fast and easy. Check out the `Usage`_ section below to see how easy it can
Key features include:
 
- Simplified API
- Support of Python 2.6+ and 3.3+
- Support of Python 2.7+ and 3.4+
- PyPy support via psycopg2cffi_
- Asynchronous support for Tornado_
- Connection information provided by URI
Loading
Loading
#!/bin/sh
# vim: set ts=2 sts=2 sw=2 et:
test -n "$SHELLDEBUG" && set -x
if test -e /var/run/docker.sock
then
DOCKER_IP=127.0.0.1
else
echo "Docker environment not detected."
exit 1
fi
set -e
if test -z "$DOCKER_COMPOSE_PREFIX"
then
CWD=${PWD##*/}
DOCKER_COMPOSE_PREFIX=${CWD/_/}
fi
COMPOSE_ARGS="-p ${DOCKER_COMPOSE_PREFIX}"
test -d build || mkdir build
get_exposed_port() {
docker-compose ${COMPOSE_ARGS} port $1 $2 | cut -d: -f2
}
docker-compose ${COMPOSE_ARGS} down --volumes --remove-orphans
docker-compose ${COMPOSE_ARGS} pull
docker-compose ${COMPOSE_ARGS} up -d --no-recreate
CONTAINER="${DOCKER_COMPOSE_PREFIX}_postgres_1"
PORT=$(get_exposed_port postgres 5432)
echo "Waiting for ${CONTAINER} \c"
export PG
until psql -U postgres -h ${DOCKER_IP} -p ${PORT} -c 'SELECT 1' > /dev/null 2> /dev/null; do
echo ".\c"
sleep 1
done
echo " done"
cat > build/test-environment<<EOF
export DOCKER_COMPOSE_PREFIX=${DOCKER_COMPOSE_PREFIX}
export PGHOST=${DOCKER_IP}
export PGPORT=${PORT}
EOF
%YAML 1.2
---
postgres:
image: postgres
ports:
- 5432
Loading
Loading
@@ -29,14 +29,6 @@ add a widget, call PUT on /widget, to update a widget call POST on /widget/[SKU]
"""
self.session = queries.TornadoSession()
 
@gen.coroutine
def prepare(self):
try:
yield self.session.validate()
except queries.OperationalError as error:
logging.error('Error connecting to the database: %s', error)
raise web.HTTPError(503)
def options(self, *args, **kwargs):
"""Let the caller know what methods are supported
 
Loading
Loading
Loading
Loading
@@ -15,13 +15,6 @@ request and will wait until all queries are complete before progressing:
self.session = queries.TornadoSession()
 
@gen.coroutine
def prepare(self):
try:
yield self.session.validate()
except queries.OperationalError as error:
logging.error('Error connecting to the database: %s', error)
raise web.HTTPError(503)
@gen.coroutine
def get(self, *args, **kwargs):
 
Loading
Loading
Version History
===============
- 1.10.4 2018-01-10
- Implement ``Results.__bool__`` to be explicit about Python 3 support.
- Catch any exception raised when using TornadoSession and invoking the execute function in psycopg2 for exceptions raised prior to sending the query to Postgres.
This could be psycopg2.Error, IndexError, KeyError, or who knows, it's not documented in psycopg2.
- 1.10.3 2017-11-01
- Remove the functionality from ``TornadoSession.validate`` and make it raise a ``DeprecationWarning``
- Catch the ``KeyError`` raised when ``PoolManager.clean()`` is invoked for a pool that doesn't exist
- 1.10.2 2017-10-26
- Ensure the pool exists when executing a query in TornadoSession, the new timeout behavior prevented that from happening.
- 1.10.1 2017-10-24
- Use an absolute time in the call to ``add_timeout``
- 1.10.0 2017-09-27
- Free when tornado_session.Result is ``__del__'d`` without ``free`` being called.
- Auto-clean the pool after Results.free TTL+1 in tornado_session.TornadoSession
- Dont raise NotImplementedError in Results.free for synchronous use, just treat as a noop
- 1.9.1 2016-10-25
- Add better exception handling around connections and getting the logged in user
- 1.9.0 2016-07-01
- Handle a potential race condition in TornadoSession when too many simultaneous new connections are made and a pool fills up
- Increase logging in various places to be more informative
- Restructure queries specific exceptions to all extend off of a base QueriesException
- Trivial code cleanup
- 1.8.10 2016-06-14
- Propagate PoolManager exceptions from TornadoSession (#20) - Fix by Dave Shawley
- 1.8.9 2015-11-11
- Move to psycopg2cffi for PyPy support
- 1.7.5 2015-09-03
- Don't let Session and TornadoSession share connections
- 1.7.1 2015-03-25
- Fix TornadoSession's use of cleanup (#8) - Fix by Oren Itamar
- 1.7.0 2015-01-13
- Implement :py:meth:`Pool.shutdown <queries.pool.Pool.shutdown>` and :py:meth:`PoolManager.shutdown <queries.pool.PoolManager.shutdown>` to
cleanly shutdown all open, non-executing connections across a Pool or all pools. Update locks in Pool operations to ensure atomicity.
- 1.6.1 2015-01-09
- Fixes an iteration error when closing a pool (#7) - Fix by Chris McGuire
- 1.6.0 2014-11-20
- Handle URI encoded password values properly
- 1.5.0 2014-10-07
- Handle empty query results in the iterator (#4) - Fix by Den Teresh
- 1.4.0 2014-09-04
- Address exception handling in tornado_session
2.0.0 2018-01-23
-----------------
- REMOVED support for Python 2.6
- FIXED CPU Pegging bug: Cleanup IOLoop and internal stack in ``TornadoSession``
on connection error. In the case of a connection error, the failure to do this
caused CPU to peg @ 100% utilization looping on a non-existent file descriptor.
Thanks to `cknave <https://github.com/cknave>`_ for his work on identifying the
issue, proposing a fix, and writing a working test case.
- Move the integration tests to use a local docker development environment
- Added new methods ``queries.pool.Pool.report`` and
``queries.pool.PoolManager.Report` for reporting pool status.
- Added new methods to ``queries.pool.Pool`` for returning a list of
busy, closed, executing, and locked connections.
1.10.4 2018-01-10
-----------------
- Implement ``Results.__bool__`` to be explicit about Python 3 support.
- Catch any exception raised when using TornadoSession and invoking the execute function in psycopg2 for exceptions raised prior to sending the query to Postgres.
This could be psycopg2.Error, IndexError, KeyError, or who knows, it's not documented in psycopg2.
1.10.3 2017-11-01
-----------------
- Remove the functionality from ``TornadoSession.validate`` and make it raise a ``DeprecationWarning``
- Catch the ``KeyError`` raised when ``PoolManager.clean()`` is invoked for a pool that doesn't exist
1.10.2 2017-10-26
-----------------
- Ensure the pool exists when executing a query in TornadoSession, the new timeout behavior prevented that from happening.
1.10.1 2017-10-24
-----------------
- Use an absolute time in the call to ``add_timeout``
1.10.0 2017-09-27
-----------------
- Free when tornado_session.Result is ``__del__'d`` without ``free`` being called.
- Auto-clean the pool after Results.free TTL+1 in tornado_session.TornadoSession
- Dont raise NotImplementedError in Results.free for synchronous use, just treat as a noop
1.9.1 2016-10-25
----------------
- Add better exception handling around connections and getting the logged in user
1.9.0 2016-07-01
----------------
- Handle a potential race condition in TornadoSession when too many simultaneous new connections are made and a pool fills up
- Increase logging in various places to be more informative
- Restructure queries specific exceptions to all extend off of a base QueriesException
- Trivial code cleanup
1.8.10 2016-06-14
-----------------
- Propagate PoolManager exceptions from TornadoSession (#20) - Fix by Dave Shawley
1.8.9 2015-11-11
----------------
- Move to psycopg2cffi for PyPy support
1.7.5 2015-09-03
----------------
- Don't let Session and TornadoSession share connections
1.7.1 2015-03-25
----------------
- Fix TornadoSession's use of cleanup (#8) - Fix by Oren Itamar
1.7.0 2015-01-13
----------------
- Implement :py:meth:`Pool.shutdown <queries.pool.Pool.shutdown>` and :py:meth:`PoolManager.shutdown <queries.pool.PoolManager.shutdown>` to
cleanly shutdown all open, non-executing connections across a Pool or all pools. Update locks in Pool operations to ensure atomicity.
1.6.1 2015-01-09
----------------
- Fixes an iteration error when closing a pool (#7) - Fix by Chris McGuire
1.6.0 2014-11-20
-----------------
- Handle URI encoded password values properly
1.5.0 2014-10-07
----------------
- Handle empty query results in the iterator (#4) - Fix by Den Teresh
1.4.0 2014-09-04
----------------
- Address exception handling in tornado_session
Loading
Loading
@@ -14,7 +14,7 @@ both fast and easy.
*Key features include*:
 
- Simplified API
- Support of Python 2.6+ and 3.3+
- Support of Python 2.7+ and 3.4+
- PyPy support via psycopg2cffi_
- Asynchronous support for Tornado_
- Connection information provided by URI
Loading
Loading
@@ -79,9 +79,6 @@ Indices and tables
.. |Version| image:: https://img.shields.io/pypi/v/queries.svg?
:target: https://pypi.python.org/pypi/queries
 
.. |Downloads| image:: https://img.shields.io/pypi/dm/queries.svg?
:target: https://pypi.python.org/pypi/queries
.. |License| image:: https://img.shields.io/github/license/gmr/queries.svg?
:target: https://github.com/gmr/queries
 
Loading
Loading
Loading
Loading
@@ -8,14 +8,6 @@ class ExampleHandler(web.RequestHandler):
def initialize(self):
self.session = queries.TornadoSession()
 
@gen.coroutine
def prepare(self):
try:
yield self.session.validate()
except queries.OperationalError as error:
logging.error('Error connecting to the database: %s', error)
raise web.HTTPError(503)
@gen.coroutine
def get(self):
try:
Loading
Loading
@@ -29,10 +21,10 @@ class ExampleHandler(web.RequestHandler):
 
 
application = web.Application([
(r"/", ExampleHandler),
(r'/', ExampleHandler),
])
 
if __name__ == "__main__":
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
application.listen(8888)
ioloop.IOLoop.instance().start()
Loading
Loading
@@ -8,67 +8,27 @@ The core `queries.Queries` class will automatically register support for UUIDs,
Unicode and Unicode arrays.
 
"""
__version__ = '1.10.4'
version = __version__
import logging
import platform
# Import PyPy compatibility
PYPY = False
target = platform.python_implementation()
if target == 'PyPy': # pragma: no cover
import psycopg2cffi.compat
psycopg2cffi.compat.register()
PYPY = True
import sys
 
# Add a Null logging handler to prevent logging output when un-configured
try:
from logging import NullHandler
except ImportError: # pragma: no cover
class NullHandler(logging.Handler):
"""Python 2.6 does not have a NullHandler"""
def emit(self, record):
"""Emit a record
:param record record: The record to emit
import psycopg2cffi
import psycopg2cffi.extras
import psycopg2cffi.extensions
except ImportError:
pass
else:
sys.modules['psycopg2'] = psycopg2cffi
sys.modules['psycopg2.extras'] = psycopg2cffi.extras
sys.modules['psycopg2.extensions'] = psycopg2cffi.extensions
 
"""
pass
logging.getLogger('queries').addHandler(NullHandler())
# Defaults
DEFAULT_URI = 'postgresql://localhost:5432'
# Mappings to queries classes and methods
from queries.results import Results
from queries.session import Session
try:
from queries.tornado_session import TornadoSession
except ImportError: # pragma: no cover
except ImportError: # pragma: nocover
TornadoSession = None
def uri(host='localhost', port=5432, dbname='postgres', user='postgres',
password=None):
"""Return a PostgreSQL connection URI for the specified values.
:param str host: Host to connect to
:param int port: Port to connect on
:param str dbname: The database name
:param str user: User to connect as
:param str password: The password to use, None for no password
:return str: The PostgreSQL connection URI
"""
if port:
host = '%s:%s' % (host, port)
if password:
return 'postgresql://%s:%s@%s/%s' % (user, password, host, dbname)
return 'postgresql://%s@%s/%s' % (user, host, dbname)
from queries.utils import uri
 
# For ease of access to different cursor types
from psycopg2.extras import DictCursor
Loading
Loading
@@ -90,3 +50,9 @@ from psycopg2 import OperationalError
from psycopg2 import ProgrammingError
from psycopg2.extensions import QueryCanceledError
from psycopg2.extensions import TransactionRollbackError
__version__ = '2.0.0'
version = __version__
# Add a Null logging handler to prevent logging output when un-configured
logging.getLogger('queries').addHandler(logging.NullHandler())
Loading
Loading
@@ -2,6 +2,7 @@
Connection Pooling
 
"""
import datetime
import logging
import os
import threading
Loading
Loading
@@ -26,6 +27,8 @@ class Connection(object):
def __init__(self, handle):
self.handle = handle
self.used_by = None
self.executions = 0
self.exceptions = 0
 
def close(self):
"""Close the connection
Loading
Loading
@@ -171,6 +174,15 @@ class Pool(object):
self.connections[id(connection)] = Connection(connection)
LOGGER.debug('Pool %s added connection %s', self.id, id(connection))
 
@property
def busy_connections(self):
"""Return a list of active/busy connections
:rtype: list
"""
return [c for c in self.connections if c.busy and not c.closed]
def clean(self):
"""Clean the pool by removing any closed connections and if the pool's
idle has exceeded its idle TTL, remove all connections.
Loading
Loading
@@ -193,6 +205,34 @@ class Pool(object):
self.remove(self.connections[cid].handle)
LOGGER.debug('Pool %s closed', self.id)
 
@property
def closed_connections(self):
"""Return a list of closed connections
:rtype: list
"""
return [c for c in self.connections.values() if c.closed]
def connection_handle(self, connection):
"""Return a connection object for the given psycopg2 connection
:param connection: The connection to return a parent for
:type connection: psycopg2.extensions.connection
:rtype: Connection
"""
return self.connections[id(connection)]
@property
def executing_connections(self):
"""Return a list of connections actively executing queries
:rtype: list
"""
return [c for c in self.connections if c.executing]
def free(self, connection):
"""Free the connection from use by the session that was using it.
 
Loading
Loading
@@ -203,7 +243,7 @@ class Pool(object):
"""
LOGGER.debug('Pool %s freeing connection %s', self.id, id(connection))
try:
self._connection(connection).free()
self.connection_handle(connection).free()
except KeyError:
raise ConnectionNotFoundError(self.id, id(connection))
 
Loading
Loading
@@ -246,9 +286,8 @@ class Pool(object):
:rtype: list
 
"""
return [self.connections[k] for k in self.connections if
not self.connections[k].busy and
not self.connections[k].closed]
return [c for c in self.connections.values()
if not c.busy and not c.closed]
 
@property
def idle_duration(self):
Loading
Loading
@@ -281,7 +320,7 @@ class Pool(object):
"""
cid = id(connection)
try:
self._connection(connection).lock(session)
self.connection_handle(connection).lock(session)
except KeyError:
raise ConnectionNotFoundError(self.id, cid)
else:
Loading
Loading
@@ -290,6 +329,15 @@ class Pool(object):
self.idle_start = None
LOGGER.debug('Pool %s locked connection %s', self.id, cid)
 
@property
def locked_connections(self):
"""Return a list of all locked connections
:rtype: list
"""
return [c for c in self.connections.values() if c.locked]
def remove(self, connection):
"""Remove the connection from the pool
 
Loading
Loading
@@ -302,11 +350,38 @@ class Pool(object):
cid = id(connection)
if cid not in self.connections:
raise ConnectionNotFoundError(self.id, cid)
self._connection(connection).close()
self.connection_handle(connection).close()
with self._lock:
del self.connections[cid]
LOGGER.debug('Pool %s removed connection %s', self.id, cid)
 
def report(self):
"""Return a report about the pool state and configuration.
:rtype: dict
"""
return {
'connections': {
'busy': len(self.busy_connections),
'closed': len(self.closed_connections),
'executing': len([c for c in self.connections
if c.executing()]),
'idle': len(self.idle_connections),
'locked': len(self.busy_connections)
},
'exceptions': sum([c.exceptions
for c in self.connections.values()]),
'executions': sum([c.executed
for c in self.connections.values()]),
'full': self.is_full,
'idle': {
'duration': self.idle_duration,
'ttl': self.idle_ttl
},
'max_size': self.max_size
}
def shutdown(self):
"""Forcefully shutdown the entire pool, closing all non-executing
connections.
Loading
Loading
@@ -341,16 +416,6 @@ class Pool(object):
with self._lock:
self.max_size = size
 
def _connection(self, connection):
"""Return a connection object for the given psycopg2 connection
:param connection: The connection to return a parent for
:type connection: psycopg2.extensions.connection
:rtype: Connection
"""
return self.connections[id(connection)]
 
class PoolManager(object):
"""The connection pool object implements behavior around connections and
Loading
Loading
@@ -441,6 +506,20 @@ class PoolManager(object):
LOGGER.debug("Creating Pool: %s (%i/%i)", pid, idle_ttl, max_size)
cls._pools[pid] = Pool(pid, idle_ttl, max_size, time_method)
 
@classmethod
def free(cls, pid, connection):
"""Free a connection that was locked by a session
:param str pid: The pool ID
:param connection: The connection to remove
:type connection: psycopg2.extensions.connection
"""
with cls._lock:
LOGGER.debug('Freeing %s from pool %s', id(connection), pid)
cls._ensure_pool_exists(pid)
cls._pools[pid].free(connection)
@classmethod
def get(cls, pid, session):
"""Get an idle, unused connection from the pool. Once a connection has
Loading
Loading
@@ -456,18 +535,18 @@ class PoolManager(object):
return cls._pools[pid].get(session)
 
@classmethod
def free(cls, pid, connection):
"""Free a connection that was locked by a session
def get_connection(cls, pid, connection):
"""Return the specified :class:`~queries.pool.Connection` from the
pool.
 
:param str pid: The pool ID
:param connection: The connection to remove
:param connection: The connection to return for
:type connection: psycopg2.extensions.connection
:rtype: queries.pool.Connection
 
"""
with cls._lock:
LOGGER.debug('Freeing %s from pool %s', id(connection), pid)
cls._ensure_pool_exists(pid)
cls._pools[pid].free(connection)
return cls._pools[pid].connection_handle(connection)
 
@classmethod
def has_connection(cls, pid, connection):
Loading
Loading
@@ -589,6 +668,19 @@ class PoolManager(object):
cls._ensure_pool_exists(pid)
return len(cls._pools[pid])
 
@classmethod
def report(cls):
"""Return the state of the all of the registered pools.
:rtype: dict
"""
return {
'timestamp': datetime.datetime.utcnow().isoformat(),
'process': os.getpid(),
'pools': dict([(i, p.report()) for i, p in cls._pools.items()])
}
@classmethod
def _ensure_pool_exists(cls, pid):
"""Raise an exception if the pool has yet to be created or has been
Loading
Loading
Loading
Loading
@@ -27,14 +27,14 @@ import hashlib
import logging
 
import psycopg2
from psycopg2 import extensions
from psycopg2 import extras
from psycopg2 import extensions, extras
 
from queries import pool, results, utils, DEFAULT_URI, PYPY
from queries import pool, results, utils
 
LOGGER = logging.getLogger(__name__)
 
DEFAULT_ENCODING = 'UTF8'
DEFAULT_URI = 'postgresql://localhost:5432'
 
 
class Session(object):
Loading
Loading
@@ -119,7 +119,13 @@ class Session(object):
:raises: queries.ProgrammingError
 
"""
self._cursor.callproc(name, args)
try:
self._cursor.callproc(name, args)
except psycopg2.Error as err:
self._incr_exceptions()
raise err
finally:
self._incr_executions()
return results.Results(self._cursor)
 
def close(self):
Loading
Loading
@@ -203,7 +209,13 @@ class Session(object):
:raises: queries.ProgrammingError
 
"""
self._cursor.execute(sql, parameters)
try:
self._cursor.execute(sql, parameters)
except psycopg2.Error as err:
self._incr_exceptions()
raise err
finally:
self._incr_executions()
return results.Results(self._cursor)
 
def set_encoding(self, value=DEFAULT_ENCODING):
Loading
Loading
@@ -285,7 +297,7 @@ class Session(object):
# Added in because psycopg2ct connects and leaves the connection in
# a weird state: consts.STATUS_DATESTYLE, returning from
# Connection._setup without setting the state as const.STATUS_OK
if PYPY:
if utils.PYPY:
connection.reset()
 
# Register the custom data types
Loading
Loading
@@ -311,6 +323,14 @@ class Session(object):
cursor.withhold = True
return cursor
 
def _incr_exceptions(self):
"""Increment the number of exceptions for the current connection."""
self._pool_manager.get_connection(self.pid, self._conn).exceptions += 1
def _incr_executions(self):
"""Increment the number of executions for the current connection."""
self._pool_manager.get_connection(self.pid, self._conn).executions += 1
def _psycopg2_connect(self, kwargs):
"""Return a psycopg2 connection for the specified kwargs. Extend for
use in async session adapters.
Loading
Loading
Loading
Loading
@@ -26,16 +26,11 @@ import logging
import socket
import warnings
 
import psycopg2
from psycopg2 import extras, extensions
from tornado import concurrent, ioloop
from psycopg2 import extras, extensions
import psycopg2
 
from queries import pool
from queries import results
from queries import session
from queries import utils
from queries import DEFAULT_URI
from queries import PYPY
from queries import pool, results, session, utils
 
LOGGER = logging.getLogger(__name__)
 
Loading
Loading
@@ -141,7 +136,7 @@ class TornadoSession(session.Session):
:param int pool_max_size: The maximum size of the pool to use
 
"""
def __init__(self, uri=DEFAULT_URI,
def __init__(self, uri=session.DEFAULT_URI,
cursor_factory=extras.RealDictCursor,
pool_idle_ttl=pool.DEFAULT_IDLE_TTL,
pool_max_size=DEFAULT_MAX_POOL_SIZE,
Loading
Loading
@@ -254,7 +249,7 @@ class TornadoSession(session.Session):
:raises: pool.NoIdleConnectionsError
 
"""
future = concurrent.TracebackFuture()
future = concurrent.Future()
 
# Attempt to get a cached connection from the connection pool
try:
Loading
Loading
@@ -300,6 +295,7 @@ class TornadoSession(session.Session):
 
"""
if cf.exception():
self._cleanup_fd(fd)
future.set_exception(cf.exception())
 
else:
Loading
Loading
@@ -308,18 +304,19 @@ class TornadoSession(session.Session):
# Add the connection to the pool
LOGGER.debug('Connection established for %s', self.pid)
self._pool_manager.add(self.pid, connection)
except (ValueError, pool.PoolException) as error:
except (ValueError, pool.PoolException) as err:
LOGGER.exception('Failed to add %r to the pool', self.pid)
future.set_exception(error)
self._cleanup_fd(fd)
future.set_exception(err)
return
 
self._pool_manager.lock(self.pid, connection, self)
 
# Added in because psycopg2ct connects and leaves the
# Added in because psycopg2cffi connects and leaves the
# connection in a weird state: consts.STATUS_DATESTYLE,
# returning from Connection._setup without setting the state
# as const.STATUS_OK
if PYPY:
if utils.PYPY:
connection.status = extensions.STATUS_READY
 
# Register the custom data types
Loading
Loading
@@ -330,7 +327,7 @@ class TornadoSession(session.Session):
future.set_result(connection)
 
# Add a future that fires once connected
self._futures[fd] = concurrent.TracebackFuture()
self._futures[fd] = concurrent.Future()
self._ioloop.add_future(self._futures[fd], on_connected)
 
# Add the connection to the IOLoop
Loading
Loading
@@ -360,7 +357,7 @@ class TornadoSession(session.Session):
:raises: queries.ProgrammingError
 
"""
future = concurrent.TracebackFuture()
future = concurrent.Future()
 
def on_connected(cf):
"""Invoked by the future returned by self._connect"""
Loading
Loading
@@ -375,16 +372,16 @@ class TornadoSession(session.Session):
def completed(qf):
"""Invoked by the IOLoop when the future has completed"""
if qf.exception():
error = qf.exception()
LOGGER.debug('Cleaning cursor due to exception: %r', error)
err = qf.exception()
LOGGER.debug('Cleaning cursor due to exception: %r', err)
self._exec_cleanup(cursor, conn.fileno())
future.set_exception(error)
future.set_exception(err)
else:
value = Results(cursor, self._exec_cleanup, conn.fileno())
future.set_result(value)
 
# Setup a callback to wait on the query result
self._futures[conn.fileno()] = concurrent.TracebackFuture()
self._futures[conn.fileno()] = concurrent.Future()
 
# Add the future to the IOLoop
self._ioloop.add_future(self._futures[conn.fileno()],
Loading
Loading
@@ -420,8 +417,7 @@ class TornadoSession(session.Session):
except (psycopg2.Error, psycopg2.Warning) as error:
LOGGER.debug('Error closing the cursor: %s', error)
 
self._pool_manager.free(self.pid, self._connections[fd])
self._ioloop.remove_handler(fd)
self._cleanup_fd(fd)
 
# If the cleanup callback exists, remove it
if self._cleanup_callback:
Loading
Loading
@@ -432,7 +428,20 @@ class TornadoSession(session.Session):
self._ioloop.time() + self._pool_idle_ttl + 1,
self._pool_manager.clean, self.pid)
 
def _cleanup_fd(self, fd):
"""Ensure the socket socket is removed from the IOLoop, the
connection stack, and futures stack.
:param int fd: The fd # to cleanup
"""
self._ioloop.remove_handler(fd)
if fd in self._connections:
try:
self._pool_manager.free(self.pid, self._connections[fd])
except pool.ConnectionNotFoundError:
pass
self._connections[fd].close()
del self._connections[fd]
if fd in self._futures:
del self._futures[fd]
Loading
Loading
Loading
Loading
@@ -3,16 +3,18 @@ Utility functions for access to OS level info and URI parsing
 
"""
import collections
import getpass
import logging
import os
import platform
 
# All systems do not support pwd module
try:
import pwd
except ImportError:
pwd = None
import getpass
 
# Python 2 & 3 compatibility
try:
from urllib import parse as _urlparse
except ImportError:
Loading
Loading
@@ -28,6 +30,8 @@ PARSED = collections.namedtuple('Parsed',
'scheme,netloc,path,params,query,fragment,'
'username,password,hostname,port')
 
PYPY = platform.python_implementation().lower() == 'pypy'
KEYWORDS = ['connect_timeout',
'client_encoding',
'options',
Loading
Loading
@@ -75,6 +79,25 @@ def parse_qs(query_string):
return _urlparse.parse_qs(query_string)
 
 
def uri(host='localhost', port=5432, dbname='postgres', user='postgres',
password=None):
"""Return a PostgreSQL connection URI for the specified values.
:param str host: Host to connect to
:param int port: Port to connect on
:param str dbname: The database name
:param str user: User to connect as
:param str password: The password to use, None for no password
:return str: The PostgreSQL connection URI
"""
if port:
host = '%s:%s' % (host, port)
if password:
return 'postgresql://%s:%s@%s/%s' % (user, password, host, dbname)
return 'postgresql://%s@%s/%s' % (user, host, dbname)
def uri_to_kwargs(uri):
"""Return a URI as kwargs for connecting to PostgreSQL with psycopg2,
applying default values for non-specified areas of the URI.
Loading
Loading
@@ -113,13 +136,7 @@ def urlparse(url):
"""
value = 'http%s' % url[5:] if url[:5] == 'postgresql' else url
parsed = _urlparse.urlparse(value)
# Python 2.6 hack
if not parsed.query and '?' in parsed.path:
path, query = parsed.path.split('?')
else:
path, query = parsed.path, parsed.query
path, query = parsed.path, parsed.query
hostname = parsed.hostname if parsed.hostname else ''
return PARSED(parsed.scheme.replace('http', 'postgresql'),
parsed.netloc,
Loading
Loading
File moved
Loading
Loading
@@ -7,6 +7,9 @@ cover-erase = 1
cover-html = 1
cover-html-dir = build/coverage
cover-package = queries
cover-tests = 1
logging-level = DEBUG
stop = 1
verbosity = 2
with-coverage = 1
detailed-errors = 1
Loading
Loading
@@ -2,7 +2,7 @@ from setuptools import setup
import os
import platform
 
# Include the proper requirements
# PYPY vs cpython
if platform.python_implementation() == 'PyPy':
install_requires = ['psycopg2cffi>=2.7.2,<3']
else:
Loading
Loading
@@ -17,7 +17,6 @@ classifiers = ['Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: BSD License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
Loading
Loading
@@ -28,14 +27,14 @@ classifiers = ['Development Status :: 5 - Production/Stable',
'Topic :: Software Development :: Libraries']
 
setup(name='queries',
version='1.10.4',
version='2.0.0',
description="Simplified PostgreSQL client built upon Psycopg2",
maintainer="Gavin M. Roy",
maintainer_email="gavinmroy@gmail.com",
url="https://github.com/gmr/queries",
install_requires=install_requires,
extras_require={'tornado': 'tornado'},
license=open('LICENSE').read(),
license='BSD',
package_data={'': ['LICENSE', 'README.rst']},
packages=['queries'],
classifiers=classifiers,
Loading
Loading
"""
Tests for functionality in the __init__.py module
"""
import platform
try:
import unittest2 as unittest
except ImportError:
import unittest
import queries
class PYPYDetectionTests(unittest.TestCase):
def test_pypy_flag(self):
"""PYPY flag is set properly"""
self.assertEqual(queries.PYPY,
platform.python_implementation() == 'PyPy')
class URICreationTests(unittest.TestCase):
def test_uri_with_password(self):
expectation = 'postgresql://foo:bar@baz:5433/qux'
self.assertEqual(queries.uri('baz', 5433, 'qux', 'foo', 'bar'),
expectation)
def test_uri_without_password(self):
expectation = 'postgresql://foo@baz:5433/qux'
self.assertEqual(queries.uri('baz', 5433, 'qux', 'foo'),
expectation)
def test_default_uri(self):
expectation = 'postgresql://postgres@localhost:5432/postgres'
self.assertEqual(queries.uri(), expectation)
import datetime
try:
import unittest2 as unittest
except ImportError:
import unittest
import os
import unittest
from tornado import gen, testing
 
import queries
from tornado import testing
 
 
class SessionIntegrationTests(unittest.TestCase):
class URIMixin(object):
@property
def pg_uri(self):
return queries.uri(os.getenv('PGHOST', 'localhost'),
int(os.getenv('PGPORT', '5432')), 'postgres')
class SessionIntegrationTests(URIMixin, unittest.TestCase):
 
def setUp(self):
uri = queries.uri('localhost', 5432, 'postgres', 'postgres')
try:
self.session = queries.Session(uri, pool_max_size=10)
self.session = queries.Session(self.pg_uri, pool_max_size=10)
except queries.OperationalError as error:
raise unittest.SkipTest(str(error).split('\n')[0])
 
def tearDown(self):
self.session.close()
def test_query_returns_results_object(self):
self.assertIsInstance(self.session.query('SELECT 1 AS value'),
queries.Results)
Loading
Loading
@@ -40,26 +49,28 @@ class SessionIntegrationTests(unittest.TestCase):
self.assertEqual(6 % 4, result[0]['mod'])
 
 
class TornadoSessionIntegrationTests(testing.AsyncTestCase):
class TornadoSessionIntegrationTests(URIMixin, testing.AsyncTestCase):
 
def setUp(self):
super(TornadoSessionIntegrationTests, self).setUp()
self.session = queries.TornadoSession(queries.uri('localhost',
5432,
'postgres',
'postgres'),
self.session = queries.TornadoSession(self.pg_uri,
pool_max_size=10,
io_loop=self.io_loop)
 
@testing.gen_test
def test_query_returns_results_object(self):
@gen.coroutine
def assertPostgresConnected(self):
try:
result = yield self.session.query('SELECT 1 AS value')
except queries.OperationalError:
raise unittest.SkipTest('PostgreSQL is not running')
self.assertIsInstance(result, queries.Results)
self.assertEqual(len(result), 1)
result.free()
 
@testing.gen_test
def test_successful_connection_and_query(self):
yield self.assertPostgresConnected()
@testing.gen_test
def test_query_result_value(self):
try:
Loading
Loading
@@ -72,7 +83,7 @@ class TornadoSessionIntegrationTests(testing.AsyncTestCase):
@testing.gen_test
def test_query_multirow_result_has_at_least_three_rows(self):
try:
result = yield self.session.query('SELECT * FROM pg_stat_database')
result = yield self.session.query('SELECT * FROM pg_class')
except queries.OperationalError:
raise unittest.SkipTest('PostgreSQL is not running')
self.assertGreaterEqual(result.count(), 3)
Loading
Loading
@@ -96,3 +107,26 @@ class TornadoSessionIntegrationTests(testing.AsyncTestCase):
raise unittest.SkipTest('PostgreSQL is not running')
self.assertEqual(6 % 4, result[0]['mod'])
result.free()
@testing.gen_test
def test_polling_stops_after_connection_error(self):
# Abort the test right away if postgres isn't running.
yield self.assertPostgresConnected()
# Use an invalid user to force an OperationalError during connection
bad_uri = queries.uri(os.getenv('PGHOST', 'localhost'),
int(os.getenv('PGPORT', '5432')), 'invalid')
session = queries.TornadoSession(bad_uri)
self.count = 0
real_poll_connection = session._poll_connection
def count_polls(*args, **kwargs):
self.count += 1
real_poll_connection(*args, **kwargs)
session._poll_connection = count_polls
with self.assertRaises(queries.OperationalError):
yield session.query('SELECT 1')
yield gen.sleep(0.05)
self.assertLess(self.count, 20)
Loading
Loading
@@ -3,10 +3,7 @@ Tests for Connection class in the pool module
 
"""
import mock
try:
import unittest2 as unittest
except ImportError:
import unittest
import unittest
import weakref
 
from queries import pool
Loading
Loading