Posted by & filed under DevOps, Python, Web development.

Let me start this off by saying RabbitMQ is a great piece of software. It excels in areas where other messaging stacks are either too cumbersome or overly complicated. Plus, it’s written in wacky Erlang.

With all of its merits, however, a common hurdle is trying understand its RabbitMQ’s exchange, queue, and routing key terminology. Only adding to the confusion, the documentation states routing key functionality depends on the exchange type being used. Developers trying to adopt RabbitMQ looking for a durable and simple message stack are often left feeling a bit betrayed.

For many users, the exchange, queue, and routing key concepts go beyond what will ever be needed. It’s important to understand how they work, and how powerful they are, but it’s just as important that the extra moving parts aren’t a deterrent.

Our examples below deviate from the common examples in both the RabbitMQ documentation and other examples you’ll find on the web in that we declare and bind queues on the producer side instead of the consumer side. Since a bound queue must exist for produced messages to not be dropped, it’s much easier to understand the flow of data this way. Using this method, a producer can be coded and publish messages to an exchange first without the consumer losing messages because they were sent before the queue was declared.

The following code examples use the Python client Pika, though other clients should behave similarly.

Major Exchange Types

Fanout

The fanout exchange type pushes every message it receives into all of the queues bound to it. Routing keys are not used (though to make things complicated, many clients require routing keys to be specified anyway). Here’s an example of a logging system that creates an exchange, binds two queues to that exchange, and then publishes some messages to the exchange. Because the fanout exchange type is used, the exchange will send all messages it receives to every queue bound to it. One consumer could be tailing the messages from the logwatch queue while another consumer provides Splunk or some other log analyzer with the exact same messages from the logprocessor queue. Again, no routing keys are actually used here.

# ## Producer.py ##
#
# Setup connection to RabbitMQ
import pika
connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server'))
channel = connection.channel()

# Declare the exchange
channel.exchange_declare(exchange='logs', type='fanout')

# Declare a logwatch queue and another logprocessor queue
channel.queue_declare(queue='logwatch')
channel.queue_declare(queue='logprocessor')

# Bind the queues to the exchange, note no routing key is specified
channel.queue_bind(exchange='logs', queue='logwatch')
channel.queue_bind(exchange='logs', queue='logprocessor')

# Publish some messages
channel.basic_publish(exchange='logs', routing_key='ignored', body='This is log content')
channel.basic_publish(exchange='logs', routing_key='ignored', body='Somebody visited a page')
# ## Consumer.py ##
#
# Setup connection to RabbitMQ
import pika
connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server'))
channel = connection.channel()

# Create a callback function
def cb(ch, methods, property, body):
    print body

# Consume the queue
channel.basic_consume(cb, queue='logwatch')
channel.start_consuming()

A consumer is now only required to consume from the logwatch queue, logprocessor queue, or both — each queue will receive all messages sent to the logs exchange.

Topic

The topic exchange type is perhaps the most useful outside of the broadcasting-style fanout type and allows you use a more structured approach to publishing and consuming messages. Consider the following scenario: an email platform receives emails that it classifies as Clean, SPAM, Possible SPAM, and Exploit (virus attached, phishing attempts, etc). Clean emails should be delivered the appropriate account, SPAM emails should be flagged and placed into a junk folder, Possible SPAM should be delivered, but flagged, and finally Exploit emails should not be delivered and should be placed in a containment area for further analysis.

# Setup connection to RabbitMQ
import pika
connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server'))
channel = connection.channel()

# Declare the exchange
channel.exchange_declare(exchange='email', type='topic')

# Declare queues for specific messages
channel.queue_declare(queue='all_messages')
channel.queue_declare(queue='clean_messages')
channel.queue_declare(queue='dirty_messages')
channel.queue_declare(queue='spam_messages')
channel.queue_declare(queue='pspam_messages')
channel.queue_declare(queue='exploit_messages')

# Bind the queues to the exchange, using the routing key topic dot-notation
channel.queue_bind(exchange='email', queue='all_messages', routing_key='incoming.*.*')
channel.queue_bind(exchange='email', queue='clean_messages', routing_key='incoming.clean')
channel.queue_bind(exchange='email', queue='dirty_messages', routing_key='incoming.dirty.*')
channel.queue_bind(exchange='email', queue='spam_messages', routing_key='incoming.dirty.spam')
channel.queue_bind(exchange='email', queue='pspam_messages', routing_key='incoming.dirty.pspam')
channel.queue_bind(exchange='email', queue='exploit_messages', routing_key='incoming.dirty.exploit')

# Publish some messages
channel.basic_publish(exchange='email', routing_key='incoming.clean', body='This is a clean message')
channel.basic_publish(exchange='email', routing_key='incoming.dirty.spam', body='This is SPAM')
channel.basic_publish(exchange='email', routing_key='incoming.dirty.exploit', body='This is an exploit')
# ## Consumer.py ##
#
# Setup connection to RabbitMQ
import pika
connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server'))
channel = connection.channel()

# Create a callback function
def cb(ch, methods, property, body):
    print body

# Consume the clean_messages queue
channel.basic_consume(cb, queue='clean_messages')
channel.start_consuming()
  • Clients listening on the all_messages queue will receive all three messages
  • Clients listening on the clean_messages queue will receive the clean message
  • Clients listening on the dirty_messages queue will receive both the SPAM and exploit messages
  • Clients listening on the spam_messages queue will receive the SPAM message
  • Clients listening on the pspam_messages queue will not receive any messages
  • Clients listening on the exploit_messages queue will receive the exploit message

The topic exchange dot-notation supports * and # to denote a wildcard within the routing key and “all messages”, respectively. incoming.*.* could be replaced with #, but it was left as-is to demonstrate the use of multiple wildcards. Do note however, that incoming.* would not match incoming.dirty.[something].

Direct

The direct exchange type is simply binding a routing key to one or more queues, and is emulated by the topic exchange type for any routing keys that don’t include * or #.

Final thoughts

RabbitMQ abstracts a simple queue concept into three major components: exchange, routing key, and queue. In doing so, it creates a rich and customizable messaging stack at the expense of some slight complexity.

Remember:

  • Producers only push messages into exchanges
  • Use routing keys to connect exchanges with queues, unless you’re using a fanout exchange
  • Consumers only read messages from queues

 

Sign up for Turret.IO — the only data-driven marketing platform made specifically for developers.

Leave a Reply

Your email address will not be published. Required fields are marked *