- Apache kafka Installation
- Apache kafka Installation by docker
- Kafka Producer/Consumer (spring-kafka, spring-cloud-starter-stream-kafka)
- Kafka Consumer retry 및 deadletter 처리 방법
Springboot 프로젝트에서 kafka를 사용하여 Producer, Consumer 구현 시 라이브러리에 따라 구현하는 방식이 상이하여 관련 내용을 정리하고자 포스팅을 하게 되었습니다. 자주 사용하는 라이브러리로는 spring-kafka, spring-cloud-starter-stream-kafka 두 가지가 있습니다. 심플하게 사용하기에는 spring-kafka가 적당 합니다. spring-cloud-starter-stream-kafka의 경우는 구현이 살짝 더 복잡한데 범용으로 사용하기 위해 의존성을 낮추고 캡슐화가 되어있기 때문입니다.
예를 들자면 spring-kafka의 경우는 kafka 플랫폼에서만 사용할 수 있지만 spring-cloud-starter-stream를 이용하여 작성한 코드는 kafka에서 rabbitmq 등 다른 stream 플랫폼으로 전환할 때 라이브러리만 spring-cloud-starter-stream-rabbit으로 교체하면 거의 코드 수정 없이 손쉽게 전환할 수 있습니다.
spring-kafka를 이용한 이벤트 메시지 처리
build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
// 생략...
}
application.yml
spring:
kafka:
bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
consumer:
group-id: consumer-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Producer 예제 – 문자열 메시지 발행
@Component
public class KafkaMessageProducer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
public void sendMessage(String payload) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", payload);
producer.send(message);
}
}
Consumer 예제 – 문자열 메시지 소비
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "domain-event")
public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) {
System.out.println("Received Headers : "+headers);
System.out.println("Received Payloads : "+message);
}
}
/**
Received Headers : {kafka_offset=385, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@682362d8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608911503694, kafka_groupId=consumer-group-1}
Received Payloads : message
**/
객체 메시지 Producer/Consumer
application.yml
value-serializer를 JsonSerializer로 수정, value-deserializer를 JsonDeserializer로 수정, spring.json.trusted.packages 추가
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9094,localhost:9095
consumer:
group-id: consumer-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.spring.kafka.domain.model
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
객체 생성
package com.spring.kafka.domain.model;
import lombok.*;
@Getter
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String id;
private String name;
private int age;
}
Producer
@Component
public class KafkaMessageProducer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
public void sendMessage(User user) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user);
producer.send(message);
}
}
Consumer
@Slf4j
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "domain-event")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
log.debug("Received Headers : "+headers);
log.debug("Received Payloads : "+user.toString());
}
}
/**
Received Headers : {kafka_offset=384, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ab7d56c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608910676656, kafka_groupId=consumer-group-1}
Received Payloads : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/
spring-cloud-kafka를 이용한 이벤트 메시지 처리
build.gradle
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/milestone' }
}
ext {
set('springCloudVersion', "Hoxton.SR9")
}
dependencies {
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
// 생략
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
// 나머지 내용 생략
application.yml
spring:
cloud.stream:
bindings:
domainEventString-in-0:
content-type: text/plain
destination: domain-event-string
group: consumer-group-string
domainEventString-out-0:
destination: domain-event-string
group: consumer-group-string
domainEventModel-in-0:
destination: domain-event-model
group: consumer-group-model
domainEventModel-out-0:
destination: domain-event-model
group: consumer-group-model
kafka:
binder:
brokers: localhost:9092,localhost:9094,localhost:9095
configuration:
auto.offset.reset: earliest
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
cloud:
stream:
function:
definition: domainEventString;domainEventModel
Producer 예제 – 문자열 메시지 발행
@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {
private final StreamBridge streamBridge;
public void sendMessageBySpringCloud(String payload) {
streamBridge.send("domainEventString-out-0", payload);
}
}
Consumer 예제 – 문자열 메시지 소비
@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
@Bean
Consumer<String> domainEventString() {
return input -> {
System.out.println("Received Message : " + input);
};
}
}
/**
[결과]
Received Message : message
**/
Producer 예제 – 객체 메시지 발행
객체 생성
@Getter
@Builder
@ToString
public class User {
private String id;
private String name;
private int age;
}
Producer
@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {
private final StreamBridge streamBridge;
public void sendMessageBySpringCloud(User payload) {
streamBridge.send("domainEventModel-out-0", payload);
}
}
Consumer 예제 – 객체 메시지 소비
@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
@Bean
Consumer<User> domainEventModel() {
return input -> {
System.out.println("Received Message : " + input);
};
}
}
/**
Received Message : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/
실습한 소스는 아래 GitHub에서 확인할 수 있습니다.
https://github.com/codej99/SpringKafkaConsumer.git














