- AuroraDB Migration – Using Kafka as a target for Database Migration Service
- Amazon DynamoDB Stream
- Amazon DocumentDB(MongoDB) Stream
- Database Migration by Transactional Outbox Pattern
- AWS Database Migration Service – Migration auroradb to kafka
이전 실습까지는 데이터 마이그레이션을 위해 Database에서 제공하는 binlog나 DynamoDB/MongoDB에서 제공하는 Change Stream을 통해 변경 데이터를 처리할 수 있었습니다. 하지만 이렇게 시스템적으로 지원을 받지 못하는 경우는 어떻게 해야할까요. 여러가지 방법이 있겟지만 이번 장에서는 Transactional Outbox Pattern을 사용하는 방법에 대해 살펴보겠습니다.
AWS의 Serverless AuroraDB는 인스턴스 및 용량 관리의 복잡성을 크게 줄여주는 장점이 있어 최근 많이 사용되는 추세입니다. 그런데 Serverless AuroraDB의 경우 Classic AuroraDB에서 제공하는 binlog를 사용할 수가 없습니다. 즉 현재 시점에서는 AWS DMS(Database Migration Service)를 사용할 수 없어 이형의 플랫폼으로 Database를 마이그레이션 해야할 경우 어려움에 봉착하게 됩니다. 결국 시스템 측면에서 도움을 받을 수 없으면 소프트웨어 측면에서 문제를 해결할 수 밖에 없습니다.
Transactional Outbox Pattern
Transactional Outbox는 특정 도메인 객체(테이블)의 변경사항이 발생할때 단일 트랜잭션내에서 동일 데이터베이스 내의 Outbox 테이블에 관련 정보를 저장하여 후속 이벤트를 처리하는 방법입니다. 해당 시나리오는 다음과 같습니다.
- 동일 Database내에 변경사항을 기록하는 Outbox Table을 생성
- 단일 트랜잭션 내에서 데이터 변경 + OutBox로 변경사항을 기록하는 액션을 수행. 동일 Database내에서 단일 트랙잭션으로 데이터를 처리하므로 변경 데이터를 일관성있게 처리 할 수 있음
- Outbox 테이블에 기록된 변경 사항은 Message Relay를 통해 Message Broker로 발행
- Message Broker에 발행된 내역은 Outbox 테이블에서 삭제
- 타 플랫폼에서는 Message Broker를 구독하여 변경 데이터를 전달받아 마이그레이션을 진행
Event Flow

Data Flow

Create Outbox Table
마이그레이션 대상의 변경 이력을 저장하기 위한 Outbox 테이블을 생성합니다.
| Field | Type | Comment | Sample |
|---|---|---|---|
| id | bigint | PrimaryKey /AutoIncrement | 1000 |
| aggregate_id | varchar | 순서 처리가 필요한 데이터를 Grouping하는 id( ex. user_id) | happydaddy |
| aggregate_type | varchar | 변경이 발생한 도메인(테이블) 이름 | LICENSE |
| payload | text | 도메인 entity 변경사항(JSON) | {“id”:42,”user_id”:”happydaddy”,”episode_id”:14562,”expire_at”:”2021-01-01T15:23:38Z”} |
| event_type | varchar | 발생한 이벤트 (INSERT, UPDATE, DELETE …) | INSERT |
| created_at_datetime | datetime | 이벤트 발생시간 | 2020-12-01 23:38:39.616869 |
Spring ApplicationEvent를 통한 Outbox 이벤트 처리
Java Spring 환경으로 작성되었으며 주요 부분에 대해서만 설명합니다. 전체 code는 다음 링크를 확인해 주십시오.
https://github.com/codej99/TransactionalOutbox.git
Application Event를 발행하는 event publisher 생성
@Component
public class EventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void publish(OutBoxEvent outboxEvent) {
this.publisher.publishEvent(outboxEvent);
}
}
비즈니스 로직 처리시 테이블 변경사항이 발생하면 단일 트랜잭션 내에서 Outbox Event를 발행
@RequiredArgsConstructor
@Service
public class LicenseService implements RegisterLicenseUseCase {
private final RegisterLicensePort registerLicensePort;
private final EventPublisher eventPublisher;
@Override
@Transactional
public void registerLicense(License license) {
LicenseEntity licenseEntity = LicenseEntity.builder().userid(license.getUserid()).episodeId(license.getEpisodeId()).expireAtDatetime(license.getExpireAtDatetime()).build();
registerLicensePort.register(licenseEntity);
eventPublisher.publish(EventUtils.createLicenseEvent(licenseEntity, EventUtils.EventType.INSERT));
}
}
이벤트 리스너가 데이터베이스 변경 이벤트를 Outbox 테이블에 기록
@RequiredArgsConstructor
@Service
public class EventService {
private final SaveOutBoxPort saveOutBoxPort;
@EventListener
public void handleOutBoxEvent(OutBoxEvent event) {
OutBoxEntity outBox = OutBoxEntity.builder()
.aggregateId(event.getAggregateId())
.aggregateType(event.getAggregateType().name())
.eventType(event.getEventType().name())
.payload(event.getPayload().toString())
.createdAtDatetime(LocalDateTime.now())
.build();
saveOutBoxPort.save(outBox);
}
}
Message Broker로 변경사항(Outbox)을 발행하는 Message Relay 작성
node로 작성되었으며 주요 부분에 대해서만 설명합니다. 전체 code는 다음 링크를 확인해 주십시오.
https://github.com/codej99/TransactionalOutboxMessageRelay.git
Outbox 테이블에 기록된 DB 변경 이벤트 조회
const publishOutbox = () => {
const connection = createConnection();
const sql = `SELECT
id,
aggregate_id,
aggregate_type,
event_type,
payload,
created_at_datetime
FROM outbox
ORDER BY id asc`;
connection.query(sql, function (err, result) {
if (result) {
pushToKafka(result);
} else {
console.error(`publishOutbox failed - ${err}`);
}
});
connection.end(function (err) {
});
};
kafka topic에 변경사항 발행
const pushToKafka = (rows) => {
const Producer = kafka.Producer,
client = new kafka.KafkaClient({kafkaHost: process.env.KAFKA_BROKER}),
producer = new Producer(client, {partitionerType: 3});
let payloads = [];
for (let i = 0; i < rows.length; i++) {
rows[i].payload = JSON.parse(rows[i].payload);
console.log(rows[i].id + " : " + JSON.stringify(rows[i]));
payloads.push({
topic: process.env.KAFKA_TOPIC,
messages: JSON.stringify(rows[i]),
key: rows[i].aggregate_id
})
}
producer.on("ready", function () {
producer.send(payloads, function (err, result) {
if (result) {
console.info(`[SUCCESS] publishKafka - ${JSON.stringify(result)}`);
deleteOutbox(rows);
} else {
console.error(`publishKafka failed - ${err}`);
}
producer.close();
})
});
producer.on("error", function (err) {
console.error(`Unknown Error Occured : ${err}`);
});
};
발행한 이벤트를 Outbox에서 삭제
const deleteOutbox = (rows) => {
const connection = createConnection();
for (let i = 0; i < rows.length; i++) {
const sql = `DELETE FROM outbox WHERE id=${rows[i].id}`;
connection.query(sql, function (err, result) {
if (result)
console.info(`[SUCCESS] deleteOutbox - ${JSON.stringify(result)}`);
else
console.error(`deleteOutbox failed - ${err}`);
});
}
connection.end(function (err) {
});
};
참고
https://dzone.com/articles/implementing-the-outbox-pattern
https://www.popit.kr/msa에서-메시징-트랜잭션-처리하기














