SoFunction
Updated on 2024-11-18

About the advanced features of RabbitMQ in Java

RabbitMQ Advanced Features

1. Reliable delivery of messages

When using RabbitMQ, as a message sender, you want to eliminate any message loss or delivery failure scenarios.RabbitMQ provides us with two ways to control message delivery reliability patterns.

  • confirm Confirm mode
  • return Return mode

RabbitMQ's entire message delivery path is:producer>rabbitMQ broker> exchange > queue > consumer

  • The message from producer to exchange then returns aconfirmCallback
  • Failure to deliver a message from exchange to queue results in the return of areturnCallback

Use these two callbacks to control the reliable delivery of messages.

1.1 confirm Confirmation mode

(1) Enable Confirmation Mode

To turn on confirmation mode when creating a connection factory. keyword:publisher-confirmsThe default isfalse

<rabbit:connection-factory  
                           host="${}"
                           port="${}"
                           username="${}"
                           password="${}"
                           virtual-host="${-host}"
                           publisher-confirms="true"
/>

(2) RabbitTemplate setup callbacks

@RunWith()
@ContextConfiguration(locations = "classpath:")
public class ProducerTest {
    /**
     * Inject RabbitTemplate
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * Test the default queue to send messages
     */
    @Test
    public void testConfirmCallback() throws InterruptedException {
        // Setting up callbacks
        (new () {
            /**
             * Callback method
             * @param correlationData The correlation data for the callback.
             * @param ack true means send successful, false send failed
             * @param cause cause of failure, ack==true->null
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    ("Sent successfully.");
                } else {
                    ("Failed to send, reason:" + cause);
                    // Post-failure process
                }
            }
        });
        ("spring_queue", "hello world");
        // Prevent incomplete callbacks from closing the channel after sending is complete
        (5000);
    }
}
  • public void confirm(CorrelationData correlationData, boolean ack, String cause)

    • correlationData parameter, when sending data you can carry on the
    • ack Whether to send successfully, success is true, failure is false
    • cause Reason for failure, null on success
  • (5000);Prevent incomplete callbacks from closing the channel after sending is complete

    If it's not added it will

    clean channel shutdown; protocol method: #method<>(reply-code=200, reply-text=OK, class-id=0, method-id=0)

1.2 return The fallback model

(1) Enable fallback mode

<rabbit:connection-factory  host="${}"
                           port="${}"
                           username="${}"
                           password="${}"
                           virtual-host="${-host}"
                           publisher-returns="true"
/>

(2) RabbitTemplate setup callbacks

@Test
    public void testReturnCallback() throws InterruptedException {
        // Set the mode in which the switch handles failure messages
        (true);
        // Setting up callbacks
        (new () {
            /**
             * Return message
             * @param message Message object
             * @param replyCode ErrorCode
             * @param replyText The exchange message
             * @param exchange exchange
             * @param routingKey routing key
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                ("Message object:" + new String(()));
                ("Error code:" + replyCode);
                ("Exchange of Information:" + replyText);
                ("Switchboard:" + exchange);
                ("Routing Key:" + routingKey);
            }
        });
        ("spring_direct_exchange", "direct_key_3",
                "spring_direct_exchange_direct_key_1");
        // Prevent incomplete callbacks from closing the channel after sending is complete
        (5000);
    }

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

  • message Message object
  • replyCode Error Code
  • replyText Exchange of information
  • exchange exchange
  • routingKey routing key

The priority of the mandatory attribute is higher than the priority of the publisher-returns
The value of the publisher-returns property is ignored when the mandatory result is true or false.
The result is determined by publisher-returns when the mandatory result is null (i.e., not configured).

Ack (consumer side)

Ack means Acknowledge. Indicates how the consumer side acknowledges the message when it is received.

There are three types of confirmation:

  • Auto-confirmation:acknowledge="none"
  • Manual confirmation:acknowledge="manual"
  • Confirmation based on anomalies:acknowledge="auto"

Auto-acknowledgement means that once a message is received by a Consumer, it automatically acknowledges receipt and removes the corresponding message from the RabbitMQ message cache.

However, in actual business processing, it is very likely that the message is received and the business processing is abnormal, then the message will be lost. If you set up a manual acknowledgement method, you need to call ``()'' after successful business processing., manually sign off, and if an exception occurs, call the()` method to have it automatically resend the message.

2.1 Setting up manual sign-off

(1) Create a listener to receive messages

When setting up manual reception, have the listener implement theChannelAwareMessageListenerconnector

If the message is successfully processed, the()

If the message processing fails, call the(), broker retransmits consumer

/**
 * @author zhong
 * <p>
 * Consumer Ack mechanism
 * 1. Set up manual signing, acknowledge="manual"
 * 2. Let the listener implement the ChannelAwareMessageListener interface
 * 3. If the message is successfully processed, call ()
 * 4. If the message is not processed successfully, call (), and the broker resends the consumer.
 */
@Component
public class AckSpringQueueListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = ().getDeliveryTag();
        // Receive messages
        ("Message:" + new String(()));
        // Manual sign-off
        /**
         * deliveryTag: identifies the id
         * multiple: acknowledges all messages
         */
        (deliveryTag, true);
        // Manual Rejection
        /**
         * requeue:if rejected messages should be re-queued instead of discarded/dead mail
         */
        //(deliveryTag, true, true);
    }
}

(2) Setting up manual, adding listening

Set up manual sign-off, acknowledge="manual"

<context:component-scan base-package=""/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" auto-declare="true">
    <rabbit:listener ref="ackSpringQueueListener" queue-names="spring_queue"/>
</rabbit:listener-container>

3. Consumer-side flow restriction

One of the roles of MQ is peak shaving, which is achieved through flow limiting on the consumer side.

Consumer-side flow limiting includes the following operations:

  • <rabbit:listener-container> configurationprefetchProperty Settings
  • How many messages the consumer side pulls at a time The consumer side acknowledgement mode must be manual acknowledgement.acknowledge="nanual"

(1) Key profiles:

<context:component-scan base-package=""/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"
                           auto-declare="true">
    <rabbit:listener ref="qosListener" queue-names="spring_queue"/>
</rabbit:listener-container>

(1) Manual confirmationacknowledge="manual"

(2) Setting the threshold prefetch="1"

(2) Key listener code

/**
 * Consumer flow-limiting mechanism
 * 1. Ensure that the ack mechanism is manually acknowledged
 * -container configuration properties
 * perfetch = 1 means that the consumer pulls one message at a time from the mq to consume, and does not proceed to pull the next message until the manual acknowledgement of consumption is complete.
 */
@Component
public class QosListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        ("QosListener:" + new String(()));
        long deliveryTag = ().getDeliveryTag();
        // Signing messages
        (1000);
        (deliveryTag, true);
    }
}

(survival time/expiration time)

The full name of TTL is Time To Live (Time To Live/Expiration Time).

  • When a message reaches the survival time and has not been consumed, it is automatically cleared.
  • RabbitMQ can set an expiration time for a message or for an entire queue.

4.1 Console setup

The RabbitMQ console allows you to set the expiration time of a queue.

4.2 Individual message expiration

@Test
public void testTTL() {
    // Message post-processing queue, set up message parameter information.
    MessagePostProcessor messagePostProcessor = message -> {
        // 1. Set the message for message
        ().setExpiration("50000");// Set expiration time, string, milliseconds
        // 2. Return message
        return message;
    };
    // Incoming
    ("spring_fanout_exchange", "key", "RabbitMQ", messagePostProcessor);
}

4.3 Summary

If the message expiration time is set and the queue expiration time is also set, it takes the shorter time. When a queue expires, all messages in the queue are removed. After a message expires, only if the message is at the top of the queue will it be determined if it has expired (removed).

5. Dead letter queue

Dead Letter Queue, abbreviation: DLX.Dead Letter Exchange

When the message becomes a Dead Message, it can be resent to another switch, which is the DLX.

to this article on the advanced features of Java RabbitMQ article on this , more related to the advanced features of RabbitMQ content please search my previous posts or continue to browse the following related articles I hope you will support me in the future !