SoFunction
Updated on 2025-04-14

Detailed explanation of the usage method of RocketMQ in Java

In Spring Boot, RocketMQ and Kafka are both commonly used message middleware. Their usage methods have some similarities and their respective characteristics.

1. The use of RocketMQ in Spring Boot

  • Introduce dependencies

    • On the projectAdd RocketMQ dependencies to the file.
    <dependency>
        <groupId></groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.3</version>
    </dependency>
    
  • Configure RocketMQ

    • existorConfigure the relevant parameters of RocketMQ in the file, such as namesrvAddr (NameServer address), etc.
    -server=127.0.0.1:9876
    
  • Producer

    • Create a producer class, use@ResourceinjectionRocketMQTemplate
    import ;
    import ;
    import ;
    
    @Component
    public class RocketMQProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendMessage(String topic, String message) {
            (topic, message);
        }
    }
    
  • consumer

    • Create a consumer class, use@RocketMQMessageListenerAnnotation specifies the topic and consumption group to listen.
    import ;
    import ;
    import ;
    
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener&lt;String&gt; {
        @Override
        public void onMessage(String message) {
            // Process the received message        ("Received message: " + message);
        }
    }
    

2. The use of Kafka in Spring Boot

  • Introduce dependencies

    • existAdd Kafka dependencies to the file.
    <dependency>
        <groupId></groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.8.12</version>
    </dependency>
    
  • Configure Kafka

    • existorConfigure Kafka related parameters in the file, such as bootstrapServers (Kafka server address), etc.
    -servers=127.0.0.1:9092
    
  • Producer

    • Create a producer class, use@ResourceinjectionKafkaTemplate
    import ;
    import ;
    import ;
    
    @Component
    public class KafkaProducer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String topic, String message) {
            (topic, message);
        }
    }
    
  • consumer

    • Create a consumer class, use@KafkaListenerAnnotation specifies the topic and consumption group to listen.
    import ;
    import ;
    
    @Component
    public class KafkaConsumer {
        @KafkaListener(topics = "your_topic", groupId = "your_consumer_group")
        public void onMessage(String message) {
            // Process the received message        ("Received message: " + message);
        }
    }
    

In general, RocketMQ and Kafka are both more convenient to use in Spring Boot. The specific message middleware selection can be determined based on the actual needs of the project. RocketMQ may have advantages such as high throughput and low latency in some scenarios, while Kafka is widely used in large-scale distributed systems, with high reliability and scalability.

3. How to ensure the order of message queues

1. Ensure order of sending end

  • Reasonable design of business

    • Ensure that messages with sequential requirements are sent to the same queue of the same topic (Queue). For example, messages from the same type of business are classified according to specific rules so that they all enter the same queue.
    • Try to send messages to one sender in a business scenario to avoid the possible out-of-order transmission of multiple senders.
  • Use Synchronous Send

    • When sending messages, use synchronous sending methodssend(Message msg, long timeout), ensure that the message is sent successfully before sending the next message. This can avoid the out-of-order situation of messages that may be caused by asynchronous sending.

2. Ensure order of consumer end

  • Single thread consumption

    • When consumers consume messages, they use a single thread to consume. This ensures that messages in the same queue are processed in sequence in the order they are sent.
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener&lt;String&gt; {
        @Override
        public void onMessage(String message) {
            // Process the received message        ("Received message: " + message);
        }
    }
    

    In practical applications, consumption logic can be placed in a separate method, and then processed sequentially in this method to ensure the order of messages.

  • Avoid concurrent processing

    • Ensure that there will be no concurrent processing during the consumption of messages. For example, do not start other asynchronous tasks or multi-thread processing while consuming messages to avoid destroying the sequence of messages.

3. Set the queue number

  • Control the number of queues
    • If the business requires very strict message order, consider reducing the number of queues under the topic. Typically, a topic can contain multiple queues and messages are randomly distributed to different queues. If the number of queues is small, messages are more likely to be sent to the same queue, making it easier to ensure order.

Through the above methods, the order of RocketMQ messages can be guaranteed to a certain extent. But it should be noted that ensuring message order may sacrifice a certain amount of performance and throughput, so trade-offs and choices need to be made based on actual business needs.

4. How to ensure the reliability of message queues

1. Send

  • Synchronous sending and confirmation

    • Use synchronous sending methodsend(Message msg, long timeout), This method will wait for the confirmation that the message is successfully sent to the Broker, ensuring that the message is correctly sent to the Broker. If the sending fails or timeouts, retry or other error handling operations can be performed.
    try {
        SendResult sendResult = (topic, message);
        ("Message sent successfully: " + sendResult);
    } catch (Exception e) {
        ("Failed to send message: " + ());
        // Retry or other error handling}
    
  • Transaction message

    • For some scenarios where transaction consistency is required, RocketMQ's transaction message mechanism can be used. Sending transaction messages is divided into two stages. First, sending semi-transaction messages, and then executing local transactions, and deciding to commit or roll back transaction messages based on the results of local transactions.
    @Service
    public class TransactionProducer {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void sendTransactionMessage() {
            TransactionSendResult result = ("transactionTopic", new Message<>("transactionMessage"), null);
            ("Transaction message sent: " + result);
        }
    }
    

2. Broker end

  • Persistent storage

    • RocketMQ supports persistent storage of messages, which can be stored on disk to prevent messages from being lost. By configurationIn the fileflushDiskTypeParameters: You can choose the synchronous brushing or asynchronous brushing method. Synchronous flushing can ensure that messages will only return a successful response after being written to disk, but will affect performance; asynchronous flushing can improve performance, but some unflushed messages may be lost in the event of a system failure.
  • High availability deployment

    • Deploy a RocketMQ cluster with multiple masters and multiple slaves. When the master node fails, the slave node can automatically switch to the master node to ensure the availability of message services. At the same time, the master-slave synchronization method can be configured to ensure reliable synchronization of messages between master-slave nodes.

3. Consumer side

  • Consumption confirmation

    • After the consumer successfully processed the message, he or she needs to send a consumption confirmation to the Broker. Can be set byconsumeModeforCONSUME_PASSIVELY(passive consumption model), and manually call after processing the messageacknowledge()Method confirmation. If the consumption fails, you can choose to try again or send the message to the dead letter queue for subsequent processing.
    @Component
    @RocketMQMessageListener(topic = "your_topic", consumerGroup = "your_consumer_group")
    public class RocketMQConsumer implements RocketMQListener&lt;String&gt; {
        @Override
        public void onMessage(String message) {
            try {
                // Process messages            ("Received message: " + message);
                // Confirm the consumption success            getRocketMQListenerContainer().acknowledge();
            } catch (Exception e) {
                ("Failed to process message: " + ());
                // You can choose to try again or send to the dead letter queue        }
        }
    }
    
  • Retry mechanism

    • Configure the number of retry times and retry intervals for the consumer. When consumption fails, RocketMQ will automatically retry. Can beorMedium configurationandParameters to control the retry policy.

Through the above measures, the reliability of RocketMQ messages can be ensured at different stages, ensuring that messages will not be lost or errors during production, storage and consumption.

5. Ensure the idempotence of message processing

In RocketMQ, the following ways can be used to ensure the idempotence of message processing:

1. Business level design

  • Use a unique identifier

    • Generate a unique identifier for each message in the service, such as using the service flow number, order number, etc. as the unique identifier of the message. When consuming messages, first determine whether the message has been processed based on this unique identifier. If it has been processed, the message is ignored directly.
    • For example, in an e-commerce system, the message created by an order can be uniquely identified by an order number. When a consumer processes a message, he first checks whether there is a processing record corresponding to the order number in the database. If it exists, it means that the message has been processed and will not be processed repeatedly.
    @Service
    public class OrderProcessingService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void processOrderMessage(String orderId) {
            boolean isProcessed = isOrderProcessed(orderId);
            if (isProcessed) {
                return;
            }
            // Order processing logic        ("Processing order: " + orderId);
            markOrderAsProcessed(orderId);
        }
    
        private boolean isOrderProcessed(String orderId) {
            int count = (
                    "SELECT COUNT(*) FROM processed_orders WHERE order_id =?",
                    , orderId);
            return count &gt; 0;
        }
    
        private void markOrderAsProcessed(String orderId) {
            (
                    "INSERT INTO processed_orders (order_id) VALUES (?)",
                    orderId);
        }
    }
    
  • Take advantage of database constraints

    • You can use unique indexes, primary key constraints, etc. in the database to ensure the uniqueness of business data. When processing messages, if these constraints are violated, it means that the message has been processed and will not be processed repeatedly.
    • For example, in the scenario where user registration is registered, the user name or email address can be used as a unique index in the user table of the database. When consuming the user's registered message, try to insert user data. If the insertion fails (because the unique index constraint is violated), it means that the user has registered and will not be processed repeatedly.
    @Service
    public class UserRegistrationService {
        @Autowired
        private JdbcTemplate jdbcTemplate;
    
        public void registerUser(String username, String password) {
            try {
                (
                        "INSERT INTO users (username, password) VALUES (?,?)",
                        username, password);
            } catch (DataIntegrityViolationException e) {
                // Handling the insertion failure may be because the user already exists            ("User already exists: " + username);
            }
        }
    }
    

2. Technical implementation

  • Distributed lock
    • Distributed locks can be used to ensure that only one consumer instance is processing a specific message at the same time. Before processing the message, first acquire the distributed lock. If the acquisition is successful, process the message and release the lock after the processing is completed. If the acquisition of the lock fails, it means that the message is being processed by another instance, and the current instance can choose to wait or ignore the message directly.
    • Distributed locks can be implemented using Redis or Zookeeper. Taking Redis as an example, you can use the SETNX command to implement distributed locking.
    @Service
    public class MessageProcessingService {
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        public void processMessage(String messageId) {
            String lockKey = "message_lock_" + messageId;
            boolean locked = tryLock(lockKey);
            if (!locked) {
                return;
            }
            try {
                boolean isProcessed = isMessageProcessed(messageId);
                if (isProcessed) {
                    return;
                }
                // Processing message logic            ("Processing message: " + messageId);
                markMessageAsProcessed(messageId);
            } finally {
                releaseLock(lockKey);
            }
        }
    
        private boolean tryLock(String key) {
            return ().setIfAbsent(key, "locked", (30));
        }
    
        private void releaseLock(String key) {
            (key);
        }
    
        private boolean isMessageProcessed(String messageId) {
            // Logic to determine whether the message has been processed        return false;
        }
    
        private void markMessageAsProcessed(String messageId) {
            // Logic for marking the message processed    }
    }
    

Through the above methods, the idempotence of RocketMQ message processing can be effectively guaranteed and the business data inconsistency caused by repeated consumption of messages can be avoided.

Summarize

This is the end of this article about the usage of RocketMQ in Java. For more related content on Java RocketMQ, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!