SoFunction
Updated on 2025-05-06

SpringBoot integrates Kafka and uses multiple dead letter queues for details

Here is a complete example of Spring Boot integrating Kafka and using multiple dead letter queues, with code and configuration instructions.

1. Add dependencies ()

<dependencies>
    <dependency>
        <groupId></groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId></groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

2. Configuration file ()

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: 
      value-serializer: 
    consumer:
      group-id: my-group
      key-deserializer: 
      value-deserializer: 
      auto-offset-reset: earliest

3. Custom exception class

public class BusinessException extends RuntimeException {
    public BusinessException(String message) {
        super(message);
    }
}

4. Kafka configuration class

import ;
import ;
import ;
import ;
import ;
import ;
import .*;
import ;
import ;
import ;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${-servers}")
    private String bootstrapServers;

    // Kafka Producer Configuration    @Bean
    public ProducerFactory&lt;String, String&gt; producerFactory() {
        Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();
        (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, );
        (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, );
        return new DefaultKafkaProducerFactory&lt;&gt;(config);
    }

    @Bean
    public KafkaTemplate&lt;String, String&gt; kafkaTemplate() {
        return new KafkaTemplate&lt;&gt;(producerFactory());
    }

    // Kafka consumer configuration    @Bean
    public ConsumerFactory&lt;String, String&gt; consumerFactory() {
        Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();
        (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        (ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new DefaultKafkaConsumerFactory&lt;&gt;(config);
    }

    // Custom error handler (supports multiple dead letter queues)    @Bean
    public CommonErrorHandler errorHandler(KafkaTemplate&lt;String, String&gt; kafkaTemplate) {
        // Retry strategy: 3 retry, 1 second interval        FixedBackOff backOff = new FixedBackOff(1000L, 3);

        DefaultErrorHandler errorHandler = new DefaultErrorHandler((record, exception) -&gt; {
            String dlqTopic = determineDlqTopic(exception);
            (dlqTopic, (), ());
            ("Message sent to the dead letter queue: " + dlqTopic);
        }, backOff);

        // Configure the exception type that needs to be retryed        ();
        ();

        return errorHandler;
    }

    // Select the dead letter queue according to the exception type    private String determineDlqTopic(Throwable exception) {
        if (() instanceof SerializationException) {
            return "serialization-error-dlq";
        } else if (() instanceof BusinessException) {
            return "business-error-dlq";
        } else {
            return "general-error-dlq";
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
        (consumerFactory());
        (errorHandler(kafkaTemplate()));
        return factory;
    }
}

5. Kafka Consumer Services

import ;
import ;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "main-topic")
    public void consume(String message) {
        try {
            if (("invalid-format")) {
                throw new SerializationException("Message format error");
            } else if (("business-error")) {
                throw new BusinessException("Business processing failed");
            }
            ("Message processing successfully: " + message);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

6. Start class

import ;
import ;

@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        (, args);
    }
}

7. Test steps

1. Create a Kafka topic:

kafka-topics --create --bootstrap-server localhost:9092 --topic main-topic
kafka-topics --create --bootstrap-server localhost:9092 --topic serialization-error-dlq
kafka-topics --create --bootstrap-server localhost:9092 --topic business-error-dlq
kafka-topics --create --bootstrap-server localhost:9092 --topic general-error-dlq

2. Send a test message:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendTestMessages() {
    ("main-topic", "valid-message");
    ("main-topic", "invalid-format");
    ("main-topic", "business-error");
}

3. Observe the dead letter queue:

  • Messages with error format will enter serialization-error-dlq
  • Business exception messages will enter business-error-dlq
  • Other exceptions enter general-error-dlq

Key points description

Error routing logic: Select different dead letter queues according to exception type through the determineDlqTopic method.

Retry mechanism: Configure the retry policy through FixedBackOff (retry up to 3 times, 1 second interval).

Exception classification:

  • SerializationException (serialization problem) directly enters the dead letter queue and does not try again.
  • BusinessException will trigger a retry and enter the dead letter queue after the eventual failure.

This is the end of this article about SpringBoot integrating Kafka and using multiple dead letter queues. For more related content on SpringBoot Kafka using dead letter queues, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!