- Apache kafka Installation
- Apache kafka Installation by docker
- Kafka Producer/Consumer (spring-kafka, spring-cloud-starter-stream-kafka)
- Kafka Consumer retry 및 deadletter 처리 방법
kafka 스트림 처리 시 어떤 이유로 메시지 처리가 실패하거나 정해진 횟수만큼 재시도했음에도 메시지 처리에 실패할 경우 대응 방법에 대해 알아보겠습니다. 일반적인 Queue(SQS)를 사용할 때는 Deadletter queue를 설정함으로써 간편하게 후속 처리를 할 수 있지만 kafka의 경우는 추가적인 코드나 설정을 통해 해당 기능을 구현해야 합니다.
spring-kafka 사용하여 재시도 구현
retryTemplate 작성
재시도 시 얼마간의 delay를 주고 몇 번 재시도를 할지 설정을 담은 Template을 작성합니다.
- setBackOffPeriod
- 재시도 시 적절한 delay 타임 이후 시도하도록 backoff 시간을 설정합니다.
- setMaxAttempts
- 최대 몇 번까지 재시도할지 설정합니다.
@Bean private RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // 재시도시 1초 후에 재 시도하도록 backoff delay 시간을 설정한다. FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(1000L); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); // 최대 재시도 횟수 설정 SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(2); retryTemplate.setRetryPolicy(retryPolicy); return retryTemplate; }
recoveryCallback 작성
retryTemplate에 의해 Max Attempt 만큼 재시도를 수행한 후에도 처리에 실패할 경우 후속 처리를 수행할 callback 함수를 작성합니다. 상황에 따라 적절히 로그를 남기거나 DB에 기록하는 액션을 수행할 수 있습니다. 아래 코드에서는 후속 처리로 Deadletter topic에 실패한 메시지를 발송하도록 코드를 작성 합니다.
처리 실패한 메시지를 DeadLetter Topic(dlt_원본Topic이름)에 발송하고 원본 메시지는 완료(acknowledgment.acknowledge())처리 합니다.
private RecoveryCallback<Object> recoveryCallback(KafkaTemplate<String, String> kafkaTemplate) { return context -> { final var retryCount = context.getRetryCount(); final var acknowledgment = (Acknowledgment) context.getAttribute("acknowledgment"); final var record = (ConsumerRecord) context.getAttribute("record"); final var topic = "dlt_" + record.topic(); final var value = record.value().toString(); try { log.warn("[Send to dead letter topic] {} - retryCount: {} - value: {}.", topic, retryCount, value); kafkaTemplate.send(topic, value); } catch (Exception e) { log.error("[Fail to dead letter topic]: {}" , topic, e); } if (Objects.nonNull(acknowledgment)) { acknowledgment.acknowledge(); } return Optional.empty(); }; }
KafkaListenerContainerFactory 작성
위에서 생성한 retryTemplate, recoveryCallback을 주입하여 ListenerContainerFactory를 작성합니다.
private ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) { final var props = Map.of( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers(), ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getConsumer().getGroupId(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getConsumer().getAutoOffsetReset(), ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false, ConsumerConfig.ISOLATION_LEVEL_CONFIG,KafkaProperties.IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class ); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(KafkaProperties kafkaProperties, KafkaTemplate<String, String> kafkaTemplate) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(kafkaProperties)); factory.setRetryTemplate(retryTemplate()); factory.setRecoveryCallback(recoveryCallback(kafkaTemplate)); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; }
KafkaListener Annotation 설정에 containerFactory 설정
위에서 설정한 KafkaListenerContainerFactory를 containerFactory로 설정하여 재시도 및 Callaback이 동작하도록 합니다.
@KafkaListener(topics = "domain-event-user", containerFactory = "kafkaListenerContainerFactory") public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) { log.debug("Received Headers : " + headers); log.debug("Received Payloads : " + user.toString()); // retry 테스트를 위해 강제로 오류 발생 throw new UnknownError("unexpected error"); } [결과] [Send to dead letter topic] dlt_domain-event-user - retryCount: 2 - value: {"id":"happydaddy@naver.com","name":"happydaddy","age":28}
console consumer를 통해 확인하면 다음과 같이 deadletter topic에 메시지가 전달된 것을 확인할 수 있습니다.
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic dlt_domain-event-user --from-beginning "{\"id\":\"happydaddy@naver.com\",\"name\":\"happydaddy\",\"age\":28}"
spring cloud stream을 사용하여 재시도 구현
Spring Cloud Stream을 사용하면 설정을 추가하는 것 만으로 간편하게 재 시도 및 deadletter를 구현할 수 있습니다. 다만 직접 구현 시와 다르게 DeadLetter Topic으로 전송하는 처리만 가능하며 Callback 함수를 통해 다양한 후속 처리를 할 수는 없습니다.
application.yml에 설정 추가
- cloud.stream.bindings에 consumer.max-attempts 설정을 추가합니다.
- cloud.stream.kafka.bindings에 consumer.enableDlq, consumer.dlqName 설정을 추가합니다.
spring: cloud.stream: bindings: domainEventString-in-0: content-type: text/plain destination: domain-event-string group: consumer-group-string domainEventString-out-0: destination: domain-event-string group: consumer-group-string domainEventModel-in-0: destination: domain-event-model group: consumer-group-model consumer: max-attempts: 2 domainEventModel-out-0: destination: domain-event-model group: consumer-group-model kafka: bindings: domainEventModel-in-0: consumer: enableDlq: true dlqName: dlt_domain-event-model binder: brokers: localhost:9092,localhost:9094,localhost:9095 configuration: auto.offset.reset: earliest key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer cloud: stream: function: definition: domainEventString;domainEventModel
실습에서 사용한 코드는 아래 Github에서 확인 할 수 있습니다.
https://github.com/codej99/SpringKafkaConsumer
카프카에 대한 더 많은 정보는…