Here is a complete example of Spring Boot integrating Kafka and using multiple dead letter queues, with code and configuration instructions.
1. Add dependencies ()
<dependencies> <dependency> <groupId></groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId></groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
2. Configuration file ()
spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: value-serializer: consumer: group-id: my-group key-deserializer: value-deserializer: auto-offset-reset: earliest
3. Custom exception class
public class BusinessException extends RuntimeException { public BusinessException(String message) { super(message); } }
4. Kafka configuration class
import ; import ; import ; import ; import ; import ; import .*; import ; import ; import ; @Configuration @EnableKafka public class KafkaConfig { @Value("${-servers}") private String bootstrapServers; // Kafka Producer Configuration @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> config = new HashMap<>(); (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ); (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ); return new DefaultKafkaProducerFactory<>(config); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } // Kafka consumer configuration @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> config = new HashMap<>(); (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); (ConsumerConfig.GROUP_ID_CONFIG, "my-group"); (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return new DefaultKafkaConsumerFactory<>(config); } // Custom error handler (supports multiple dead letter queues) @Bean public CommonErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) { // Retry strategy: 3 retry, 1 second interval FixedBackOff backOff = new FixedBackOff(1000L, 3); DefaultErrorHandler errorHandler = new DefaultErrorHandler((record, exception) -> { String dlqTopic = determineDlqTopic(exception); (dlqTopic, (), ()); ("Message sent to the dead letter queue: " + dlqTopic); }, backOff); // Configure the exception type that needs to be retryed (); (); return errorHandler; } // Select the dead letter queue according to the exception type private String determineDlqTopic(Throwable exception) { if (() instanceof SerializationException) { return "serialization-error-dlq"; } else if (() instanceof BusinessException) { return "business-error-dlq"; } else { return "general-error-dlq"; } } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); (consumerFactory()); (errorHandler(kafkaTemplate())); return factory; } }
5. Kafka Consumer Services
import ; import ; @Service public class KafkaConsumerService { @KafkaListener(topics = "main-topic") public void consume(String message) { try { if (("invalid-format")) { throw new SerializationException("Message format error"); } else if (("business-error")) { throw new BusinessException("Business processing failed"); } ("Message processing successfully: " + message); } catch (Exception e) { throw new RuntimeException(e); } } }
6. Start class
import ; import ; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { (, args); } }
7. Test steps
1. Create a Kafka topic:
kafka-topics --create --bootstrap-server localhost:9092 --topic main-topic
kafka-topics --create --bootstrap-server localhost:9092 --topic serialization-error-dlq
kafka-topics --create --bootstrap-server localhost:9092 --topic business-error-dlq
kafka-topics --create --bootstrap-server localhost:9092 --topic general-error-dlq
2. Send a test message:
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendTestMessages() { ("main-topic", "valid-message"); ("main-topic", "invalid-format"); ("main-topic", "business-error"); }
3. Observe the dead letter queue:
- Messages with error format will enter serialization-error-dlq
- Business exception messages will enter business-error-dlq
- Other exceptions enter general-error-dlq
Key points description
Error routing logic: Select different dead letter queues according to exception type through the determineDlqTopic method.
Retry mechanism: Configure the retry policy through FixedBackOff (retry up to 3 times, 1 second interval).
Exception classification:
- SerializationException (serialization problem) directly enters the dead letter queue and does not try again.
- BusinessException will trigger a retry and enter the dead letter queue after the eventual failure.
This is the end of this article about SpringBoot integrating Kafka and using multiple dead letter queues. For more related content on SpringBoot Kafka using dead letter queues, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!