- Apache kafka Installation
- Apache kafka Installation by docker
- Kafka Producer/Consumer (spring-kafka, spring-cloud-starter-stream-kafka)
- Kafka Consumer retry 및 deadletter 처리 방법
이번 시간에는 docker를 이용하여 개발환경에 apache kafka cluster를 생성하는 방법에 대하여 알아보겠습니다. kafka는 크게 cluster를 관리하는 zookeeper와 kafka 기능을 제공하는 broker로 구성되어 있습니다. 안정적인 서비스를 지속적으로 제공하기 위해 HA(high availability – 고가용성) 구성이 필요하며 이를위해 kafka는 여러대의 kafka broker를 클러스터링하는 방법을 제공합니다.
kafka는 내부적으로 zookeeper를 이용하여 cluster에 속한 broker 상태를 감시하며 새로운 broker가 추가되거나 특정 broker에 문제가 생길경우 자동으로 cluster에 편입시키거나 제외하여 서비스를 안정적으로 유지합니다.
실습에서는 docker를 이용하여 kafka cluster를 구성해보겠습니다. docker를 이용하면 kafka cluster를 구성하기 위한 여러가지 작업을 일일히 수작업으로 진행할 필요가 없어 매우 편리합니다.
사전 준비사항
실습 진행을 위해 아래 내용을 참고하여 docker를 개발 pc에 설치합니다. docker-compose를 이용하여 컨테이너를 생성할 것이므로 kitematic은 따로 설치할 필요가 없습니다.
docker-compose.yml 작성 및 kafka 컨터이너 실행
docker-compose는 여러개의 도커 컨테이너에 대한 정의를 yaml 파일로 정의하여 한번에 많은 컨테이너를 작동시키고 관리할 수 있는 툴입니다. 실습에서는 3대의 zookeeper와 3대의 kafka broker를 생성하여 cluster로 구성해야 하는데, 한번에 많은 컨테이너를 작동시켜야 하므로 docker-compose의 사용목적과 일치합니다.
docker-compose 파일 작성에 대한 자세한 내용은 아래 공식링크를 통해 확인 할 수 있습니다.
https://docs.docker.com/compose/compose-file/
실습에서는 아래 github에서 이미 만들어놓은 docker-compose 파일을 토대로 클러스터를 띄워보겠습니다.
https://github.com/simplesteph/kafka-stack-docker-compose
version: '2.1' services: zoo1: image: zookeeper:3.4.9 hostname: zoo1 ports: - "2181:2181" environment: ZOO_MY_ID: 1 ZOO_PORT: 2181 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo1/data:/data - ./zk-multiple-kafka-multiple/zoo1/datalog:/datalog zoo2: image: zookeeper:3.4.9 hostname: zoo2 ports: - "2182:2182" environment: ZOO_MY_ID: 2 ZOO_PORT: 2182 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo2/data:/data - ./zk-multiple-kafka-multiple/zoo2/datalog:/datalog zoo3: image: zookeeper:3.4.9 hostname: zoo3 ports: - "2183:2183" environment: ZOO_MY_ID: 3 ZOO_PORT: 2183 ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 volumes: - ./zk-multiple-kafka-multiple/zoo3/data:/data - ./zk-multiple-kafka-multiple/zoo3/datalog:/datalog kafka1: image: confluentinc/cp-kafka:5.5.1 hostname: kafka1 ports: - "9092:9092" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" volumes: - ./zk-multiple-kafka-multiple/kafka1/data:/var/lib/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka2: image: confluentinc/cp-kafka:5.5.1 hostname: kafka2 ports: - "9093:9093" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 2 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" volumes: - ./zk-multiple-kafka-multiple/kafka2/data:/var/lib/kafka/data depends_on: - zoo1 - zoo2 - zoo3 kafka3: image: confluentinc/cp-kafka:5.5.1 hostname: kafka3 ports: - "9094:9094" environment: KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183" KAFKA_BROKER_ID: 3 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" volumes: - ./zk-multiple-kafka-multiple/kafka3/data:/var/lib/kafka/data depends_on: - zoo1 - zoo2 - zoo3
위에서 작성한 내용을 적당한 파일이름(kafka-cluster.yml)으로 저장하고 다음 명령어를 실행합니다. 총 3개의 zookeeper(port 2181, 2182, 2183)와 3개의 kafka broker(port 9092, 9093, 9094)로 이루어진 cluster가 생성됩니다.
$ docker-compose -f kafka-cluster.yml up
Kafka Cluster 테스트
kafka에 접속하여 테스트 하기 위해 다음 파일을 다운로드 받아 압축을 해제합니다. 압축 해제한 파일내 bin 디렉터리에 들어가면 카프카에 명령을 요청할 수 있는 shell script가 마련되어 있습니다.
$ wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz $ tar xvf kafka_2.12-2.2.1.tgz $ cd kafka_2.12-2.2.1/bin $ ls connect-distributed.sh kafka-dump-log.sh kafka-topics.sh connect-standalone.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh kafka-acls.sh kafka-mirror-maker.sh kafka-verifiable-producer.sh kafka-broker-api-versions.sh kafka-preferred-replica-election.sh offset.json kafka-configs.sh kafka-producer-perf-test.sh trogdor.sh kafka-console-consumer.sh kafka-reassign-partitions.sh windows kafka-console-producer.sh kafka-replica-verification.sh zookeeper-security-migration.sh kafka-consumer-groups.sh kafka-run-class.sh zookeeper-server-start.sh kafka-consumer-perf-test.sh kafka-server-start.sh zookeeper-server-stop.sh kafka-delegation-tokens.sh kafka-server-stop.sh zookeeper-shell.sh kafka-delete-records.sh kafka-streams-application-reset.sh
Cluster내 모든 Topic 리스트 조회
$ ./kafka-topics.sh --list --bootstrap-server localhost:9092,localhost:9093,localhost:9094 __confluent.support.metrics __consumer_offsets
신규 Topic 생성
$ ./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 1 --topic news
Topic 정보 조회
$ ./kafka-topics.sh --describe --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic news Topic:news PartitionCount:1 ReplicationFactor:1 Configs: Topic: news Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic에 메시지 발행
$ ./kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic news >news message-1 >news message-2 >news message-3
Topic 메시지 소비
–from-beginning 옵션을 주면 Topic의 첫 메시지부터 모든 메시지를 받아 소비합니다.
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic news --from-beginning news message-1 news message-2 news message-3
Topic이 여러개의 partition으로 나뉘어져 있고 특정 파티션에서 메시지를 받고 싶으면 –partition 옵션으로 파티션을 지정하여 메시지를 받을 수 있습니다.
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic news --partition 1
Consumer Group으로 메시지 소비
하나의 토픽 메시지를 group단위로 소비하는 경우 다음과 같이 consumer group을 지정하여 메시지를 소비할 수 있습니다.
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic news --group news-group-1 --from-beginning news message-1 news message-2 news message-3
Consumer Group 정보 조회
Consumer 리스트 조회
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list news-group-1
단일 Consumer 정보 조회
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --group news-group-1 --describe TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID news 0 4 4 0 consumer-1-d75d0466-96e2-4532-994e-c9ef2f0cc87a /172.21.0.1 consumer-1
Consumer Group 오프셋 초기화
특정 Consumer Group이 메시지를 다시 전송받아야 할 경우 offset를 초기화하여 Topic메시지를 다시 수신 할 수 있습니다.
–topic 대신 –all-topics를 지정하면 모든 토픽에 적용 가능합니다.
–execute 옵션을 제거하고 실행하면 실제 반영되지 않고 어떻게 변할지 결과만 출력합니다.(dry run)
–shift-by <Long: number-of-offsets> | 현재 offset에서 +/- 만큼 offset을 증가/감소 시킵니다. |
–to-offset <Long: offset> | 지정한 offset으로 이동시킵니다. |
–by-duration <String: duration> | 현재 시간에서 특정 시간만큼 이전으로 offset을 이동시킵니다. 형식 ‘PnDTnHnMnS’ ex) P7D : 1주일전 |
–to-datetime <String: datetime> | 특정 날짜로 offset을 이동시킵니다. 형식 ‘YYYY-MM-DDTHH:mm:SS.sss’ |
–to-latest | 가장 마지막 offset값(offset 최대값)으로 이동시킵니다. |
–to-earliest | 가장 처음 offset값(offset 최소값)으로 이동시킵니다. |
./kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic news --group news-group-1 --reset-offsets --to-earliest --execute TOPIC PARTITION NEW-OFFSET news 0 0
Consumer Group 삭제
consumer group을 삭제하기 위해서는 현재 해당 그룹으로 소비중인 consumer가 실행중이지 않아야 합니다. 실행중인 consumer가 있으면 종료하고 명령어를 실행합니다.
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --delete --group news-group-1 Error: Deletion of some consumer groups failed: * Group 'news-group-1' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty. $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --delete --group news-group-1 Deletion of requested consumer groups ('news-group-1') was successful.
Topic 메시지 삭제
아래처럼 json 파일을 생성하여 명령을 실행하면 특정 Topic의 메시지를 삭제할 수 있습니다. 예제에서처럼 offset을 -1로 설정하면 해당 topic의 partition:0의 메시지가 모두 삭제됩니다. 일부분의 메시지만 삭제하려면 offset값을 조정합니다.
{ "partitions": [ { "topic": "news", "partition": 0, "offset": -1 } ], "version": 1 }
$ ./kafka-delete-records.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --offset-json-file ./delete-topic-message.json Executing records delete operation Records delete operation completed: partition: news-0 low_watermark: 3
Topic 삭제
$ ./kafka-topics.sh --delete --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic news Topic news is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.