Setup

pom.xml

<dependencies>
    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Optional: For JSON serialization -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

application.yml

spring:
	kafka:
		# Kafka broker address
	  bootstrap-servers: localhost:9092
	  
	  # consumer setup
    consumer:
			group-id: my-group
			# Start from the beginning of the topic
			auto-offset-reset: earliest
			key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
			value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        spring.json.trusted.packages: com.example.mhpractice.features.wallet.event

	  # producer setup
		producer:
			key-serializer: org.apache.kafka.common.serialization.StringSerializer
			value-serializer: org.apache.kafka.common.serialization.StringSerializer
		
# inside app custom config (so not hardcoded)	
app:
  kafka:
    retry:
      count: 3          
      backoff-ms: 2000  
    transaction:
      enabled: false

compose.yml

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: mhpractice-zookeeper-1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: mhpractice-kafka-1
    hostname: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      # a unique id for each broker
      KAFKA_BROKER_ID: 1
      # connect to zookeeper
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # security protocol map
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      # advertised listeners (the address Kafka gives to the client AFTER they connect, telling them where to send future requests.)
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      # listeners (the reception port, if receive from internal, give it internal, if receive from external, give it external)
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
      # inter-broker listener name
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      # offsets topic replication factor (only 1 broker, so set to 1)
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      # transaction state log min ISR (only 1 broker, so set to 1)
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      # transaction state log replication factor (only 1 broker, so set to 1)
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    healthcheck:
      test: [ "CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list" ]
      interval: 10s
      timeout: 5s
      retries: 5

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

Kafka Config class

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${app.kafka.retry.count}")
    private int retryCount;

    @Value("${app.kafka.retry.backoff-ms}")
    private long retryBackoffMs;

    @Value("${app.kafka.transaction.enabled}")
    private boolean transactionEnabled;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    // Creates Kafka template for sending messages
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    // Transaction manager for @Transactional support
    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
        return new KafkaTransactionManager<>(producerFactory());
    }

    // Creates Kafka producers for sending messages
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        // Transaction support
        // Ensures messages sent in a transaction are atomic (all or nothing)
        // Prevents duplicate messages even if producer retries
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "wallet-tx-");

        // Reliability settings
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
        props.put(ProducerConfig.RETRIES_CONFIG, retryCount); // Retry 3 times
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); // Wait before retry

        return new DefaultKafkaProducerFactory<>(props);
    }

    // Creates Kafka consumers for receiving messages
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        // Error-handling deserializers (wraps actual deserializer)
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);

        // Actual deserializers
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);

        // Trust only your event packages
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.mhpractice.features.wallet.event");
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, true);
        // props.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
        // "com.example.mhpractice.features.wallet.event.TransferRequestEvent");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    // Required for @KafkaListener to work!
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        // Creates Kafka listener container factory
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        // Optional: Add error handling with retry
        factory.setCommonErrorHandler(new DefaultErrorHandler(
                new FixedBackOff(retryBackoffMs, retryCount)));

        return factory;
    }
}

Kafka send message

// kafkaTemplate.send(topic, message)
// is async, returns a ListenableFuture.
kafkaTemplate.send("transfer.events.success", transactionId);

// Transactional send
kafkaTemplate.executeInTransaction(operations -> {
    operations.send("transfer.events.request", transferRequestEvent);
    return null;
});

Kafka Listener

@RequiredArgsConstructor
@Component
public class TransferEventListener {

    private final TransferOrchestrator transferOrchestrator;
    private final TransactionRepository transactionRepository;

    @KafkaListener(topics = "transfer.events.request", groupId = "wallet-service-group")
    public void handleTransferRequest(TransferRequestEvent event) {
        transferOrchestrator.executeTransfer(event.getFromWalletId(), event.getToWalletId(), event.getAmount(),
                event.getTransactionId());
    }

    @KafkaListener(topics = "transfer.events.success", groupId = "notification-group")
    public void handleTransferSuccess(String transactionId) {
        Transaction txn = transactionRepository.findByTransactionId(transactionId).orElse(null);
    }

    @KafkaListener(topics = "transfer.events.failed", groupId = "notification-group")
    public void handleTransferFailed(String transactionId) {
        Transaction txn = transactionRepository.findByTransactionId(transactionId).orElse(null);
    }

    @KafkaListener(topics = "transfer.events.rollback", groupId = "notification-group")
    public void handleTransferRollback(String transactionId) {
        Transaction txn = transactionRepository.findByTransactionId(transactionId).orElse(null);
    }
}