Skip to content
Snippets Groups Projects
Commit 6ed8c5b1 authored by Gavin M. Roy's avatar Gavin M. Roy
Browse files

pylint cleanup

parent d0adb0a4
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -38,7 +38,7 @@ class Channel(base.AMQPChannel):
# Do something
 
To create a new channel, invoke
py:meth:`rabbitpy.connection.Connection.channel`
py:meth:`~rabbitpy.connection.Connection.channel()`
 
 
To improve performance, pass blocking_read to True. Note that doing
Loading
Loading
@@ -47,10 +47,14 @@ class Channel(base.AMQPChannel):
 
:param int channel_id: The channel # to use for this instance
:param dict server_capabilities: Features the server supports
:param events rabbitpy.Events: Event management object
:param queue.Queue exception_queue: Exception queue
:param queue.Queue read_queue: Queue to read pending frames from
:param queue.Queue write_queue: Queue to write pending AMQP objs to
:param events: Event management object
:type events: rabbitpy.Events
:param exception_queue: Exception queue
:type exception_queue: queue.Queue
:param read_queue: Queue to read pending frames from
:type read_queue: queue.Queue
:param write_queue: Queue to write pending AMQP objs to
:type write_queue: queue.Queue
:param int maximum_frame_size: The max frame size for msg bodies
:param socket write_trigger: Write to this socket to break IO waiting
:param bool blocking_read: Use blocking Queue.get to improve performance
Loading
Loading
@@ -151,7 +155,7 @@ class Channel(base.AMQPChannel):
self._publisher_confirms = True
 
@property
def id(self):
def id(self): # pylint: disable=invalid-name
"""Return the channel id
 
:rtype: int
Loading
Loading
@@ -161,6 +165,11 @@ class Channel(base.AMQPChannel):
 
@property
def maximum_frame_size(self):
"""Return the AMQP maximum frame size
:rtype: int
"""
return self._maximum_frame_size
 
def open(self):
Loading
Loading
@@ -263,10 +272,11 @@ class Channel(base.AMQPChannel):
del self._consumers[value.consumer_tag]
raise exceptions.RemoteCancellationException(value.consumer_tag)
 
def _consume(self, obj, no_ack, priority=None):
def _consume(self, queue, no_ack, priority=None):
"""Register a Queue object as a consumer, issuing Basic.Consume.
 
:param rabbitpy.amqp_queue.Queue obj: The queue to consume
:param queue: The queue to consume
:type queue: rabbitpy.amqp_queue.Queue
:param bool no_ack: no_ack mode
:param int priority: Consumer priority
:raises: ValueError
Loading
Loading
@@ -279,11 +289,11 @@ class Channel(base.AMQPChannel):
if not isinstance(priority, int):
raise ValueError('Consumer priority must be an int')
args['x-priority'] = priority
self.rpc(spec.Basic.Consume(queue=obj.name,
consumer_tag=obj.consumer_tag,
self.rpc(spec.Basic.Consume(queue=queue.name,
consumer_tag=queue.consumer_tag,
no_ack=no_ack,
arguments=args))
self._consumers[obj.consumer_tag] = (obj, no_ack)
self._consumers[queue.consumer_tag] = (queue, no_ack)
 
def _consume_message(self):
"""Get a message from the stack, blocking while doing so. If a consumer
Loading
Loading
@@ -306,7 +316,8 @@ class Channel(base.AMQPChannel):
the dictionary of message parts. Will return None if no message can be
created.
 
:param pamqp.specification.Frame method_frame: The method frame value
:param method_frame: The method frame value
:type method_frame: pamqp.specification.Frame
:param header_frame: Header frame value
:type header_frame: pamqp.header.ContentHeader or None
:param body: The message body
Loading
Loading
@@ -390,7 +401,7 @@ class Channel(base.AMQPChannel):
return self._server_capabilities.get(b'basic.nack', False)
 
@property
def _supports_consumer_cancel_notify(self):
def _supports_consumer_cancel_notify(self): # pylint: disable=invalid-name
"""Indicates if the server supports sending consumer cancellation
notifications
 
Loading
Loading
@@ -456,20 +467,27 @@ class Channel(base.AMQPChannel):
if self._interrupt_is_set:
return self._on_interrupt_set()
 
error = False
body_value = bytes() if PYTHON3 else str()
while len(body_value) < header_value.body_size:
body_part = self._wait_on_frame(CONTENT_BODY)
self._check_for_rpc_request(body_part)
if self._interrupt_is_set:
return self._on_interrupt_set()
if not body_part:
self._on_interrupt_set()
error = True
elif not body_part:
break
elif self.closing or self.closed:
error = True
elif consuming and not self._consumers:
self._reject_inbound_message(method_frame)
error = True
if error:
return
body_value += body_part.value
if len(body_value) == header_value.body_size:
break
if self.closing or self.closed:
return None
if consuming and not self._consumers:
self._reject_inbound_message(method_frame)
return None
return self._create_message(method_frame, header_value, body_value)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment