이 연재글은 KAFKA 정보 모음의 4번째 글입니다.

kafka 스트림 처리 시 어떤 이유로 메시지 처리가 실패하거나 정해진 횟수만큼 재시도했음에도 메시지 처리에 실패할 경우 대응 방법에 대해 알아보겠습니다. 일반적인 Queue(SQS)를 사용할 때는 Deadletter queue를 설정함으로써 간편하게 후속 처리를 할 수 있지만 kafka의 경우는 추가적인 코드나 설정을 통해 해당 기능을 구현해야 합니다.

spring-kafka 사용하여 재시도 구현

retryTemplate 작성

재시도 시 얼마간의 delay를 주고 몇 번 재시도를 할지 설정을 담은 Template을 작성합니다.

  • setBackOffPeriod
    • 재시도 시 적절한 delay 타임 이후 시도하도록 backoff 시간을 설정합니다.
  • setMaxAttempts
    • 최대 몇 번까지 재시도할지 설정합니다.
@Bean
private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    // 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다.
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(1000L);
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    // 최대 재시도 횟수 설정
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(2);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

recoveryCallback 작성

retryTemplate에 의해 Max Attempt 만큼 재시도를 수행한 후에도 처리에 실패할 경우 후속 처리를 수행할 callback 함수를 작성합니다. 상황에 따라 적절히 로그를 남기거나 DB에 기록하는 액션을 수행할 수 있습니다. 아래 코드에서는 후속 처리로 Deadletter topic에 실패한 메시지를 발송하도록 코드를 작성 합니다.

처리 실패한 메시지를 DeadLetter Topic(dlt_원본Topic이름)에 발송하고 원본 메시지는 완료(acknowledgment.acknowledge())처리 합니다.

private RecoveryCallback<Object> recoveryCallback(KafkaTemplate<String, String> kafkaTemplate) {
    return context -> {
        final var retryCount = context.getRetryCount();
        final var acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment");
        final var record = (ConsumerRecord) context.getAttribute("record");
        final var topic = "dlt_" + record.topic();
        final var value = record.value().toString();
        try {
            log.warn("[Send to dead letter topic] {} - retryCount: {} - value: {}.", topic, retryCount, value);
            kafkaTemplate.send(topic, value);
        } catch (Exception e) {
            log.error("[Fail to dead letter topic]: {}" , topic, e);
        }
        if (Objects.nonNull(acknowledgment)) {
            acknowledgment.acknowledge();
        }
        return Optional.empty();
    };
}

KafkaListenerContainerFactory 작성

위에서 생성한 retryTemplate, recoveryCallback을 주입하여 ListenerContainerFactory를 작성합니다.

private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
    final var props = Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(),
            ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(),
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(),
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
            ConsumerConfig.ISOLATION_LEVEL_CONFIG,KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT),
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
        );
        return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(recoveryCallback(kafkaTemplate));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
}

KafkaListener Annotation 설정에 containerFactory 설정

위에서 설정한 KafkaListenerContainerFactory를 containerFactory로 설정하여 재시도 및 Callaback이 동작하도록 합니다.

@KafkaListener(topics = "domain-event-user", containerFactory = "kafkaListenerContainerFactory")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
    log.debug("Received Headers : " + headers);
    log.debug("Received Payloads : " + user.toString());
    // retry 테스트를 위해 강제로 오류 발생
    throw new UnknownError("unexpected error");
}
[결과]
[Send to dead letter topic] dlt_domain-event-user - retryCount: 2 - value: {"id":"happydaddy@naver.com","name":"happydaddy","age":28}

console consumer를 통해 확인하면 다음과 같이 deadletter topic에 메시지가 전달된 것을 확인할 수 있습니다.

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic dlt_domain-event-user --from-beginning
"{\"id\":\"happydaddy@naver.com\",\"name\":\"happydaddy\",\"age\":28}"

spring cloud stream을 사용하여 재시도 구현

Spring Cloud Stream을 사용하면 설정을 추가하는 것 만으로 간편하게 재 시도 및 deadletter를 구현할 수 있습니다. 다만 직접 구현 시와 다르게 DeadLetter Topic으로 전송하는 처리만 가능하며 Callback 함수를 통해 다양한 후속 처리를 할 수는 없습니다.

application.yml에 설정 추가

  • cloud.stream.bindings에 consumer.max-attempts 설정을 추가합니다.
  • cloud.stream.kafka.bindings에 consumer.enableDlq, consumer.dlqName 설정을 추가합니다.
spring:
  cloud.stream:
    bindings:
      domainEventString-in-0:
        content-type: text/plain
        destination: domain-event-string
        group: consumer-group-string
      domainEventString-out-0:
        destination: domain-event-string
        group: consumer-group-string
      domainEventModel-in-0:
        destination: domain-event-model
        group: consumer-group-model
        consumer:
          max-attempts: 2
      domainEventModel-out-0:
        destination: domain-event-model
        group: consumer-group-model
    kafka:
      bindings:
        domainEventModel-in-0:
          consumer:
            enableDlq: true
            dlqName: dlt_domain-event-model
      binder:
        brokers: localhost:9092,localhost:9094,localhost:9095
        configuration:
          auto.offset.reset: earliest
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  cloud:
    stream:
      function:
        definition: domainEventString;domainEventModel

실습에서 사용한 코드는 아래 Github에서 확인 할 수 있습니다.

https://github.com/codej99/SpringKafkaConsumer

카프카에 대한 더 많은 정보는…

https://kafka.apache.org/

연재글 이동[이전글] Kafka Producer/Consumer (spring-kafka, spring-cloud-starter-stream-kafka)