Skip to content
Snippets Groups Projects
Commit d0adb0a4 authored by Gavin M. Roy's avatar Gavin M. Roy
Browse files

pylint cleanup

parent 321b21b5
No related branches found
No related tags found
No related merge requests found
"""
Core IO for rabbitpy
Core Threaded IO implementation used to communicate with RabbitMQ at the socket
level.
 
"""
import collections
import errno
import logging
try:
import queue
except ImportError:
import Queue as queue
import select
import socket
import ssl
import threading
 
LOGGER = logging.getLogger(__name__)
from pamqp import frame
from pamqp import exceptions as pamqp_exceptions
from pamqp import specification
Loading
Loading
@@ -24,6 +19,8 @@ from rabbitpy import base
from rabbitpy import events
from rabbitpy import exceptions
 
LOGGER = logging.getLogger(__name__)
MAX_READ = specification.FRAME_MAX_SIZE
MAX_WRITE = specification.FRAME_MAX_SIZE
 
Loading
Loading
@@ -31,13 +28,20 @@ MAX_WRITE = specification.FRAME_MAX_SIZE
POLL_TIMEOUT = 1.0
 
 
class SelectPoller(object):
class _SelectPoller(object):
def __init__(self, fd, write_trigger):
self.read = [[fd, write_trigger], [], [fd], POLL_TIMEOUT]
self.write = [[fd, write_trigger], [fd], [fd], POLL_TIMEOUT]
 
def poll(self, write_wanted):
"""Invoke select.select, waiting for it to return with the per action
list of file descriptors that are returned to the IO Loop.
:param bool write_wanted: Is there data pending to be written
:rtype: tuple(list, list, list)
:return: (read, write, error)
"""
try:
if write_wanted:
rlist, wlist, xlist = select.select(*self.write)
Loading
Loading
@@ -50,7 +54,7 @@ class SelectPoller(object):
[sock.fileno() for sock in xlist])
 
 
class KQueuePoller(object):
class _KQueuePoller(object):
 
MAX_EVENTS = 1000
 
Loading
Loading
@@ -58,8 +62,7 @@ class KQueuePoller(object):
self._fd = fd
self._write_trigger = write_trigger
self._kqueue = select.kqueue()
self._kqueue.control([select.kevent(fd,
select.KQ_FILTER_READ,
self._kqueue.control([select.kevent(fd, select.KQ_FILTER_READ,
select.KQ_EV_ADD)], 0)
self._kqueue.control([select.kevent(write_trigger,
select.KQ_FILTER_READ,
Loading
Loading
@@ -67,14 +70,24 @@ class KQueuePoller(object):
self._write_in_last_poll = False
 
def poll(self, write_wanted):
"""Update the KQueue object with the desired actions to block on,
waiting until the KQueue.control method returns events and then returns
the list of actions containing file descriptors to act on for those
actions.
:param bool write_wanted: Is there data pending to be written
:rtype: tuple(list, list, list)
:return: (read, write, error)
"""
rlist, wlist, xlist = [], [], []
try:
events = self._kqueue.control(self._changelist(write_wanted),
self.MAX_EVENTS, POLL_TIMEOUT)
kq_events = self._kqueue.control(self._changelist(write_wanted),
self.MAX_EVENTS, POLL_TIMEOUT)
except select.error as error:
LOGGER.debug('kqueue.control error: %s', error)
return [], [], []
for event in events:
for event in kq_events:
if event.filter == select.KQ_FILTER_READ:
rlist.append(event.ident)
elif event.filter == select.KQ_FILTER_WRITE:
Loading
Loading
@@ -90,30 +103,26 @@ class KQueuePoller(object):
def _changelist(self, write_wanted):
if write_wanted and not self._write_in_last_poll:
self._write_in_last_poll = True
return [select.kevent(self._fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_ADD)]
return [select.kevent(self._fd, select.KQ_FILTER_WRITE,
select.KQ_EV_ADD)]
elif self._write_in_last_poll and not write_wanted:
self._write_in_last_poll = False
return [select.kevent(self._fd,
select.KQ_FILTER_WRITE,
return [select.kevent(self._fd, select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)]
return None
 
def _cleanup(self):
self._kqueue.control([select.kevent(self._fd,
select.KQ_FILTER_READ,
self._kqueue.control([select.kevent(self._fd, select.KQ_FILTER_READ,
select.KQ_EV_DELETE)], 0)
self._kqueue.control([select.kevent(self._write_trigger,
select.KQ_FILTER_READ,
select.KQ_EV_DELETE)], 0)
if self._write_in_last_poll:
return [select.kevent(self._fd,
select.KQ_FILTER_WRITE,
return [select.kevent(self._fd, select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)]
 
 
class PollPoller(object):
class _PollPoller(object):
 
# Register constants to prevent platform specific errors
POLLIN = 1
Loading
Loading
@@ -130,23 +139,23 @@ class PollPoller(object):
self._poll.register(write_trigger, self.READ)
self._write_in_last_poll = False
 
def _update_poll(self, write_wanted):
if self._write_in_last_poll:
if not write_wanted:
self._write_in_last_poll = False
self._poll.modify(self._fd, self.READ)
else:
self._write_in_last_poll = True
self._poll.modify(self._fd, self.WRITE)
def poll(self, write_wanted):
"""Update the Poll object with the desired actions to block on, waiting
until the poll returns events and then returns the list of actions
containing file descriptors to act on for those actions.
:param bool write_wanted: Is there data pending to be written
:rtype: tuple(list, list, list)
:return: (read, write, error)
"""
self._update_poll(write_wanted)
rlist, wlist, xlist = [], [], []
try:
events = self._poll.poll(POLL_TIMEOUT * 1000)
poll_events = self._poll.poll(POLL_TIMEOUT * 1000)
except select.error:
return [], [], []
for fileno, event in events:
for fileno, event in poll_events:
if event & self.POLLIN:
rlist.append(fileno)
if event & self.POLLOUT:
Loading
Loading
@@ -155,9 +164,21 @@ class PollPoller(object):
xlist.append(fileno)
return rlist, wlist, xlist
 
def _update_poll(self, write_wanted):
if self._write_in_last_poll:
if not write_wanted:
self._write_in_last_poll = False
self._poll.modify(self._fd, self.READ)
else:
self._write_in_last_poll = True
self._poll.modify(self._fd, self.WRITE)
 
class IOLoop(object):
class _IOLoop(object):
"""Generic base IOLoop implementation that leverages different types of
Polling (select, KQueue, poll).
 
"""
def __init__(self, fd, error_callback, read_callback, write_queue,
event_obj, write_trigger, exception_stack):
self._data = threading.local()
Loading
Loading
@@ -171,30 +192,23 @@ class IOLoop(object):
self._data.write_queue = write_queue
self._data.write_trigger = write_trigger
self._exceptions = exception_stack
self._poller = self._create_poller(fd, write_trigger)
@staticmethod
def _create_poller(fd, write_trigger):
if hasattr(select, 'poll'):
LOGGER.debug('Returning PollPoller')
return PollPoller(fd, write_trigger)
if hasattr(select, 'kqueue'):
LOGGER.debug('Returning KQueuePoller')
return KQueuePoller(fd, write_trigger)
else:
LOGGER.debug('Returning SelectPoller')
return SelectPoller(fd, write_trigger)
self._poller = self._create_poller()
 
def run(self):
"""Run the IOLoop, blocking until the socket is closed or there is
another exception.
"""
self._data.running = True
while self._data.running:
try:
self._poll()
except EnvironmentError as exception:
if (getattr(exception, 'errno', None) == errno.EINTR or
(isinstance(getattr(exception, 'args', None), tuple) and
len(exception.args) == 2 and
exception.args[0] == errno.EINTR)):
if getattr(exception, 'errno') == errno.EINTR:
continue
elif (isinstance(getattr(exception, 'args'), tuple) and
len(exception.args) == 2 and
exception.args[0] == errno.EINTR):
continue
if self._data.events.is_set(events.SOCKET_CLOSE):
LOGGER.debug('Exiting due to closed socket')
Loading
Loading
@@ -203,6 +217,7 @@ class IOLoop(object):
LOGGER.debug('Exiting IOLoop.run')
 
def stop(self):
"""Stop the IOLoop."""
LOGGER.debug('Stopping IOLoop')
self._data.running = False
try:
Loading
Loading
@@ -210,6 +225,17 @@ class IOLoop(object):
except socket.error:
pass
 
def _create_poller(self):
if hasattr(select, 'poll'):
LOGGER.debug('Returning PollPoller')
return _PollPoller(self._data.fd, self._data.write_trigger)
if hasattr(select, 'kqueue'):
LOGGER.debug('Returning KQueuePoller')
return _KQueuePoller(self._data.fd, self._data.write_trigger)
else:
LOGGER.debug('Returning SelectPoller')
return _SelectPoller(self._data.fd, self._data.write_trigger)
def _poll(self):
# Poll select with the materialized lists
if not self._data.running:
Loading
Loading
@@ -282,15 +308,26 @@ class IOLoop(object):
 
 
class IO(threading.Thread, base.StatefulObject):
"""IO is the primary IO thread that is responsible for communicating with
RabbitMQ at the socket level and adds demashalled frames to the appropriate
thread-safe data structures.
"""
CONTENT_METHODS = ['Basic.Deliver', 'Basic.GetOk']
READ_BUFFER_SIZE = specification.FRAME_MAX_SIZE
SSL_KWARGS = {'keyfile': 'keyfile',
'certfile': 'certfile',
'cert_reqs': 'verify',
'ssl_version': 'ssl_version',
'ca_certs': 'cacertfile'}
def __init__(self, group=None, target=None, name=None, args=(),
SSL_KWARGS = {
'keyfile': 'keyfile',
'certfile': 'certfile',
'cert_reqs': 'verify',
'ssl_version': 'ssl_version',
'ca_certs': 'cacertfile'
}
def __init__(self,
group=None,
target=None,
name=None,
args=(),
kwargs=None):
if kwargs is None:
kwargs = dict()
Loading
Loading
@@ -333,13 +370,14 @@ class IO(threading.Thread, base.StatefulObject):
return self._bytes_read
 
def run(self):
"""
"""The blocking method to execute the core IO object, that connects
and then blocks on the IOLoop, exiting when the IOLoop stops.
 
"""
self._connect()
if not self._socket:
LOGGER.warning('Could not connect to %s:%s',
self._args['host'], self._args['port'])
LOGGER.warning('Could not connect to %s:%s', self._args['host'],
self._args['port'])
return
 
LOGGER.debug('Socket connected')
Loading
Loading
@@ -347,15 +385,11 @@ class IO(threading.Thread, base.StatefulObject):
# Create the remote name
local_socket = self._socket.getsockname()
peer_socket = self._socket.getpeername()
self._remote_name = '%s:%s -> %s:%s' % (local_socket[0],
local_socket[1],
peer_socket[0],
peer_socket[1])
self._loop = IOLoop(self._socket, self.on_error, self.on_read,
self._write_queue,
self._events,
self._write_listener,
self._exceptions)
self._remote_name = '%s:%s -> %s:%s' % (
local_socket[0], local_socket[1], peer_socket[0], peer_socket[1])
self._loop = _IOLoop(self._socket, self.on_error, self.on_read,
self._write_queue, self._events,
self._write_listener, self._exceptions)
self._loop.run()
if not self._exceptions.empty() and \
not self._events.is_set(events.EXCEPTION_RAISED):
Loading
Loading
@@ -377,7 +411,12 @@ class IO(threading.Thread, base.StatefulObject):
self._events.set(events.EXCEPTION_RAISED)
 
def on_read(self, data):
# Append the data to the buffer
"""Append the data that is read to the buffer and try and parse
frames out of it.
:param bytes data: The data that has been read in
"""
self._buffer += data
 
while self._buffer:
Loading
Loading
@@ -404,13 +443,16 @@ class IO(threading.Thread, base.StatefulObject):
self._add_frame_to_queue(value[0], value[1])
 
def stop(self):
"""Stop the IO Layer due to exception or other problem.
"""
"""Stop the IO Layer due to exception or other problem."""
self._close()
 
@property
def write_trigger(self):
"""Return the write trigger socket.
:rtype: socket.socket
"""
return self._write_trigger
 
def _add_frame_to_queue(self, channel_id, frame_value):
Loading
Loading
@@ -418,13 +460,15 @@ class IO(threading.Thread, base.StatefulObject):
expectations and then add it to the list.
 
:param int channel_id: The channel id the frame was received on
:param pamqp.specification.Frame frame_value: The frame to add
:param frame_value: The frame to add
:type frame_value: :class:`~pamqp.specification.Frame`
 
"""
# LOGGER.debug('Adding %s to channel %s', frame_value.name, channel_id)
self._channels[channel_id][1].put(frame_value)
 
def _close(self):
"""Close the socket and set the proper event states"""
self._set_state(self.CLOSING)
if hasattr(self, '_socket') and self._socket:
self._socket.close()
Loading
Loading
@@ -433,7 +477,13 @@ class IO(threading.Thread, base.StatefulObject):
self._set_state(self.CLOSED)
 
def _connect_socket(self, sock, address):
"""Connect the socket to the specified host and port."""
"""Connect the socket to the specified host and port, setting the
timeout.
:param socket.socket sock: The socket to connect
:param (str, int) address: The address tuple to connect to
"""
LOGGER.debug('Connecting to %r', address)
sock.settimeout(self._args['timeout'])
sock.connect(address)
Loading
Loading
@@ -446,10 +496,11 @@ class IO(threading.Thread, base.StatefulObject):
"""
self._set_state(self.OPENING)
sock = None
for (af, socktype, proto,
canonname, sockaddr) in self._get_addr_info():
# pylint: disable=unused-variable
for (address_family, socktype,
proto, cname, sockaddr) in self._get_addr_info():
try:
sock = self._create_socket(af, socktype, proto)
sock = self._create_socket(address_family, socktype, proto)
self._connect_socket(sock, sockaddr)
break
except socket.error as error:
Loading
Loading
@@ -468,17 +519,19 @@ class IO(threading.Thread, base.StatefulObject):
self._events.set(events.SOCKET_OPENED)
self._set_state(self.OPEN)
 
def _create_socket(self, af, socktype, proto):
def _create_socket(self, address_family, socktype, protocol):
"""Create the new socket, optionally with SSL support.
 
:param int address_family: The address family to use when creating
:param int socktype: The type of socket to create
:param int protocol: The protocol to use
:rtype: socket.socket or ssl.SSLSocket
 
"""
sock = socket.socket(af, socktype, proto)
sock = socket.socket(address_family, socktype, protocol)
if self._args['ssl']:
kwargs = {'sock': sock,
'server_side': False}
for argv, key in self.SSL_KWARGS.iteritems():
kwargs = {'sock': sock, 'server_side': False}
for argv, key in self.SSL_KWARGS.items():
if self._args[key]:
kwargs[argv] = self._args[key]
LOGGER.debug('Wrapping socket for SSL: %r', kwargs)
Loading
Loading
@@ -494,14 +547,11 @@ class IO(threading.Thread, base.StatefulObject):
if not socket.has_ipv6:
family = socket.AF_INET
try:
res = socket.getaddrinfo(self._args['host'],
self._args['port'],
family,
socket.SOCK_STREAM,
0)
res = socket.getaddrinfo(self._args['host'], self._args['port'],
family, socket.SOCK_STREAM, 0)
except socket.error as error:
LOGGER.error('Could not resolve %s: %s',
self._args['host'], error, exc_info=True)
LOGGER.error('Could not resolve %s: %s', self._args['host'], error,
exc_info=True)
return []
return res
 
Loading
Loading
@@ -540,12 +590,14 @@ class IO(threading.Thread, base.StatefulObject):
block the IO loop unless the exception is caught.
 
:param int channel_id: The channel to remote close
:param pamqp.specification.Frame frame_value: The Channel.Close frame
:param frame_value: The Channel.Close frame
:type frame_value: :class:`~pamqp.specification.Frame`
 
"""
self._channels[channel_id][0].on_remote_close(frame_value)
 
def _socketpair(self):
@staticmethod
def _socketpair():
"""Return a socket pair regardless of platform.
 
:rtype: (socket, socket)
Loading
Loading
@@ -558,24 +610,26 @@ class IO(threading.Thread, base.StatefulObject):
LOGGER.debug('Falling back to emulated socketpair behavior')
 
# Create the listening server socket & bind it to a random port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 0))
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('127.0.0.1', 0))
 
# Get the port for the notifying socket to connect to
port = s.getsockname()[1]
port = server_sock.getsockname()[1]
 
# Create the notifying client socket and connect using a timer
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
def connect():
"""Connect to the client to the server socket pair"""
client.connect(('127.0.0.1', port))
 
t = threading.Timer(0.01, connect)
t.start()
timer = threading.Timer(0.01, connect)
timer.start()
 
# Have the listening server socket listen and accept the connect
s.listen(0)
server, _unused = s.accept()
server_sock.listen(0)
# pylint: disable=unused-variable
server, _unused = server_sock.accept()
 
# Don't block on either socket
server.setblocking(0)
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment