이 연재글은 KAFKA 정보 모음의 4번째 글입니다.

kafka 스트림 처리 시 어떤 이유로 메시지 처리가 실패하거나 정해진 횟수만큼 재시도했음에도 메시지 처리에 실패할 경우 대응 방법에 대해 알아보겠습니다. 일반적인 Queue(SQS)를 사용할 때는 Deadletter queue를 설정함으로써 간편하게 후속 처리를 할 수 있지만 kafka의 경우는 추가적인 코드나 설정을 통해 해당 기능을 구현해야 합니다.

spring-kafka 사용하여 재시도 구현

retryTemplate 작성

재시도 시 얼마간의 delay를 주고 몇 번 재시도를 할지 설정을 담은 Template을 작성합니다.

  • setBackOffPeriod
    • 재시도 시 적절한 delay 타임 이후 시도하도록 backoff 시간을 설정합니다.
  • setMaxAttempts
    • 최대 몇 번까지 재시도할지 설정합니다.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Bean
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다.
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(1000L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
// 최대 재시도 횟수 설정
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(2);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean private RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다. FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(1000L); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); // 최대 재시도 횟수 설정 SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(2); retryTemplate.setRetryPolicy(retryPolicy); return retryTemplate; }
@Bean
private RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    // 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다.
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(1000L);
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    // 최대 재시도 횟수 설정
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(2);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

recoveryCallback 작성

retryTemplate에 의해 Max Attempt 만큼 재시도를 수행한 후에도 처리에 실패할 경우 후속 처리를 수행할 callback 함수를 작성합니다. 상황에 따라 적절히 로그를 남기거나 DB에 기록하는 액션을 수행할 수 있습니다. 아래 코드에서는 후속 처리로 Deadletter topic에 실패한 메시지를 발송하도록 코드를 작성 합니다.

처리 실패한 메시지를 DeadLetter Topic(dlt_원본Topic이름)에 발송하고 원본 메시지는 완료(acknowledgment.acknowledge())처리 합니다.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
private RecoveryCallback<Object> recoveryCallback(KafkaTemplate<String, String> kafkaTemplate) {
return context -> {
final var retryCount = context.getRetryCount();
final var acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment");
final var record = (ConsumerRecord) context.getAttribute("record");
final var topic = "dlt_" + record.topic();
final var value = record.value().toString();
try {
log.warn("[Send to dead letter topic] {} - retryCount: {} - value: {}.", topic, retryCount, value);
kafkaTemplate.send(topic, value);
} catch (Exception e) {
log.error("[Fail to dead letter topic]: {}" , topic, e);
}
if (Objects.nonNull(acknowledgment)) {
acknowledgment.acknowledge();
}
return Optional.empty();
};
}
private RecoveryCallback<Object> recoveryCallback(KafkaTemplate<String, String> kafkaTemplate) { return context -> { final var retryCount = context.getRetryCount(); final var acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment"); final var record = (ConsumerRecord) context.getAttribute("record"); final var topic = "dlt_" + record.topic(); final var value = record.value().toString(); try { log.warn("[Send to dead letter topic] {} - retryCount: {} - value: {}.", topic, retryCount, value); kafkaTemplate.send(topic, value); } catch (Exception e) { log.error("[Fail to dead letter topic]: {}" , topic, e); } if (Objects.nonNull(acknowledgment)) { acknowledgment.acknowledge(); } return Optional.empty(); }; }
private RecoveryCallback<Object> recoveryCallback(KafkaTemplate<String, String> kafkaTemplate) {
    return context -> {
        final var retryCount = context.getRetryCount();
        final var acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment");
        final var record = (ConsumerRecord) context.getAttribute("record");
        final var topic = "dlt_" + record.topic();
        final var value = record.value().toString();
        try {
            log.warn("[Send to dead letter topic] {} - retryCount: {} - value: {}.", topic, retryCount, value);
            kafkaTemplate.send(topic, value);
        } catch (Exception e) {
            log.error("[Fail to dead letter topic]: {}" , topic, e);
        }
        if (Objects.nonNull(acknowledgment)) {
            acknowledgment.acknowledge();
        }
        return Optional.empty();
    };
}

KafkaListenerContainerFactory 작성

위에서 생성한 retryTemplate, recoveryCallback을 주입하여 ListenerContainerFactory를 작성합니다.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
final var props = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(),
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
ConsumerConfig.ISOLATION_LEVEL_CONFIG,KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(recoveryCallback(kafkaTemplate));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) { final var props = Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(), ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false, ConsumerConfig.ISOLATION_LEVEL_CONFIG,KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class ); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(kafkaProperties)); factory.setRetryTemplate(retryTemplate()); factory.setRecoveryCallback(recoveryCallback(kafkaTemplate)); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; }
private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
    final var props = Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(),
            ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(),
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(),
            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
            ConsumerConfig.ISOLATION_LEVEL_CONFIG,KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT),
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
        );
        return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(recoveryCallback(kafkaTemplate));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
}

KafkaListener Annotation 설정에 containerFactory 설정

위에서 설정한 KafkaListenerContainerFactory를 containerFactory로 설정하여 재시도 및 Callaback이 동작하도록 합니다.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@KafkaListener(topics = "domain-event-user", containerFactory = "kafkaListenerContainerFactory")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
log.debug("Received Headers : " + headers);
log.debug("Received Payloads : " + user.toString());
// retry 테스트를 위해 강제로 오류 발생
throw new UnknownError("unexpected error");
}
[결과]
[Send to dead letter topic] dlt_domain-event-user - retryCount: 2 - value: {"id":"happydaddy@naver.com","name":"happydaddy","age":28}
@KafkaListener(topics = "domain-event-user", containerFactory = "kafkaListenerContainerFactory") public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) { log.debug("Received Headers : " + headers); log.debug("Received Payloads : " + user.toString()); // retry 테스트를 위해 강제로 오류 발생 throw new UnknownError("unexpected error"); } [결과] [Send to dead letter topic] dlt_domain-event-user - retryCount: 2 - value: {"id":"happydaddy@naver.com","name":"happydaddy","age":28}
@KafkaListener(topics = "domain-event-user", containerFactory = "kafkaListenerContainerFactory")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
    log.debug("Received Headers : " + headers);
    log.debug("Received Payloads : " + user.toString());
    // retry 테스트를 위해 강제로 오류 발생
    throw new UnknownError("unexpected error");
}
[결과]
[Send to dead letter topic] dlt_domain-event-user - retryCount: 2 - value: {"id":"happydaddy@naver.com","name":"happydaddy","age":28}

console consumer를 통해 확인하면 다음과 같이 deadletter topic에 메시지가 전달된 것을 확인할 수 있습니다.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic dlt_domain-event-user --from-beginning
"{\"id\":\"happydaddy@naver.com\",\"name\":\"happydaddy\",\"age\":28}"
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic dlt_domain-event-user --from-beginning "{\"id\":\"happydaddy@naver.com\",\"name\":\"happydaddy\",\"age\":28}"
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic dlt_domain-event-user --from-beginning
"{\"id\":\"happydaddy@naver.com\",\"name\":\"happydaddy\",\"age\":28}"

spring cloud stream을 사용하여 재시도 구현

Spring Cloud Stream을 사용하면 설정을 추가하는 것 만으로 간편하게 재 시도 및 deadletter를 구현할 수 있습니다. 다만 직접 구현 시와 다르게 DeadLetter Topic으로 전송하는 처리만 가능하며 Callback 함수를 통해 다양한 후속 처리를 할 수는 없습니다.

application.yml에 설정 추가

  • cloud.stream.bindings에 consumer.max-attempts 설정을 추가합니다.
  • cloud.stream.kafka.bindings에 consumer.enableDlq, consumer.dlqName 설정을 추가합니다.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
spring:
cloud.stream:
bindings:
domainEventString-in-0:
content-type: text/plain
destination: domain-event-string
group: consumer-group-string
domainEventString-out-0:
destination: domain-event-string
group: consumer-group-string
domainEventModel-in-0:
destination: domain-event-model
group: consumer-group-model
consumer:
max-attempts: 2
domainEventModel-out-0:
destination: domain-event-model
group: consumer-group-model
kafka:
bindings:
domainEventModel-in-0:
consumer:
enableDlq: true
dlqName: dlt_domain-event-model
binder:
brokers: localhost:9092,localhost:9094,localhost:9095
configuration:
auto.offset.reset: earliest
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
cloud:
stream:
function:
definition: domainEventString;domainEventModel
spring: cloud.stream: bindings: domainEventString-in-0: content-type: text/plain destination: domain-event-string group: consumer-group-string domainEventString-out-0: destination: domain-event-string group: consumer-group-string domainEventModel-in-0: destination: domain-event-model group: consumer-group-model consumer: max-attempts: 2 domainEventModel-out-0: destination: domain-event-model group: consumer-group-model kafka: bindings: domainEventModel-in-0: consumer: enableDlq: true dlqName: dlt_domain-event-model binder: brokers: localhost:9092,localhost:9094,localhost:9095 configuration: auto.offset.reset: earliest key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer cloud: stream: function: definition: domainEventString;domainEventModel
spring:
  cloud.stream:
    bindings:
      domainEventString-in-0:
        content-type: text/plain
        destination: domain-event-string
        group: consumer-group-string
      domainEventString-out-0:
        destination: domain-event-string
        group: consumer-group-string
      domainEventModel-in-0:
        destination: domain-event-model
        group: consumer-group-model
        consumer:
          max-attempts: 2
      domainEventModel-out-0:
        destination: domain-event-model
        group: consumer-group-model
    kafka:
      bindings:
        domainEventModel-in-0:
          consumer:
            enableDlq: true
            dlqName: dlt_domain-event-model
      binder:
        brokers: localhost:9092,localhost:9094,localhost:9095
        configuration:
          auto.offset.reset: earliest
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  cloud:
    stream:
      function:
        definition: domainEventString;domainEventModel

실습에서 사용한 코드는 아래 Github에서 확인 할 수 있습니다.

https://github.com/codej99/SpringKafkaConsumer

카프카에 대한 더 많은 정보는…

https://kafka.apache.org/

연재글 이동[이전글] Kafka Producer/Consumer (spring-kafka, spring-cloud-starter-stream-kafka)