The Stream data types introduced by Redis 5.0 bring powerful and flexible message queueing functions to the Redis ecosystem, making up for the shortcomings of previous publish/subscribe modes, such as message persistence, consumer groups, message confirmation and other features.
Redis Stream combines the characteristics of traditional message queues and timing databases, and is suitable for log collection, event-driven applications, real-time analysis and other scenarios.
This article will introduce 6 message processing modes of Redis Stream.
1. Simple Consumption
Basic concepts
The simple consumption model is the most basic way to use Redis Stream. It does not use the consumer group and directly reads messages in the stream. The producer appends the message to the stream, and the consumer reads the message by specifying the start ID.
Core Commands
# Post a messageXADD stream_name [ID] field value [field value ...] # Read the messageXREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id
Implementation example
Redis CLI
# Add message to stream> XADD mystream * sensor_id 1234 temperature 19.8 humidity 56 "1647257548956-0" # Read all messages from scratch> XREAD STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # Start reading from the specified ID> XREAD STREAMS mystream 1647257548956-0 (empty list or set) # Start reading from the latest message ID (blocking and waiting for new messages)> XREAD BLOCK 5000 STREAMS mystream $ (nil)
Java Spring Boot Example
@Service public class SimpleStreamService { @Autowired private StringRedisTemplate redisTemplate; /** * Post a message to Stream */ public String publishEvent(String streamKey, Map<String, Object> eventData) { StringRecord record = (eventData).withStreamKey(streamKey); return ().add(record).getValue(); } /** * Start reading the message from the specified location */ public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) { StreamReadOptions readOptions = ().count(count); return ().read(readOptions, (streamKey, (startId))); } /** * Blocking read message */ public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) { StreamReadOptions readOptions = ().count(10).block((timeoutMillis)); return ().read(readOptions, (streamKey)); } }
Use scenarios
- Simple event logging
- Single consumer scenario
- Time series data collection
- Development and debugging phases
Pros and cons
advantage
- Simple implementation without creating and managing consumer groups
- Directly control where to start consuming messages
- Suitable for single consumer scenarios
shortcoming
- Load balancing cannot be achieved
- Unable to track message confirmation status
- Read message ID needs to be managed manually
- Restart the service and record the last read location by yourself
2. Consumer Groups
Basic concepts
The consumer group allows multiple consumers to jointly process a stream of messages, realize load balancing, and provide a message confirmation mechanism to ensure that messages are processed at least once. Each consumer group maintains its own consumption position, and different consumer groups do not interfere with each other.
Core Commands
# Create a consumer groupXGROUP CREATE stream_name group_name [ID|$] [MKSTREAM] # Read messages from the consumer groupXREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID] # Confirm message processing is completedXACK stream_name group_name message_id [message_id ...]
Implementation example
Redis CLI
# Create a consumer group> XGROUP CREATE mystream processing-group $ MKSTREAM OK # Consumer 1 reads the message> XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # Confirm that the message has been processed> XACK mystream processing-group 1647257548956-0 (integer) 1 # Consumer 2 reads the message (no unprocessed messages are available)> XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream > 1) 1) "mystream" 2) (empty list or set)
Java Spring Boot Example
@Service public class ConsumerGroupService { @Autowired private StringRedisTemplate redisTemplate; /** * Create a consumer group */ public void createGroup(String streamKey, String groupName) { try { ().createGroup(streamKey, groupName); } catch (RedisSystemException e) { // Handle the situation where the stream does not exist if (() instanceof RedisCommandExecutionException && ().getMessage().contains("NOGROUP")) { ().createGroup(("0"), streamKey, groupName); } else { throw e; } } } /** * Read messages from the consumer group */ public List<MapRecord<String, Object, Object>> readFromGroup( String streamKey, String groupName, String consumerName, int count) { StreamReadOptions options = ().count(count); return ().read( (groupName, consumerName), options, (streamKey, ()) ); } /** * Blocking read messages from consumer group */ public List<MapRecord<String, Object, Object>> readFromGroupBlocking( String streamKey, String groupName, String consumerName, int count, Duration timeout) { StreamReadOptions options = ().count(count).block(timeout); return ().read( (groupName, consumerName), options, (streamKey, ()) ); } /** * Confirm that the message has been processed */ public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) { return ().acknowledge(streamKey, groupName, messageIds); } }
Use scenarios
- Systems that require horizontal scaling of message processing capabilities
- Business scenarios that require reliable processing of messages
- Implement message work queue
- Event delivery between microservices
Pros and cons
advantage
- Multiple consumers can process messages in parallel
- Provide message confirmation mechanism to ensure that messages are not lost
- Supports consumer crash recovery processing
- Each consumer group maintains an independent consumption location
shortcoming
- Relatively complex implementation
- Consumer groups and consumers need to be properly managed
- Need to explicitly process message confirmation
- Unconfirmed messages need to be processed regularly
3. Blocking Consumption
Basic concepts
Blocked consumption allows consumers to stay connected when there is no new message, waiting for new messages to arrive. This mode reduces polling overhead and improves real-time performance, making it suitable for scenarios with high demands on message processing timeliness.
Core Commands
# Blocking simple consumptionXREAD BLOCK milliseconds STREAMS stream_name ID # Blocked consumer group consumptionXREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >
Implementation example
Redis CLI
# Blocking and waiting for new messages (waiting up to 10 seconds)> XREAD BLOCK 10000 STREAMS mystream $ (nil) # If there is no new news within 10 seconds # Blocked consumption using consumer groups> XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream > (nil) # if10No new message is assigned within seconds
Java Spring Boot Example
@Service public class BlockingStreamConsumerService { @Autowired private StringRedisTemplate redisTemplate; /** * Blocked message consumer tasks */ @Async public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) { StreamReadOptions options = () .count(1) .block(timeout); while (!().isInterrupted()) { try { // Blocking reading of messages List<MapRecord<String, Object, Object>> records = () .read(options, (streamKey, (lastId))); if (records != null && !()) { for (MapRecord<String, Object, Object> record : records) { // Process messages processMessage(record); // Update the last read ID lastId = ().getValue(); } } else { // The message is not read in timeout, and some other logic can be executed } } catch (Exception e) { // Exception handling ("Error reading from stream: {}", (), e); try { (1000); // Wait for a while after an error and try again } catch (InterruptedException ie) { ().interrupt(); break; } } } } /** * Blocked consumer group consumption */ @Async public void startGroupBlockingConsumer( String streamKey, String groupName, String consumerName, Duration timeout) { StreamReadOptions options = () .count(1) .block(timeout); while (!().isInterrupted()) { try { // Blocking reading of messages List<MapRecord<String, Object, Object>> records = () .read((groupName, consumerName), options, (streamKey, ())); if (records != null && !()) { for (MapRecord<String, Object, Object> record : records) { try { // Process messages processMessage(record); // Confirm message () .acknowledge(streamKey, groupName, ().getValue()); } catch (Exception e) { // Processing failed, logging ("Error processing message: {}", (), e); } } } } catch (Exception e) { ("Error reading from stream group: {}", (), e); try { (1000); } catch (InterruptedException ie) { ().interrupt(); break; } } } } private void processMessage(MapRecord<String, Object, Object> record) { // Actual message processing logic ("Processing message: {}", record); // ...Specific business logic for processing messages } }
Use scenarios
- Real-time data processing system
- Event-driven task processing
- Applications with low latency requirements
- Instant communication system
- Notification Service
Pros and cons
advantage
- Reduce resource waste caused by polling
- Good real-time performance, process it immediately after the message arrives
- Reduce the load on Redis and clients
- Save CPU and network resources
shortcoming
- Long connections may occupy Redis connection resources
- The timeout time needs to be set reasonably
- Reconnection after network interruption may need to be handled
- Consumers need concurrent processing capabilities
4. Fan-out Pattern
Basic concepts
Fanout mode allows multiple independent consumer groups to consume all messages in the same stream simultaneously, similar to the publish/subscribe mode, but with message persistence and backtracking capabilities. Each consumer group independently maintains its own consumption position.
Core Commands
Create multiple consumer groups, all of which consume the same stream independently:
XGROUP CREATE stream_name group_name_1 $ MKSTREAM XGROUP CREATE stream_name group_name_2 $ MKSTREAM XGROUP CREATE stream_name group_name_3 $ MKSTREAM
Implementation example
Redis CLI
# Create multiple consumer groups> XGROUP CREATE notifications analytics-group $ MKSTREAM OK > XGROUP CREATE notifications email-group $ MKSTREAM OK > XGROUP CREATE notifications mobile-group $ MKSTREAM OK # Add a message> XADD notifications * type user_signup user_id 1001 email "user@" "1647345678912-0" # Read from each consumer group (every group receives all messages)> XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@" > XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@" > XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@"
Java Spring Boot Example
@Service public class FanOutService { @Autowired private StringRedisTemplate redisTemplate; /** * Initialize fan-out consumer group */ public void initializeFanOutGroups(String streamKey, List<String> groupNames) { // Make sure the flow exists try { info = ().info(streamKey); } catch (Exception e) { // The stream does not exist, send an initial message Map<String, Object> initialMessage = new HashMap<>(); ("init", "true"); ().add(streamKey, initialMessage); } // Create all consumer groups for (String groupName : groupNames) { try { ().createGroup(streamKey, groupName); } catch (Exception e) { // Ignore the errors that already exist in the group ("Group {} may already exist: {}", groupName, ()); } } } /** * Post fan-out message */ public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) { StringRecord record = (messageData).withStreamKey(streamKey); return ().add(record).getValue(); } /** * Start consumers for a specific group */ @Async public void startGroupConsumer( String streamKey, String groupName, String consumerName, Consumer<MapRecord<String, Object, Object>> messageHandler) { StreamReadOptions options = ().count(10).block((2)); while (!().isInterrupted()) { try { List<MapRecord<String, Object, Object>> messages = ().read( (groupName, consumerName), options, (streamKey, ()) ); if (messages != null && !()) { for (MapRecord<String, Object, Object> message : messages) { try { // Process messages (message); // Confirm message ().acknowledge( streamKey, groupName, ().getValue()); } catch (Exception e) { ("Error processing message in group {}: {}", groupName, (), e); } } } } catch (Exception e) { ("Error reading from stream for group {}: {}", groupName, (), e); try { (1000); } catch (InterruptedException ie) { ().interrupt(); break; } } } } }
Example of usage
@Service public class NotificationService { @Autowired private FanOutService fanOutService; @PostConstruct public void init() { // Initialize fan-out group List<String> groups = ("email-group", "sms-group", "analytics-group"); ("user-events", groups); // Start the processor of each consumer group ( "user-events", "email-group", "email-consumer", this::processEmailNotification); ( "user-events", "sms-group", "sms-consumer", this::processSmsNotification); ( "user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent); } private void processEmailNotification(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = (); ("Processing email notification: {}", messageData); // Email sending logic } private void processSmsNotification(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = (); ("Processing SMS notification: {}", messageData); // SMS sending logic } private void processAnalyticsEvent(MapRecord<String, Object, Object> message) { Map<Object, Object> messageData = (); ("Processing analytics event: {}", messageData); // Analyze event processing logic } public void publishUserEvent(String eventType, Map<String, Object> eventData) { Map<String, Object> message = new HashMap<>(eventData); ("event_type", eventType); ("timestamp", ()); ("user-events", message); } }
Use scenarios
- Multiple systems need to handle the same event flow independently
- Implement event broadcasting mechanism
- System integration: One event triggers multiple business processes
- Logs are processed and distributed to different services
- Notification system: An event requires users to be notified in multiple ways
Pros and cons
advantage
- Achieve multiple consumptions in one release
- Each consumer group works independently and does not affect each other
- Added consumer group to consume all historical news from scratch
- High reliability, persistent message storage
shortcoming
- As streaming data grows, it may take up more storage space
- The maximum length or expiration strategy of the flow needs to be set reasonably
- Too many consumer groups may increase Redis load
- Need to manage the status of each consumer group separately
5. Retry and Recovery
Basic concepts
This pattern focuses on the recovery and retry mechanisms for handling failed messages. The Redis Stream consumer group tracks the processing status of each message, allowing viewing and managing unconfirmed messages (PEL - Pending Entry List) to enable reliable message processing.
Core Commands
# View unconfirmed messages in the consumer groupXPENDING stream_name group_name [start_id end_id count] [consumer_name] # View details of messages that have not been confirmed for a long time in the consumer groupXPENDING stream_name group_name start_id end_id count [consumer_name] # Claim processing timeout messagesXCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]
Implementation example
Redis CLI
# Check the number of unconfirmed messages> XPENDING mystream processing-group 1) (integer) 2 # Number of messages not confirmed2) "1647257548956-0" # Minimum ID3) "1647257549123-0" # Maximum ID4) 1) 1) "consumer-1" # Number of unconfirmed messages for each consumer 2) (integer) 1 2) 1) "consumer-2" 2) (integer) 1 # View unconfirmed messages from specific consumers> XPENDING mystream processing-group - + 10 consumer-1 1) 1) "1647257548956-0" # Message ID 2) "consumer-1" # Current consumers 3) (integer) 120000 # idle time (milliseconds) 4) (integer) 2 # Number of passes # Claim unprocessed messages for more than 2 minutes> XCLAIM mystream processing-group consumer-2 120000 1647257548956-0 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56"
Java Spring Boot Example
@Service public class MessageRecoveryService { @Autowired private StringRedisTemplate redisTemplate; /** * Get unconfirmed messages in the consumer group */ public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) { return ().pending(streamKey, groupName); } /** * Get detailed unconfirmed messages from the specified consumer */ public PendingMessages getPendingMessages( String streamKey, String groupName, String consumerName, Range<String> idRange, long count) { return ().pending( streamKey, (groupName, consumerName), idRange, count); } /** * Claim long-term unprocessed messages */ public List<MapRecord<String, Object, Object>> claimMessages( String streamKey, String groupName, String newConsumerName, Duration minIdleTime, String... messageIds) { return ().claim( streamKey, (groupName, newConsumerName), minIdleTime, messageIds); } /** * Check and restore unprocessed messages regularly */ @Scheduled(fixedRate = 60000) // Execute once per minute public void recoverStaleMessages() { //Configuration parameters String streamKey = "mystream"; String groupName = "processing-group"; String recoveryConsumer = "recovery-consumer"; Duration minIdleTime = (5); // Unprocessed messages over 5 minutes try { // 1. Get a summary of all unconfirmed messages PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName); if (summary != null && () > 0) { // 2. Iterate through each consumer's unconfirmed message for (Consumer consumer : ().keySet()) { // Get the detailed list of unconfirmed messages for this consumer PendingMessages pendingMessages = getPendingMessages( streamKey, groupName, (), (), 50); // Process up to 50 items each time if (pendingMessages != null) { // 3. Filter out messages whose idle time exceeds the threshold List<String> staleMessageIds = new ArrayList<>(); for (PendingMessage message : pendingMessages) { if (().compareTo(minIdleTime) > 0) { (()); } } // 4. Claim these news if (!()) { ("Claiming {} stale messages from consumer {}", (), ()); List<MapRecord<String, Object, Object>> claimedMessages = claimMessages( streamKey, groupName, recoveryConsumer, minIdleTime, (new String[0])); // 5. Handle these claimed messages processClaimedMessages(streamKey, groupName, claimedMessages); } } } } } catch (Exception e) { ("Error recovering stale messages: {}", (), e); } } /** * Handle the claimed news */ private void processClaimedMessages( String streamKey, String groupName, List<MapRecord<String, Object, Object>> messages) { if (messages == null || ()) { return; } for (MapRecord<String, Object, Object> message : messages) { try { // Execute message processing logic processMessage(message); // Confirm message ().acknowledge( streamKey, groupName, ().getValue()); ("Successfully processed recovered message: {}", ()); } catch (Exception e) { ("Failed to process recovered message {}: {}", (), (), e); // Decide whether to add messages to the dead letter queue based on business needs moveToDeadLetterQueue(streamKey, message); } } } /** * Move the message to the dead letter queue */ private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) { String deadLetterStream = sourceStream + ":dead-letter"; Map<Object, Object> messageData = (); Map<String, Object> dlqMessage = new HashMap<>(); ((k, v) -> ((), v)); // Add metadata ("original_id", ().getValue()); ("error_time", ()); ().add(deadLetterStream, dlqMessage); // Optional: Confirm the message from the original consumer group // ().acknowledge(sourceStream, groupName, ().getValue()); } private void processMessage(MapRecord<String, Object, Object> message) { // Actual message processing logic ("Processing recovered message: {}", message); // ... } }
Use scenarios
- Critical business systems that require reliable message processing
- Tasks with longer processing time
- Workflow that requires error retry mechanism
- Monitoring and diagnostic message processing
- Implement dead letter queue handling specific failure scenarios
Pros and cons
advantage
- Improve system fault tolerance and reliability
- Automatically recover unprocessed messages caused by consumer crashes
- Can identify and process long-term unconfirmed messages
- Supports the implementation of complex retry strategies and dead letter processing
shortcoming
- Additional development and maintenance of recovery mechanisms are required
- May cause duplicate processing of messages, and it is necessary to ensure business logic idempotence
- Increased system complexity
- Need to monitor and manage the size of the PEL (unconfirmed message list)
6. Streaming Window Processing
Basic concepts
The stream processing window mode divides the data stream based on time or message count, performing aggregation or analysis operations within each window. This model is suitable for real-time analysis, trend monitoring, and time series processing. Although Redis Stream itself does not directly provide window operations, it can be implemented in combination with other features of Redis.
Implementation method
It is mainly achieved through the following methods:
1. Time range based on message ID (Redis message ID contains millisecond timestamp)
2. Storing window data in combination with Redis's sorted set (SortedSet)
3. Use Redis's expiration key to implement sliding window
Implementation example
Redis CLI
Window data collection and query:
# Add time stamped data> XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000 "1647257550123-0" > XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000 "1647257560234-0" > XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000 "1647257570345-0" # Query data in a specific time range> XRANGE temperature 1647257550000-0 1647257570000-0 1) 1) "1647257550123-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.5" 5) "timestamp" 6) "1647257548000" 2) 1) "1647257560234-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.8" 5) "timestamp" 6) "1647257558000"
Java Spring Boot Example
@Service public class TimeWindowProcessingService { @Autowired private StringRedisTemplate redisTemplate; /** * Add data points to the stream and store them to the corresponding time window */ public String addDataPoint(String streamKey, String sensorId, double value) { long timestamp = (); // 1. Add to the original data stream Map<String, Object> dataPoint = new HashMap<>(); ("sensor_id", sensorId); ("value", (value)); ("timestamp", (timestamp)); StringRecord record = (dataPoint).withStreamKey(streamKey); RecordId recordId = ().add(record); // 2. Calculate the window to which it belongs (here is 5 minutes as a window) long windowStart = timestamp - (timestamp % (5 * 60 * 1000)); String windowKey = streamKey + ":window:" + windowStart; // 3. Add data points to the ordered set of windows, with the fraction as timestamps String dataPointJson = new ObjectMapper().writeValueAsString(dataPoint); ().add(windowKey, dataPointJson, timestamp); // 4. Set the expiration time of the window key (retained for 24 hours) (windowKey, (24)); return (); } /** * Get the data points in the specified time window */ public List<Map<String, Object>> getWindowData( String streamKey, long windowStartTime, long windowEndTime) { // Calculate possible window keys (one window every 5 minutes) List<String> windowKeys = new ArrayList<>(); long current = windowStartTime - (windowStartTime % (5 * 60 * 1000)); while (current <= windowEndTime) { (streamKey + ":window:" + current); current += (5 * 60 * 1000); } // Get data points from each window List<Map<String, Object>> results = new ArrayList<>(); ObjectMapper mapper = new ObjectMapper(); for (String windowKey : windowKeys) { Set<String> dataPoints = ().rangeByScore( windowKey, windowStartTime, windowEndTime); if (dataPoints != null) { for (String dataPointJson : dataPoints) { try { Map<String, Object> dataPoint = ( dataPointJson, new TypeReference<Map<String, Object>>() {}); (dataPoint); } catch (Exception e) { ("Error parsing data point: {}", (), e); } } } } // Sort by timestamp ((dp -> (("timestamp").toString()))); return results; } /** * Calculate the aggregate statistics of data in the calculation window */ public Map<String, Object> getWindowStats( String streamKey, String sensorId, long windowStartTime, long windowEndTime) { List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime); // Filter data from specific sensors List<Double> values = () .filter(dp -> (("sensor_id").toString())) .map(dp -> (("value").toString())) .collect(()); Map<String, Object> stats = new HashMap<>(); ("count", ()); if (!()) { DoubleSummaryStatistics summaryStats = ().collect((v -> v)); ("min", ()); ("max", ()); ("avg", ()); ("sum", ()); } ("start_time", windowStartTime); ("end_time", windowEndTime); ("sensor_id", sensorId); return stats; } /** * Implement sliding window processing */ @Scheduled(fixedRate = 60000) // Execute once per minute public void processSlidingWindows() { String streamKey = "temperature"; long now = (); // Process data from the last 10 minutes window long windowEndTime = now; long windowStartTime = now - (10 * 60 * 1000); List<String> sensorIds = ("1", "2", "3"); // Sample sensor ID for (String sensorId : sensorIds) { try { // Get window statistics Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime); //Execute business logic based on statistical results if (("avg")) { double avgTemp = (double) ("avg"); if (avgTemp > 25.0) { // Trigger high temperature alarm ("High temperature alert for sensor {}: {} °C", sensorId, avgTemp); triggerAlert(sensorId, "HIGH_TEMP", avgTemp); } } // Store aggregate results for historical trend analysis saveAggregatedResults(streamKey, sensorId, stats); } catch (Exception e) { ("Error processing sliding window for sensor {}: {}", sensorId, (), e); } } } /** * Trigger an alarm */ private void triggerAlert(String sensorId, String alertType, double value) { Map<String, Object> alertData = new HashMap<>(); ("sensor_id", sensorId); ("alert_type", alertType); ("value", value); ("timestamp", ()); ().add("alerts", alertData); } /** * Save the aggregate results */ private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) { long windowTime = (long) ("end_time"); String aggregateKey = streamKey + ":aggregate:" + sensorId; // Use time as fraction to store the aggregate results ().add( aggregateKey, new ObjectMapper().writeValueAsString(stats), windowTime); // Keep 30 days of aggregated data (aggregateKey, (30)); } }
Use scenarios
- Real-time data analysis and statistics
- Trend detection and forecasting
- Outlier and Threshold Monitoring
- Time series data processing
- IoT data stream processing and aggregation
- User behavior analysis
Pros and cons
advantage
- Support time-based data analysis
- Real-time aggregation and calculation can be implemented
- Flexible window definition (sliding window, scroll window)
- Scalable to support complex analytical scenarios
shortcoming
- Highly complex implementation
- Additional data structures and storage space may be required
- For large data volume window calculations may affect performance
- Careful management of memory usage and data expiration strategies
in conclusion
Redis Stream provides powerful and flexible message processing functions. By combining these modes, it is possible to build a high-performance, reliable and flexible message processing system to meet various application needs, from simple task queues to complex real-time data processing.
When selecting and implementing these modes, you should fully consider business characteristics, performance requirements, reliability requirements and system scale, and combine the characteristics of Redis Stream to create a message processing solution that is most suitable for your application scenarios.
The above is a detailed article that will help you understand the 6 message processing modes of Redis Stream. For more information about Redis Stream message processing mode, please follow my other related articles!