1.Redis Stream message queue template configuration class
/** * Redis Stream Message Queuing Configuration */ @Configuration @RequiredArgsConstructor public class RedisStreamConfiguration { private static final Logger log = (); private final RedisConnectionFactory redisConnectionFactory; private final Consumer1 Consumer1; private final Consumer2 Consumer2; // Define configuration constants that need to be customized private static final int BATCH_SIZE = 10; // Number of messages drawn in batches per batch private static final Duration POLL_TIMEOUT = (3); // Blocking timeout for pulling messages private static final String THREAD_NAME_PREFIX = "your-business"; // Thread name prefix private static final String GROUP_NAME_1 = "group1"; // First consumer group name private static final String GROUP_NAME_2 = "group2"; // The second consumer group name private static final String CONSUMER_NAME_1 = "consumer1"; // The first consumer name private static final String CONSUMER_NAME_2 = "consumer2"; // The second consumer name private static final String STREAM_TOPIC_KEY = SHORT_LINK_STATS_STREAM_TOPIC_KEY; // Stream's theme keys @Bean public ExecutorService asyncStreamConsumer() { ("Redis Stream Message Queue Configuration Thread Pool"); AtomicInteger index = new AtomicInteger(); int processors = ().availableProcessors(); // Create a custom thread pool return new ThreadPoolExecutor( processors, processors + (processors >> 1), 60, , new LinkedBlockingQueue<>(), runnable -> { Thread thread = new Thread(runnable); (THREAD_NAME_PREFIX + "_" + ()); (true); return thread; } ); } @Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer( ExecutorService asyncStreamConsumer) { // Configure StreamMessageListenerContainer container options <String, MapRecord<String, String, String>> options = () .batchSize(BATCH_SIZE) // Batch pull number of messages .executor(asyncStreamConsumer) // Use the configured thread pool .pollTimeout(POLL_TIMEOUT) // Timeout time for pulling messages .build(); // Create StreamMessageListenerContainer instance StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = (redisConnectionFactory, options); // Configure the first message listener ( (GROUP_NAME_1, CONSUMER_NAME_1), // Specify the first consumer group and consumer name (STREAM_TOPIC_KEY, ()), // Specify topic and offset Consumer1 // Specify the first message processing logic ); // Configure the second message listener ( (GROUP_NAME_2, CONSUMER_NAME_2), // Specify the second consumer group and consumer name (STREAM_TOPIC_KEY, ()), // Specify topic and offset Consumer2 // Specify the second message processing logic ); return container; } }
1. Introduction
RedisStreamConfiguration
is a Spring configuration class for configuring Redis Stream message queues. It realizes asynchronous processing of messages and multi-consumer consumption through Redis Stream, suitable for business scenarios that require high throughput and low latency.
2. Key components and custom parameters
This class mainly configures the Redis Stream message listening containerStreamMessageListenerContainer
, including thread pool configuration, consumption batches and timeout time, etc., which facilitates users to customize according to business needs.
Core parameters
-
BATCH_SIZE
: Define the number of messages to be pulled in batches per batch. By setting the appropriate batch size, the number of consumption requests can be reduced and processing efficiency can be improved. -
POLL_TIMEOUT
: Sets the timeout time to pull messages from Redis Stream. Timeout control allows the program to remain blocked when there is no message, waiting for the message to arrive. -
THREAD_NAME_PREFIX
: Set the thread name prefix to help identify threads of different business modules. -
GROUP_NAME_1
andGROUP_NAME_2
: Define two different consumer groups, suitable for scenarios where multiple consumers process messages in parallel on the same Stream. -
CONSUMER_NAME_1
andCONSUMER_NAME_2
: Assigning an independent consumer name to each consumer group will help to achieve the allocation and management of consumer tasks.
Code implementation
ConfiguredStreamMessageListenerContainer
To process Stream messages and register different listeners for two consumer groups and consumers respectively.
3. Description of main methods
ExecutorService (thread pool configuration)
@Bean public ExecutorService asyncStreamConsumer() { ... }
Used to create a custom thread pool that provides an asynchronous execution environment for message consumption in Redis Stream.processors
Set the number of core threads to the CPU core number, and the maximum number of threads isprocessors + (processors >> 1)
, that is, 1.5 times the number of cores. Thread naming useTHREAD_NAME_PREFIX
Prefix to facilitate logging and troubleshooting.
StreamMessageListenerContainer (Message Listening Container)
@Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer(...) { ... }
This method creates and configures the listening container for Redis Stream. The key steps are as follows:
Build container options: including batch size, thread pool, pull timeout and other parameters.
Container instantiation: by
()
Create a container and starts automatically during initialization.-
Message Listener Configuration
- For the first consumer group
GROUP_NAME_1
And consumersCONSUMER_NAME_1
Configure message listenerConsumer1
, realize automatic confirmation and consumption of messages. - For the second consumer group
GROUP_NAME_2
And consumersCONSUMER_NAME_2
Another set of message listeners is configuredConsumer2
, so that multiple consumers can handle it.
- For the first consumer group
4. Application scenarios
This configuration is suitable for Redis Stream message queue management in large-scale concurrency scenarios. By flexibly configuring multiple consumer groups and consumers, multi-threaded consumption logic for load balancing can be realized.
2. Consumer template
/** * Message queue consumer */ @RequiredArgsConstructor @Slf4j @Component public class ShortLinkStatsSaveConsumer implements StreamListener<String, MapRecord<String, String, String>> { private final RedissonClient redissonClient; private final StringRedisTemplate stringRedisTemplate; private final MessageQueueIdempotentHandler messageQueueIdempotentHandler; @Override public void onMessage(MapRecord<String, String, String> message) { String stream = (); RecordId id = (); if (!(())) { // Determine whether the current message process is completed if ((())) { return; } throw new ServiceException("Message process has not been completed, message queue needs to be retryed"); } try { Map<String, String> producerMap = (); //Your own business logic } // Delete the message ().delete((stream), ()); } catch (Throwable ex) { (()); ("Exception in consumption", ex); throw ex; } //Delete after consumption (()); } }
This template implements aRedis Stream
The infrastructure of message queue consumers. This template mainly focuses on three core functions: idempotence inspection, message analysis and processing, and consumption state management, ensuring the security and consistency of messages in a high concurrency environment.
See another article for specific idempotence verification
3. Producer template
/** * Short link monitoring status save message queue producer */ @Component @RequiredArgsConstructor public class Producer implements MessageQueueProducer{ private final StringRedisTemplate stringRedisTemplate; /** * Send a message */ public void send(Map<String, String> producerMap) { ().add(YOUR_KEY, producerMap); } }
NoticeYOUR_KEY
Just replace it with your own
4. Summary
1. Advantages of Redis Stream Message Queue
Redis Stream is a powerful message queueing solution provided by Redis, suitable for high throughput, low latency business scenarios. Compared to traditional message queueing systems such as RabbitMQ or Kafka, Redis Stream is simpler in integration and configuration, especially suitable for Redis-based applications. Redis Stream offers the following benefits:
- High throughput: supports high concurrency and fast message consumption, and can handle large amounts of messages in an instant.
- Sequential consumption: Ensure sequential consumption of messages, suitable for business scenarios that require sequential processing.
- Consumer group mechanism: Through the consumer group, the processing power can be improved through multiple consumers' parallel consumption.
- Persistence and backup: You can store messages in Redis, which has certain persistence capabilities to prevent data loss.
2. Redis Stream Configuration and Application
This article introduces how to integrate the configuration and consumption logic of Redis Stream message queues in Spring Boot, mainly including:
- Message consumption configuration: via
StreamMessageListenerContainer
Implement asynchronous consumption of messages. Custom parameters such as batch pulling, blocking timeout, thread pool, etc. are configured to help improve the system's concurrent processing capabilities. - Multi-consumer parallel processing: Through the Consumer Group mechanism, multiple consumers can consume the same Stream in parallel, improving the throughput and efficiency of message processing.
- Impotence and Consumption Confirmation: By
MessageQueueIdempotentHandler
To ensure the idempotence of messages and avoid the problem of repeated consumption. The processing logic ensures that each message will be consumed only once and can be rolled back appropriately when consumption fails, ensuring the reliability of the system.
3. Consumer and Producer Template
- Consumer template: Consumers realize
StreamListener
Interface to process messages pulled from Redis Stream. In order to ensure idempotence, the consumer first checks whether the message has been processed. The unfinished message will be marked and retryed to ensure the security of message processing. - Producer template: Producer pass
StringRedisTemplate
Send the message to Redis Stream. When new messages in the business need to be processed, the producer adds the messages to the Redis Stream for subsequent processing.
4. Application scenarios
Redis Stream is suitable for many scenarios, especially business needs that require high concurrency, high throughput and guarantee sequential consumption. For example, business scenarios such as short link generation and access statistics, order processing, log collection, etc. can all achieve efficient and reliable message queues through Redis Stream.
This is the end of this article about the implementation example of SpringBoot integrated Redis message queue. For more related SpringBoot Redis message queue content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!