DocumentDB는 AWS에서 제공하는 MongoDB 호환 완전관리형 Nosql 데이터베이스입니다. MongoDB와 완전히 호환되며 JSON 데이터를 쉽게 저장하고 쿼리 및 인덱싱 할 수 있는 document 기반 database 입니다. dynamoDB와 마찬가지로 stream을 제공하며 documentDB에 상태 변경(Insert, Delete, Update)이 발생했을때 변경 데이터 스트림을 전달받을 수 있습니다. 다만 아쉽게도 dynamoDB와 달리 아직은 Lambda trigger로 제공되지 않아 Polling 방식으로 stream을 읽어서 사용해야 합니다.(마지막 받은 stream의 id를 어딘가 저장해둬야 하는 불편함이 있습니다.)
Data Flow
documentDB의 변경 스트림을 Lambda에서 polling하여 카프카에 발행합니다. 카프카의 Topic을 구독 중인 Consumer는 스트리밍 데이터를 이형의 데이터 플랫폼에 전달하여 처리할 수 있게 합니다.
Amazon DocumentDB Stream 제약사항
- 변경 스트림은 DocumentDB 클러스터의 Primary 인스턴스에 대한 연결에서만 열 수 있습니다. 현재 복제본 인스턴스의 변경 스트림에서 읽기는 지원되지 않습니다.
- 변경 스트림에 작성된 이벤트는 최대 24시간 동안 사용할 수 있습니다.(기본값은 3시간). 변경 스트림 데이터는 새 변경 사항이 발생하지 않은 경우에도 로그 보존 기간 후에는 삭제 됩니다.
- collection에서 updateMany 또는 deleteMany 같은 장기 실행 쓰기 작업을 수행하는 경우, 장기 실행 쓰기 작업이 완료될 때까지 변경 스트림 이벤트의 쓰기를 일시적으로 중단할 수 있습니다.
- DocumentDB의 stream을 사용하려면 지정한 컬렉션의 변경 스트림을 명시적으로 활성화해야 합니다.
- 변경 스트림 이벤트의 총 크기(변경 데이터 및 요청된 경우 전체 문서 포함)가 16MB보다 크면 클라이언트가 변경 스트림에서 읽기에 실패합니다.
- Amazon DocumentDB 변경 스트림 기능은 기본적으로 비활성화되어 있으며 이 기능을 활성화하고 사용할 때까지 추가 요금이 발생하지 않지만 클러스터에서 변경 스트림을 사용하면 추가 IOPS 및 스토리지 비용이 발생합니다. (자세한 내용은 Amazon DocumentDB 요금을 참조)
Change Stream 활성화
지정된 데이터베이스 내의 모든 컬렉션 또는 선택한 컬렉션에 대해서만 Amazon DocumentDB 변경 스트림을 활성화 할 수 있습니다. 다음은 mongo shell을 사용하여 다양한 UseCase에 대한 변경 스트림을 활성화하는 방법의 예를 보여줍니다. database 및 collection 이름을 지정할 때 빈 문자열은 와일드 카드로 처리됩니다.
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true}); //Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false}); //Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true}); //Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
Stream 활성화 확인
cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}})); { "database" : "docdb-test", "collection" : "" }
Stream 로그 보관시간 변경(Modifying the Change Stream Log Retention Duration)
로그 보관시간은 Parameter Groups의 Change Stream Log Retention Duration 항목을 수정하여 변경할 수 있습니다. 변경후에는 클러스터를 리스타트 해야 적용됩니다..
DocumentDB Console -> Parameter Groups -> 연관된 그룹선택 -> Cluster parameters -> change_stream_log_retention_duration 검색 – Radiobox 선택 후 Edit 클릭하여 Value값 수정
Stream 데이터 확인을 위한 간단한 Lambda 프로그램 작성
change stream을 listening하는 node.js 프로그램을 다음과 같이 작성합니다. 아래 프로그램을 실행하면 변경 stream 정보가 console에 출력됩니다. 특정 스트림 이후의 변경 데이터만 받고 싶으면 resumeAfter 항목에 stream id를 세팅하고 실행하면 됩니다.
'use strict'; const MongoClient = require("mongodb").MongoClient; module.exports.main = event => { MongoClient.connect(MONGODB_URL, function(error, client){ if(error) { console.log(error); } else { const db = client.db('docdb-test'); const collection = db.collection('sample'); const changeStreamOptions = { fullDocument: 'updateLookup' /* ,resumeAfter: { _data: '015fbe030200000002010000000200005010' }*/ }; const changeStream = collection.watch(changeStreamOptions); changeStream.on('change', next => console.log(next)); } }); };
DocumentDB Stream format
위에서 작성한 프로그램으로 변경 stream을 캡쳐하면 아래와 같은 형태의 결과를 확인 할 수 있습니다.
- 데이터 추가시 : operationType = insert , 결과에 fullDocument 데이터 출력
- 데이터 수정시 : operationType = update , 결과에 fullDocument , updateDescription데이터 출력
- 데이터 삭제시 : operationType = delete , 결과에 documentKey 데이터 출력
// insert { _id: { _data: '015fbe037000000001010000000100005010' }, operationType: 'insert', clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606288240 }, ns: { db: 'docdb-test', coll: 'sample' }, documentKey: { _id: 5fbe0370d9e5066692de4de5 }, fullDocument: { _id: 5fbe0370d9e5066692de4de5, name: 'Michael', age: 15, gender: 'M' } } // update { _id: { _data: '015fbe049f00000009010000000900005010' }, operationType: 'update', clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 9, high_: 1606288543 }, ns: { db: 'docdb-test', coll: 'sample' }, documentKey: { _id: 5fbdf4e661b6f9416b335117 }, fullDocument: { _id: 5fbdf4e661b6f9416b335117, name: 'Michael', age: 15, gender: 'M', hobby: 'swimming' }, updateDescription: { updatedFields: { hobby: 'swimming' }, removedFields: [] } } // delete { _id: { _data: '015fbe058a00000007010000000700005010' }, operationType: 'delete', clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 7, high_: 1606288778 }, ns: { db: 'docdb-test', coll: 'sample' }, documentKey: { _id: 5fbdf57e2baf0c4303453554 } }
documentDB stream을 kafka에 발행하는 내용은 이전장의 dynamoDB Stream에서 실습했으므로 생략합니다.
[참고]
https://medium.com/@marchpig/mongodb-change-streams-baa78eaa82ed
https://docs.aws.amazon.com/documentdb/latest/developerguide/change_streams.html
https://docs.aws.amazon.com/ko_kr/documentdb/latest/developerguide/change_streams.html