Skip to content
Snippets Groups Projects
Commit 39ec1b37 authored by Gavin M. Roy's avatar Gavin M. Roy
Browse files

pylint/flake8 cleanup

parent 8393fc35
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -2,27 +2,7 @@
rabbitpy, a pythonic RabbitMQ client
 
"""
__version__ = '0.27.0'
version = __version__
import logging
import platform
PYPY = platform.python_implementation() == 'PyPy'
try:
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
"""Python 2.6 does not have a NullHandler"""
def emit(self, record):
"""Emit a record
:param record record: The record to emit
"""
pass
logging.getLogger('rabbitpy').addHandler(NullHandler())
from rabbitpy.version import __version__
 
from rabbitpy.amqp import AMQP
from rabbitpy.connection import Connection
Loading
Loading
@@ -46,3 +26,45 @@ from rabbitpy.simple import create_fanout_exchange
from rabbitpy.simple import create_headers_exchange
from rabbitpy.simple import create_topic_exchange
from rabbitpy.simple import delete_exchange
import logging
from rabbitpy.utils import NullHandler
logging.getLogger('rabbitpy').addHandler(NullHandler())
VERSION = __version__
__all__ = [
'__version__',
'VERSION',
'amqp_queue',
'channel',
'connection',
'exceptions',
'exchange',
'message',
'simple',
'tx',
'AMQP',
'Connection',
'Channel',
'Exchange',
'DirectExchange',
'FanoutExchange',
'HeadersExchange',
'TopicExchange',
'Message',
'Queue',
'Tx',
'consume',
'get',
'publish',
'create_queue',
'delete_queue',
'create_direct_exchange',
'create_fanout_exchange',
'create_headers_exchange',
'create_topic_exchange',
'delete_exchange'
]
Loading
Loading
@@ -4,12 +4,14 @@ AMQP Adapter
"""
from pamqp import specification as spec
 
from rabbitpy import PYPY
from rabbitpy import base
from rabbitpy import message
from rabbitpy import exceptions
from rabbitpy import utils
 
 
class AMQP(object):
# pylint: disable=too-many-public-methods
class AMQP(base.ChannelWriter):
"""The AMQP Adapter provides a more generic, non-opinionated interface to
RabbitMQ by providing methods that map to the AMQP API.
 
Loading
Loading
@@ -17,10 +19,10 @@ class AMQP(object):
 
"""
def __init__(self, channel):
self.channel = channel
super(AMQP, self).__init__(channel)
self.consumer_tag = 'rabbitpy.%s.%s' % (self.channel.id, id(self))
self._consuming = False
def basic_ack(self, delivery_tag=0, multiple=False):
"""Acknowledge one or more messages
 
Loading
Loading
@@ -28,7 +30,8 @@ class AMQP(object):
or Get-Ok methods. The client can ask to confirm a single message or a
set of messages up to and including a specific message.
 
:param int/long delivery_tag: Server-assigned delivery tag
:param delivery_tag: Server-assigned delivery tag
:type delivery_tag: int|long
:param bool multiple: Acknowledge multiple messages
 
"""
Loading
Loading
@@ -55,7 +58,7 @@ class AMQP(object):
message.ack()
 
:param str queue: The queue name to consume from
:param str consumer-tag: The consumer tag
:param str consumer_tag: The consumer tag
:param bool no_local: Do not deliver own messages
:param bool no_ack: No acknowledgement needed
:param bool exclusive: Request exclusive access
Loading
Loading
@@ -65,12 +68,14 @@ class AMQP(object):
"""
if not consumer_tag:
consumer_tag = self.consumer_tag
# pylint: disable=protected-access
self.channel._consumers[consumer_tag] = (self, no_ack)
self._rpc(spec.Basic.Consume(0, queue, consumer_tag, no_local, no_ack,
exclusive, nowait, arguments))
self._consuming = True
try:
while self._consuming:
# pylint: disable=protected-access
msg = self.channel._consume_message()
if msg:
yield msg
Loading
Loading
@@ -95,10 +100,11 @@ class AMQP(object):
:param bool nowait: Do not send a reply method
 
"""
if PYPY and not self._consuming:
if utils.PYPY and not self._consuming:
return
if not self._consuming:
raise exceptions.NotConsumingError()
# pylint: disable=protected-access
self.channel._cancel_consumer(self, consumer_tag, nowait)
self._consuming = False
 
Loading
Loading
@@ -125,7 +131,8 @@ class AMQP(object):
unhandled messages. If a publisher receives this method, it probably
needs to republish the offending messages.
 
:param int/long delivery_tag: Server-assigned delivery tag
:param delivery_tag: Server-assigned delivery tag
:type delivery_tag: int|long
:param bool multiple: Reject multiple messages
:param bool requeue: Requeue the message
 
Loading
Loading
@@ -143,6 +150,9 @@ class AMQP(object):
 
:param str exchange: The exchange name
:param str routing_key: Message routing key
:param body: The message body
:type body: str|bytes
:param dict properties: AMQP message properties
:param bool mandatory: Indicate mandatory routing
:param bool immediate: Request immediate delivery
:return: bool or None
Loading
Loading
@@ -162,7 +172,8 @@ class AMQP(object):
could in principle apply to both peers, it is currently meaningful only
for the server.
 
:param int/long prefetch_size: Prefetch window in octets
:param prefetch_size: Prefetch window in octets
:type prefetch_size: int|long
:param int prefetch_count: Prefetch window in messages
:param bool global_flag: Apply to entire connection
 
Loading
Loading
@@ -176,7 +187,8 @@ class AMQP(object):
interrupt and cancel large incoming messages, or return untreatable
messages to their original queue.
 
:param int/long delivery_tag: Server-assigned delivery tag
:param delivery_tag: Server-assigned delivery tag
:type delivery_tag: int|long
:param bool requeue: Requeue the message
 
"""
Loading
Loading
@@ -251,8 +263,8 @@ class AMQP(object):
:param dict arguments: Optional arguments
 
"""
self._rpc(spec.Exchange.Bind(0, destination, source, routing_key, nowait,
arguments))
self._rpc(spec.Exchange.Bind(0, destination, source, routing_key,
nowait, arguments))
 
def exchange_unbind(self, destination='', source='',
routing_key='', nowait=False, arguments=None):
Loading
Loading
@@ -350,7 +362,8 @@ class AMQP(object):
:param dict arguments: Arguments of binding
 
"""
self._rpc(spec.Queue.Unbind(0, queue, exchange, routing_key, arguments))
self._rpc(spec.Queue.Unbind(0, queue, exchange, routing_key,
arguments))
 
def tx_select(self):
"""Select standard transaction mode
Loading
Loading
@@ -383,22 +396,3 @@ class AMQP(object):
 
"""
self._rpc(spec.Tx.Rollback())
def _rpc(self, frame_value):
"""Execute the RPC command for the frame.
:param pamqp.specification.Frame frame_value: The frame to send
:rtype: pamqp.specification.Frame or pamqp.message.Message
"""
if self.channel.closed:
raise exceptions.ChannelClosedException()
return self.channel.rpc(frame_value)
def _write_frame(self, frame_value):
"""Write a frame to the channel's connection
:param pamqp.specification.Frame frame_value: The frame to send
"""
self.channel.write_frame(frame_value)
Loading
Loading
@@ -34,17 +34,15 @@ from pamqp import specification
from rabbitpy import base
from rabbitpy import exceptions
from rabbitpy import utils
from rabbitpy import PYPY
 
LOGGER = logging.getLogger(__name__)
 
 
class Queue(base.AMQPClass):
"""Create and manage RabbitMQ queues.
 
:param channel: The channel object to communicate on
:type channel: :py:class:`rabbitpy.channel.Channel`
:type channel: :class:`~rabbitpy.channel.Channel`
:param str name: The name of the queue
:param exclusive: Queue can only be used by this channel and will
auto-delete once the channel is closed.
Loading
Loading
@@ -62,8 +60,8 @@ class Queue(base.AMQPClass):
:type dead_letter_routing_key: str
:param dict arguments: Custom arguments for the queue
 
:raises: rabbitpy.exceptions.RemoteClosedChannelException
:raises: rabbitpy.exceptions.RemoteCancellationException
:raises: :exc:`~rabbitpy.exceptions.RemoteClosedChannelException`
:raises: :exc:`~rabbitpy.exceptions.RemoteCancellationException`
 
"""
arguments = dict()
Loading
Loading
@@ -76,6 +74,7 @@ class Queue(base.AMQPClass):
max_length = None
message_ttl = None
 
# pylint: disable=too-many-arguments
def __init__(self, channel, name='',
durable=False, exclusive=False, auto_delete=False,
max_length=None, message_ttl=None, expires=None,
Loading
Loading
@@ -111,7 +110,6 @@ class Queue(base.AMQPClass):
"""
return self.consume()
 
def __len__(self):
"""Return the pending number of messages in the queue by doing a
passive Queue declare.
Loading
Loading
@@ -139,12 +137,11 @@ class Queue(base.AMQPClass):
 
if (name in ['max_length', 'message_ttl', 'expires'] and
not isinstance(value, int)):
raise ValueError('%s must be an int' % name)
raise ValueError('%s must be an int' % name)
 
if (name in ['dead_letter_exchange', 'dead_letter_routing_key'] and
not utils.is_string(value)):
raise ValueError('%s must be a str, bytes or unicode' %
name)
raise ValueError('%s must be a str, bytes or unicode' % name)
 
if name == 'arguments' and not isinstance(value, dict):
raise ValueError('arguments must be a dict')
Loading
Loading
@@ -194,6 +191,7 @@ class Queue(base.AMQPClass):
self._consume(no_ack, prefetch, priority)
try:
while self.consuming:
# pylint: disable=protected-access
message = self.channel._consume_message()
if message:
yield message
Loading
Loading
@@ -228,6 +226,7 @@ class Queue(base.AMQPClass):
DeprecationWarning)
return self.consume(no_ack, prefetch, priority)
 
# pylint: disable=no-self-use, unused-argument
def consumer(self, no_ack=False, prefetch=None, priority=None):
"""Method for returning the contextmanager for consuming messages. You
should not use this directly.
Loading
Loading
@@ -284,7 +283,8 @@ class Queue(base.AMQPClass):
"""
self._write_frame(specification.Basic.Get(queue=self.name,
no_ack=not acknowledge))
return self.channel._get_message()
return self.channel._get_message() # pylint: disable=protected-access
 
def ha_declare(self, nodes=None):
"""Declare a the queue as highly available, passing in a list of nodes
Loading
Loading
@@ -318,11 +318,11 @@ class Queue(base.AMQPClass):
will return None instead of a :py:class:`rabbitpy.Message`.
 
"""
if PYPY and not self.consuming:
if utils.PYPY and not self.consuming:
return
if not self.consuming:
raise exceptions.NotConsumingError()
self.channel._cancel_consumer(self)
self.channel._cancel_consumer(self) # pylint: disable=protected-access
self.consuming = False
 
def unbind(self, source, routing_key=None):
Loading
Loading
@@ -352,6 +352,7 @@ class Queue(base.AMQPClass):
"""
if prefetch:
self.channel.prefetch_count(prefetch, False)
# pylint: disable=protected-access
self.channel._consume(self, no_ack, priority)
self.consuming = True
 
Loading
Loading
Loading
Loading
@@ -3,42 +3,29 @@ Base classes for various parts of rabbitpy
 
"""
import logging
try:
import queue
except ImportError:
import Queue as queue
import socket
import threading
import time
 
from pamqp import specification
 
from rabbitpy import exceptions
from rabbitpy import utils
from rabbitpy.utils import queue
 
LOGGER = logging.getLogger(__name__)
 
 
class AMQPClass(object):
"""Base Class object for wrapping the specification.Frame classes
class ChannelWriter(object): # pylint: disable=too-few-public-methods
 
"""
def __init__(self, channel, name):
"""Create a new ClassObject.
"""The AMQP Adapter provides a more generic, non-opinionated interface to
RabbitMQ by providing methods that map to the AMQP API.
 
:param rabbitpy.Channel channel: The channel to execute commands on
:param str name: Set the name
:raises: ValueError
"""
# Use type so there's not a circular dependency
if channel.__class__.__name__ != 'Channel':
raise ValueError('channel must be a valid rabbitpy Channel object')
if not utils.is_string(name):
raise ValueError('name must be str, bytes or unicode')
:param channel: The channel to use
:type channel: rabbitpy.channel.Channel
 
"""
def __init__(self, channel):
self.channel = channel
self.name = name
 
def _rpc(self, frame_value):
"""Execute the RPC command for the frame.
Loading
Loading
@@ -61,6 +48,26 @@ class AMQPClass(object):
self.channel.write_frame(frame_value)
 
 
class AMQPClass(ChannelWriter): # pylint: disable=too-few-public-methods
"""Base Class object AMQP object classes"""
def __init__(self, channel, name):
"""Create a new ClassObject.
:param channel: The channel to execute commands on
:type channel: rabbitpy.Channel
:param str name: Set the name
:raises: ValueError
"""
super(AMQPClass, self).__init__(channel)
# Use type so there's not a circular dependency
if channel.__class__.__name__ != 'Channel':
raise ValueError('channel must be a valid rabbitpy Channel object')
elif not utils.is_string(name):
raise ValueError('name must be str, bytes or unicode')
self.name = name
class StatefulObject(object):
"""Base object for rabbitpy classes that need to maintain state such as
connection and channel.
Loading
Loading
@@ -149,13 +156,13 @@ class StatefulObject(object):
 
 
class AMQPChannel(StatefulObject):
"""Base AMQP Channel Object"""
 
CLOSE_REQUEST_FRAME = specification.Channel.Close
DEFAULT_CLOSE_CODE = 200
DEFAULT_CLOSE_REASON = 'Normal Shutdown'
REMOTE_CLOSED = 0x04
 
def __init__(self, exception_queue, write_trigger, blocking_read=False):
super(AMQPChannel, self).__init__()
if blocking_read:
Loading
Loading
@@ -178,6 +185,7 @@ class AMQPChannel(StatefulObject):
return self._channel_id
 
def close(self):
"""Close the AMQP channel"""
if self.closed:
if self._is_debugging:
LOGGER.debug('AMQPChannel %i close invoked and already closed',
Loading
Loading
@@ -254,8 +262,10 @@ class AMQPChannel(StatefulObject):
return
self._check_for_exceptions()
if self._is_debugging:
LOGGER.debug('Writing frames: %r', [frame.name for frame in frames])
LOGGER.debug('Writing frames: %r',
[frame.name for frame in frames])
with self._write_lock:
# pylint: disable=expression-not-assigned
[self._write_queue.put((self._channel_id, frame))
for frame in frames]
self._trigger_write()
Loading
Loading
@@ -318,6 +328,10 @@ class AMQPChannel(StatefulObject):
consuming. Will wait until the ``_wait_on_frame_interrupt`` is cleared
to make this a blocking operation.
 
:param callback: The method to call
:type callback: typing.Callable
:param list args: Args to pass to the callback
"""
if not self._waiting:
if self._is_debugging:
Loading
Loading
@@ -333,6 +347,7 @@ class AMQPChannel(StatefulObject):
return self._interrupt['event'].is_set()
 
def _on_interrupt_set(self):
# pylint: disable=not-an-iterable,not-callable
self._interrupt['callback'](*self._interrupt['args'])
self._interrupt['event'].clear()
self._interrupt['callback'] = None
Loading
Loading
@@ -381,16 +396,14 @@ class AMQPChannel(StatefulObject):
to a local socket.
 
"""
try:
self._write_trigger.send(b'0')
except socket.error:
pass
utils.trigger_write(self._write_trigger)
 
def _validate_frame_type(self, frame_value, frame_type):
"""Validate the frame value against the frame type. The frame type can
be an individual frame type or a list of frame types.
 
:param pamqp.specification.Frame frame_value: The frame to check
:param frame_value: The frame to check
:type frame_value: pamqp.specification.Frame
:param frame_type: The frame(s) to check against
:type frame_type: pamqp.specification.Frame or list
:rtype: bool
Loading
Loading
@@ -421,7 +434,7 @@ class AMQPChannel(StatefulObject):
call the method.
 
:param frame_type: The name or list of names of the frame type(s)
:type frame_type: str or list or pamqp.specification.Frame
:type frame_type: str|list|pamqp.specification.Frame
:rtype: Frame
 
"""
Loading
Loading
Loading
Loading
@@ -6,10 +6,6 @@ the communication between the IO thread and the higher-level objects.
 
"""
import logging
try:
import queue as Queue
except ImportError:
import Queue
 
from pamqp import specification as spec
from pamqp import PYTHON3
Loading
Loading
@@ -17,6 +13,7 @@ from pamqp import PYTHON3
from rabbitpy import base
from rabbitpy import exceptions
from rabbitpy import message
from rabbitpy.utils import queue
 
LOGGER = logging.getLogger(__name__)
 
Loading
Loading
@@ -272,11 +269,11 @@ class Channel(base.AMQPChannel):
del self._consumers[value.consumer_tag]
raise exceptions.RemoteCancellationException(value.consumer_tag)
 
def _consume(self, queue, no_ack, priority=None):
def _consume(self, obj, no_ack, priority=None):
"""Register a Queue object as a consumer, issuing Basic.Consume.
 
:param queue: The queue to consume
:type queue: rabbitpy.amqp_queue.Queue
:param obj: The queue to consume
:type obj: rabbitpy.amqp_queue.Queue
:param bool no_ack: no_ack mode
:param int priority: Consumer priority
:raises: ValueError
Loading
Loading
@@ -289,11 +286,11 @@ class Channel(base.AMQPChannel):
if not isinstance(priority, int):
raise ValueError('Consumer priority must be an int')
args['x-priority'] = priority
self.rpc(spec.Basic.Consume(queue=queue.name,
consumer_tag=queue.consumer_tag,
self.rpc(spec.Basic.Consume(queue=obj.name,
consumer_tag=obj.consumer_tag,
no_ack=no_ack,
arguments=args))
self._consumers[queue.consumer_tag] = (queue, no_ack)
self._consumers[obj.consumer_tag] = (obj, no_ack)
 
def _consume_message(self):
"""Get a message from the stack, blocking while doing so. If a consumer
Loading
Loading
@@ -345,7 +342,7 @@ class Channel(base.AMQPChannel):
"""
try:
frame_value = self._read_queue.get(False)
except Queue.Empty:
except queue.Empty:
return None
 
try:
Loading
Loading
Loading
Loading
@@ -5,23 +5,20 @@ client on channel 0.
"""
import locale
import logging
try:
import queue
except ImportError:
import Queue as queue
import sys
 
from pamqp import header
from pamqp import heartbeat
from pamqp import specification
 
from rabbitpy import __version__
from rabbitpy import base
from rabbitpy import events
from rabbitpy import exceptions
from rabbitpy import version
from rabbitpy.utils import queue
 
LOGGER = logging.getLogger(__name__)
default_locale = locale.getdefaultlocale()
DEFAULT_LOCALE = locale.getdefaultlocale()
del locale
 
 
Loading
Loading
@@ -30,6 +27,16 @@ class Channel0(base.AMQPChannel):
processing and dispatching events on channel 0 once connected.
 
:param dict connection_args: Data required to negotiate the connection
:param events_obj: The shared events coordination object
:type events_obj: rabbitpy.events.Events
:param exception_queue: The queue where any pending exceptions live
:type exception_queue: queue.Queue
:param heartbeat_checker: The object that implements heartbeat check logic
:type heartbeat_checker: rabbitpy.heartbeat.Checker
:param write_queue: The queue to place data to write in
:type write_queue: queue.Queue
:param write_trigger: The socket to write to, to trigger IO writes
:type write_trigger: socket.socket
 
"""
CHANNEL = 0
Loading
Loading
@@ -61,14 +68,17 @@ class Channel0(base.AMQPChannel):
 
@property
def heartbeat_interval(self):
"""Return the AMQP heartbeat interval for the connection"""
return self._heartbeat_interval
 
@property
def maximum_channels(self):
"""Return the AMQP maximum channel count for the connection"""
return self._max_channels
 
@property
def maximum_frame_size(self):
"""Return the AMQP maximum frame size for the connection"""
return self._max_frame_size
 
def on_frame(self, value):
Loading
Loading
@@ -109,7 +119,7 @@ class Channel0(base.AMQPChannel):
self._events.clear(events.CONNECTION_BLOCKED)
elif value.name == 'Heartbeat':
if not self._heartbeat_interval:
LOGGER.warn('Unexpected Heartbeat frame received')
LOGGER.warning('Unexpected Heartbeat frame received')
self._heartbeat_checker.on_heartbeat()
self.write_frame(heartbeat.Heartbeat())
self._trigger_write()
Loading
Loading
@@ -118,6 +128,7 @@ class Channel0(base.AMQPChannel):
raise specification.AMQPUnexpectedFrame(value)
 
def start(self):
"""Start the AMQP protocol negotiation"""
self._set_state(self.OPENING)
self._write_protocol_header()
 
Loading
Loading
@@ -135,18 +146,15 @@ class Channel0(base.AMQPChannel):
:rtype: pamqp.specification.Connection.StartOk
 
"""
version = sys.version_info
properties = {'product': 'rabbitpy',
'platform': 'Python %s.%s.%s' % (version[0],
version[1],
version[2]),
'platform': 'Python {}.{}.{}'.format(*sys.version_info),
'capabilities': {'authentication_failure_close': True,
'basic.nack': True,
'connection.blocked': True,
'consumer_cancel_notify': True,
'publisher_confirms': True},
'information': 'See http://rabbitpy.readthedocs.org',
'version': __version__}
'information': 'See https://rabbitpy.readthedocs.io',
'version': version.__version__}
return specification.Connection.StartOk(client_properties=properties,
response=self._credentials,
locale=self._get_locale())
Loading
Loading
@@ -178,7 +186,7 @@ class Channel0(base.AMQPChannel):
 
"""
if not self._args['locale']:
return default_locale[0] or self.DEFAULT_LOCALE
return DEFAULT_LOCALE[0] or self.DEFAULT_LOCALE
return self._args['locale']
 
@staticmethod
Loading
Loading
@@ -252,9 +260,8 @@ class Channel0(base.AMQPChannel):
:rtype: bool
 
"""
if (frame_value.version_major,
frame_value.version_minor) != (specification.VERSION[0],
specification.VERSION[1]):
if (frame_value.version_major, frame_value.version_minor) != \
(specification.VERSION[0], specification.VERSION[1]):
LOGGER.warning('AMQP version error (received %i.%i, expected %r)',
frame_value.version_major,
frame_value.version_minor,
Loading
Loading
Loading
Loading
@@ -3,11 +3,6 @@ The Connection class negotiates and manages the connection state.
 
"""
import logging
try:
import queue
except ImportError:
import Queue as queue
import socket
try:
import ssl
except ImportError:
Loading
Loading
@@ -25,6 +20,7 @@ from rabbitpy import channel0
from rabbitpy import events
from rabbitpy import exceptions
from rabbitpy import message
from rabbitpy.utils import queue
from rabbitpy import utils
 
LOGGER = logging.getLogger(__name__)
Loading
Loading
@@ -38,13 +34,13 @@ if ssl:
'required': ssl.CERT_REQUIRED}
SSL_VERSION_MAP = dict()
if hasattr(ssl, 'PROTOCOL_SSLv2'):
SSL_VERSION_MAP['SSLv2'] = ssl.PROTOCOL_SSLv2
SSL_VERSION_MAP['SSLv2'] = getattr(ssl, 'PROTOCOL_SSLv2')
if hasattr(ssl, 'PROTOCOL_SSLv3'):
SSL_VERSION_MAP['SSLv3'] = ssl.PROTOCOL_SSLv3
SSL_VERSION_MAP['SSLv3'] = getattr(ssl, 'PROTOCOL_SSLv3')
if hasattr(ssl, 'PROTOCOL_SSLv23'):
SSL_VERSION_MAP['SSLv23'] = ssl.PROTOCOL_SSLv23
SSL_VERSION_MAP['SSLv23'] = getattr(ssl, 'PROTOCOL_SSLv23')
if hasattr(ssl, 'PROTOCOL_TLSv1'):
SSL_VERSION_MAP['TLSv1'] = ssl.PROTOCOL_TLSv1
SSL_VERSION_MAP['TLSv1'] = getattr(ssl, 'PROTOCOL_TLSv1')
else:
SSL_CERT_MAP, SSL_VERSION_MAP = dict(), dict()
 
Loading
Loading
@@ -149,6 +145,11 @@ class Connection(base.StatefulObject):
 
@property
def args(self):
"""Return the connection arguments.
:rtype: dict
"""
return dict(self._args)
 
@property
Loading
Loading
@@ -159,9 +160,6 @@ class Connection(base.StatefulObject):
blocked using the Connection.Blocked RPC notification from RabbitMQ
that was added in RabbitMQ 3.2.
 
@TODO If RabbitMQ version < 3.2, use the HTTP management API to query
the value
:rtype: bool
 
"""
Loading
Loading
@@ -330,8 +328,10 @@ class Connection(base.StatefulObject):
the dictionary of message parts.
 
:param int channel_id: The channel id the message was sent on
:param pamqp.specification.Frame method_frame: The method frame value
:param pamqp.header.ContentHeader header_frame: The header frame value
:param method_frame: The method frame value
:type method_frame: pamqp.specification.Frame
:param header_frame: The header frame value
:type header_frame: pamqp.header.ContentHeader
:param str body: The message body
:rtype: rabbitpy.message.Message
 
Loading
Loading
@@ -398,6 +398,7 @@ class Connection(base.StatefulObject):
def _normalize_expectations(channel_id, expectations):
"""Turn a class or list of classes into a list of class names.
 
:param int channel_id: The channel to normalize for
:param expectations: List of classes or class name or class obj
:type expectations: list or str or pamqp.specification.Frame
:rtype: list
Loading
Loading
@@ -544,6 +545,7 @@ class Connection(base.StatefulObject):
for chan_id in [chan_id for chan_id in self._channels
if not self._channels[chan_id].closed]:
if force:
# pylint: disable=protected-access
self._channels[chan_id]._force_close()
else:
self._channels[chan_id].close()
Loading
Loading
@@ -575,7 +577,4 @@ class Connection(base.StatefulObject):
to a local socket.
 
"""
try:
self._io.write_trigger.send(b'0')
except socket.error:
pass
utils.trigger_write(self._io.write_trigger)
Loading
Loading
@@ -42,11 +42,14 @@ class Events(object):
"""Create a new instance of Events"""
self._events = self._create_event_objects()
 
def _create_event_objects(self):
@staticmethod
def _create_event_objects():
"""Events are used like signals across threads for communicating state
changes, used by the various threaded objects to communicate with each
other when an action needs to be taken.
 
:rtype: dict
"""
events = dict()
for event in [CHANNEL0_CLOSE,
Loading
Loading
Loading
Loading
@@ -13,7 +13,17 @@ LOGGER = logging.getLogger(__name__)
 
 
class Checker(object):
"""The Checker object implements the logic to ensure that heartbeats have
been received and are replied to. If no heartbeat or data has been received
for the specified interval, it will add an exception to the exception
queue, causing the connection to shutdown.
 
:param io: The rabbitpy IO object
:type io: rabbitpy.io.IO
:param exception_queue: The exception queue
:type exception_queue: queue.Queue
"""
MAX_MISSED_HEARTBEATS = 2
 
def __init__(self, io, exception_queue):
Loading
Loading
@@ -26,16 +36,23 @@ class Checker(object):
self._timer = None
 
def on_heartbeat(self):
"""Callback invoked when a heartbeat is received"""
LOGGER.debug('Heartbeat received, updating the last_heartbeat time')
self._lock.acquire(True)
self._last_heartbeat = time.time()
self._lock.release()
 
def start(self, interval):
"""Start the heartbeat checker
:param int interval: How often to expect heartbeats.
"""
self._interval = interval
self._start_timer()
 
def stop(self):
"""Stop the heartbeat checker"""
self._interval = 0
if self._timer:
self._timer.cancel()
Loading
Loading
Loading
Loading
@@ -635,13 +635,3 @@ class IO(threading.Thread, base.StatefulObject):
server.setblocking(0)
client.setblocking(0)
return server, client
def _trigger_write(self):
"""Notifies the IO loop we need to write a frame by writing a byte
to a local socket.
"""
try:
self._write_trigger.send(b'0')
except socket.error:
pass
Loading
Loading
@@ -24,10 +24,12 @@ from rabbitpy import utils
LOGGER = logging.getLogger(__name__)
 
 
# Python 2.6/3.2 does not have a memoryview object, create dummy for isinstance
# Python 2.6 does not have a memoryview object, create dummy for isinstance
try:
py26mv = memoryview(b'foo')
_PY_VERSION_CHECK = memoryview(b'foo')
except NameError:
# pylint: disable=too-few-public-methods, redefined-builtin
# pylint: disable=invalid-name, missing-docstring
class memoryview(object):
pass
 
Loading
Loading
@@ -86,7 +88,8 @@ class Message(base.AMQPClass):
 
:param channel: The channel object for the message object to act upon
:type channel: :py:class:`rabbitpy.channel.Channel`
:param str or dict or list body_value: The message body
:param body_value: The message body
:type body_value: str|bytes|unicode|memoryview|dict|json
:param dict properties: A dictionary of message properties
:param bool auto_id: Add a message id if no properties were passed in.
:param bool opinionated: Automatically populate properties if True
Loading
Loading
@@ -96,8 +99,8 @@ class Message(base.AMQPClass):
method = None
name = 'Message'
 
def __init__(self, channel, body_value, properties=None,
auto_id=False, opinionated=False):
def __init__(self, channel, body_value, properties=None, auto_id=False,
opinionated=False):
"""Create a new instance of the Message object."""
super(Message, self).__init__(channel, 'Message')
 
Loading
Loading
@@ -108,6 +111,7 @@ class Message(base.AMQPClass):
if isinstance(body_value, memoryview):
self.body = bytes(body_value)
else:
# pylint: disable=redefined-variable-type
self.body = self._auto_serialize(body_value)
 
# Add a message id if auto_id is not turned off and it is not set
Loading
Loading
@@ -312,7 +316,8 @@ class Message(base.AMQPClass):
"""Add the timestamp to the properties"""
self.properties['timestamp'] = datetime.datetime.utcnow()
 
def _as_datetime(self, value):
@staticmethod
def _as_datetime(value):
"""Return the passed in value as a ``datetime.datetime`` value.
 
:param value: The value to convert or pass through
Loading
Loading
@@ -349,7 +354,7 @@ class Message(base.AMQPClass):
"""Automatically serialize the body as JSON if it is a dict or list.
 
:param mixed body_value: The message body passed into the constructor
:return: str or bytes or unicode or None
:return: bytes|str
 
"""
if isinstance(body_value, dict) or isinstance(body_value, list):
Loading
Loading
Loading
Loading
@@ -54,8 +54,8 @@ def get(uri=None, queue_name=None):
 
with connection.Connection(uri) as conn:
with conn.channel() as channel:
q = amqp_queue.Queue(channel, queue_name)
return q.get(False)
queue = amqp_queue.Queue(channel, queue_name)
return queue.get(False)
 
 
def publish(uri=None, exchange_name=None, routing_key=None,
Loading
Loading
@@ -67,7 +67,8 @@ def publish(uri=None, exchange_name=None, routing_key=None,
:param str uri: AMQP URI to connect to
:param str exchange_name: The exchange to publish to
:param str routing_key: The routing_key to publish with
:param str or unicode or bytes or dict or list: The message body
:param body: The message body
:type body: str or unicode or bytes or dict or list
:param dict properties: Dict representation of Basic.Properties
:param bool confirm: Confirm this delivery with Publisher Confirms
:rtype: bool or None
Loading
Loading
Loading
Loading
@@ -4,19 +4,38 @@ a single API point for rabbitpy to use.
 
"""
import collections
try:
from urllib import parse as _urlparse
except ImportError:
import urlparse as _urlparse
# Cross Python Version queue module export
import logging
# pylint: disable=unused-import,import-error
try:
import Queue as queue
except ImportError:
import queue
import platform
import socket
# pylint: disable=import-error
try:
from urllib import parse as _urlparse
except ImportError:
import urlparse as _urlparse
 
from pamqp import PYTHON3
 
if hasattr(logging, 'NullHandler'):
NullHandler = logging.NullHandler
else:
class NullHandler(logging.Handler):
"""Python 2.6 does not have a NullHandler"""
def emit(self, record):
"""Emit a record
:param record record: The record to emit
"""
pass
PYPY = platform.python_implementation() == 'PyPy'
Parsed = collections.namedtuple('Parsed',
'scheme,netloc,path,params,query,fragment,'
'username,password,hostname,port')
Loading
Loading
@@ -34,7 +53,7 @@ def maybe_utf8_encode(value):
if is_string(value) and not isinstance(value, bytes):
return bytes(value, 'utf-8')
return value
if isinstance(value, unicode):
if isinstance(value, unicode): # pylint: disable=undefined-variable
return value.encode('utf-8')
return value
 
Loading
Loading
@@ -83,5 +102,17 @@ def is_string(value):
"""
checks = [isinstance(value, bytes), isinstance(value, str)]
if not PYTHON3:
# pylint: disable=undefined-variable
checks.append(isinstance(value, unicode))
return any(checks)
def trigger_write(sock):
"""Notifies the IO loop we need to write a frame by writing a byte
to a local socket.
"""
try:
sock.send(b'0')
except socket.error:
pass
"""
Package is complex enough version is causing cyclical references
"""
__version__ = '0.27.0'
Loading
Loading
@@ -9,8 +9,8 @@ except ImportError:
import uuid
 
import rabbitpy
from rabbitpy import PYPY
from rabbitpy import exceptions
from rabbitpy import utils
 
LOGGER = logging.getLogger(__name__)
 
Loading
Loading
@@ -127,7 +127,7 @@ class PublishAndConsumeTest(unittest.TestCase):
self.assertEqual(msg.properties['message_type'],
self.msg.properties['message_type'])
break
if PYPY:
if utils.PYPY:
self.queue.stop_consuming()
 
 
Loading
Loading
@@ -174,7 +174,7 @@ class PublishAndConsumeIteratorTest(unittest.TestCase):
msg.ack()
LOGGER.info('breaking out of iterator')
break
if PYPY:
if utils.PYPY:
self.queue.stop_consuming()
self.assertFalse(self.queue.consuming)
 
Loading
Loading
@@ -228,7 +228,7 @@ class PublishAndConsumeIteratorStopTest(unittest.TestCase):
break
qty += 1
msg.ack()
if PYPY:
if utils.PYPY:
self.queue.stop_consuming()
LOGGER.info('Exited iterator, %r, %r', self.queue.consuming, qty)
self.assertFalse(self.queue.consuming)
Loading
Loading
Loading
Loading
@@ -10,7 +10,6 @@ except ImportError:
import mock
from pamqp import specification
 
from rabbitpy import PYPY
from rabbitpy import amqp_queue
from rabbitpy import channel
from rabbitpy import exceptions
Loading
Loading
@@ -177,7 +176,6 @@ class QueueInitializationTests(unittest.TestCase):
dead_letter_routing_key='routing-key')
self.assertIsInstance(queue.dead_letter_routing_key, str)
 
@unittest.skipIf(utils.PYTHON3, 'No unicode in Python 3')
def test_dlr_unicode(self):
routing_key = unicode('routing-key')
Loading
Loading
@@ -189,7 +187,8 @@ class QueueInitializationTests(unittest.TestCase):
self.assertRaises(ValueError, amqp_queue.Queue, self.chan, '', True,
False, True, None, None, None, None, True)
 
@unittest.skipIf(PYPY, 'PyPy bails this due to improper __exit__ behavior')
@unittest.skipIf(utils.PYPY,
'PyPy bails this due to improper __exit__ behavior')
def test_stop_consuming_raises_exception(self):
queue = amqp_queue.Queue(self.chan)
self.assertRaises(exceptions.NotConsumingError, queue.stop_consuming)
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment