SoFunction
Updated on 2025-05-04

One article will help you understand the 6 message processing modes of Redis Stream

The Stream data types introduced by Redis 5.0 bring powerful and flexible message queueing functions to the Redis ecosystem, making up for the shortcomings of previous publish/subscribe modes, such as message persistence, consumer groups, message confirmation and other features.

Redis Stream combines the characteristics of traditional message queues and timing databases, and is suitable for log collection, event-driven applications, real-time analysis and other scenarios.

This article will introduce 6 message processing modes of Redis Stream.

1. Simple Consumption

Basic concepts

The simple consumption model is the most basic way to use Redis Stream. It does not use the consumer group and directly reads messages in the stream. The producer appends the message to the stream, and the consumer reads the message by specifying the start ID.

Core Commands

# Post a messageXADD stream_name [ID] field value [field value ...]

# Read the messageXREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name start_id

Implementation example

Redis CLI

# Add message to stream> XADD mystream * sensor_id 1234 temperature 19.8 humidity 56
"1647257548956-0"

# Read all messages from scratch> XREAD STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# Start reading from the specified ID> XREAD STREAMS mystream 1647257548956-0
(empty list or set)

# Start reading from the latest message ID (blocking and waiting for new messages)> XREAD BLOCK 5000 STREAMS mystream $
(nil)

Java Spring Boot Example

@Service
public class SimpleStreamService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
      * Post a message to Stream
      */
    public String publishEvent(String streamKey, Map<String, Object> eventData) {
        StringRecord record = (eventData).withStreamKey(streamKey);
        return ().add(record).getValue();
    }
    
    /**
      * Start reading the message from the specified location
      */
    public List<MapRecord<String, Object, Object>> readEvents(String streamKey, String startId, int count) {
        StreamReadOptions readOptions = ().count(count);
        return ().read(readOptions, (streamKey, (startId)));
    }
    
    /**
      * Blocking read message
      */
    public List<MapRecord<String, Object, Object>> readEventsBlocking(String streamKey, int timeoutMillis) {
        StreamReadOptions readOptions = ().count(10).block((timeoutMillis));
        return ().read(readOptions, (streamKey));
    }
}

Use scenarios

  • Simple event logging
  • Single consumer scenario
  • Time series data collection
  • Development and debugging phases

Pros and cons

advantage

  • Simple implementation without creating and managing consumer groups
  • Directly control where to start consuming messages
  • Suitable for single consumer scenarios

shortcoming

  • Load balancing cannot be achieved
  • Unable to track message confirmation status
  • Read message ID needs to be managed manually
  • Restart the service and record the last read location by yourself

2. Consumer Groups

Basic concepts

The consumer group allows multiple consumers to jointly process a stream of messages, realize load balancing, and provide a message confirmation mechanism to ensure that messages are processed at least once. Each consumer group maintains its own consumption position, and different consumer groups do not interfere with each other.

Core Commands

# Create a consumer groupXGROUP CREATE stream_name group_name [ID|$] [MKSTREAM]

# Read messages from the consumer groupXREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] STREAMS stream_name [>|ID]

# Confirm message processing is completedXACK stream_name group_name message_id [message_id ...]

Implementation example

Redis CLI

# Create a consumer group> XGROUP CREATE mystream processing-group $ MKSTREAM
OK

# Consumer 1 reads the message> XREADGROUP GROUP processing-group consumer-1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# Confirm that the message has been processed> XACK mystream processing-group 1647257548956-0
(integer) 1

# Consumer 2 reads the message (no unprocessed messages are available)> XREADGROUP GROUP processing-group consumer-2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) (empty list or set)

Java Spring Boot Example

@Service
public class ConsumerGroupService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
      * Create a consumer group
      */
    public void createGroup(String streamKey, String groupName) {
        try {
            ().createGroup(streamKey, groupName);
        } catch (RedisSystemException e) {
            // Handle the situation where the stream does not exist            if (() instanceof RedisCommandExecutionException 
                    && ().getMessage().contains("NOGROUP")) {
                ().createGroup(("0"), streamKey, groupName);
            } else {
                throw e;
            }
        }
    }
    
    /**
      * Read messages from the consumer group
      */
    public List<MapRecord<String, Object, Object>> readFromGroup(
            String streamKey, String groupName, String consumerName, int count) {
        
        StreamReadOptions options = ().count(count);
        return ().read(
                (groupName, consumerName),
                options,
                (streamKey, ())
        );
    }
    
    /**
      * Blocking read messages from consumer group
      */
    public List<MapRecord<String, Object, Object>> readFromGroupBlocking(
            String streamKey, String groupName, String consumerName, int count, Duration timeout) {
        
        StreamReadOptions options = ().count(count).block(timeout);
        return ().read(
                (groupName, consumerName),
                options,
                (streamKey, ())
        );
    }
    
    /**
      * Confirm that the message has been processed
      */
    public Long acknowledgeMessage(String streamKey, String groupName, String... messageIds) {
        return ().acknowledge(streamKey, groupName, messageIds);
    }
}

Use scenarios

  • Systems that require horizontal scaling of message processing capabilities
  • Business scenarios that require reliable processing of messages
  • Implement message work queue
  • Event delivery between microservices

Pros and cons

advantage

  • Multiple consumers can process messages in parallel
  • Provide message confirmation mechanism to ensure that messages are not lost
  • Supports consumer crash recovery processing
  • Each consumer group maintains an independent consumption location

shortcoming

  • Relatively complex implementation
  • Consumer groups and consumers need to be properly managed
  • Need to explicitly process message confirmation
  • Unconfirmed messages need to be processed regularly

3. Blocking Consumption

Basic concepts

Blocked consumption allows consumers to stay connected when there is no new message, waiting for new messages to arrive. This mode reduces polling overhead and improves real-time performance, making it suitable for scenarios with high demands on message processing timeliness.

Core Commands

# Blocking simple consumptionXREAD BLOCK milliseconds STREAMS stream_name ID

# Blocked consumer group consumptionXREADGROUP GROUP group_name consumer_name BLOCK milliseconds STREAMS stream_name >

Implementation example

Redis CLI

# Blocking and waiting for new messages (waiting up to 10 seconds)> XREAD BLOCK 10000 STREAMS mystream $
(nil)  # If there is no new news within 10 seconds
# Blocked consumption using consumer groups> XREADGROUP GROUP processing-group consumer-1 BLOCK 10000 STREAMS mystream >
(nil)  # if10No new message is assigned within seconds

Java Spring Boot Example

@Service
public class BlockingStreamConsumerService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
      * Blocked message consumer tasks
      */
    @Async
    public void startBlockingConsumer(String streamKey, String lastId, Duration timeout) {
        StreamReadOptions options = ()
                .count(1)
                .block(timeout);
        
        while (!().isInterrupted()) {
            try {
                // Blocking reading of messages                List<MapRecord<String, Object, Object>> records = ()
                        .read(options, (streamKey, (lastId)));
                
                if (records != null && !()) {
                    for (MapRecord<String, Object, Object> record : records) {
                        // Process messages                        processMessage(record);
                        
                        // Update the last read ID                        lastId = ().getValue();
                    }
                } else {
                    // The message is not read in timeout, and some other logic can be executed                }
            } catch (Exception e) {
                // Exception handling                ("Error reading from stream: {}", (), e);
                try {
                    (1000); // Wait for a while after an error and try again                } catch (InterruptedException ie) {
                    ().interrupt();
                    break;
                }
            }
        }
    }
    
    /**
      * Blocked consumer group consumption
      */
    @Async
    public void startGroupBlockingConsumer(
            String streamKey, String groupName, String consumerName, Duration timeout) {
        
        StreamReadOptions options = ()
                .count(1)
                .block(timeout);
        
        while (!().isInterrupted()) {
            try {
                // Blocking reading of messages                List<MapRecord<String, Object, Object>> records = ()
                        .read((groupName, consumerName),
                               options,
                               (streamKey, ()));
                
                if (records != null && !()) {
                    for (MapRecord<String, Object, Object> record : records) {
                        try {
                            // Process messages                            processMessage(record);
                            
                            // Confirm message                            ()
                                    .acknowledge(streamKey, groupName, ().getValue());
                        } catch (Exception e) {
                            // Processing failed, logging                            ("Error processing message: {}", (), e);
                        }
                    }
                }
            } catch (Exception e) {
                ("Error reading from stream group: {}", (), e);
                try {
                    (1000);
                } catch (InterruptedException ie) {
                    ().interrupt();
                    break;
                }
            }
        }
    }
    
    private void processMessage(MapRecord<String, Object, Object> record) {
        // Actual message processing logic        ("Processing message: {}", record);
        // ...Specific business logic for processing messages    }
}

Use scenarios

  • Real-time data processing system
  • Event-driven task processing
  • Applications with low latency requirements
  • Instant communication system
  • Notification Service

Pros and cons

advantage

  • Reduce resource waste caused by polling
  • Good real-time performance, process it immediately after the message arrives
  • Reduce the load on Redis and clients
  • Save CPU and network resources

shortcoming

  • Long connections may occupy Redis connection resources
  • The timeout time needs to be set reasonably
  • Reconnection after network interruption may need to be handled
  • Consumers need concurrent processing capabilities

4. Fan-out Pattern

Basic concepts

Fanout mode allows multiple independent consumer groups to consume all messages in the same stream simultaneously, similar to the publish/subscribe mode, but with message persistence and backtracking capabilities. Each consumer group independently maintains its own consumption position.

Core Commands

Create multiple consumer groups, all of which consume the same stream independently:

XGROUP CREATE stream_name group_name_1 $ MKSTREAM
XGROUP CREATE stream_name group_name_2 $ MKSTREAM
XGROUP CREATE stream_name group_name_3 $ MKSTREAM

Implementation example

Redis CLI

# Create multiple consumer groups> XGROUP CREATE notifications analytics-group $ MKSTREAM
OK
> XGROUP CREATE notifications email-group $ MKSTREAM
OK
> XGROUP CREATE notifications mobile-group $ MKSTREAM
OK

# Add a message> XADD notifications * type user_signup user_id 1001 email "user@"
"1647345678912-0"

# Read from each consumer group (every group receives all messages)> XREADGROUP GROUP analytics-group analytics-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@"

> XREADGROUP GROUP email-group email-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@"

> XREADGROUP GROUP mobile-group mobile-1 COUNT 1 STREAMS notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@"

Java Spring Boot Example

@Service
public class FanOutService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
      * Initialize fan-out consumer group
      */
    public void initializeFanOutGroups(String streamKey, List<String> groupNames) {
        // Make sure the flow exists        try {
             info = ().info(streamKey);
        } catch (Exception e) {
            // The stream does not exist, send an initial message            Map<String, Object> initialMessage = new HashMap<>();
            ("init", "true");
            ().add(streamKey, initialMessage);
        }
        
        // Create all consumer groups        for (String groupName : groupNames) {
            try {
                ().createGroup(streamKey, groupName);
            } catch (Exception e) {
                // Ignore the errors that already exist in the group                ("Group {} may already exist: {}", groupName, ());
            }
        }
    }
    
    /**
      * Post fan-out message
      */
    public String publishFanOutMessage(String streamKey, Map<String, Object> messageData) {
        StringRecord record = (messageData).withStreamKey(streamKey);
        return ().add(record).getValue();
    }
    
    /**
      * Start consumers for a specific group
      */
    @Async
    public void startGroupConsumer(
            String streamKey, String groupName, String consumerName, 
            Consumer<MapRecord<String, Object, Object>> messageHandler) {
        
        StreamReadOptions options = ().count(10).block((2));
        
        while (!().isInterrupted()) {
            try {
                List<MapRecord<String, Object, Object>> messages = ().read(
                        (groupName, consumerName),
                        options,
                        (streamKey, ())
                );
                
                if (messages != null && !()) {
                    for (MapRecord<String, Object, Object> message : messages) {
                        try {
                            // Process messages                            (message);
                            
                            // Confirm message                            ().acknowledge(
                                    streamKey, groupName, ().getValue());
                        } catch (Exception e) {
                            ("Error processing message in group {}: {}", 
                                    groupName, (), e);
                        }
                    }
                }
            } catch (Exception e) {
                ("Error reading from stream for group {}: {}", 
                        groupName, (), e);
                try {
                    (1000);
                } catch (InterruptedException ie) {
                    ().interrupt();
                    break;
                }
            }
        }
    }
}

Example of usage

@Service
public class NotificationService {
    
    @Autowired
    private FanOutService fanOutService;
    
    @PostConstruct
    public void init() {
        // Initialize fan-out group        List<String> groups = ("email-group", "sms-group", "analytics-group");
        ("user-events", groups);
        
        // Start the processor of each consumer group        (
                "user-events", "email-group", "email-consumer", this::processEmailNotification);
        
        (
                "user-events", "sms-group", "sms-consumer", this::processSmsNotification);
        
        (
                "user-events", "analytics-group", "analytics-consumer", this::processAnalyticsEvent);
    }
    
    private void processEmailNotification(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = ();
        ("Processing email notification: {}", messageData);
        // Email sending logic    }
    
    private void processSmsNotification(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = ();
        ("Processing SMS notification: {}", messageData);
        // SMS sending logic    }
    
    private void processAnalyticsEvent(MapRecord<String, Object, Object> message) {
        Map<Object, Object> messageData = ();
        ("Processing analytics event: {}", messageData);
        // Analyze event processing logic    }
    
    public void publishUserEvent(String eventType, Map<String, Object> eventData) {
        Map<String, Object> message = new HashMap<>(eventData);
        ("event_type", eventType);
        ("timestamp", ());
        
        ("user-events", message);
    }
}

Use scenarios

  • Multiple systems need to handle the same event flow independently
  • Implement event broadcasting mechanism
  • System integration: One event triggers multiple business processes
  • Logs are processed and distributed to different services
  • Notification system: An event requires users to be notified in multiple ways

Pros and cons

advantage

  • Achieve multiple consumptions in one release
  • Each consumer group works independently and does not affect each other
  • Added consumer group to consume all historical news from scratch
  • High reliability, persistent message storage

shortcoming

  • As streaming data grows, it may take up more storage space
  • The maximum length or expiration strategy of the flow needs to be set reasonably
  • Too many consumer groups may increase Redis load
  • Need to manage the status of each consumer group separately

5. Retry and Recovery

Basic concepts

This pattern focuses on the recovery and retry mechanisms for handling failed messages. The Redis Stream consumer group tracks the processing status of each message, allowing viewing and managing unconfirmed messages (PEL - Pending Entry List) to enable reliable message processing.

Core Commands

# View unconfirmed messages in the consumer groupXPENDING stream_name group_name [start_id end_id count] [consumer_name]

# View details of messages that have not been confirmed for a long time in the consumer groupXPENDING stream_name group_name start_id end_id count [consumer_name]

# Claim processing timeout messagesXCLAIM stream_name group_name consumer_name min_idle_time message_id [message_id ...] [JUSTID]

Implementation example

Redis CLI

# Check the number of unconfirmed messages> XPENDING mystream processing-group
1) (integer) 2         # Number of messages not confirmed2) "1647257548956-0"   # Minimum ID3) "1647257549123-0"   # Maximum ID4) 1) 1) "consumer-1"  # Number of unconfirmed messages for each consumer      2) (integer) 1
   2) 1) "consumer-2"
      2) (integer) 1

# View unconfirmed messages from specific consumers> XPENDING mystream processing-group - + 10 consumer-1
1) 1) "1647257548956-0"   # Message ID   2) "consumer-1"         # Current consumers   3) (integer) 120000     # idle time (milliseconds)   4) (integer) 2          # Number of passes
# Claim unprocessed messages for more than 2 minutes> XCLAIM mystream processing-group consumer-2 120000 1647257548956-0
1) 1) "1647257548956-0"
   2) 1) "sensor_id"
      2) "1234"
      3) "temperature"
      4) "19.8"
      5) "humidity"
      6) "56"

Java Spring Boot Example

@Service
public class MessageRecoveryService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
      * Get unconfirmed messages in the consumer group
      */
    public PendingMessagesSummary getPendingMessagesSummary(String streamKey, String groupName) {
        return ().pending(streamKey, groupName);
    }
    
    /**
      * Get detailed unconfirmed messages from the specified consumer
      */
    public PendingMessages getPendingMessages(
            String streamKey, String groupName, String consumerName, 
            Range<String> idRange, long count) {
        
        return ().pending(
                streamKey, 
                (groupName, consumerName), 
                idRange, 
                count);
    }
    
    /**
      * Claim long-term unprocessed messages
      */
    public List<MapRecord<String, Object, Object>> claimMessages(
            String streamKey, String groupName, String newConsumerName, 
            Duration minIdleTime, String... messageIds) {
        
        return ().claim(
                streamKey, 
                (groupName, newConsumerName), 
                minIdleTime, 
                messageIds);
    }
    
    /**
      * Check and restore unprocessed messages regularly
      */
    @Scheduled(fixedRate = 60000) // Execute once per minute    public void recoverStaleMessages() {
        //Configuration parameters        String streamKey = "mystream";
        String groupName = "processing-group";
        String recoveryConsumer = "recovery-consumer";
        Duration minIdleTime = (5); // Unprocessed messages over 5 minutes        
        try {
            // 1. Get a summary of all unconfirmed messages            PendingMessagesSummary summary = getPendingMessagesSummary(streamKey, groupName);
            
            if (summary != null && () > 0) {
                // 2. Iterate through each consumer's unconfirmed message                for (Consumer consumer : ().keySet()) {
                    // Get the detailed list of unconfirmed messages for this consumer                    PendingMessages pendingMessages = getPendingMessages(
                            streamKey, groupName, (), 
                            (), 50); // Process up to 50 items each time                    
                    if (pendingMessages != null) {
                        // 3. Filter out messages whose idle time exceeds the threshold                        List<String> staleMessageIds = new ArrayList<>();
                        
                        for (PendingMessage message : pendingMessages) {
                            if (().compareTo(minIdleTime) > 0) {
                                (());
                            }
                        }
                        
                        // 4. Claim these news                        if (!()) {
                            ("Claiming {} stale messages from consumer {}", 
                                    (), ());
                            
                            List<MapRecord<String, Object, Object>> claimedMessages = claimMessages(
                                    streamKey, groupName, recoveryConsumer, minIdleTime, 
                                    (new String[0]));
                            
                            // 5. Handle these claimed messages                            processClaimedMessages(streamKey, groupName, claimedMessages);
                        }
                    }
                }
            }
        } catch (Exception e) {
            ("Error recovering stale messages: {}", (), e);
        }
    }
    
    /**
      * Handle the claimed news
      */
    private void processClaimedMessages(
            String streamKey, String groupName, 
            List<MapRecord<String, Object, Object>> messages) {
        
        if (messages == null || ()) {
            return;
        }
        
        for (MapRecord<String, Object, Object> message : messages) {
            try {
                // Execute message processing logic                processMessage(message);
                
                // Confirm message                ().acknowledge(
                        streamKey, groupName, ().getValue());
                
                ("Successfully processed recovered message: {}", ());
            } catch (Exception e) {
                ("Failed to process recovered message {}: {}", 
                        (), (), e);
                // Decide whether to add messages to the dead letter queue based on business needs                moveToDeadLetterQueue(streamKey, message);
            }
        }
    }
    
    /**
      * Move the message to the dead letter queue
      */
    private void moveToDeadLetterQueue(String sourceStream, MapRecord<String, Object, Object> message) {
        String deadLetterStream = sourceStream + ":dead-letter";
        Map<Object, Object> messageData = ();
        
        Map<String, Object> dlqMessage = new HashMap<>();
        ((k, v) -> ((), v));
        
        // Add metadata        ("original_id", ().getValue());
        ("error_time", ());
        
        ().add(deadLetterStream, dlqMessage);
        
        // Optional: Confirm the message from the original consumer group        // ().acknowledge(sourceStream, groupName, ().getValue());
    }
    
    private void processMessage(MapRecord<String, Object, Object> message) {
        // Actual message processing logic        ("Processing recovered message: {}", message);
        // ...
    }
}

Use scenarios

  • Critical business systems that require reliable message processing
  • Tasks with longer processing time
  • Workflow that requires error retry mechanism
  • Monitoring and diagnostic message processing
  • Implement dead letter queue handling specific failure scenarios

Pros and cons

advantage

  • Improve system fault tolerance and reliability
  • Automatically recover unprocessed messages caused by consumer crashes
  • Can identify and process long-term unconfirmed messages
  • Supports the implementation of complex retry strategies and dead letter processing

shortcoming

  • Additional development and maintenance of recovery mechanisms are required
  • May cause duplicate processing of messages, and it is necessary to ensure business logic idempotence
  • Increased system complexity
  • Need to monitor and manage the size of the PEL (unconfirmed message list)

6. Streaming Window Processing

Basic concepts

The stream processing window mode divides the data stream based on time or message count, performing aggregation or analysis operations within each window. This model is suitable for real-time analysis, trend monitoring, and time series processing. Although Redis Stream itself does not directly provide window operations, it can be implemented in combination with other features of Redis.

Implementation method

It is mainly achieved through the following methods:

1. Time range based on message ID (Redis message ID contains millisecond timestamp)

2. Storing window data in combination with Redis's sorted set (SortedSet)

3. Use Redis's expiration key to implement sliding window

Implementation example

Redis CLI

Window data collection and query:

# Add time stamped data> XADD temperature * sensor_id 1 value 21.5 timestamp 1647257548000
"1647257550123-0"
> XADD temperature * sensor_id 1 value 21.8 timestamp 1647257558000
"1647257560234-0"
> XADD temperature * sensor_id 1 value 22.1 timestamp 1647257568000
"1647257570345-0"

# Query data in a specific time range> XRANGE temperature 1647257550000-0 1647257570000-0
1) 1) "1647257550123-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.5"
      5) "timestamp"
      6) "1647257548000"
2) 1) "1647257560234-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.8"
      5) "timestamp"
      6) "1647257558000"

Java Spring Boot Example

@Service
public class TimeWindowProcessingService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    /**
      * Add data points to the stream and store them to the corresponding time window
      */
    public String addDataPoint(String streamKey, String sensorId, double value) {
        long timestamp = ();
        
        // 1. Add to the original data stream        Map<String, Object> dataPoint = new HashMap<>();
        ("sensor_id", sensorId);
        ("value", (value));
        ("timestamp", (timestamp));
        
        StringRecord record = (dataPoint).withStreamKey(streamKey);
        RecordId recordId = ().add(record);
        
        // 2. Calculate the window to which it belongs (here is 5 minutes as a window)        long windowStart = timestamp - (timestamp % (5 * 60 * 1000));
        String windowKey = streamKey + ":window:" + windowStart;
        
        // 3. Add data points to the ordered set of windows, with the fraction as timestamps        String dataPointJson = new ObjectMapper().writeValueAsString(dataPoint);
        ().add(windowKey, dataPointJson, timestamp);
        
        // 4. Set the expiration time of the window key (retained for 24 hours)        (windowKey, (24));
        
        return ();
    }
    
    /**
      * Get the data points in the specified time window
      */
    public List<Map<String, Object>> getWindowData(
            String streamKey, long windowStartTime, long windowEndTime) {
        
        // Calculate possible window keys (one window every 5 minutes)        List<String> windowKeys = new ArrayList<>();
        long current = windowStartTime - (windowStartTime % (5 * 60 * 1000));
        
        while (current <= windowEndTime) {
            (streamKey + ":window:" + current);
            current += (5 * 60 * 1000);
        }
        
        // Get data points from each window        List<Map<String, Object>> results = new ArrayList<>();
        ObjectMapper mapper = new ObjectMapper();
        
        for (String windowKey : windowKeys) {
            Set<String> dataPoints = ().rangeByScore(
                    windowKey, windowStartTime, windowEndTime);
            
            if (dataPoints != null) {
                for (String dataPointJson : dataPoints) {
                    try {
                        Map<String, Object> dataPoint = (
                                dataPointJson, new TypeReference<Map<String, Object>>() {});
                        (dataPoint);
                    } catch (Exception e) {
                        ("Error parsing data point: {}", (), e);
                    }
                }
            }
        }
        
        // Sort by timestamp        ((dp -> (("timestamp").toString())));
        
        return results;
    }
    
    /**
      * Calculate the aggregate statistics of data in the calculation window
      */
    public Map<String, Object> getWindowStats(
            String streamKey, String sensorId, long windowStartTime, long windowEndTime) {
        
        List<Map<String, Object>> windowData = getWindowData(streamKey, windowStartTime, windowEndTime);
        
        // Filter data from specific sensors        List<Double> values = ()
                .filter(dp -> (("sensor_id").toString()))
                .map(dp -> (("value").toString()))
                .collect(());
        
        Map<String, Object> stats = new HashMap<>();
        ("count", ());
        
        if (!()) {
            DoubleSummaryStatistics summaryStats = ().collect((v -> v));
            ("min", ());
            ("max", ());
            ("avg", ());
            ("sum", ());
        }
        
        ("start_time", windowStartTime);
        ("end_time", windowEndTime);
        ("sensor_id", sensorId);
        
        return stats;
    }
    
    /**
      * Implement sliding window processing
      */
    @Scheduled(fixedRate = 60000) // Execute once per minute    public void processSlidingWindows() {
        String streamKey = "temperature";
        long now = ();
        
        // Process data from the last 10 minutes window        long windowEndTime = now;
        long windowStartTime = now - (10 * 60 * 1000);
        
        List<String> sensorIds = ("1", "2", "3"); // Sample sensor ID        
        for (String sensorId : sensorIds) {
            try {
                // Get window statistics                Map<String, Object> stats = getWindowStats(streamKey, sensorId, windowStartTime, windowEndTime);
                
                //Execute business logic based on statistical results                if (("avg")) {
                    double avgTemp = (double) ("avg");
                    if (avgTemp > 25.0) {
                        // Trigger high temperature alarm                        ("High temperature alert for sensor {}: {} °C", sensorId, avgTemp);
                        triggerAlert(sensorId, "HIGH_TEMP", avgTemp);
                    }
                }
                
                // Store aggregate results for historical trend analysis                saveAggregatedResults(streamKey, sensorId, stats);
                
            } catch (Exception e) {
                ("Error processing sliding window for sensor {}: {}", 
                        sensorId, (), e);
            }
        }
    }
    
    /**
      * Trigger an alarm
      */
    private void triggerAlert(String sensorId, String alertType, double value) {
        Map<String, Object> alertData = new HashMap<>();
        ("sensor_id", sensorId);
        ("alert_type", alertType);
        ("value", value);
        ("timestamp", ());
        
        ().add("alerts", alertData);
    }
    
    /**
      * Save the aggregate results
      */
    private void saveAggregatedResults(String streamKey, String sensorId, Map<String, Object> stats) {
        long windowTime = (long) ("end_time");
        String aggregateKey = streamKey + ":aggregate:" + sensorId;
        
        // Use time as fraction to store the aggregate results        ().add(
                aggregateKey, 
                new ObjectMapper().writeValueAsString(stats),
                windowTime);
        
        // Keep 30 days of aggregated data        (aggregateKey, (30));
    }
}

Use scenarios

  • Real-time data analysis and statistics
  • Trend detection and forecasting
  • Outlier and Threshold Monitoring
  • Time series data processing
  • IoT data stream processing and aggregation
  • User behavior analysis

Pros and cons

advantage

  • Support time-based data analysis
  • Real-time aggregation and calculation can be implemented
  • Flexible window definition (sliding window, scroll window)
  • Scalable to support complex analytical scenarios

shortcoming

  • Highly complex implementation
  • Additional data structures and storage space may be required
  • For large data volume window calculations may affect performance
  • Careful management of memory usage and data expiration strategies

in conclusion

Redis Stream provides powerful and flexible message processing functions. By combining these modes, it is possible to build a high-performance, reliable and flexible message processing system to meet various application needs, from simple task queues to complex real-time data processing.

When selecting and implementing these modes, you should fully consider business characteristics, performance requirements, reliability requirements and system scale, and combine the characteristics of Redis Stream to create a message processing solution that is most suitable for your application scenarios.

The above is a detailed article that will help you understand the 6 message processing modes of Redis Stream. For more information about Redis Stream message processing mode, please follow my other related articles!