pom.xml
<dependencies>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Optional: For JSON serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
application.yml
spring:
kafka:
# Kafka broker address
bootstrap-servers: localhost:9092
# consumer setup
consumer:
group-id: my-group
# Start from the beginning of the topic
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring.json.trusted.packages: com.example.mhpractice.features.wallet.event
# producer setup
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# inside app custom config (so not hardcoded)
app:
kafka:
retry:
count: 3
backoff-ms: 2000
transaction:
enabled: false
compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: mhpractice-zookeeper-1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: mhpractice-kafka-1
hostname: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
# a unique id for each broker
KAFKA_BROKER_ID: 1
# connect to zookeeper
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# security protocol map
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# advertised listeners (the address Kafka gives to the client AFTER they connect, telling them where to send future requests.)
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
# listeners (the reception port, if receive from internal, give it internal, if receive from external, give it external)
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
# inter-broker listener name
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# offsets topic replication factor (only 1 broker, so set to 1)
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# transaction state log min ISR (only 1 broker, so set to 1)
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
# transaction state log replication factor (only 1 broker, so set to 1)
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
healthcheck:
test: [ "CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list" ]
interval: 10s
timeout: 5s
retries: 5
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${app.kafka.retry.count}")
private int retryCount;
@Value("${app.kafka.retry.backoff-ms}")
private long retryBackoffMs;
@Value("${app.kafka.transaction.enabled}")
private boolean transactionEnabled;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
// Creates Kafka template for sending messages
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// Transaction manager for @Transactional support
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
// Creates Kafka producers for sending messages
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Transaction support
// Ensures messages sent in a transaction are atomic (all or nothing)
// Prevents duplicate messages even if producer retries
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "wallet-tx-");
// Reliability settings
props.put(ProducerConfig.ACKS_CONFIG, "all"); // Wait for all replicas
props.put(ProducerConfig.RETRIES_CONFIG, retryCount); // Retry 3 times
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); // Wait before retry
return new DefaultKafkaProducerFactory<>(props);
}
// Creates Kafka consumers for receiving messages
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
// Error-handling deserializers (wraps actual deserializer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
// Actual deserializers
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
// Trust only your event packages
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.mhpractice.features.wallet.event");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, true);
// props.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
// "com.example.mhpractice.features.wallet.event.TransferRequestEvent");
return new DefaultKafkaConsumerFactory<>(props);
}
// Required for @KafkaListener to work!
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
// Creates Kafka listener container factory
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Optional: Add error handling with retry
factory.setCommonErrorHandler(new DefaultErrorHandler(
new FixedBackOff(retryBackoffMs, retryCount)));
return factory;
}
}
Kafka send message
// kafkaTemplate.send(topic, message)
// is async, returns a ListenableFuture.
kafkaTemplate.send("transfer.events.success", transactionId);
// Transactional send
kafkaTemplate.executeInTransaction(operations -> {
operations.send("transfer.events.request", transferRequestEvent);
return null;
});
Kafka Listener
@RequiredArgsConstructor
@Component
public class TransferEventListener {
private final TransferOrchestrator transferOrchestrator;
private final TransactionRepository transactionRepository;
@KafkaListener(topics = "transfer.events.request", groupId = "wallet-service-group")
public void handleTransferRequest(TransferRequestEvent event) {
transferOrchestrator.executeTransfer(event.getFromWalletId(), event.getToWalletId(), event.getAmount(),
event.getTransactionId());
}
@KafkaListener(topics = "transfer.events.success", groupId = "notification-group")
public void handleTransferSuccess(String transactionId) {
Transaction txn = transactionRepository.findByTransactionId(transactionId).orElse(null);
}
@KafkaListener(topics = "transfer.events.failed", groupId = "notification-group")
public void handleTransferFailed(String transactionId) {
Transaction txn = transactionRepository.findByTransactionId(transactionId).orElse(null);
}
@KafkaListener(topics = "transfer.events.rollback", groupId = "notification-group")
public void handleTransferRollback(String transactionId) {
Transaction txn = transactionRepository.findByTransactionId(transactionId).orElse(null);
}
}