Let me start this off by saying RabbitMQ is a great piece of software. It excels in areas where other messaging stacks are either too cumbersome or overly complicated. Plus, it’s written in wacky Erlang.
With all of its merits, however, a common hurdle is trying understand its RabbitMQ’s exchange, queue, and routing key terminology. Only adding to the confusion, the documentation states routing key functionality depends on the exchange type being used. Developers trying to adopt RabbitMQ looking for a durable and simple message stack are often left feeling a bit betrayed.
For many users, the exchange, queue, and routing key concepts go beyond what will ever be needed. It’s important to understand how they work, and how powerful they are, but it’s just as important that the extra moving parts aren’t a deterrent.
Our examples below deviate from the common examples in both the RabbitMQ documentation and other examples you’ll find on the web in that we declare and bind queues on the producer side instead of the consumer side. Since a bound queue must exist for produced messages to not be dropped, it’s much easier to understand the flow of data this way. Using this method, a producer can be coded and publish messages to an exchange first without the consumer losing messages because they were sent before the queue was declared.
The following code examples use the Python client Pika, though other clients should behave similarly.
Major Exchange Types
Fanout
The fanout exchange type pushes every message it receives into all of the queues bound to it. Routing keys are not used (though to make things complicated, many clients require routing keys to be specified anyway). Here’s an example of a logging system that creates an exchange, binds two queues to that exchange, and then publishes some messages to the exchange. Because the fanout
exchange type is used, the exchange will send all messages it receives to every queue bound to it. One consumer could be tailing the messages from the logwatch
queue while another consumer provides Splunk or some other log analyzer with the exact same messages from the logprocessor
queue. Again, no routing keys are actually used here.
# ## Producer.py ## # # Setup connection to RabbitMQ import pika connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server')) channel = connection.channel() # Declare the exchange channel.exchange_declare(exchange='logs', type='fanout') # Declare a logwatch queue and another logprocessor queue channel.queue_declare(queue='logwatch') channel.queue_declare(queue='logprocessor') # Bind the queues to the exchange, note no routing key is specified channel.queue_bind(exchange='logs', queue='logwatch') channel.queue_bind(exchange='logs', queue='logprocessor') # Publish some messages channel.basic_publish(exchange='logs', routing_key='ignored', body='This is log content') channel.basic_publish(exchange='logs', routing_key='ignored', body='Somebody visited a page')
# ## Consumer.py ## # # Setup connection to RabbitMQ import pika connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server')) channel = connection.channel() # Create a callback function def cb(ch, methods, property, body): print body # Consume the queue channel.basic_consume(cb, queue='logwatch') channel.start_consuming()
A consumer is now only required to consume from the logwatch
queue, logprocessor
queue, or both — each queue will receive all messages sent to the logs
exchange.
Topic
The topic exchange type is perhaps the most useful outside of the broadcasting-style fanout type and allows you use a more structured approach to publishing and consuming messages. Consider the following scenario: an email platform receives emails that it classifies as Clean, SPAM, Possible SPAM, and Exploit (virus attached, phishing attempts, etc). Clean emails should be delivered the appropriate account, SPAM emails should be flagged and placed into a junk folder, Possible SPAM should be delivered, but flagged, and finally Exploit emails should not be delivered and should be placed in a containment area for further analysis.
# Setup connection to RabbitMQ import pika connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server')) channel = connection.channel() # Declare the exchange channel.exchange_declare(exchange='email', type='topic') # Declare queues for specific messages channel.queue_declare(queue='all_messages') channel.queue_declare(queue='clean_messages') channel.queue_declare(queue='dirty_messages') channel.queue_declare(queue='spam_messages') channel.queue_declare(queue='pspam_messages') channel.queue_declare(queue='exploit_messages') # Bind the queues to the exchange, using the routing key topic dot-notation channel.queue_bind(exchange='email', queue='all_messages', routing_key='incoming.*.*') channel.queue_bind(exchange='email', queue='clean_messages', routing_key='incoming.clean') channel.queue_bind(exchange='email', queue='dirty_messages', routing_key='incoming.dirty.*') channel.queue_bind(exchange='email', queue='spam_messages', routing_key='incoming.dirty.spam') channel.queue_bind(exchange='email', queue='pspam_messages', routing_key='incoming.dirty.pspam') channel.queue_bind(exchange='email', queue='exploit_messages', routing_key='incoming.dirty.exploit') # Publish some messages channel.basic_publish(exchange='email', routing_key='incoming.clean', body='This is a clean message') channel.basic_publish(exchange='email', routing_key='incoming.dirty.spam', body='This is SPAM') channel.basic_publish(exchange='email', routing_key='incoming.dirty.exploit', body='This is an exploit')
# ## Consumer.py ## # # Setup connection to RabbitMQ import pika connection = pika.BlockingConnection(pika.connection.URLParameters('amqp://test:test@server')) channel = connection.channel() # Create a callback function def cb(ch, methods, property, body): print body # Consume the clean_messages queue channel.basic_consume(cb, queue='clean_messages') channel.start_consuming()
- Clients listening on the
all_messages
queue will receive all three messages - Clients listening on the
clean_messages
queue will receive the clean message - Clients listening on the
dirty_messages
queue will receive both the SPAM and exploit messages - Clients listening on the
spam_messages
queue will receive the SPAM message - Clients listening on the
pspam_messages
queue will not receive any messages - Clients listening on the
exploit_messages
queue will receive the exploit message
The topic exchange dot-notation supports *
and #
to denote a wildcard within the routing key and “all messages”, respectively. incoming.*.*
could be replaced with #
, but it was left as-is to demonstrate the use of multiple wildcards. Do note however, that incoming.*
would not match incoming.dirty.[something]
.
Direct
The direct exchange type is simply binding a routing key to one or more queues, and is emulated by the topic exchange type for any routing keys that don’t include *
or #
.
Final thoughts
RabbitMQ abstracts a simple queue concept into three major components: exchange, routing key, and queue. In doing so, it creates a rich and customizable messaging stack at the expense of some slight complexity.
Remember:
- Producers only push messages into exchanges
- Use routing keys to connect exchanges with queues, unless you’re using a fanout exchange
- Consumers only read messages from queues
Sign up for Turret.IO — the only data-driven marketing platform made specifically for developers.
Leave a Reply