- AuroraDB Migration – Using Kafka as a target for Database Migration Service
- Amazon DynamoDB Stream
- Amazon DocumentDB(MongoDB) Stream
- Database Migration by Transactional Outbox Pattern
- AWS Database Migration Service – Migration auroradb to kafka
AuroraDB(mysql)의 binary log(binlog)를 이용하면 database의 변경사항을 다른 시스템으로 쉽게 이관할 수 있습니다. binlog란 mysql에서 제공하는 데이터의 변경 event를 저장하는 로그 파일을 의미합니다. 데이터에 대한 변경 내역뿐 아니라 그 쿼리가 실행될 때 생성된 정보들도 가지고 있어 mysql의 서버 장애 시 복구를 위해 사용하거나 복제(replica) 데이터 구성을 위해 사용합니다.
본문에서는 AWS에서 제공하는 DMS(Database Migration Service) 기능을 이용하여 AuroraDB의 binlog를 kafka로 이관해 볼 것입니다. Database의 복제본을 kafka로 이관하는 이유는 다양한 Cosumer가 스트림을 전달받아 용도에 맞게 데이터를 가공하기 위해서입니다. 예를 들면 통계 데이터 생성을 위한 원천 데이터가 필요할 수 있습니다. Database의 변경 스트림을 kafka로 전파하면 이형의 플랫폼(Hadoop과 같은 Bigdata Platform이나 Mongodb 같은 Nosql Platform)으로 데이터를 전파하여 용도에 맞게 데이터를 가공하여 사용할 수 있습니다.
Data Flow
Aurora DB(Source Endpoint)의 변경사항을 DMS를 이용하여 Kafka(Destination Endpoint)에 발행합니다. 카프카의 Topic을 구독 중인 Consumer는 스트리밍 데이터를 전달받아 이형의 데이터 플랫폼에 전달하게 됩니다.
AuroraDB 생성시 체크사항
auroraDB를 migration 해야 하는 경우 binary logging이 enable 되어있어야 합니다. auroraDB의 경우 serverless 타입으로도 생성할 수 있는데 이 경우에는 binlog를 제공하지 않아 DMS로의 마이그레이션이 불가능하므로 serverless 방식으로 생성하면 안 됩니다. binary logging을 enable하는 방법은 db 클러스터에 적용된 parameter_group의 binlog_format 값을 ROW로 설정하면 됩니다. 기존의 db에 적용하는 경우는 재시작해야 설정이 적용됩니다.
MSK(Managed Service Kafka) 생성시 체크사항
MSK의 cluster configuration의 설정값 중 auto.create.topics.enable=true로 설정해야 합니다.
이관하려는 Database와 MSK의 VPC가 서로 다르다면 상태 변경 데이터를 이관할 수 없으므로 VPC Peering이나 Transit gateway를 적용해야 합니다.
DMS 생성
Replication Instance 생성
auroraDB에서 binlog를 가져오거나 kafka의 topic에 binlog를 publish 할 인스턴스를 생성합니다. 설정 시 별다른 어려움은 없으며 적절한 Intance Type을 지정하고 VPC의 경우는 auroraDB와 MSK가 속해있는 vpc를 선택하면 됩니다.
Endpoint 생성
위에서 생성한 replication instance를 이용하여 데이터 마이그레이션을 진행하게 되는데, 원천 데이터에 해당하는 Source Endpoint와 이관 대상이 되는 Target Endpoint 2개를 생성합니다.
Source Endpoint 생성
- 실습에서 Source Endpoint는 Aurora Mysql입니다.
- Endpoint Type을 Source Endpoint로 체크하고 Select RDS DB Instance를 체크하면 이관하고자 하는 RDS를 선택할 수 있습니다.
- Endpoint identifier는 적당한 레이블명으로 설정합니다.
- Source engine은 AWS Aurora Mysql을 선택합니다.
- Server name에는 DB의 Host명을 설정하고 접속 Port와 계정 정보(Name/Password)도 올바른 값으로 설정합니다.
- 그 외 값들은 기본값으로 세팅합니다.
- Create Endpoint를 하기 전에 Test endpoint connection을 선택하여 replication instance에서 auroraDB로 접근이 가능한지 확인해야 합니다.
- 만약 접근이 불가능하다면 설정한 Host와 접속할 계정 정보가 맞는지 확인하고 DB의 Security Group의 Inbound rule에 3306 port가 열려있는지 확인합니다.
Target Endpoint 생성
- 실습에서 Target Endpoint는 MSK입니다.
- Endpoint Type을 Target Endpoint로 체크하고 Target Engine은 Kafka를 선택합니다.
- Kafka Broker정보와 Topic 이름을 세팅합니다. Topic이름을 따로 지정하지 않으면 자동으로 생성하여 이관합니다.
- 그 외 값들은 기본값으로 세팅합니다.
- Create Endpoint를 하기 전에 Test endpoint connection을 선택하여 replication instance에서 MSK로 접근이 가능한지 확인해야 합니다.
- 만약 접근이 불가능하다면 broker정보가 맞는지 확인하고 MSK의 Security Group의 Inbound rule에서 9092 port가 열려있는지 확인합니다.
Database migration tasks 생성
replication instance와 source, target endpoint를 모두 생성하였으면 드디어 Migration Task를 만들 수 있습니다.
- replication instance에는 위에서 생성한 instance정보를 선택합니다.
- source database endpoint에는 위에서 생성한 source endpoint를 선택합니다.
- target database endpoint에는 위에서 생성한 target endpoint를 선택합니다.
- migration type은 총 3가지인데 Migrate existing data and replicate ongoing changes를 선택합니다. 최초 한번 source 대상 전체를 migration하고 그 이후의 데이터 변경사항을 계속 복제합니다.
- 그 외의 설정은 기본값을 세팅합니다. 실제 서비스에서는 아래 링크의 내용을 참고하여 좀 더 세밀하게 설정하면 됩니다.
Specifying task settings for AWS Database Migration Service tasks – AWS Database Migration Service (amazon.com)
migration task를 생성하면 즉시 Source -> Target endpoint로 data migration이 수행됩니다. 만약 Staus가 fail일 경우엔 로그(Last failure message)를 확인하여 문제점을 수정하고 재시작하면 됩니다.
kafka message format
Migration task가 수행되면 원본 database에서 작업이 일어났을 때 DMS를 통해 kafka의 topic으로 변경사항이 발행(publish)됩니다. 발행된 데이터는 구독한 Consumer가 전달받을 수 있으며 메시지 형태는 다음과 같습니다.
- DDL : operation – drop-table, create-table
- DML : operation – load, update, delete
{ "metadata": { "timestamp": "2020-11-26T13:27:02.482358Z", "record-type": "control", "operation": "drop-table", "partition-key-type": "task-id", "schema-name": "kwtoon", "table-name": "gift" } } { "metadata": { "timestamp": "2020-11-26T13:27:02.489854Z", "record-type": "control", "operation": "create-table", "partition-key-type": "task-id", "schema-name": "kwtoon", "table-name": "gift" } } { "data": { "id": 1, "name": "coupon", "created": "2020-11-26T12:38:28Z" }, "metadata": { "timestamp": "2020-11-26T13:27:02.498033Z", "record-type": "data", "operation": "load", "partition-key-type": "primary-key", "schema-name": "kwtoon", "table-name": "gift" } } { "data": { "id": 1, "name": "cash", "created": "2020-11-26T12:38:28Z" }, "metadata": { "timestamp": "2020-11-26T13:29:24.751256Z", "record-type": "data", "operation": "update", "partition-key-type": "schema-table", "schema-name": "kwtoon", "table-name": "gift", "transaction-id": 8589934811 } } { "data": { "id": 1, "name": "cash", "created": "2020-11-26T12:38:28Z" }, "metadata": { "timestamp": "2020-11-26T13:30:15.674865Z", "record-type": "data", "operation": "delete", "partition-key-type": "schema-table", "schema-name": "kwtoon", "table-name": "gift", "transaction-id": 8589935097 } }
[참고]
https://docs.aws.amazon.com/ko_kr/dms/latest/userguide/CHAP_Target.Kafka.html
https://github.com/aws-samples/aws-dms-msk-demo – Connect to preview
https://docs.aws.amazon.com/ko_kr/msk/latest/developerguide/client-access.html
https://cloud.hosting.kr/aws-dms-database-migration-service/