Skip to content
Snippets Groups Projects
Commit 6d804ab4 authored by Gavin M. Roy's avatar Gavin M. Roy
Browse files

Add the SimpleChannel class and simplify simple methods

parent 4c84aedc
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -16,6 +16,7 @@ from rabbitpy.message import Message
from rabbitpy.amqp_queue import Queue
from rabbitpy.tx import Tx
 
from rabbitpy.simple import SimpleChannel
from rabbitpy.simple import consume
from rabbitpy.simple import get
from rabbitpy.simple import publish
Loading
Loading
@@ -47,6 +48,7 @@ __all__ = [
'AMQP',
'Connection',
'Channel',
'SimpleChannel',
'Exchange',
'DirectExchange',
'FanoutExchange',
Loading
Loading
Loading
Loading
@@ -9,6 +9,41 @@ from rabbitpy import exchange
from rabbitpy import message
 
 
class SimpleChannel(object):
"""The rabbitpy.simple.Channel class creates a context manager
implementation for use on a single channel where the connection is
automatically created and managed for you.
Example:
.. code:: python
import rabbitpy
with rabbitpy.SimpleChannel(url) as channel:
queue = rabbitpy.Queue(channel, 'my-queue')
:param str uri: The AMQP URI to connect with. For URI options, see the
:class:`~rabbitpy.connection.Connection` class documentation.
"""
def __init__(self, uri):
self.connection = None
self.channel = None
self.uri = uri
def __enter__(self):
self.connection = connection.Connection(self.uri)
self.channel = self.connection.channel()
return self.channel
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type and exc_val:
raise
self.channel.close()
self.connection.close()
def consume(uri=None, queue_name=None, no_ack=False, prefetch=None,
priority=None):
"""Consume messages from the queue as a generator:
Loading
Loading
@@ -27,14 +62,11 @@ def consume(uri=None, queue_name=None, no_ack=False, prefetch=None,
:raises: py:class:`ValueError`
 
"""
if not queue_name:
raise ValueError('You must specify a queue name to consume from')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
queue = amqp_queue.Queue(channel, queue_name)
for msg in queue.consume(no_ack, prefetch, priority):
yield msg
_validate_name(queue_name, 'queue')
with SimpleChannel(uri) as channel:
queue = amqp_queue.Queue(channel, queue_name)
for msg in queue.consume(no_ack, prefetch, priority):
yield msg
 
 
def get(uri=None, queue_name=None):
Loading
Loading
@@ -49,13 +81,10 @@ def get(uri=None, queue_name=None):
:raises: py:class:`ValueError`
 
"""
if not queue_name:
raise ValueError('You must specify a queue name to get a message from')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
queue = amqp_queue.Queue(channel, queue_name)
return queue.get(False)
_validate_name(queue_name, 'queue')
with SimpleChannel(uri) as channel:
queue = amqp_queue.Queue(channel, queue_name)
return queue.get(False)
 
 
def publish(uri=None, exchange_name=None, routing_key=None,
Loading
Loading
@@ -77,14 +106,14 @@ def publish(uri=None, exchange_name=None, routing_key=None,
if exchange_name is None:
exchange_name = ''
 
with connection.Connection(uri) as conn:
with conn.channel() as channel:
msg = message.Message(channel, body or '', properties or dict())
if confirm:
channel.enable_publisher_confirms()
return msg.publish(exchange_name, routing_key or '')
else:
msg.publish(exchange_name, routing_key or '')
with SimpleChannel(uri) as channel:
msg = message.Message(channel, body or '', properties or dict())
if confirm:
channel.enable_publisher_confirms()
return msg.publish(exchange_name, routing_key or '',
mandatory=True)
else:
msg.publish(exchange_name, routing_key or '')
 
 
def create_queue(uri=None, queue_name='', durable=True, auto_delete=False,
Loading
Loading
@@ -113,19 +142,17 @@ def create_queue(uri=None, queue_name='', durable=True, auto_delete=False,
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
dlx_routing_key = dead_letter_routing_key
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = amqp_queue.Queue(channel, queue_name,
durable=durable,
auto_delete=auto_delete,
max_length=max_length,
message_ttl=message_ttl,
expires=expires,
dead_letter_exchange=dead_letter_exchange,
dead_letter_routing_key=dlx_routing_key,
arguments=arguments)
obj.declare()
_validate_name(queue_name, 'queue')
with SimpleChannel(uri) as channel:
amqp_queue.Queue(channel, queue_name,
durable=durable,
auto_delete=auto_delete,
max_length=max_length,
message_ttl=message_ttl,
expires=expires,
dead_letter_exchange=dead_letter_exchange,
dead_letter_routing_key=dead_letter_routing_key,
arguments=arguments).declare()
 
 
def delete_queue(uri=None, queue_name=None):
Loading
Loading
@@ -139,12 +166,9 @@ def delete_queue(uri=None, queue_name=None):
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
if not queue_name:
raise ValueError('You must specify a queue name to consume from')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
queue = amqp_queue.Queue(channel, queue_name)
queue.delete()
_validate_name(queue_name, 'queue')
with SimpleChannel(uri) as channel:
amqp_queue.Queue(channel, queue_name).delete()
 
 
def create_direct_exchange(uri=None, exchange_name=None, durable=True):
Loading
Loading
@@ -158,13 +182,7 @@ def create_direct_exchange(uri=None, exchange_name=None, durable=True):
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to create')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.DirectExchange(channel, exchange_name,
durable=durable)
obj.declare()
_create_exchange(uri, exchange_name, exchange.DirectExchange, durable)
 
 
def create_fanout_exchange(uri=None, exchange_name=None, durable=True):
Loading
Loading
@@ -178,13 +196,7 @@ def create_fanout_exchange(uri=None, exchange_name=None, durable=True):
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to create')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.FanoutExchange(channel, exchange_name,
durable=durable)
obj.declare()
_create_exchange(uri, exchange_name, exchange.FanoutExchange, durable)
 
 
def create_headers_exchange(uri=None, exchange_name=None, durable=True):
Loading
Loading
@@ -198,13 +210,7 @@ def create_headers_exchange(uri=None, exchange_name=None, durable=True):
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to create')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.HeadersExchange(channel, exchange_name,
durable=durable)
obj.declare()
_create_exchange(uri, exchange_name, exchange.HeadersExchange, durable)
 
 
def create_topic_exchange(uri=None, exchange_name=None, durable=True):
Loading
Loading
@@ -218,13 +224,7 @@ def create_topic_exchange(uri=None, exchange_name=None, durable=True):
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to create')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.TopicExchange(channel, exchange_name,
durable=durable)
obj.declare()
_create_exchange(uri, exchange_name, exchange.TopicExchange, durable)
 
 
def delete_exchange(uri=None, exchange_name=None):
Loading
Loading
@@ -237,9 +237,34 @@ def delete_exchange(uri=None, exchange_name=None):
:raises: :py:class:`rabbitpy.RemoteClosedException`
 
"""
if not exchange_name:
raise ValueError('You must specify a exchange name to delete')
with connection.Connection(uri) as conn:
with conn.channel() as channel:
obj = exchange.Exchange(channel, exchange_name)
obj.delete()
_validate_name(exchange_name, 'exchange')
with SimpleChannel(uri) as channel:
exchange.Exchange(channel, exchange_name).delete()
def _create_exchange(uri, exchange_name, exchange_class, durable):
"""Create an exchange from RabbitMQ. This should only be used for one-off
operations.
:param str uri: AMQP URI to connect to
:param str exchange_name: The exchange name to create
:param bool durable: Exchange should survive server restarts
:raises: :py:class:`ValueError`
:raises: :py:class:`rabbitpy.RemoteClosedException`
"""
_validate_name(exchange_name, 'exchange')
with SimpleChannel(uri) as channel:
exchange_class(channel, exchange_name, durable=durable).declare()
def _validate_name(value, obj_type):
"""Validate the specified name is set.
:param str value: The value to validate
:param str obj_type: The object type for the error message if needed
:raises: ValueError
"""
if not value:
raise ValueError('You must specify the {} name'.format(obj_type))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment