Client generate UUID as Idempotency key
Sends it as a request header.
Controller read it:
// required = false : optional
// if client doesn't send it, task creation works normally with no dedup.
@RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey
Idempotency Service
public class IdempotencyService {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
private static final Duration TTL = Duration.ofHours(24);
private static final String PREFIX = "idempotency:task:";
public Optional<TaskResponse> get(String idempotencyKey) {
String cached = redisTemplate.opsForValue().get(PREFIX + idempotencyKey);
if (cached == null) {
return Optional.empty();
}
try {
return Optional.of(objectMapper.readValue(cached, TaskResponse.class));
} catch (Exception e) {
log.warn("Failed to deserialize cached idempotency response",
kv("idempotencyKey", idempotencyKey));
return Optional.empty();
}
}
public void store(String idempotencyKey, TaskResponse response) {
try {
String json = objectMapper.writeValueAsString(response);
redisTemplate.opsForValue().set(PREFIX + idempotencyKey, json, TTL);
} catch (Exception e) {
log.warn("Failed to store idempotency response",
kv("idempotencyKey", idempotencyKey));
}
}
}
Usage in Controller
private final TaskService taskService;
private final IdempotencyService idempotencyService;
@PostMapping
@Timed(value = "task.create", description = "Time taken to create a task")
public ResponseEntity<TaskResponse> createTask(
@Valid @RequestBody CreateTaskRequest request,
@RequestHeader("X-User-Id") String userId,
@RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey) {
// check the idempotency key exist dy or not
if (idempotencyKey != null) {
Optional<TaskResponse> cached = idempotencyService.get(idempotencyKey);
if (cached.isPresent()) {
return ResponseEntity.ok(cached.get());
}
}
TaskResponse response = taskService.createTask(request, UUID.fromString(userId));
// dy executed task, store the idempotency key into redis
if (idempotencyKey != null) {
idempotencyService.store(idempotencyKey, response);
}
return ResponseEntity.status(HttpStatus.CREATED).body(response);
}
spring:
application:
name: orchestrator-service
kafka:
bootstrap-servers: localhost:29092
consumer:
group-id: orchestrator-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
specific.avro.reader: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
acks: all
retries: 3
properties:
**enable.idempotence: true**
@KafkaListener(topics = "${kafka.topics.task-created}", groupId = "${spring.kafka.consumer.group-id}")
public void consumeTaskCreated(ConsumerRecord<String, TaskEvent> record) {
TaskEvent event = record.value();
// set a unique id for this task
String dedupKey = PROCESSED_KEY_PREFIX + event.getEventId();
// if reconnect, check this id to see is it exist dy
if (Boolean.TRUE.equals(redisTemplate.hasKey(dedupKey))) {
log.warn("Duplicate event detected, skipping",
kv("eventId", event.getEventId()),
kv("taskId", event.getTaskId()));
return;
}
try {
orchestrationService.processTaskCreated(event);
redisTemplate.opsForValue().set(dedupKey, "1", DEDUP_TTL);
} catch (Exception e) {
log.error("Failed to process task.created event",
kv("taskId", event.getTaskId()),
kv("error", e.getMessage()), e);
throw e;
}
}