HTTP Idempotency Keys

Client generate UUID as Idempotency key

  1. Sends it as a request header.

  2. Controller read it:

    // required = false : optional 
    // if client doesn't send it, task creation works normally with no dedup.
    @RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey
    

Idempotency Service

public class IdempotencyService {

    private final StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;

    private static final Duration TTL = Duration.ofHours(24);
    private static final String PREFIX = "idempotency:task:";

    public Optional<TaskResponse> get(String idempotencyKey) {
        String cached = redisTemplate.opsForValue().get(PREFIX + idempotencyKey);
        if (cached == null) {
            return Optional.empty();
        }
        try {
            return Optional.of(objectMapper.readValue(cached, TaskResponse.class));
        } catch (Exception e) {
            log.warn("Failed to deserialize cached idempotency response",
                    kv("idempotencyKey", idempotencyKey));
            return Optional.empty();
        }
    }

    public void store(String idempotencyKey, TaskResponse response) {
        try {
            String json = objectMapper.writeValueAsString(response);
            redisTemplate.opsForValue().set(PREFIX + idempotencyKey, json, TTL);
        } catch (Exception e) {
            log.warn("Failed to store idempotency response",
                    kv("idempotencyKey", idempotencyKey));
        }
    }
}

Usage in Controller

private final TaskService taskService;
private final IdempotencyService idempotencyService;

@PostMapping
@Timed(value = "task.create", description = "Time taken to create a task")
public ResponseEntity<TaskResponse> createTask(
        @Valid @RequestBody CreateTaskRequest request,
        @RequestHeader("X-User-Id") String userId,
        @RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey) {

		// check the idempotency key exist dy or not
    if (idempotencyKey != null) {
        Optional<TaskResponse> cached = idempotencyService.get(idempotencyKey);
        if (cached.isPresent()) {
            return ResponseEntity.ok(cached.get());
        }
    }

    TaskResponse response = taskService.createTask(request, UUID.fromString(userId));

		// dy executed task, store the idempotency key into redis
    if (idempotencyKey != null) {
        idempotencyService.store(idempotencyKey, response);
    }

  return ResponseEntity.status(HttpStatus.CREATED).body(response);
}

Kafka Producer Idempotency

spring:
  application:
    name: orchestrator-service

  kafka:
    bootstrap-servers: localhost:29092
    consumer:
      group-id: orchestrator-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
      properties:
        specific.avro.reader: true
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      acks: all
      retries: 3
      properties:
        **enable.idempotence: true**

Kafka Consumer Idempotency

@KafkaListener(topics = "${kafka.topics.task-created}", groupId = "${spring.kafka.consumer.group-id}")
public void consumeTaskCreated(ConsumerRecord<String, TaskEvent> record) {
    TaskEvent event = record.value();
    // set a unique id for this task
    String dedupKey = PROCESSED_KEY_PREFIX + event.getEventId();

		// if reconnect, check this id to see is it exist dy
    if (Boolean.TRUE.equals(redisTemplate.hasKey(dedupKey))) {
        log.warn("Duplicate event detected, skipping",
                kv("eventId", event.getEventId()),
                kv("taskId", event.getTaskId()));
        return;
    }

    try {
        orchestrationService.processTaskCreated(event);
        redisTemplate.opsForValue().set(dedupKey, "1", DEDUP_TTL);
    } catch (Exception e) {
        log.error("Failed to process task.created event",
                kv("taskId", event.getTaskId()),
                kv("error", e.getMessage()), e);
        throw e;
    }
}