SoFunction
Updated on 2025-05-23

SpringKafka Message Consumption @KafkaListener and Consumption Group Configuration Method

introduction

As a high-throughput distributed messaging system, Apache Kafka plays a key role in big data processing and microservice architecture.

Spring Kafka provides Java developers with a simple and easy-to-use Kafka consumer API, especially through @KafkaListener annotation, which greatly simplifies the implementation process of message consumption.

This article will explore the message consumption mechanism of Spring Kafka in depth, focusing on the usage methods and consumption group configuration strategies of @KafkaListener annotation, helping developers build an efficient and stable message consumption system.

1. Spring Kafka consumer basic configuration

The first step in using Spring Kafka for message consumption is to configure the consumer factory and the listener container factory.

These configurations define the basic behavior of consumers, including server addresses, message deserialization methods, etc.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, );
        (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, );
        (ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // Make JsonDeserializer trust all packages        (JsonDeserializer.TRUSTED_PACKAGES, "*");
        
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        (consumerFactory());
        return factory;
    }
}

2. Use @KafkaListener annotation

@KafkaListener is a core annotation provided by Spring Kafka for marking methods as Kafka message listeners.

Through simple annotation configuration, automatic consumption and processing of messages can be achieved.

@Service
public class KafkaConsumerService {

    // Basic usage: Listen to a single topic    @KafkaListener(topics = "test-topic", groupId = "test-group")
    public void listen(String message) {
        ("Message received:" + message);
    }
    
    // Listen to multiple topics    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "multi-topic-group")
    public void listenMultipleTopics(String message) {
        ("Received messages from multiple topics:" + message);
    }
    
    // Specify partition monitoring    @KafkaListener(topicPartitions = {
        @TopicPartition(topic = "partitioned-topic", partitions = {"0", "1"})
    }, groupId = "partitioned-group")
    public void listenPartitions(String message) {
        ("Received from a specific partition:" + message);
    }
    
    // Use ConsumerRecord to get message metadata    @KafkaListener(topics = "metadata-topic", groupId = "metadata-group")
    public void listenWithMetadata(ConsumerRecord<String, String> record) {
        ("theme:" + () + 
                          ", Partition:" + () +
                          ", Offset:" + () +
                          ",key:" + () +
                          ",value:" + ());
    }
    
    // Bulk consumption    @KafkaListener(topics = "batch-topic", groupId = "batch-group", 
                  containerFactory = "batchListenerFactory")
    public void listenBatch(List<String> messages) {
        ("Batch message received, quantity:" + ());
        (message -> ("Batch Message:" + message));
    }
}

Configuring batch consumption requires additional batch listener container factories:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> batchListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    (consumerFactory());
    (true);  // Enable batch monitoring    ().setPollTimeout(3000);  // Polling timeout    return factory;
}

3. Consumer group configuration and load balancing

Kafka's consumption group mechanism is the key to achieving load balancing of message consumption. Multiple consumer instances within the same group will automatically assign topic partitions to ensure that each partition is processed by only one consumer and realize parallel consumption.

// Configure consumption group attributes@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    // Basic configuration    (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, );
    (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, );
    
    // Consumption group configuration    (ConsumerConfig.GROUP_ID_CONFIG, "my-application-group");
    (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // Disable automatic submission    (ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);  // Maximum number of records for a single poll    (ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);  // Session timeout    (ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);  // Heartbeat interval    
    return new DefaultKafkaConsumerFactory<>(props);
}

Multiple consumers can achieve load balancing by configuring the same group ID:

// Consumer 1@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer1(String message) {
    ("Consumer 1 received the message:" + message);
}

// Consumer 2@KafkaListener(topics = "shared-topic", groupId = "shared-group")
public void consumer2(String message) {
    ("Consumer 2 received the message:" + message);
}

When these two consumers run at the same time, Kafka automatically assigns topic partitions to them, each consumer only processes messages in the partition assigned to it.

4. Manually submit offsets

In some scenarios, the automatic submission offset may not meet the requirements, and manual submission can be configured at this time. Manual submission allows for more precise control of the acknowledgment timing of message consumption, ensuring that offsets are submitted only after the message is fully processed.

@Configuration
public class ManualCommitConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> manualCommitFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        (consumerFactory());
        ().setAckMode();
        return factory;
    }
}

@Service
public class ManualCommitService {
    
    @KafkaListener(topics = "manual-commit-topic", 
                  groupId = "manual-group",
                  containerFactory = "manualCommitFactory")
    public void listenWithManualCommit(String message, Acknowledgment ack) {
        try {
            ("Processing message:" + message);
            // Business logic for processing messages            // ...
            // Confirm the message after successful processing            ();
        } catch (Exception e) {
            // Exception handling, you can choose not to confirm            ("Message processing failed:" + ());
        }
    }
}

5. Error handling and retry mechanism

Various exceptions may be encountered during message consumption. Spring Kafka provides a comprehensive error handling mechanism, including retry, dead letter queues, etc.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryListenerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = 
        new ConcurrentKafkaListenerContainerFactory<>();
    (consumerFactory());
    
    // Retry configuration    (retryTemplate());
    
    //Configuration recovery callback    (context -> {
        ConsumerRecord<String, String> record = 
            (ConsumerRecord<String, String>) ("record");
        ("Retry failed, sent to the dead letter queue:" + ());
        // Messages can be sent to dead letter topics        // ("dead-letter-topic", ());
        return null;
    });
    
    return factory;
}

private RetryTemplate retryTemplate() {
    RetryTemplate template = new RetryTemplate();
    
    // Fixed interval retry strategy    FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    (1000);  // 1 second retry interval    (backOffPolicy);
    
    // Simply try the policy    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    (3);  // Maximum number of retry    (retryPolicy);
    
    return template;
}

@KafkaListener(topics = "retry-topic", groupId = "retry-group", 
               containerFactory = "retryListenerFactory")
public void listenWithRetry(String message) {
    ("Received a message that needs to be retryed:" + message);
    // Simulation processing failed    if (("error")) {
        throw new RuntimeException("Processing failed, will try again");
    }
    ("Message processing succeeded");
}

Summarize

Spring Kafka provides developers with strong message consumption capabilities through @KafkaListener annotation and flexible consumption group configuration.

This article introduces the basic configuration, how to use @KafkaListener, consumption group mechanism, manual submission of offsets, and error handling strategies.

In practical applications, developers should choose appropriate consumption patterns and configuration strategies based on business needs to achieve efficient and reliable message processing.

Rational use of consumer groups can achieve load balancing and horizontal scaling, while manual submission of offsets and error handling mechanisms can improve the robustness of the system.

The above is personal experience. I hope you can give you a reference and I hope you can support me more.