- AuroraDB Migration – Using Kafka as a target for Database Migration Service
- Amazon DynamoDB Stream
- Amazon DocumentDB(MongoDB) Stream
- Database Migration by Transactional Outbox Pattern
- AWS Database Migration Service – Migration auroradb to kafka
이전 실습까지는 데이터 마이그레이션을 위해 Database에서 제공하는 binlog나 DynamoDB/MongoDB에서 제공하는 Change Stream을 통해 변경 데이터를 처리할 수 있었습니다. 하지만 이렇게 시스템적으로 지원을 받지 못하는 경우는 어떻게 해야할까요. 여러가지 방법이 있겟지만 이번 장에서는 Transactional Outbox Pattern을 사용하는 방법에 대해 살펴보겠습니다.
AWS의 Serverless AuroraDB는 인스턴스 및 용량 관리의 복잡성을 크게 줄여주는 장점이 있어 최근 많이 사용되는 추세입니다. 그런데 Serverless AuroraDB의 경우 Classic AuroraDB에서 제공하는 binlog를 사용할 수가 없습니다. 즉 현재 시점에서는 AWS DMS(Database Migration Service)를 사용할 수 없어 이형의 플랫폼으로 Database를 마이그레이션 해야할 경우 어려움에 봉착하게 됩니다. 결국 시스템 측면에서 도움을 받을 수 없으면 소프트웨어 측면에서 문제를 해결할 수 밖에 없습니다.
Transactional Outbox Pattern
Transactional Outbox는 특정 도메인 객체(테이블)의 변경사항이 발생할때 단일 트랜잭션내에서 동일 데이터베이스 내의 Outbox 테이블에 관련 정보를 저장하여 후속 이벤트를 처리하는 방법입니다. 해당 시나리오는 다음과 같습니다.
- 동일 Database내에 변경사항을 기록하는 Outbox Table을 생성
- 단일 트랜잭션 내에서 데이터 변경 + OutBox로 변경사항을 기록하는 액션을 수행. 동일 Database내에서 단일 트랙잭션으로 데이터를 처리하므로 변경 데이터를 일관성있게 처리 할 수 있음
- Outbox 테이블에 기록된 변경 사항은 Message Relay를 통해 Message Broker로 발행
- Message Broker에 발행된 내역은 Outbox 테이블에서 삭제
- 타 플랫폼에서는 Message Broker를 구독하여 변경 데이터를 전달받아 마이그레이션을 진행
Event Flow
Data Flow
Create Outbox Table
마이그레이션 대상의 변경 이력을 저장하기 위한 Outbox 테이블을 생성합니다.
Field | Type | Comment | Sample |
---|---|---|---|
id | bigint | PrimaryKey /AutoIncrement | 1000 |
aggregate_id | varchar | 순서 처리가 필요한 데이터를 Grouping하는 id( ex. user_id) | happydaddy |
aggregate_type | varchar | 변경이 발생한 도메인(테이블) 이름 | LICENSE |
payload | text | 도메인 entity 변경사항(JSON) | {“id”:42,”user_id”:”happydaddy”,”episode_id”:14562,”expire_at”:”2021-01-01T15:23:38Z”} |
event_type | varchar | 발생한 이벤트 (INSERT, UPDATE, DELETE …) | INSERT |
created_at_datetime | datetime | 이벤트 발생시간 | 2020-12-01 23:38:39.616869 |
Spring ApplicationEvent를 통한 Outbox 이벤트 처리
Java Spring 환경으로 작성되었으며 주요 부분에 대해서만 설명합니다. 전체 code는 다음 링크를 확인해 주십시오.
https://github.com/codej99/TransactionalOutbox.git
Application Event를 발행하는 event publisher 생성
@Component public class EventPublisher implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } public void publish(OutBoxEvent outboxEvent) { this.publisher.publishEvent(outboxEvent); } }
비즈니스 로직 처리시 테이블 변경사항이 발생하면 단일 트랜잭션 내에서 Outbox Event를 발행
@RequiredArgsConstructor @Service public class LicenseService implements RegisterLicenseUseCase { private final RegisterLicensePort registerLicensePort; private final EventPublisher eventPublisher; @Override @Transactional public void registerLicense(License license) { LicenseEntity licenseEntity = LicenseEntity.builder().userid(license.getUserid()).episodeId(license.getEpisodeId()).expireAtDatetime(license.getExpireAtDatetime()).build(); registerLicensePort.register(licenseEntity); eventPublisher.publish(EventUtils.createLicenseEvent(licenseEntity, EventUtils.EventType.INSERT)); } }
이벤트 리스너가 데이터베이스 변경 이벤트를 Outbox 테이블에 기록
@RequiredArgsConstructor @Service public class EventService { private final SaveOutBoxPort saveOutBoxPort; @EventListener public void handleOutBoxEvent(OutBoxEvent event) { OutBoxEntity outBox = OutBoxEntity.builder() .aggregateId(event.getAggregateId()) .aggregateType(event.getAggregateType().name()) .eventType(event.getEventType().name()) .payload(event.getPayload().toString()) .createdAtDatetime(LocalDateTime.now()) .build(); saveOutBoxPort.save(outBox); } }
Message Broker로 변경사항(Outbox)을 발행하는 Message Relay 작성
node로 작성되었으며 주요 부분에 대해서만 설명합니다. 전체 code는 다음 링크를 확인해 주십시오.
https://github.com/codej99/TransactionalOutboxMessageRelay.git
Outbox 테이블에 기록된 DB 변경 이벤트 조회
const publishOutbox = () => { const connection = createConnection(); const sql = `SELECT id, aggregate_id, aggregate_type, event_type, payload, created_at_datetime FROM outbox ORDER BY id asc`; connection.query(sql, function (err, result) { if (result) { pushToKafka(result); } else { console.error(`publishOutbox failed - ${err}`); } }); connection.end(function (err) { }); };
kafka topic에 변경사항 발행
const pushToKafka = (rows) => { const Producer = kafka.Producer, client = new kafka.KafkaClient({kafkaHost: process.env.KAFKA_BROKER}), producer = new Producer(client, {partitionerType: 3}); let payloads = []; for (let i = 0; i < rows.length; i++) { rows[i].payload = JSON.parse(rows[i].payload); console.log(rows[i].id + " : " + JSON.stringify(rows[i])); payloads.push({ topic: process.env.KAFKA_TOPIC, messages: JSON.stringify(rows[i]), key: rows[i].aggregate_id }) } producer.on("ready", function () { producer.send(payloads, function (err, result) { if (result) { console.info(`[SUCCESS] publishKafka - ${JSON.stringify(result)}`); deleteOutbox(rows); } else { console.error(`publishKafka failed - ${err}`); } producer.close(); }) }); producer.on("error", function (err) { console.error(`Unknown Error Occured : ${err}`); }); };
발행한 이벤트를 Outbox에서 삭제
const deleteOutbox = (rows) => { const connection = createConnection(); for (let i = 0; i < rows.length; i++) { const sql = `DELETE FROM outbox WHERE id=${rows[i].id}`; connection.query(sql, function (err, result) { if (result) console.info(`[SUCCESS] deleteOutbox - ${JSON.stringify(result)}`); else console.error(`deleteOutbox failed - ${err}`); }); } connection.end(function (err) { }); };
참고
https://dzone.com/articles/implementing-the-outbox-pattern
https://www.popit.kr/msa에서-메시징-트랜잭션-처리하기