Skip to content
Snippets Groups Projects
Commit ee7f6cfb authored by Gavin M. Roy's avatar Gavin M. Roy Committed by GitHub
Browse files

Merge pull request #25 from gmr/autoclean

Autoclean
parents 8560f4ae 23aec156
No related branches found
No related tags found
No related merge requests found
Version History
===============
- Next Release
- Log a warning when a tornado_session.Result is ``__del__'d`` without ``free`` being called.
- Free when tornado_session.Result is ``__del__'d`` without ``free`` being called.
- Auto-clean the pool after Results.free TTL+1 in tornado_session.TornadoSession
- Dont raise NotImplementedError in Results.free for synchronous use, just treat as a noop
- 1.9.1 2016-10-25
- Add better exception handling around connections and getting the logged in user
- 1.9.0 2016-07-01
Loading
Loading
Loading
Loading
@@ -8,7 +8,7 @@ The core `queries.Queries` class will automatically register support for UUIDs,
Unicode and Unicode arrays.
 
"""
__version__ = '1.9.1'
__version__ = '1.10.0'
version = __version__
 
import logging
Loading
Loading
Loading
Loading
@@ -132,11 +132,13 @@ class Pool(object):
def __init__(self,
pool_id,
idle_ttl=DEFAULT_IDLE_TTL,
max_size=DEFAULT_MAX_SIZE):
max_size=DEFAULT_MAX_SIZE,
time_method=None):
self.connections = {}
self._id = pool_id
self.idle_ttl = idle_ttl
self.max_size = max_size
self.time_method = time_method or time.time
 
def __contains__(self, connection):
"""Return True if the pool contains the connection"""
Loading
Loading
@@ -207,7 +209,7 @@ class Pool(object):
 
if self.idle_connections == list(self.connections.values()):
with self._lock:
self.idle_start = time.time()
self.idle_start = self.time_method()
LOGGER.debug('Pool %s freed connection %s', self.id, id(connection))
 
def get(self, session):
Loading
Loading
@@ -258,7 +260,7 @@ class Pool(object):
"""
if self.idle_start is None:
return 0
return time.time() - self.idle_start
return self.time_method() - self.idle_start
 
@property
def is_full(self):
Loading
Loading
@@ -375,6 +377,8 @@ class PoolManager(object):
"""Only allow a single PoolManager instance to exist, returning the
handle for it.
 
:param callable time_method: Override the default :py:meth`time.time`
method for time calculations. Only applied on first invocation.
:rtype: PoolManager
 
"""
Loading
Loading
@@ -407,13 +411,11 @@ class PoolManager(object):
with cls._lock:
cls._ensure_pool_exists(pid)
cls._pools[pid].clean()
# If the pool has no open connections, remove it
if not len(cls._pools[pid]):
del cls._pools[pid]
cls._maybe_remove_pool(pid)
 
@classmethod
def create(cls, pid, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE):
def create(cls, pid, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE,
time_method=None):
"""Create a new pool, with the ability to pass in values to override
the default idle TTL and the default maximum size.
 
Loading
Loading
@@ -426,6 +428,8 @@ class PoolManager(object):
:param str pid: The pool ID
:param int idle_ttl: Time in seconds for the idle TTL
:param int max_size: The maximum pool size
:param callable time_method: Override the use of :py:meth:`time.time`
method for time values.
:raises: KeyError
 
"""
Loading
Loading
@@ -433,7 +437,7 @@ class PoolManager(object):
raise KeyError('Pool %s already exists' % pid)
with cls._lock:
LOGGER.debug("Creating Pool: %s (%i/%i)", pid, idle_ttl, max_size)
cls._pools[pid] = Pool(pid, idle_ttl, max_size)
cls._pools[pid] = Pool(pid, idle_ttl, max_size, time_method)
 
@classmethod
def get(cls, pid, session):
Loading
Loading
@@ -595,6 +599,16 @@ class PoolManager(object):
if pid not in cls._pools:
raise KeyError('Pool %s has not been created' % pid)
 
@classmethod
def _maybe_remove_pool(cls, pid):
"""If the pool has no open connections, remove it
:param str pid: The pool id to clean
"""
if not len(cls._pools[pid]):
del cls._pools[pid]
 
class QueriesException(Exception):
"""Base Exception for all other Queries exceptions"""
Loading
Loading
Loading
Loading
@@ -92,7 +92,7 @@ class Results(object):
connections.
 
"""
raise NotImplementedError
LOGGER.debug('Invoking synchronous free has no effect')
 
def items(self):
"""Return all of the rows that are in the result set.
Loading
Loading
Loading
Loading
@@ -30,15 +30,10 @@ import psycopg2
from psycopg2 import extensions
from psycopg2 import extras
 
from queries import pool
from queries import results
from queries import utils
from queries import pool, results, utils, DEFAULT_URI, PYPY
 
LOGGER = logging.getLogger(__name__)
 
from queries import DEFAULT_URI
from queries import PYPY
DEFAULT_ENCODING = 'UTF8'
 
 
Loading
Loading
Loading
Loading
@@ -109,7 +109,6 @@ class Results(results.Results):
self._fd = fd
self._freed = False
 
@gen.coroutine
def free(self):
"""Release the results and connection lock from the TornadoSession
object. This **must** be called after you finish processing the results
Loading
Loading
@@ -124,7 +123,8 @@ class Results(results.Results):
 
def __del__(self):
if not self._freed:
LOGGER.warning('%s not freed - %r', self.__class__.__name__, self)
LOGGER.warning('Auto-freeing result on deletion')
self.free()
 
 
class TornadoSession(session.Session):
Loading
Loading
@@ -162,15 +162,18 @@ class TornadoSession(session.Session):
"""
self._connections = dict()
self._futures = dict()
self._cleanup_callback = None
self._pool_idle_ttl = pool_idle_ttl
 
self._cursor_factory = cursor_factory
self._ioloop = io_loop or ioloop.IOLoop.instance()
self._ioloop = io_loop or ioloop.IOLoop.current()
self._pool_manager = pool.PoolManager.instance()
self._uri = uri
 
# Ensure the pool exists in the pool manager
if self.pid not in self._pool_manager:
self._pool_manager.create(self.pid, pool_idle_ttl, pool_max_size)
self._pool_manager.create(self.pid, pool_idle_ttl, pool_max_size,
self._ioloop.time)
 
@property
def connection(self):
Loading
Loading
@@ -438,6 +441,14 @@ class TornadoSession(session.Session):
self._pool_manager.free(self.pid, self._connections[fd])
self._ioloop.remove_handler(fd)
 
# If the cleanup callback exists, remove it
if self._cleanup_callback:
self._ioloop.remove_timeout(self._cleanup_callback)
# Create a new cleanup callback to clean the pool of idle connections
self._cleanup_callback = self._ioloop.add_timeout(
self._pool_idle_ttl + 1, self._pool_manager.clean, self.pid)
if fd in self._connections:
del self._connections[fd]
if fd in self._futures:
Loading
Loading
[flake8]
exclude = .git,build,dist,docs,env
[nosetests]
with-coverage=1
cover-package=queries
cover-branches=1
cover-erase=1
cover-branches = 1
cover-erase = 1
cover-html = 1
cover-html-dir = build/coverage
cover-package = queries
logging-level = DEBUG
verbosity = 2
with-coverage = 1
Loading
Loading
@@ -20,16 +20,15 @@ classifiers = ['Development Status :: 5 - Production/Stable',
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Database',
'Topic :: Software Development :: Libraries']
 
setup(name='queries',
version='1.9.1',
version='1.10.0',
description="Simplified PostgreSQL client built upon Psycopg2",
maintainer="Gavin M. Roy",
maintainer_email="gavinmroy@gmail.com",
Loading
Loading
coverage
flake8
mock
nose
codecov
Loading
Loading
Loading
Loading
@@ -45,7 +45,8 @@ class ResultsTestCase(unittest.TestCase):
self.cursor.rowcount = 0
with mock.patch.object(self.obj, '_rewind') as rewind:
[x for x in self.obj]
assert not rewind.called, '_rewind should not be called on empty result'
assert not rewind.called, \
'_rewind should not be called on empty result'
 
def test_iter_rewinds(self):
self.cursor.__iter__ = mock.Mock(return_value=iter([1, 2, 3]))
Loading
Loading
@@ -102,15 +103,13 @@ class ResultsTestCase(unittest.TestCase):
self.cursor.rowcount = 2
self.assertEqual(self.obj.count(), 2)
 
def test_free_raises_exception(self):
self.assertRaises(NotImplementedError, self.obj.free)
def test_items_returns_on_empty(self):
self.cursor.rowcount = 0
self.cursor.scroll = mock.Mock()
self.cursor.fetchall = mock.Mock()
self.obj.items()
assert not self.cursor.scroll.called, 'Cursor.scroll should not be called on empty result'
assert not self.cursor.scroll.called, \
'Cursor.scroll should not be called on empty result'
 
def test_items_invokes_scroll(self):
self.cursor.scroll = mock.Mock()
Loading
Loading
Loading
Loading
@@ -47,11 +47,11 @@ class SessionInitTests(unittest.TestCase):
def setUp(self):
self.obj = tornado_session.TornadoSession()
 
#def test_creates_empty_callback_dict(self):
# self.assertDictEqual(self.obj._futures, {})
def test_creates_empty_callback_dict(self):
self.assertDictEqual(self.obj._futures, {})
 
# def test_creates_empty_connections_dict(self):
# self.assertDictEqual(self.obj._connections, {})
def test_creates_empty_connections_dict(self):
self.assertDictEqual(self.obj._connections, {})
 
def test_sets_default_cursor_factory(self):
self.assertEqual(self.obj._cursor_factory, extras.RealDictCursor)
Loading
Loading
@@ -105,8 +105,8 @@ class SessionConnectTests(testing.AsyncTestCase):
conn = yield self.obj._connect()
self.obj._pool_manager.add(self.obj.pid, conn)
with mock.patch.object(self.obj._pool_manager, 'get') as get:
with mock.patch.object(self.io_loop, 'add_handler') as add_handler:
second_result = yield self.obj._connect()
with mock.patch.object(self.io_loop, 'add_handler'):
yield self.obj._connect()
get.assert_called_once_with(self.obj.pid, self.obj)
 
@testing.gen_test
Loading
Loading
@@ -116,7 +116,7 @@ class SessionConnectTests(testing.AsyncTestCase):
with mock.patch.object(self.obj._pool_manager, 'get') as get:
get.return_value = self.conn
with mock.patch.object(self.io_loop, 'add_handler') as add_handler:
second_result = yield self.obj._connect()
yield self.obj._connect()
add_handler.assert_called_once_with(self.conn.fileno(),
self.obj._on_io_events,
ioloop.IOLoop.WRITE)
Loading
Loading
@@ -232,5 +232,5 @@ class SessionPublicMethodTests(testing.AsyncTestCase):
future.set_result(connection)
_connect.return_value = future
obj = tornado_session.TornadoSession(io_loop=self.io_loop)
result = yield obj.validate()
yield obj.validate()
_connect.assert_called_once_with()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment