SoFunction
Updated on 2025-05-22

Solution to the problem of redis abnormal exit in spring boot

question:

Exception in thread "rtsp-consumer-3" : Unable to connect to Redis; nested exception is : Unable to connect to localhost:6379
    at $(:1689)
    at $(:1597)
    at (:1006)
    at (:1069)
    at (:990)
    at (:395)
    at (:346)
    at (:592)
    at $(:310)
    at $(:387)
    at $(:382)
    at (:222)
    at (:189)
    at (:96)
    at (:305)
    at (:70)
    at (:200)
    at $null$0(:108)
    at (:1142)
    at $(:617)
    at (:745)
Caused by: : Unable to connect to localhost:6379
    at (:78)
    at (:56)
    at (:330)
    at (:216)
    at $getConnection$1(:115)
    at (:267)
    at (:115)
    at $(:1595)
    ... 19 more
Caused by: $AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: : Address already in use: no further information
    at (Native Method)
    at (:717)
    at (:337)
    at $(:334)
    at (:776)
    at (:724)
    at (:650)
    at (:562)
    at $(:997)
    at $(:74)
    at (:30)
    at (:745) 

solve

According to code and phenomenon analysis, the root cause may beThe processing time is too long, causing the Redis message backlog to trigger timeout exception. Here are detailed analysis and solutions:

🔍 The root of the problem

Synchronous blocking model

  • ()There is a network delay when calling the external interface, resulting in message processing time exceeding RedisBLOCKTimeout (2000ms).
  • Failure to ACK in time will cause the message to re-enter the Pending state, and it will be triggered after accumulation.READONLYError or consumer group invalid.

(1000) Intensify blockage

  • Even if there is no message, each loop is forced to sleep for 1 second, reducing consumption throughput and unable to process backlog messages in time.

Exception not properly handled

  • Commented out try-catch causes exceptions during processing (such as network timeout) to be not caught, and Redis connection is not properly restored after interrupting the loop.

🛠️ Solution

1. Asynchronous processing + advance ACK (key steps)

Separate message processing from ACK,ACK immediately before processingAvoid message backlog and process business logic asynchronously:

ExecutorService executor = (); // Global thread poolwhile (!().isInterrupted()) {
    List<MapRecord<String, Object, Object>> records = (...);
    if (records != null) {
        for (MapRecord<String, Object, Object> record : records) {
            // Early ACK release message            (streamKey, groupName, ());
            // Submit asynchronous task processing business            (() -> {
                try {
                    String strdata = (String) ().get("frameData");
                    byte[] frameData = ().decode(strdata);
                    BufferedImage image = (new ByteArrayInputStream(frameData));
                    (image, streamId, commonQueues, regions);
                } catch (Exception e) {
                    ("Failed to process frames: {}", ());
                }
            });
        }
    }
    // Remove (1000) to increase throughput}

2. Adjust Redis consumer group parameters

IncreaseXGROUP CREATEAt the same timeBLOCKandIDLEtime out:

XGROUP CREATE stream:1234 mygroup $ MKSTREAM BLOCK 5000 IDLE 30000

3. Optimize Redis connection configuration

Ensure that the read and write timeout of RedisTemplate is greater than the maximum service processing time:

@Bean
public RedisTemplate<String, Object> redisTemplate() {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    (redisConnectionFactory());
    // Set the timeout time (example: 10 seconds)    RedisConnectionFactory factory = ();
    if (factory instanceof LettuceConnectionFactory) {
        ((LettuceConnectionFactory) factory).setTimeout((10));
    }
    return template;
}

4. Monitor the Pending queue

Check the number of Pending messages regularly and reset the consumer when abnormal:

// Add checks in loopLong pending = ().pending(streamKey, groupName);
if (pending > 1000) { // Threshold is set according to service    ().destroyGroup(streamKey, groupName);
    ().createGroup(streamKey, groupName);
}

5. Add a circuit breaker mechanism

When external interfaces fail continuously, pause consumption to avoid avalanches:

// Global fuseCircuitBreaker breaker = ("frameProcessor");
// Change the processing logic to(() -> {
    (...);
    return null;
});

📈 Verification plan

Log monitoring

// Add time-consuming logs to asynchronous taskslong start = ();
(...);
("Time-consuming processing: {}ms", () - start);

Ensure that a single processing time is below the Redis timeout threshold.

Redis CLI Check

XINFO GROUPS stream:1234  # Check the status of the consumer groupXPENDING stream:1234 mygroup # CheckPendinginformation

🌟 Summary of key optimization points

measure effect
Asynchronous processing + advance ACK Completely decoupled processing time consuming and Redis consumption
Increase Redis timeout parameter Avoid actively disconnecting the server
Thread pool isolation Prevent external interface failure from dragging down the entire consumer service
Fuse mechanism Fast failure to protect system stability

Through the above modification, even()Calling external platform interfaces can also ensure the stability of Redis consumption links through asynchronization and resource isolation. In the end, you only need to ensure that the thread pool and the fuse parameters are adapted to the business load.

Specific code addition location:

The following is forOptimize Redis connection configurationMonitor Pending queuesandAdding a circuit breaker mechanismDetailed description of the specific location of the code:

1. Optimize Redis connection configuration

Location: Redis configuration class
Adjust the timeout parameter in Spring Boot's Redis configuration class (usually):

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // Set connection timeout and read and write timeout (key parameters)        ((10));  // Command timeout        (false);    // Disable shared connections to avoid blocking        RedisTemplate<String, Object> template = new RedisTemplate<>();
        (lettuceConnectionFactory);
        (new StringRedisSerializer());
        (new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

Key parameter description

  • setTimeout(10 seconds): Ensure that the timeout is greater than()Maximum processing time
  • setShareNativeConnection(false): Avoid blocking by sharing the same connection by multiple threads.

2. Monitor the Pending queue

Location:consumeFramesIn the loop within the method
Check the Pending queue periodically in the main loop of consuming messages:

private void consumeFrames(String streamId, String groupName, String consumerName,
                           CommonQueues commonQueues, String regions) throws InterruptedException, IOException {
    // ... Other initialization codes ...    int checkPendingInterval = 10; // Check the Pending queue every 10 cycles    int loopCount = 0;
    while (!().isInterrupted()) {
        // ... The original code reads the message...        // Monitor the logic of the Pending queue (add position)        loopCount++;
        if (loopCount % checkPendingInterval == 0) {
            String streamKey = "stream:" + streamId;
            PendingMessages pending = (streamKey, groupName);
            if (pending != null && () > 1000) { // Threshold adjustment according to business                ("DetectedPendingMessage backlog {} strip,Reset the consumer group", ());
                (streamKey, groupName);
                ((streamKey), groupName);
            }
        }
        // ... Subsequent processing code ...    }
}

illustrate

  • pass()Gets the current number of Pending messages.
  • When the Pending message exceeds the threshold, force the consumer group to be destroyed and rebuilt to avoid message jamming.

3. Add a circuit breaker mechanism

Location: Outer layer of business logic processing messages
Package with Resilience4j fuse()Called:

1. Fuse configuration class

@Configuration
public class CircuitBreakerConfig {
    @Bean
    public CircuitBreaker frameProcessorCircuitBreaker() {
        CircuitBreakerConfig config = ()
            .failureRateThreshold(50)          // Failure rate threshold is 50%            .slidingWindowType(SlidingWindowType.COUNT_BASED)
            .slidingWindowSize(10)             // Based on the last 10 calls            .minimumNumberOfCalls(5)           // Calculation starts after at least 5 calls            .waitDurationInOpenState((30)) // Enter half-open 30 seconds after the fuse            .build();
        return (config).circuitBreaker("frameProcessor");
    }
}

2. Use fuses in consumption code

public class YourConsumerClass {
    @Autowired
    private CircuitBreaker frameProcessorCircuitBreaker; // Injection fuse    private void consumeFrames(...) {
        // ... Original code ...        for (MapRecord<String, Object, Object> record : records) {
            (...); // ACK in advance            // Use fuse protection processing logic (add position)            (() -> (() -> {
                String strdata = (String) ().get("frameData");
                byte[] frameData = ().decode(strdata);
                BufferedImage image = (new ByteArrayInputStream(frameData));
                (image, streamId, commonQueues, regions);
            })).onFailure(e -> ("Processing failed and fuses: {}", ()));
        }
        // ... Subsequent code ...    }
}

Fuse logic description

  • when()When the threshold is triggered continuously, the fuse temporarily blocks subsequent calls to avoid the avalanche effect.
  • The processing will be skipped directly during the circuit breaker, but the ACK message will still be available (select whether to try again according to the business needs).

4. Summary of code integration location

Optimization measures Code location Key Notes
Redis connection configuration Redis configuration class (such as Adjust timeout and connection pool parameters
Pending queue monitoring consumeFramesIn the main loop of the method Regular check + automatic reset of consumer groups
Fuse mechanism Business processing code outer layer (package Relying on fuse libraries (such as Resilience4j)

5. Parameter adjustment suggestions

Redis timeout

  • Should be greater than()Maximum processing time + network jitter margin (if set to 2 times the actual maximum processing time).

Pending queue threshold

  • If 100 messages are processed per second, the threshold can be set to1000(equivalent to the backlog of 10 seconds).

Fuse parameters

  • failureRateThreshold: Adjust according to the stability of the external interface (such as frequent timeouts can be set to 70%).
  • waitDurationInOpenState: Adjusted according to the recovery time of external services (such as 30 seconds to 5 minutes).

Through the above modification, even()Calling external platform interfaces can also ensure the stability of Redis consumption links through resource isolation, rapid failure and automatic recovery mechanisms.

This is the end of this article about redis abnormal exit in spring boot. For more related redis spring boot exception exit content, please search for my previous article or continue browsing the related articles below. I hope everyone will support me in the future!