이 연재글은 KAFKA 정보 모음의 3번째 글입니다.

Springboot 프로젝트에서 kafka를 사용하여 Producer, Consumer 구현 시 라이브러리에 따라 구현하는 방식이 상이하여 관련 내용을 정리하고자 포스팅을 하게 되었습니다. 자주 사용하는 라이브러리로는 spring-kafka, spring-cloud-starter-stream-kafka 두 가지가 있습니다. 심플하게 사용하기에는 spring-kafka가 적당 합니다. spring-cloud-starter-stream-kafka의 경우는 구현이 살짝 더 복잡한데 범용으로 사용하기 위해 의존성을 낮추고 캡슐화가 되어있기 때문입니다.

예를 들자면 spring-kafka의 경우는 kafka 플랫폼에서만 사용할 수 있지만 spring-cloud-starter-stream를 이용하여 작성한 코드는 kafka에서 rabbitmq 등 다른 stream 플랫폼으로 전환할 때 라이브러리만 spring-cloud-starter-stream-rabbit으로 교체하면 거의 코드 수정 없이 손쉽게 전환할 수 있습니다.

spring-kafka를 이용한 이벤트 메시지 처리

build.gradle

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
// 생략...
}
dependencies { implementation 'org.springframework.kafka:spring-kafka' // 생략... }
dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
    // 생략...
}

application.yml

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
spring:
kafka:
bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
consumer:
group-id: consumer-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring: kafka: bootstrap-servers: localhost:9093,localhost:9094,localhost:9095 consumer: group-id: consumer-group-1 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:
  kafka:
    bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
    consumer:
      group-id: consumer-group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer 예제 – 문자열 메시지 발행

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
public class KafkaMessageProducer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
public void sendMessage(String payload) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", payload);
producer.send(message);
}
}
@Component public class KafkaMessageProducer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; public void sendMessage(String payload) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", payload); producer.send(message); } }
@Component
public class KafkaMessageProducer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public void sendMessage(String payload) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", payload);
        producer.send(message);
    }
}

Consumer 예제 – 문자열 메시지 소비

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "domain-event")
public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) {
System.out.println("Received Headers : "+headers);
System.out.println("Received Payloads : "+message);
}
}
/**
Received Headers : {kafka_offset=385, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@682362d8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608911503694, kafka_groupId=consumer-group-1}
Received Payloads : message
**/
@Component public class KafkaMessageConsumer { @KafkaListener(topics = "domain-event") public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) { System.out.println("Received Headers : "+headers); System.out.println("Received Payloads : "+message); } } /** Received Headers : {kafka_offset=385, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@682362d8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608911503694, kafka_groupId=consumer-group-1} Received Payloads : message **/
@Component
public class KafkaMessageConsumer {
    @KafkaListener(topics = "domain-event")
    public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) {
        System.out.println("Received Headers : "+headers);
        System.out.println("Received Payloads : "+message);
    }
}
/**
Received Headers : {kafka_offset=385, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@682362d8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608911503694, kafka_groupId=consumer-group-1}
Received Payloads : message
**/

객체 메시지 Producer/Consumer

application.yml

value-serializer를 JsonSerializer로 수정, value-deserializer를 JsonDeserializer로 수정, spring.json.trusted.packages 추가

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9094,localhost:9095
consumer:
group-id: consumer-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.spring.kafka.domain.model
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring: kafka: bootstrap-servers: localhost:9092,localhost:9094,localhost:9095 consumer: group-id: consumer-group-1 auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: com.spring.kafka.domain.model producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9094,localhost:9095
    consumer:
      group-id: consumer-group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.spring.kafka.domain.model
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

객체 생성

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package com.spring.kafka.domain.model;
import lombok.*;
@Getter
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String id;
private String name;
private int age;
}
package com.spring.kafka.domain.model; import lombok.*; @Getter @Builder @ToString @AllArgsConstructor @NoArgsConstructor public class User { private String id; private String name; private int age; }
package com.spring.kafka.domain.model;
import lombok.*;

@Getter
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private String id;
    private String name;
    private int age;
}

Producer

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
public class KafkaMessageProducer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
public void sendMessage(User user) {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user);
producer.send(message);
}
}
@Component public class KafkaMessageProducer { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; public void sendMessage(User user) { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); KafkaProducer<String, User> producer = new KafkaProducer<>(properties); ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user); producer.send(message); } }
@Component
public class KafkaMessageProducer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public void sendMessage(User user) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, User> message = new ProducerRecord<>("domain-event-user", user);
        producer.send(message);
    }
}

Consumer

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Slf4j
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "domain-event")
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
log.debug("Received Headers : "+headers);
log.debug("Received Payloads : "+user.toString());
}
}
/**
Received Headers : {kafka_offset=384, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ab7d56c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608910676656, kafka_groupId=consumer-group-1}
Received Payloads : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/
@Slf4j @Component public class KafkaMessageConsumer { @KafkaListener(topics = "domain-event") public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) { log.debug("Received Headers : "+headers); log.debug("Received Payloads : "+user.toString()); } } /** Received Headers : {kafka_offset=384, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ab7d56c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608910676656, kafka_groupId=consumer-group-1} Received Payloads : User(id=happydaddy@naver.com, name=happydaddy, age=28) **/
@Slf4j
@Component
public class KafkaMessageConsumer {
    @KafkaListener(topics = "domain-event")
    public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
        log.debug("Received Headers : "+headers);
        log.debug("Received Payloads : "+user.toString());
    }
}
/**
Received Headers : {kafka_offset=384, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ab7d56c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608910676656, kafka_groupId=consumer-group-1}
Received Payloads : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/

spring-cloud-kafka를 이용한 이벤트 메시지 처리

build.gradle

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/milestone' }
}
ext {
set('springCloudVersion', "Hoxton.SR9")
}
dependencies {
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
// 생략
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
// 나머지 내용 생략
repositories { mavenCentral() maven { url 'https://repo.spring.io/milestone' } } ext { set('springCloudVersion', "Hoxton.SR9") } dependencies { implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka' // 생략 } dependencyManagement { imports { mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}" } } // 나머지 내용 생략
repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }
}

ext {
    set('springCloudVersion', "Hoxton.SR9")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    // 생략
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

// 나머지 내용 생략

application.yml

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
spring:
cloud.stream:
bindings:
domainEventString-in-0:
content-type: text/plain
destination: domain-event-string
group: consumer-group-string
domainEventString-out-0:
destination: domain-event-string
group: consumer-group-string
domainEventModel-in-0:
destination: domain-event-model
group: consumer-group-model
domainEventModel-out-0:
destination: domain-event-model
group: consumer-group-model
kafka:
binder:
brokers: localhost:9092,localhost:9094,localhost:9095
configuration:
auto.offset.reset: earliest
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
cloud:
stream:
function:
definition: domainEventString;domainEventModel
spring: cloud.stream: bindings: domainEventString-in-0: content-type: text/plain destination: domain-event-string group: consumer-group-string domainEventString-out-0: destination: domain-event-string group: consumer-group-string domainEventModel-in-0: destination: domain-event-model group: consumer-group-model domainEventModel-out-0: destination: domain-event-model group: consumer-group-model kafka: binder: brokers: localhost:9092,localhost:9094,localhost:9095 configuration: auto.offset.reset: earliest key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer cloud: stream: function: definition: domainEventString;domainEventModel
spring:  
  cloud.stream:
    bindings:
      domainEventString-in-0:
        content-type: text/plain
        destination: domain-event-string
        group: consumer-group-string
      domainEventString-out-0:
        destination: domain-event-string
        group: consumer-group-string
      domainEventModel-in-0:
        destination: domain-event-model
        group: consumer-group-model
      domainEventModel-out-0:
        destination: domain-event-model
        group: consumer-group-model
    kafka:
      binder:
        brokers: localhost:9092,localhost:9094,localhost:9095
        configuration:
          auto.offset.reset: earliest
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  cloud:
    stream:
      function:
        definition: domainEventString;domainEventModel

Producer 예제 – 문자열 메시지 발행

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {
private final StreamBridge streamBridge;
public void sendMessageBySpringCloud(String payload) {
streamBridge.send("domainEventString-out-0", payload);
}
}
@Component @RequiredArgsConstructor public class KafkaMessageProducer { private final StreamBridge streamBridge; public void sendMessageBySpringCloud(String payload) { streamBridge.send("domainEventString-out-0", payload); } }
@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {

    private final StreamBridge streamBridge;

    public void sendMessageBySpringCloud(String payload) {
        streamBridge.send("domainEventString-out-0", payload);
    }
}

Consumer 예제 – 문자열 메시지 소비

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
@Bean
Consumer<String> domainEventString() {
return input -> {
System.out.println("Received Message : " + input);
};
}
}
/**
[결과]
Received Message : message
**/
@Slf4j @Component public class KafkaSpringCloudMessageConsumer { @Bean Consumer<String> domainEventString() { return input -> { System.out.println("Received Message : " + input); }; } } /** [결과] Received Message : message **/
@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
    @Bean
    Consumer<String> domainEventString() {
        return input -> {
            System.out.println("Received Message : " + input);
        };
    }
}

/**
[결과] 
Received Message : message
**/

Producer 예제 – 객체 메시지 발행

객체 생성

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Getter
@Builder
@ToString
public class User {
private String id;
private String name;
private int age;
}
@Getter @Builder @ToString public class User { private String id; private String name; private int age; }
@Getter
@Builder
@ToString
public class User {
    private String id;
    private String name;
    private int age;
}

Producer

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {
private final StreamBridge streamBridge;
public void sendMessageBySpringCloud(User payload) {
streamBridge.send("domainEventModel-out-0", payload);
}
}
@Component @RequiredArgsConstructor public class KafkaMessageProducer { private final StreamBridge streamBridge; public void sendMessageBySpringCloud(User payload) { streamBridge.send("domainEventModel-out-0", payload); } }
@Component
@RequiredArgsConstructor
public class KafkaMessageProducer {

    private final StreamBridge streamBridge;

    public void sendMessageBySpringCloud(User payload) {
        streamBridge.send("domainEventModel-out-0", payload);
    }
}

Consumer 예제 – 객체 메시지 소비

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
@Bean
Consumer<User> domainEventModel() {
return input -> {
System.out.println("Received Message : " + input);
};
}
}
/**
Received Message : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/
@Slf4j @Component public class KafkaSpringCloudMessageConsumer { @Bean Consumer<User> domainEventModel() { return input -> { System.out.println("Received Message : " + input); }; } } /** Received Message : User(id=happydaddy@naver.com, name=happydaddy, age=28) **/
@Slf4j
@Component
public class KafkaSpringCloudMessageConsumer {
    @Bean
    Consumer<User> domainEventModel() {
        return input -> {
            System.out.println("Received Message : " + input);
        };
    }
}
/**
Received Message : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/

실습한 소스는 아래 GitHub에서 확인할 수 있습니다.

https://github.com/codej99/SpringKafkaConsumer.git

참고 자료

https://docs.spring.io/spring-cloud-stream/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream.html#_overview

더 보기

연재글 이동[이전글] Apache kafka Installation by docker
[다음글] Kafka Consumer retry 및 deadletter 처리 방법