Skip to content
Snippets Groups Projects
Commit 3edac61e authored by Gavin M. Roy's avatar Gavin M. Roy
Browse files

Misc updates

- PEP8 cleanup
- channel.py/connection.py: Check to see there is an actual exception being raised, not just a type of exception passed in (Python 2.6 issue)
- channel.py: specify the default priority of None
parent 71afc865
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -192,10 +192,10 @@ class AMQPChannel(StatefulObject):
"""
if self.closed:
raise exceptions.ChannelClosedException()
#LOGGER.debug('Sending %r', frame_value)
# LOGGER.debug('Sending %r', frame_value)
self._write_frame(frame_value)
if frame_value.synchronous:
#LOGGER.debug('Waiting on %r', frame_value)
# LOGGER.debug('Waiting on %r', frame_value)
return self._wait_on_frame(frame_value.valid_responses)
 
def _build_close_frame(self):
Loading
Loading
Loading
Loading
@@ -88,7 +88,7 @@ class Channel(base.AMQPChannel):
it's an exception or what.
 
"""
if exc_type:
if exc_type and exc_val:
self._set_state(self.CLOSED)
raise
if self.open:
Loading
Loading
@@ -247,7 +247,7 @@ class Channel(base.AMQPChannel):
elif isinstance(value, specification.Basic.Return):
self._on_basic_return(self._wait_for_content_frames(value))
 
def _consume(self, obj, no_ack, priority):
def _consume(self, obj, no_ack, priority=None):
"""Register a Queue object as a consumer, issuing Basic.Consume.
 
:param rabbitpy.amqp_queue.Queue obj: The queue to consume
Loading
Loading
Loading
Loading
@@ -126,7 +126,7 @@ class Connection(base.StatefulObject):
it's an exception or what.
 
"""
if exc_type:
if exc_type and exc_val:
self._set_state(self.CLOSED)
raise
self._set_state(self.CLOSED)
Loading
Loading
@@ -163,15 +163,16 @@ class Connection(base.StatefulObject):
with self._channel_lock:
channel_id = self._get_next_channel_id()
channel_frames = queue.Queue()
self._channels[channel_id] = channel.Channel(channel_id,
self.capabilities,
self._events,
self._exceptions,
channel_frames,
self._write_queue,
self._max_frame_size,
self._io.write_trigger,
blocking_read)
self._channels[channel_id] = \
channel.Channel(channel_id,
self.capabilities,
self._events,
self._exceptions,
channel_frames,
self._write_queue,
self._max_frame_size,
self._io.write_trigger,
blocking_read)
self._add_channel_to_io(self._channels[channel_id], channel_frames)
self._channels[channel_id].open()
return self._channels[channel_id]
Loading
Loading
Loading
Loading
@@ -4,6 +4,7 @@ Exceptions that may be raised by rabbitpy during use
 
"""
 
class ActionException(Exception):
"""Raised when an action is taken on a rabbitpy object that is not
supported due to the state of the object. An example would be trying to
Loading
Loading
@@ -95,7 +96,8 @@ class NotSupportedError(Exception):
server.
 
"""
pass
def __repr__(self):
return 'The selected feature "%s" is not supported' % self.args[0]
 
 
class TooManyChannelsError(Exception):
Loading
Loading
@@ -267,6 +269,7 @@ class AMQPNotAllowed(Exception):
"""
pass
 
class AMQPNotImplemented(Exception):
"""
The client tried to use functionality that is not implemented in the
Loading
Loading
Loading
Loading
@@ -93,8 +93,8 @@ class Exchange(base.AMQPClass):
if hasattr(source, 'name'):
source = source.name
self._rpc(specification.Exchange.Unbind(destination=self.name,
source=source,
routing_key=routing_key))
source=source,
routing_key=routing_key))
 
 
class DirectExchange(Exchange):
Loading
Loading
Loading
Loading
@@ -378,7 +378,7 @@ class IO(threading.Thread, base.StatefulObject):
if self._buffer and value[0] is None:
break
 
#LOGGER.debug('Received (%i) %r', value[0], value[1])
# LOGGER.debug('Received (%i) %r', value[0], value[1])
 
# If it's channel 0, call the Channel0 directly
if value[0] == 0:
Loading
Loading
@@ -407,7 +407,7 @@ class IO(threading.Thread, base.StatefulObject):
:param pamqp.specification.Frame frame_value: The frame to add
 
"""
#LOGGER.debug('Adding %s to channel %s', frame_value.name, channel_id)
# LOGGER.debug('Adding %s to channel %s', frame_value.name, channel_id)
self._channels[channel_id][1].put(frame_value)
 
def _close(self):
Loading
Loading
Loading
Loading
@@ -340,7 +340,7 @@ class Message(base.AMQPClass):
return datetime.datetime.fromtimestamp(value)
 
raise TypeError('Could not cast a %s value to a datetime.datetime' %
type(value))
type(value))
 
def _auto_serialize(self, body_value):
"""Automatically serialize the body as JSON if it is a dict or list.
Loading
Loading
Loading
Loading
@@ -112,6 +112,7 @@ def create_queue(uri=None, queue_name='', durable=True, auto_delete=False,
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
dlx_routing_key = dead_letter_routing_key
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = amqp_queue.Queue(channel, queue_name,
Loading
Loading
@@ -121,8 +122,7 @@ def create_queue(uri=None, queue_name='', durable=True, auto_delete=False,
message_ttl=message_ttl,
expires=expires,
dead_letter_exchange=dead_letter_exchange,
dead_letter_routing_key=
dead_letter_routing_key,
dead_letter_routing_key=dlx_routing_key,
arguments=arguments)
obj.declare()
 
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment