- Apache kafka Installation
- Apache kafka Installation by docker
- Kafka Producer/Consumer (spring-kafka, spring-cloud-starter-stream-kafka)
- Kafka Consumer retry 및 deadletter 처리 방법
Springboot 프로젝트에서 kafka를 사용하여 Producer, Consumer 구현 시 라이브러리에 따라 구현하는 방식이 상이하여 관련 내용을 정리하고자 포스팅을 하게 되었습니다. 자주 사용하는 라이브러리로는 spring-kafka, spring-cloud-starter-stream-kafka 두 가지가 있습니다. 심플하게 사용하기에는 spring-kafka가 적당 합니다. spring-cloud-starter-stream-kafka의 경우는 구현이 살짝 더 복잡한데 범용으로 사용하기 위해 의존성을 낮추고 캡슐화가 되어있기 때문입니다.
예를 들자면 spring-kafka의 경우는 kafka 플랫폼에서만 사용할 수 있지만 spring-cloud-starter-stream를 이용하여 작성한 코드는 kafka에서 rabbitmq 등 다른 stream 플랫폼으로 전환할 때 라이브러리만 spring-cloud-starter-stream-rabbit으로 교체하면 거의 코드 수정 없이 손쉽게 전환할 수 있습니다.
spring-kafka를 이용한 이벤트 메시지 처리
build.gradle
dependencies { implementation 'org.springframework.kafka:spring-kafka' // 생략... }
application.yml
spring: kafka: bootstrap-servers: localhost:9093,localhost:9094,localhost:9095 consumer: group-id: consumer-group-1 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
Producer 예제 – 문자열 메시지 발행
@Component public class KafkaMessageProducer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; public void sendMessage(String payload) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", payload); producer.send(message); } }
Consumer 예제 – 문자열 메시지 소비
@Component public class KafkaMessageConsumer { @KafkaListener(topics = "domain-event") public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) { System.out.println("Received Headers : "+headers); System.out.println("Received Payloads : "+message); } } /** Received Headers : {kafka_offset=385, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@682362d8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608911503694, kafka_groupId=consumer-group-1} Received Payloads : message **/
객체 메시지 Producer/Consumer
application.yml
value-serializer를 JsonSerializer로 수정, value-deserializer를 JsonDeserializer로 수정, spring.json.trusted.packages 추가
spring: kafka: bootstrap-servers: localhost:9092,localhost:9094,localhost:9095 consumer: group-id: consumer-group-1 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: com.spring.kafka.domain.model producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
객체 생성
package com.spring.kafka.domain.model; import lombok.*; @Getter @Builder @ToString @AllArgsConstructor @NoArgsConstructor public class User { private String id; private String name; private int age; }
Producer
@Component public class KafkaMessageProducer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; public void sendMessage(User user) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); KafkaProducer<String, User> producer = new KafkaProducer<>(properties); ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user); producer.send(message); } }
Consumer
@Slf4j @Component public class KafkaMessageConsumer { @KafkaListener(topics = "domain-event") public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) { log.debug("Received Headers : "+headers); log.debug("Received Payloads : "+user.toString()); } } /** Received Headers : {kafka_offset=384, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ab7d56c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608910676656, kafka_groupId=consumer-group-1} Received Payloads : User(id=happydaddy@naver.com, name=happydaddy, age=28) **/
spring-cloud-kafka를 이용한 이벤트 메시지 처리
build.gradle
repositories { mavenCentral() maven { url 'https://repo.spring.io/milestone' } } ext { set('springCloudVersion', "Hoxton.SR9") } dependencies { implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' // 생략 } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}" } } // 나머지 내용 생략
application.yml
spring: cloud.stream: bindings: domainEventString-in-0: content-type: text/plain destination: domain-event-string group: consumer-group-string domainEventString-out-0: destination: domain-event-string group: consumer-group-string domainEventModel-in-0: destination: domain-event-model group: consumer-group-model domainEventModel-out-0: destination: domain-event-model group: consumer-group-model kafka: binder: brokers: localhost:9092,localhost:9094,localhost:9095 configuration: auto.offset.reset: earliest key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer cloud: stream: function: definition: domainEventString;domainEventModel
Producer 예제 – 문자열 메시지 발행
@Component @RequiredArgsConstructor public class KafkaMessageProducer { private final StreamBridge streamBridge; public void sendMessageBySpringCloud(String payload) { streamBridge.send("domainEventString-out-0", payload); } }
Consumer 예제 – 문자열 메시지 소비
@Slf4j @Component public class KafkaSpringCloudMessageConsumer { @Bean Consumer<String> domainEventString() { return input -> { System.out.println("Received Message : " + input); }; } } /** [결과] Received Message : message **/
Producer 예제 – 객체 메시지 발행
객체 생성
@Getter @Builder @ToString public class User { private String id; private String name; private int age; }
Producer
@Component @RequiredArgsConstructor public class KafkaMessageProducer { private final StreamBridge streamBridge; public void sendMessageBySpringCloud(User payload) { streamBridge.send("domainEventModel-out-0", payload); } }
Consumer 예제 – 객체 메시지 소비
@Slf4j @Component public class KafkaSpringCloudMessageConsumer { @Bean Consumer<User> domainEventModel() { return input -> { System.out.println("Received Message : " + input); }; } } /** Received Message : User(id=happydaddy@naver.com, name=happydaddy, age=28) **/
실습한 소스는 아래 GitHub에서 확인할 수 있습니다.
https://github.com/codej99/SpringKafkaConsumer.git