Skip to content
Snippets Groups Projects
Commit 51c4b9fa authored by Gavin M. Roy's avatar Gavin M. Roy
Browse files

Cleanup and fix related to errors connection errors (#28)

parent fcb6962f
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -254,7 +254,7 @@ class TornadoSession(session.Session):
:raises: pool.NoIdleConnectionsError
 
"""
future = concurrent.TracebackFuture()
future = concurrent.Future()
 
# Attempt to get a cached connection from the connection pool
try:
Loading
Loading
@@ -300,6 +300,7 @@ class TornadoSession(session.Session):
 
"""
if cf.exception():
self._cleanup_fd(fd)
future.set_exception(cf.exception())
 
else:
Loading
Loading
@@ -308,9 +309,10 @@ class TornadoSession(session.Session):
# Add the connection to the pool
LOGGER.debug('Connection established for %s', self.pid)
self._pool_manager.add(self.pid, connection)
except (ValueError, pool.PoolException) as error:
except (ValueError, pool.PoolException) as err:
LOGGER.exception('Failed to add %r to the pool', self.pid)
future.set_exception(error)
self._cleanup_fd(fd)
future.set_exception(err)
return
 
self._pool_manager.lock(self.pid, connection, self)
Loading
Loading
@@ -330,7 +332,7 @@ class TornadoSession(session.Session):
future.set_result(connection)
 
# Add a future that fires once connected
self._futures[fd] = concurrent.TracebackFuture()
self._futures[fd] = concurrent.Future()
self._ioloop.add_future(self._futures[fd], on_connected)
 
# Add the connection to the IOLoop
Loading
Loading
@@ -360,7 +362,7 @@ class TornadoSession(session.Session):
:raises: queries.ProgrammingError
 
"""
future = concurrent.TracebackFuture()
future = concurrent.Future()
 
def on_connected(cf):
"""Invoked by the future returned by self._connect"""
Loading
Loading
@@ -375,16 +377,16 @@ class TornadoSession(session.Session):
def completed(qf):
"""Invoked by the IOLoop when the future has completed"""
if qf.exception():
error = qf.exception()
LOGGER.debug('Cleaning cursor due to exception: %r', error)
err = qf.exception()
LOGGER.debug('Cleaning cursor due to exception: %r', err)
self._exec_cleanup(cursor, conn.fileno())
future.set_exception(error)
future.set_exception(err)
else:
value = Results(cursor, self._exec_cleanup, conn.fileno())
future.set_result(value)
 
# Setup a callback to wait on the query result
self._futures[conn.fileno()] = concurrent.TracebackFuture()
self._futures[conn.fileno()] = concurrent.Future()
 
# Add the future to the IOLoop
self._ioloop.add_future(self._futures[conn.fileno()],
Loading
Loading
@@ -420,8 +422,7 @@ class TornadoSession(session.Session):
except (psycopg2.Error, psycopg2.Warning) as error:
LOGGER.debug('Error closing the cursor: %s', error)
 
self._pool_manager.free(self.pid, self._connections[fd])
self._ioloop.remove_handler(fd)
self._cleanup_fd(fd)
 
# If the cleanup callback exists, remove it
if self._cleanup_callback:
Loading
Loading
@@ -432,7 +433,20 @@ class TornadoSession(session.Session):
self._ioloop.time() + self._pool_idle_ttl + 1,
self._pool_manager.clean, self.pid)
 
def _cleanup_fd(self, fd):
"""Ensure the socket socket is removed from the IOLoop, the
connection stack, and futures stack.
:param int fd: The fd # to cleanup
"""
self._ioloop.remove_handler(fd)
if fd in self._connections:
try:
self._pool_manager.free(self.pid, self._connections[fd])
except pool.ConnectionNotFoundError:
pass
self._connections[fd].close()
del self._connections[fd]
if fd in self._futures:
del self._futures[fd]
Loading
Loading
Loading
Loading
@@ -104,3 +104,26 @@ class TornadoSessionIntegrationTests(URIMixin, testing.AsyncTestCase):
raise unittest.SkipTest('PostgreSQL is not running')
self.assertEqual(6 % 4, result[0]['mod'])
result.free()
@testing.gen_test
def test_polling_stops_after_connection_error(self):
# Abort the test right away if postgres isn't running.
yield self.assertPostgresConnected()
# Use an invalid user to force an OperationalError during connection
bad_uri = queries.uri(os.getenv('PGHOST', 'localhost'),
int(os.getenv('PGPORT', '5432')), 'invalid')
session = queries.TornadoSession(bad_uri)
self.count = 0
real_poll_connection = session._poll_connection
def count_polls(*args, **kwargs):
self.count += 1
real_poll_connection(*args, **kwargs)
session._poll_connection = count_polls
with self.assertRaises(queries.OperationalError):
yield session.query('SELECT 1')
yield gen.sleep(0.05)
self.assertLess(self.count, 20)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment