Consumer Priorities
Overview
Consumer priorities allow you to ensure that high priority consumers receive messages while they are active, with messages only going to lower priority consumers when the high priority consumers block.
Normally, active consumers connected to a queue receive messages from it in a round-robin fashion. When consumer priorities are in use, messages are delivered round-robin if multiple active consumers exist with the same high priority.
Definition of Active Consumers
The above paragraphs refer to consumers as being active
or blocked. At any moment, any given consumer is either
one or the other. An active consumer is one which could receive
a message without waiting. A consumer becomes blocked if it
cannot receive messages - because its channel has reached the
maximum number of unacknowledged messages after issuing
basic.qos
, or simply because of network congestion.
Therefore for each queue, at least one of three things must be true:
- There are no active consumers
- The queue is empty
- The queue is busy delivering messages to consumers
Note that consumers can switch between active and blocked many times per second. We therefore don't expose whether a consumer is active or blocked through the management plugin or rabbitmqctl.
When consumer priorities are in use, you can expect your highest priority consumers to receive all the messages until they become blocked, at which point lower priority consumers will start to receive some. It's important to understand that RabbitMQ will still prioritise delivering messages - it will not wait for a high priority blocked consumer to become unblocked if there is an active lower priority consumer ready.
Using Consumer Priorities
Set the x-priority
argument in the
basic.consume
method to an integer value. Consumers
which do not specify a value have priority 0. Larger numbers
indicate higher priority, and both positive and negative numbers
can be used.
For example (in Java):
Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume("my-queue", false, args, consumer);
This creates a new consumer with priority 10.