SoFunction
Updated on 2025-05-16

Example of implementation of Redis processing MQ consumption idempotence

Example of implementation of Redis processing MQ consumption idempotence

Updated: May 16, 2025 10:40:51 Author: sjsjsbbsbsn
This article mainly introduces the implementation example of Redis processing MQ consumption idempotence. The example code is introduced in this article in detail, which has certain reference learning value for everyone's study or work. Friends who need it, please learn with the editor below.

This solution refers to the message idempotence processing solution in Mage short connection project

1. Generator template code

@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "", havingValue = "rocketmq")
public class CustomMessageProducer implements MessageQueueProducer {
    
    private final RocketMQTemplate rocketMQTemplate;

    @Value("${}")
    private String customTopic;

    /**
      * General sending method, allowing custom message content and processing logic
      * @param messagePayload message body data
      */
    @Override
    public void send(Map<String, String> messagePayload) {
        // Generate a unique message key to identify the idempotence of the message        String messageId = ().toString();
        ("messageId", messageId);

        // Build the message        Message<Map<String, String>> message = MessageBuilder
                .withPayload(messagePayload)
                .setHeader(MessageConst.PROPERTY_KEYS, messageId)
                .build();

        // Send a message and process the result        try {
            SendResult sendResult = (customTopic, message, 2000L);
            ("The message was sent successfully: state={}, informationID={}, information键={}", (), (), messageId);
        } catch (Exception e) {
            ("information发送失败: information内容={}", (messagePayload), e);
            // Add custom failure handling logic        }
    }
}

Key Notes

  • Idepotency mark:existmessagePayloadJoinmessageId, ensure message uniqueness, and facilitate subsequent verification and processing of message idempotence in Redis.
  • Custom logic: Adjust for different business needscustomTopicandmessagePayloadcontent.
  • Exception handling: Keep a custom exception handling interface to process or retry the failed messages.

This design helps achieve general message sending, just changemessagePayloadData structures and custom processing logic can adapt to different businesses.

2. Message Iptimate Processor Template Code

package ;

import ;
import ;
import ;

import ;
import ;

/**
  * Message Iptimate Processor
  */
@Component
@RequiredArgsConstructor
public class MessageIdempotentHandler {

    private final StringRedisTemplate stringRedisTemplate;
    private static final String IDEMPOTENT_KEY_PREFIX = "message:idempotent:";

    /**
      * Determine whether the message has been processed
      *
      * @param messageId message unique identifier
      * @return true means that the message has not been processed and can be continued; false means that the message has been processed to avoid repeated consumption
      */
    public boolean isMessageNotProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        // Try to set a new key, if it does not exist, it returns true to indicate that it is not processed, and if it exists, it returns false        return (().setIfAbsent(key, "0", 2, ));
    }

    /**
      * The mark message processing process is completed
      *
      * @param messageId message unique identifier
      */
    public void markAsProcessed(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        ().set(key, "1", 2, );
    }

    /**
      * Query whether the message has been processed
      *
      * @param messageId message unique identifier
      * @return true means message processing has been completed, false means not completed
      */
    public boolean isProcessingComplete(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        return (().get(key), "1");
    }

    /**
      * Delete idempotent identifiers when handling exceptions
      *
      * @param messageId message unique identifier
      */
    public void clearProcessedFlag(String messageId) {
        String key = IDEMPOTENT_KEY_PREFIX + messageId;
        (key);
    }
}

Responsibility description

  • Overall Responsibilities
    MessageIdempotentHandlerThe main responsibility is to ensure that the news is in the consumption processOnly processed once, prevent repeated consumption. It stores and checks unique identifiers for messages with the help of Redis to enable idempotence control of messages.

  • Method Responsibilities

    • isMessageNotProcessed: Determine whether the message has been processed. This method attempts to set a short-term expired flag, and if the message has not been consumed (i.e., the key does not exist in Redis), it can continue processing, otherwise it means that the message has been processed.
    • markAsProcessed: Mark the message as completed consumption. Call this method after successful consumption, change the key value identification in Redis, mark it as completed, and avoid repeated processing.
    • isProcessingComplete: Check whether the message consumption process is completed. After the consumption is completed, the key value will be set to"1", this method is used to verify the status.
    • clearProcessedFlag: Clear the idempotent flag. In case of message consumption failure or exception, delete the identifier so that the message can be re-consumed.

3. Producer code template

@Override
public void onMessage(Map<String, String> producerMap) {
    // Get the unique identifier of the message    String keys = ("keys");
    
    // Check whether the message has been processed, idempotence control    if (!(keys)) {
        // If the message process has not been completed yet        if ((keys)) {
            return; // Skip the message that completed the process        }
        throw new ServiceException("Message process has not been completed, message queue needs to be retryed");
    }
    
    // Business logic processing    try {
       	//Calling business method code        }
    } catch (Throwable ex) {
        ("Exception in consumption", ex);
        throw ex;
    }
    
    // Tag message processing is completed    (keys);
}


Responsibility description

onMessageThe responsibility of the template isReceive and process messages in the message queue, ensure idempotence, and throw an exception if needed to allow the message queue to retry. Here are the responsibilities and instructions for the key steps in this method:

  • Message Impotence Verification
    • passkeysAs a unique identifier, check whether the message has been processed and avoid repeated consumption.
    • use(keys)Method, skip if the message has been processed; if the processing process has not been completed, but the status is "completed", it will be returned directly; otherwise, it will be thrownServiceException, let the message queue try again.
  • Business logic processing
    • existtryThe actual business processing of messages is carried out in the block, and the relevant business logic methods are called to avoid uncaught exceptions causing inconsistencies in idempotence marks.
    • If an exception is thrown during business processing, the exception log is recorded (("Consumption abnormality", ex)) and throw an exception again.
  • Message processing completion flag
    • After successfully completing the business logic, call(keys)Mark the message as completed.
    • This ensures that the status of the message is updated correctly, and can avoid repeated consumption even after retry, ensuring the idempotence of the message.

4. Message consumption process

Assume that the application scenario of message consumption is as follows:

  • Receive message: The message queue receives a new message and obtains the unique identifier of the message.messageId
  • Check if it has been processed
    • CallisMessageNotProcessedCheck whether the message is identified in Redis.
    • If you returntrue, it means that this message has not been processed and enters the consumption process.
    • If you returnfalse, it means that this message has been processed, and you can skip it directly to avoid repeated consumption.
  • Consumption news: If the message is not processed, the specific business logic will be executed.
  • Mark completion status
    • After the consumption is completed, callmarkAsProcessed, mark the message as completed in Redis.
  • Exception handling
    • If an exception is thrown during consumption, callclearProcessedFlagDelete the Redis identity of the message, allowing the system to retry the message later.

5. Summary

In this solution, by using Redis to realize idempotent processing of MQ messages, it ensures that the message will only be processed once during the consumption process, avoiding business exceptions and resource waste caused by repeated consumption. Its main features and advantages are as follows:

  • Idepotency assurance: Using RedissetIfAbsentTo determine whether the message has been processed, ensure that the message is executed only once during the consumption process, and avoid the risk of repeated consumption.
  • General design
    • Producer code adopts a common template, throughUUIDUnique ID of generated messagemessageId, and embed it into the message body to ensure the uniqueness of each message.
    • Consumer code adopts idempotent processor templateMessageIdempotentHandler, supports multiple state checks and exception handling.
  • Refine exception handling: During the consumption process, if an exception occurs, the identifier in Redis can be deleted in time to ensure that the system will not be mistaken for processing when re-consuming the message next time, which enhances the robustness of message consumption.
  • Flexible business integration: This solution can adjust message content and customized business logic processing according to different business needs, adapting to message idempotent consumption needs in various scenarios.
  • Process management:passmarkAsProcessedandclearProcessedFlagThe consumption completion status marking and abnormal retry mechanism are realized to ensure the reliability and consistency of consumption.

In general, this solution provides a scalable, general and efficient implementation method for message idempotence control, which is very suitable for application in highly concurrent distributed systems, and can effectively improve the stability and security of message consumption.

This is the end of this article about the implementation example of Redis processing MQ consumption idempotence. For more related Redis MQ consumption idempotence content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!

  • Redis
  • MQ
  • Consumption of idempotence

Related Articles

  • Solution to uneven hash distribution in Redis

    This article mainly introduces relevant information on solutions to uneven hash distribution in Redis. Friends who need it can refer to it.
    2021-02-02
  • Detailed explanation of Redis master-slave replication practice

    This article will demonstrate how to configure, implement and implement the master-slave replication, and the three major strategies of Redis master-slave replication, full replication, partial replication and immediate replication.
    2021-05-05
  • Analysis on the principle of efficient search of geographic location in Redis

    This article mainly introduces how Redis efficiently retrieves geographical location. Through geo-related commands, it is easy to store and use latitude and longitude coordinate information in Redis. Follow the editor to see the specific implementation method.
    2021-06-06
  • How to start Redis in Windows environment

    This article mainly introduces how Redis can be started in the windows environment. It has good reference value. I hope it will be helpful to everyone. If there are any errors or no complete considerations, I hope you will be very encouraged.
    2025-04-04
  • Sample code for jwt+redis to implement login authentication

    In the login business code, when the user logs in successfully, a login credential is generated and stored in Redis. This article mainly introduces the example code for jwt+redis to implement login authentication, which has certain reference value. If you are interested, you can learn about it.
    2024-03-03
  • Detailed explanation of the implementation of Redis Sentinel Mode

    This article mainly introduces a detailed explanation of the implementation of the Redis Sentinel model. The example code is introduced in this article in detail, which has certain reference learning value for everyone's study or work. Friends who need it, please learn with the editor below.
    2023-06-06
  • Implementation method of Redis internal data structure Dict

    This article mainly introduces the implementation method of Redis internal data structure Dict. The main function of dict in Redis is to maintain all the key and value map data structures in the Redis database. Friends who need it can refer to it
    2022-05-05
  • Redis, the difference between Hash and String using object information

    This article mainly introduces the difference between Hash and String in Redis storage object information. The article focuses on the topic and has a certain reference value. Friends who need it can refer to it.
    2022-09-09
  • Detailed explanation of how to prevent repeated sending of RabbitMQ messages using Redis

    I encountered a problem today. When sending MQ messages, you need to ensure that it will not be sent repeatedly. Note that it is not reliable to arrive. Here, it is guaranteed that multiple same messages will not be produced. Therefore, this article mainly introduces the method of using Redis to prevent repeated sending of RabbitMQ messages. Friends who need it can refer to it.
    2025-01-01
  • Reasons for Redis Memory Fragmentation and Analysis of the Principle of Pipeline Pipeline

    This article mainly introduces the reasons for the occurrence of Redis memory fragments and the analysis of the principle of Pipeline pipeline. Friends in need can refer to it for reference. I hope it can be helpful. I wish you more progress and get promoted as soon as possible to get a salary increase.
    2023-03-03

Latest Comments