Example of implementation of Redis processing MQ consumption idempotence
This solution refers to the message idempotence processing solution in Mage short connection project
1. Generator template code
@Slf4j @Component @RequiredArgsConstructor @ConditionalOnProperty(name = "", havingValue = "rocketmq") public class CustomMessageProducer implements MessageQueueProducer { private final RocketMQTemplate rocketMQTemplate; @Value("${}") private String customTopic; /** * General sending method, allowing custom message content and processing logic * @param messagePayload message body data */ @Override public void send(Map<String, String> messagePayload) { // Generate a unique message key to identify the idempotence of the message String messageId = ().toString(); ("messageId", messageId); // Build the message Message<Map<String, String>> message = MessageBuilder .withPayload(messagePayload) .setHeader(MessageConst.PROPERTY_KEYS, messageId) .build(); // Send a message and process the result try { SendResult sendResult = (customTopic, message, 2000L); ("The message was sent successfully: state={}, informationID={}, information键={}", (), (), messageId); } catch (Exception e) { ("information发送失败: information内容={}", (messagePayload), e); // Add custom failure handling logic } } }
Key Notes
-
Idepotency mark:exist
messagePayload
JoinmessageId
, ensure message uniqueness, and facilitate subsequent verification and processing of message idempotence in Redis. -
Custom logic: Adjust for different business needs
customTopic
andmessagePayload
content. - Exception handling: Keep a custom exception handling interface to process or retry the failed messages.
This design helps achieve general message sending, just changemessagePayload
Data structures and custom processing logic can adapt to different businesses.
2. Message Iptimate Processor Template Code
package ; import ; import ; import ; import ; import ; /** * Message Iptimate Processor */ @Component @RequiredArgsConstructor public class MessageIdempotentHandler { private final StringRedisTemplate stringRedisTemplate; private static final String IDEMPOTENT_KEY_PREFIX = "message:idempotent:"; /** * Determine whether the message has been processed * * @param messageId message unique identifier * @return true means that the message has not been processed and can be continued; false means that the message has been processed to avoid repeated consumption */ public boolean isMessageNotProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; // Try to set a new key, if it does not exist, it returns true to indicate that it is not processed, and if it exists, it returns false return (().setIfAbsent(key, "0", 2, )); } /** * The mark message processing process is completed * * @param messageId message unique identifier */ public void markAsProcessed(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; ().set(key, "1", 2, ); } /** * Query whether the message has been processed * * @param messageId message unique identifier * @return true means message processing has been completed, false means not completed */ public boolean isProcessingComplete(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; return (().get(key), "1"); } /** * Delete idempotent identifiers when handling exceptions * * @param messageId message unique identifier */ public void clearProcessedFlag(String messageId) { String key = IDEMPOTENT_KEY_PREFIX + messageId; (key); } }
Responsibility description
Overall Responsibilities:
MessageIdempotentHandler
The main responsibility is to ensure that the news is in the consumption processOnly processed once, prevent repeated consumption. It stores and checks unique identifiers for messages with the help of Redis to enable idempotence control of messages.-
Method Responsibilities:
-
isMessageNotProcessed
: Determine whether the message has been processed. This method attempts to set a short-term expired flag, and if the message has not been consumed (i.e., the key does not exist in Redis), it can continue processing, otherwise it means that the message has been processed. -
markAsProcessed
: Mark the message as completed consumption. Call this method after successful consumption, change the key value identification in Redis, mark it as completed, and avoid repeated processing. -
isProcessingComplete
: Check whether the message consumption process is completed. After the consumption is completed, the key value will be set to"1"
, this method is used to verify the status. -
clearProcessedFlag
: Clear the idempotent flag. In case of message consumption failure or exception, delete the identifier so that the message can be re-consumed.
-
3. Producer code template
@Override public void onMessage(Map<String, String> producerMap) { // Get the unique identifier of the message String keys = ("keys"); // Check whether the message has been processed, idempotence control if (!(keys)) { // If the message process has not been completed yet if ((keys)) { return; // Skip the message that completed the process } throw new ServiceException("Message process has not been completed, message queue needs to be retryed"); } // Business logic processing try { //Calling business method code } } catch (Throwable ex) { ("Exception in consumption", ex); throw ex; } // Tag message processing is completed (keys); }
Responsibility description
onMessage
The responsibility of the template isReceive and process messages in the message queue, ensure idempotence, and throw an exception if needed to allow the message queue to retry. Here are the responsibilities and instructions for the key steps in this method:
-
Message Impotence Verification:
- pass
keys
As a unique identifier, check whether the message has been processed and avoid repeated consumption. - use
(keys)
Method, skip if the message has been processed; if the processing process has not been completed, but the status is "completed", it will be returned directly; otherwise, it will be thrownServiceException
, let the message queue try again.
- pass
-
Business logic processing:
- exist
try
The actual business processing of messages is carried out in the block, and the relevant business logic methods are called to avoid uncaught exceptions causing inconsistencies in idempotence marks. - If an exception is thrown during business processing, the exception log is recorded (
("Consumption abnormality", ex)
) and throw an exception again.
- exist
-
Message processing completion flag:
- After successfully completing the business logic, call
(keys)
Mark the message as completed. - This ensures that the status of the message is updated correctly, and can avoid repeated consumption even after retry, ensuring the idempotence of the message.
- After successfully completing the business logic, call
4. Message consumption process
Assume that the application scenario of message consumption is as follows:
-
Receive message: The message queue receives a new message and obtains the unique identifier of the message.
messageId
。 -
Check if it has been processed:
- Call
isMessageNotProcessed
Check whether the message is identified in Redis. - If you return
true
, it means that this message has not been processed and enters the consumption process. - If you return
false
, it means that this message has been processed, and you can skip it directly to avoid repeated consumption.
- Call
- Consumption news: If the message is not processed, the specific business logic will be executed.
-
Mark completion status:
- After the consumption is completed, call
markAsProcessed
, mark the message as completed in Redis.
- After the consumption is completed, call
-
Exception handling:
- If an exception is thrown during consumption, call
clearProcessedFlag
Delete the Redis identity of the message, allowing the system to retry the message later.
- If an exception is thrown during consumption, call
5. Summary
In this solution, by using Redis to realize idempotent processing of MQ messages, it ensures that the message will only be processed once during the consumption process, avoiding business exceptions and resource waste caused by repeated consumption. Its main features and advantages are as follows:
-
Idepotency assurance: Using Redis
setIfAbsent
To determine whether the message has been processed, ensure that the message is executed only once during the consumption process, and avoid the risk of repeated consumption. -
General design:
- Producer code adopts a common template, through
UUID
Unique ID of generated messagemessageId
, and embed it into the message body to ensure the uniqueness of each message. - Consumer code adopts idempotent processor template
MessageIdempotentHandler
, supports multiple state checks and exception handling.
- Producer code adopts a common template, through
- Refine exception handling: During the consumption process, if an exception occurs, the identifier in Redis can be deleted in time to ensure that the system will not be mistaken for processing when re-consuming the message next time, which enhances the robustness of message consumption.
- Flexible business integration: This solution can adjust message content and customized business logic processing according to different business needs, adapting to message idempotent consumption needs in various scenarios.
-
Process management:pass
markAsProcessed
andclearProcessedFlag
The consumption completion status marking and abnormal retry mechanism are realized to ensure the reliability and consistency of consumption.
In general, this solution provides a scalable, general and efficient implementation method for message idempotence control, which is very suitable for application in highly concurrent distributed systems, and can effectively improve the stability and security of message consumption.
This is the end of this article about the implementation example of Redis processing MQ consumption idempotence. For more related Redis MQ consumption idempotence content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!
Related Articles
Solution to uneven hash distribution in Redis
This article mainly introduces relevant information on solutions to uneven hash distribution in Redis. Friends who need it can refer to it.2021-02-02Detailed explanation of Redis master-slave replication practice
This article will demonstrate how to configure, implement and implement the master-slave replication, and the three major strategies of Redis master-slave replication, full replication, partial replication and immediate replication.2021-05-05Analysis on the principle of efficient search of geographic location in Redis
This article mainly introduces how Redis efficiently retrieves geographical location. Through geo-related commands, it is easy to store and use latitude and longitude coordinate information in Redis. Follow the editor to see the specific implementation method.2021-06-06How to start Redis in Windows environment
This article mainly introduces how Redis can be started in the windows environment. It has good reference value. I hope it will be helpful to everyone. If there are any errors or no complete considerations, I hope you will be very encouraged.2025-04-04Sample code for jwt+redis to implement login authentication
In the login business code, when the user logs in successfully, a login credential is generated and stored in Redis. This article mainly introduces the example code for jwt+redis to implement login authentication, which has certain reference value. If you are interested, you can learn about it.2024-03-03Detailed explanation of the implementation of Redis Sentinel Mode
This article mainly introduces a detailed explanation of the implementation of the Redis Sentinel model. The example code is introduced in this article in detail, which has certain reference learning value for everyone's study or work. Friends who need it, please learn with the editor below.2023-06-06Implementation method of Redis internal data structure Dict
This article mainly introduces the implementation method of Redis internal data structure Dict. The main function of dict in Redis is to maintain all the key and value map data structures in the Redis database. Friends who need it can refer to it2022-05-05Redis, the difference between Hash and String using object information
This article mainly introduces the difference between Hash and String in Redis storage object information. The article focuses on the topic and has a certain reference value. Friends who need it can refer to it.2022-09-09Detailed explanation of how to prevent repeated sending of RabbitMQ messages using Redis
I encountered a problem today. When sending MQ messages, you need to ensure that it will not be sent repeatedly. Note that it is not reliable to arrive. Here, it is guaranteed that multiple same messages will not be produced. Therefore, this article mainly introduces the method of using Redis to prevent repeated sending of RabbitMQ messages. Friends who need it can refer to it.2025-01-01Reasons for Redis Memory Fragmentation and Analysis of the Principle of Pipeline Pipeline
This article mainly introduces the reasons for the occurrence of Redis memory fragments and the analysis of the principle of Pipeline pipeline. Friends in need can refer to it for reference. I hope it can be helpful. I wish you more progress and get promoted as soon as possible to get a salary increase.2023-03-03