question:
Exception in thread "rtsp-consumer-3" : Unable to connect to Redis; nested exception is : Unable to connect to localhost:6379
at $(:1689)
at $(:1597)
at (:1006)
at (:1069)
at (:990)
at (:395)
at (:346)
at (:592)
at $(:310)
at $(:387)
at $(:382)
at (:222)
at (:189)
at (:96)
at (:305)
at (:70)
at (:200)
at $null$0(:108)
at (:1142)
at $(:617)
at (:745)
Caused by: : Unable to connect to localhost:6379
at (:78)
at (:56)
at (:330)
at (:216)
at $getConnection$1(:115)
at (:267)
at (:115)
at $(:1595)
... 19 more
Caused by: $AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: : Address already in use: no further information
at (Native Method)
at (:717)
at (:337)
at $(:334)
at (:776)
at (:724)
at (:650)
at (:562)
at $(:997)
at $(:74)
at (:30)
at (:745)
solve
According to code and phenomenon analysis, the root cause may beThe processing time is too long, causing the Redis message backlog to trigger timeout exception. Here are detailed analysis and solutions:
🔍 The root of the problem
Synchronous blocking model:
-
()
There is a network delay when calling the external interface, resulting in message processing time exceeding RedisBLOCK
Timeout (2000ms). - Failure to ACK in time will cause the message to re-enter the Pending state, and it will be triggered after accumulation.
READONLY
Error or consumer group invalid.
(1000) Intensify blockage:
- Even if there is no message, each loop is forced to sleep for 1 second, reducing consumption throughput and unable to process backlog messages in time.
Exception not properly handled:
- Commented out try-catch causes exceptions during processing (such as network timeout) to be not caught, and Redis connection is not properly restored after interrupting the loop.
🛠️ Solution
1. Asynchronous processing + advance ACK (key steps)
Separate message processing from ACK,ACK immediately before processingAvoid message backlog and process business logic asynchronously:
ExecutorService executor = (); // Global thread poolwhile (!().isInterrupted()) { List<MapRecord<String, Object, Object>> records = (...); if (records != null) { for (MapRecord<String, Object, Object> record : records) { // Early ACK release message (streamKey, groupName, ()); // Submit asynchronous task processing business (() -> { try { String strdata = (String) ().get("frameData"); byte[] frameData = ().decode(strdata); BufferedImage image = (new ByteArrayInputStream(frameData)); (image, streamId, commonQueues, regions); } catch (Exception e) { ("Failed to process frames: {}", ()); } }); } } // Remove (1000) to increase throughput}
2. Adjust Redis consumer group parameters
IncreaseXGROUP CREATE
At the same timeBLOCK
andIDLE
time out:
XGROUP CREATE stream:1234 mygroup $ MKSTREAM BLOCK 5000 IDLE 30000
3. Optimize Redis connection configuration
Ensure that the read and write timeout of RedisTemplate is greater than the maximum service processing time:
@Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); (redisConnectionFactory()); // Set the timeout time (example: 10 seconds) RedisConnectionFactory factory = (); if (factory instanceof LettuceConnectionFactory) { ((LettuceConnectionFactory) factory).setTimeout((10)); } return template; }
4. Monitor the Pending queue
Check the number of Pending messages regularly and reset the consumer when abnormal:
// Add checks in loopLong pending = ().pending(streamKey, groupName); if (pending > 1000) { // Threshold is set according to service ().destroyGroup(streamKey, groupName); ().createGroup(streamKey, groupName); }
5. Add a circuit breaker mechanism
When external interfaces fail continuously, pause consumption to avoid avalanches:
// Global fuseCircuitBreaker breaker = ("frameProcessor"); // Change the processing logic to(() -> { (...); return null; });
📈 Verification plan
Log monitoring:
// Add time-consuming logs to asynchronous taskslong start = (); (...); ("Time-consuming processing: {}ms", () - start);
Ensure that a single processing time is below the Redis timeout threshold.
Redis CLI Check:
XINFO GROUPS stream:1234 # Check the status of the consumer groupXPENDING stream:1234 mygroup # CheckPendinginformation
🌟 Summary of key optimization points
measure | effect |
---|---|
Asynchronous processing + advance ACK | Completely decoupled processing time consuming and Redis consumption |
Increase Redis timeout parameter | Avoid actively disconnecting the server |
Thread pool isolation | Prevent external interface failure from dragging down the entire consumer service |
Fuse mechanism | Fast failure to protect system stability |
Through the above modification, even()
Calling external platform interfaces can also ensure the stability of Redis consumption links through asynchronization and resource isolation. In the end, you only need to ensure that the thread pool and the fuse parameters are adapted to the business load.
Specific code addition location:
The following is forOptimize Redis connection configuration、Monitor Pending queuesandAdding a circuit breaker mechanismDetailed description of the specific location of the code:
1. Optimize Redis connection configuration
Location: Redis configuration class
Adjust the timeout parameter in Spring Boot's Redis configuration class (usually):
@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { // Set connection timeout and read and write timeout (key parameters) ((10)); // Command timeout (false); // Disable shared connections to avoid blocking RedisTemplate<String, Object> template = new RedisTemplate<>(); (lettuceConnectionFactory); (new StringRedisSerializer()); (new GenericJackson2JsonRedisSerializer()); return template; } }
Key parameter description:
-
setTimeout(10 seconds)
: Ensure that the timeout is greater than()
Maximum processing time -
setShareNativeConnection(false)
: Avoid blocking by sharing the same connection by multiple threads.
2. Monitor the Pending queue
Location:consumeFrames
In the loop within the method
Check the Pending queue periodically in the main loop of consuming messages:
private void consumeFrames(String streamId, String groupName, String consumerName, CommonQueues commonQueues, String regions) throws InterruptedException, IOException { // ... Other initialization codes ... int checkPendingInterval = 10; // Check the Pending queue every 10 cycles int loopCount = 0; while (!().isInterrupted()) { // ... The original code reads the message... // Monitor the logic of the Pending queue (add position) loopCount++; if (loopCount % checkPendingInterval == 0) { String streamKey = "stream:" + streamId; PendingMessages pending = (streamKey, groupName); if (pending != null && () > 1000) { // Threshold adjustment according to business ("DetectedPendingMessage backlog {} strip,Reset the consumer group", ()); (streamKey, groupName); ((streamKey), groupName); } } // ... Subsequent processing code ... } }
illustrate:
- pass
()
Gets the current number of Pending messages. - When the Pending message exceeds the threshold, force the consumer group to be destroyed and rebuilt to avoid message jamming.
3. Add a circuit breaker mechanism
Location: Outer layer of business logic processing messages
Package with Resilience4j fuse()
Called:
1. Fuse configuration class
@Configuration public class CircuitBreakerConfig { @Bean public CircuitBreaker frameProcessorCircuitBreaker() { CircuitBreakerConfig config = () .failureRateThreshold(50) // Failure rate threshold is 50% .slidingWindowType(SlidingWindowType.COUNT_BASED) .slidingWindowSize(10) // Based on the last 10 calls .minimumNumberOfCalls(5) // Calculation starts after at least 5 calls .waitDurationInOpenState((30)) // Enter half-open 30 seconds after the fuse .build(); return (config).circuitBreaker("frameProcessor"); } }
2. Use fuses in consumption code
public class YourConsumerClass { @Autowired private CircuitBreaker frameProcessorCircuitBreaker; // Injection fuse private void consumeFrames(...) { // ... Original code ... for (MapRecord<String, Object, Object> record : records) { (...); // ACK in advance // Use fuse protection processing logic (add position) (() -> (() -> { String strdata = (String) ().get("frameData"); byte[] frameData = ().decode(strdata); BufferedImage image = (new ByteArrayInputStream(frameData)); (image, streamId, commonQueues, regions); })).onFailure(e -> ("Processing failed and fuses: {}", ())); } // ... Subsequent code ... } }
Fuse logic description:
- when
()
When the threshold is triggered continuously, the fuse temporarily blocks subsequent calls to avoid the avalanche effect. - The processing will be skipped directly during the circuit breaker, but the ACK message will still be available (select whether to try again according to the business needs).
4. Summary of code integration location
Optimization measures | Code location | Key Notes |
---|---|---|
Redis connection configuration | Redis configuration class (such as ) |
Adjust timeout and connection pool parameters |
Pending queue monitoring |
consumeFrames In the main loop of the method |
Regular check + automatic reset of consumer groups |
Fuse mechanism | Business processing code outer layer (package ) |
Relying on fuse libraries (such as Resilience4j) |
5. Parameter adjustment suggestions
Redis timeout:
-
Should be greater than
()
Maximum processing time + network jitter margin (if set to 2 times the actual maximum processing time).
Pending queue threshold:
- If 100 messages are processed per second, the threshold can be set to
1000
(equivalent to the backlog of 10 seconds).
Fuse parameters:
-
failureRateThreshold
: Adjust according to the stability of the external interface (such as frequent timeouts can be set to 70%). -
waitDurationInOpenState
: Adjusted according to the recovery time of external services (such as 30 seconds to 5 minutes).
Through the above modification, even()
Calling external platform interfaces can also ensure the stability of Redis consumption links through resource isolation, rapid failure and automatic recovery mechanisms.
This is the end of this article about redis abnormal exit in spring boot. For more related redis spring boot exception exit content, please search for my previous article or continue browsing the related articles below. I hope everyone will support me in the future!