이 연재글은 KAFKA 정보 모음의 3번째 글입니다.

Springboot 프로젝트에서 kafka를 사용하여 Producer, Consumer 구현 시 라이브러리에 따라 구현하는 방식이 상이하여 관련 내용을 정리하고자 포스팅을 하게 되었습니다. 자주 사용하는 라이브러리로는 spring-kafka, spring-cloud-starter-stream-kafka 두 가지가 있습니다. 심플하게 사용하기에는 spring-kafka가 적당 합니다. spring-cloud-starter-stream-kafka의 경우는 구현이 살짝 더 복잡한데 범용으로 사용하기 위해 의존성을 낮추고 캡슐화가 되어있기 때문입니다.

예를 들자면 spring-kafka의 경우는 kafka 플랫폼에서만 사용할 수 있지만 spring-cloud-starter-stream를 이용하여 작성한 코드는 kafka에서 rabbitmq 등 다른 stream 플랫폼으로 전환할 때 라이브러리만 spring-cloud-starter-stream-rabbit으로 교체하면 거의 코드 수정 없이 손쉽게 전환할 수 있습니다.

spring-kafka를 이용한 이벤트 메시지 처리

build.gradle

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
    // 생략...
}

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
    consumer:
      group-id: consumer-group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Producer 예제 – 문자열 메시지 발행

@Component
public class KafkaMessageProducer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public void sendMessage() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> message = new ProducerRecord<>("domain-event", "message");
        producer.send(message);
    }
}

Consumer 예제 – 문자열 메시지 소비

@Component
public class KafkaMessageConsumer {
    @KafkaListener(topics = "domain-event")
    public void consumeMessage(@Headers MessageHeaders headers, @Payload String message) {
        System.out.println("Received Headers : "+headers);
        System.out.println("Received Payloads : "+message);
    }
}
/**
Received Headers : {kafka_offset=385, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@682362d8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608911503694, kafka_groupId=consumer-group-1}
Received Payloads : message
**/

객체 메시지 Producer/Consumer

application.yml

value-serializer를 JsonSerializer로 수정, value-deserializer를 JsonDeserializer로 수정, spring.json.trusted.packages 추가

spring:
  kafka:
    bootstrap-servers: localhost:9093,localhost:9094,localhost:9095
    consumer:
      group-id: consumer-group-1
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.kakaowebtoon.outbox.domain.model
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

객체 생성

@Getter
@Builder
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private String id;
    private String name;
    private int age;
}

Producer

@Component
public class KafkaMessageProducer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public void sendMessage() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        User user = User.builder().id("happydaddy@naver.com").name("happydaddy").age(28).build();
        KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, User> message = new ProducerRecord<>("domain-event", user);
        producer.send(message);
    }
}

Consumer

@Component
public class KafkaMessageConsumer {
    @KafkaListener(topics = "domain-event")
    public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
        System.out.println("Received Headers : "+headers);
        System.out.println("Received Payloads : "+user.toString());
    }
}
/**
Received Headers : {kafka_offset=384, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@4ab7d56c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608910676656, kafka_groupId=consumer-group-1}
Received Payloads : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/

spring-cloud-kafka를 이용한 이벤트 메시지 처리

build.gradle

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/milestone' }
}

ext {
    set('springCloudVersion', "Hoxton.SR8")
}

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    // 생략
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}

// 나머지 내용 생략

application.yml

spring:  
  cloud.stream:
    bindings:
        domain-event-in:
          destination: domain-event
          group: consumer-group-1
        domain-event-out:
          destination: domain-event
          group: consumer-group-1
    kafka:
      binder:
        brokers: localhost:9093,localhost:9094,localhost:9095
        configuration:
          auto.offset.reset: earliest
          key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

DomainEventBinder

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface DomainEventBinder {
    String INPUT = "domain-event-in";
    String OUTPUT = "domain-event-out";

    @Input(INPUT)
    SubscribableChannel input();

    @Output(OUTPUT)
    MessageChannel output();
}

Producer 예제 – 문자열 메시지 발행

@Component
public class KafkaMessageProducer {

    @Autowired
    private DomainEventBinder domainEventBinder;

    public void sendMessage() {
        domainEventBinder.output().send(MessageBuilder.withPayload("message").build());
    }
}

Consumer 예제 – 문자열 메시지 소비

@EnableBinding(DomainEventBinder.class)
public class KafkaMessageConsumer {
    @StreamListener(DomainEventBinder.INPUT)
    public void listenDomainEvent(@Headers MessageHeaders headers, @Payload String message) {
        System.out.println("Received Headers : " + headers.toString());
        System.out.println("Received Message : " + message);
    }
}

/**
[결과] 
Received Headers : {kafka_offset=372, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@16faf6f4, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608904886958, kafka_groupId=consumer-group-1}
Received Message : message
**/

Producer 예제 – 객체 메시지 발행

객체 생성

@Getter
@Builder
@ToString
public class User {
    private String id;
    private String name;
    private int age;
}

Producer

@Component
public class KafkaMessageProducer {
    @Autowired
    private DomainEventBinder domainEventBinder;

    public void sendMessage() {
        User user = User.builder().id("happydaddy@naver.com").name("happydaddy").age(28).build();
        domainEventBinder.output().send(MessageBuilder.withPayload(user).build());
    }
}

Consumer 예제 – 객체 메시지 소비

@StreamListener(target = DomainEventBinder.INPUT)
public void listenDomainEvent(@Headers MessageHeaders headers, @Payload User user) {
    System.out.println("Received Headers : " + headers.toString());
    System.out.println("Received Message : " + user.toString());
}
/**
Received Headers : {kafka_offset=376, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1d19d3ad, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=domain-event, kafka_receivedTimestamp=1608905766345, kafka_groupId=consumer-group-1}
Received Message : User(id=happydaddy@naver.com, name=happydaddy, age=28)
**/
연재글 이동[이전글] Apache kafka Installation by docker