Skip to content
Snippets Groups Projects
Commit b3eec7a2 authored by Gavin M. Roy's avatar Gavin M. Roy
Browse files

Metric Reporting

- Keep track of the number of query executions and exceptions per connection
- Add report function to both Pool and PoolManager
- Add methods for returning a list of busy, closed, executing,  and locked connections in a Pool
- Rename Pool._connection to Pool.connection_handle
parent 80a3958e
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -2,6 +2,7 @@
Connection Pooling
 
"""
import datetime
import logging
import os
import threading
Loading
Loading
@@ -26,6 +27,8 @@ class Connection(object):
def __init__(self, handle):
self.handle = handle
self.used_by = None
self.executions = 0
self.exceptions = 0
 
def close(self):
"""Close the connection
Loading
Loading
@@ -171,6 +174,15 @@ class Pool(object):
self.connections[id(connection)] = Connection(connection)
LOGGER.debug('Pool %s added connection %s', self.id, id(connection))
 
@property
def busy_connections(self):
"""Return a list of active/busy connections
:rtype: list
"""
return [c for c in self.connections if c.busy and not c.closed]
def clean(self):
"""Clean the pool by removing any closed connections and if the pool's
idle has exceeded its idle TTL, remove all connections.
Loading
Loading
@@ -193,6 +205,34 @@ class Pool(object):
self.remove(self.connections[cid].handle)
LOGGER.debug('Pool %s closed', self.id)
 
@property
def closed_connections(self):
"""Return a list of closed connections
:rtype: list
"""
return [c for c in self.connections.values() if c.closed]
def connection_handle(self, connection):
"""Return a connection object for the given psycopg2 connection
:param connection: The connection to return a parent for
:type connection: psycopg2.extensions.connection
:rtype: Connection
"""
return self.connections[id(connection)]
@property
def executing_connections(self):
"""Return a list of connections actively executing queries
:rtype: list
"""
return [c for c in self.connections if c.executing]
def free(self, connection):
"""Free the connection from use by the session that was using it.
 
Loading
Loading
@@ -203,7 +243,7 @@ class Pool(object):
"""
LOGGER.debug('Pool %s freeing connection %s', self.id, id(connection))
try:
self._connection(connection).free()
self.connection_handle(connection).free()
except KeyError:
raise ConnectionNotFoundError(self.id, id(connection))
 
Loading
Loading
@@ -246,9 +286,8 @@ class Pool(object):
:rtype: list
 
"""
return [self.connections[k] for k in self.connections if
not self.connections[k].busy and
not self.connections[k].closed]
return [c for c in self.connections.values()
if not c.busy and not c.closed]
 
@property
def idle_duration(self):
Loading
Loading
@@ -281,7 +320,7 @@ class Pool(object):
"""
cid = id(connection)
try:
self._connection(connection).lock(session)
self.connection_handle(connection).lock(session)
except KeyError:
raise ConnectionNotFoundError(self.id, cid)
else:
Loading
Loading
@@ -290,6 +329,15 @@ class Pool(object):
self.idle_start = None
LOGGER.debug('Pool %s locked connection %s', self.id, cid)
 
@property
def locked_connections(self):
"""Return a list of all locked connections
:rtype: list
"""
return [c for c in self.connections.values() if c.locked]
def remove(self, connection):
"""Remove the connection from the pool
 
Loading
Loading
@@ -302,11 +350,38 @@ class Pool(object):
cid = id(connection)
if cid not in self.connections:
raise ConnectionNotFoundError(self.id, cid)
self._connection(connection).close()
self.connection_handle(connection).close()
with self._lock:
del self.connections[cid]
LOGGER.debug('Pool %s removed connection %s', self.id, cid)
 
def report(self):
"""Return a report about the pool state and configuration.
:rtype: dict
"""
return {
'connections': {
'busy': len(self.busy_connections),
'closed': len(self.closed_connections),
'executing': len([c for c in self.connections
if c.executing()]),
'idle': len(self.idle_connections),
'locked': len(self.busy_connections)
},
'exceptions': sum([c.exceptions
for c in self.connections.values()]),
'executions': sum([c.executed
for c in self.connections.values()]),
'full': self.is_full,
'idle': {
'duration': self.idle_duration,
'ttl': self.idle_ttl
},
'max_size': self.max_size
}
def shutdown(self):
"""Forcefully shutdown the entire pool, closing all non-executing
connections.
Loading
Loading
@@ -341,16 +416,6 @@ class Pool(object):
with self._lock:
self.max_size = size
 
def _connection(self, connection):
"""Return a connection object for the given psycopg2 connection
:param connection: The connection to return a parent for
:type connection: psycopg2.extensions.connection
:rtype: Connection
"""
return self.connections[id(connection)]
 
class PoolManager(object):
"""The connection pool object implements behavior around connections and
Loading
Loading
@@ -441,6 +506,20 @@ class PoolManager(object):
LOGGER.debug("Creating Pool: %s (%i/%i)", pid, idle_ttl, max_size)
cls._pools[pid] = Pool(pid, idle_ttl, max_size, time_method)
 
@classmethod
def free(cls, pid, connection):
"""Free a connection that was locked by a session
:param str pid: The pool ID
:param connection: The connection to remove
:type connection: psycopg2.extensions.connection
"""
with cls._lock:
LOGGER.debug('Freeing %s from pool %s', id(connection), pid)
cls._ensure_pool_exists(pid)
cls._pools[pid].free(connection)
@classmethod
def get(cls, pid, session):
"""Get an idle, unused connection from the pool. Once a connection has
Loading
Loading
@@ -456,18 +535,18 @@ class PoolManager(object):
return cls._pools[pid].get(session)
 
@classmethod
def free(cls, pid, connection):
"""Free a connection that was locked by a session
def get_connection(cls, pid, connection):
"""Return the specified :class:`~queries.pool.Connection` from the
pool.
 
:param str pid: The pool ID
:param connection: The connection to remove
:param connection: The connection to return for
:type connection: psycopg2.extensions.connection
:rtype: queries.pool.Connection
 
"""
with cls._lock:
LOGGER.debug('Freeing %s from pool %s', id(connection), pid)
cls._ensure_pool_exists(pid)
cls._pools[pid].free(connection)
return cls._pools[pid].connection_handle(connection)
 
@classmethod
def has_connection(cls, pid, connection):
Loading
Loading
@@ -589,6 +668,19 @@ class PoolManager(object):
cls._ensure_pool_exists(pid)
return len(cls._pools[pid])
 
@classmethod
def report(cls):
"""Return the state of the all of the registered pools.
:rtype: dict
"""
return {
'timestamp': datetime.datetime.utcnow().isoformat(),
'process': os.getpid(),
'pools': dict([(i, p.report()) for i, p in cls._pools.items()])
}
@classmethod
def _ensure_pool_exists(cls, pid):
"""Raise an exception if the pool has yet to be created or has been
Loading
Loading
Loading
Loading
@@ -119,7 +119,13 @@ class Session(object):
:raises: queries.ProgrammingError
 
"""
self._cursor.callproc(name, args)
try:
self._cursor.callproc(name, args)
except psycopg2.Error as err:
self._incr_exceptions()
raise err
finally:
self._incr_executions()
return results.Results(self._cursor)
 
def close(self):
Loading
Loading
@@ -203,7 +209,13 @@ class Session(object):
:raises: queries.ProgrammingError
 
"""
self._cursor.execute(sql, parameters)
try:
self._cursor.execute(sql, parameters)
except psycopg2.Error as err:
self._incr_exceptions()
raise err
finally:
self._incr_executions()
return results.Results(self._cursor)
 
def set_encoding(self, value=DEFAULT_ENCODING):
Loading
Loading
@@ -311,6 +323,14 @@ class Session(object):
cursor.withhold = True
return cursor
 
def _incr_exceptions(self):
"""Increment the number of exceptions for the current connection."""
self._pool_manager.get_connection(self.pid, self._conn).exceptions += 1
def _incr_executions(self):
"""Increment the number of executions for the current connection."""
self._pool_manager.get_connection(self.pid, self._conn).executions += 1
def _psycopg2_connect(self, kwargs):
"""Return a psycopg2 connection for the specified kwargs. Extend for
use in async session adapters.
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment