Search

11. 스트림 처리

목차

개요

복잡하지만 잘 작동하는 시스템은 예외 없이 간단하지만 잘 작동하는 시스템으로부터 발전한다. 이 명제는 역도 참이다. 처음부터 복잡하게 설계된 시스템은 절대 작동할 리도 없고 작동하게 만들지도 못한다. - 존 갈, 체계론(1975)
일괄 처리를 이용해 고성능의 대용량 데이터 처리를 제공할 수 있게 되었지만, 실시간성이 떨어진다는 문제가 있는데, 이러한 실시간성을 높히기 위해 스트림 처리라는 기술이 나오게 되었다
스트림은 시간 흐름에 따라 점진적으로 생산된 데이터를 말하는데, 유닉스의 stdin / stdout, 프로그래밍 언어(느긋한 리스트(lazy lists)), 자바의 FileInputStream 같은 파일 시스템 API, TCP 연결, 인터넷 상의 오디오와 비디오 전송 등 많은 곳에서 등장한다.
이번 포스팅에선 이벤트 스트림을 통해 데이터를 관리하는 방법에 대해 살펴본다.

이벤트 스트림 전송

스트림 처리에선 어떻게 무엇을 입출력할까?
일괄 처리 환경에서는 파일이였다면, 스트림 처리 문맥에서는 특정 시점에 일어난 사건에 대한 세부 사항을 포함하는 이벤트라는 작은 불변 객체를 이용한다.
이벤트는
텍스트 문자열
JSON
이진 형태
와 같은 형태로 부호화된다. 이벤트는 이런 부호화 과정을 통해 저장할 수 있다.
이렇게 저장한 데이터는 다른 노드에 네트워크를 통해 전송해서 처리하게 할 수도 있다.
스트리밍에서는 생산자(producer)가 이벤트를 만들면 (발행자(publisher)발송자(sender)라고도 한다.) 해당 이벤트를 복수의 소비자(consumer, 구독자(subscriber) 또는 수신자(recipient))가 처리할 수 있다. 그리고 보통 토픽(topic)이나 스트림으로 관련 이벤트를 묶는다.
이론상으로는 파일 혹은 DB를 이용해
생산자는 모든 이벤트를 DB에 기록하고
소비자는 주기적으로 DB를 폴링해 새로운 이벤트를 확인한다.
위와 같은 방법으로 생산자와 소비자를 연결할 수 있다. 하지만, 이런 방식은 소비자가 새로운 이벤트를 폴링하는 방식이 고비용일 확률이 높다. 폴링이 잦을수록 해당 폴링이 유효한 폴링이 될 가능성이 낮아지고 이는 폴링의 오버헤드가 커진다는 것과 같다.
오히려 새로운 이벤트가 나타날 때마다 소비자에게 알리는 편이 더 나을 수 있다.

메시징 시스템

새로운 이벤트를 소비자에게 알려주는 일반적인 방법으로 메시징 시스템(messaging system)이 있다.
메시징 시스템을 구축하는 가장 간단한 방법은 생산자와 소비자 사이에 유닉스 파이프나 TCP 연결과 같은 직접 통신 채널을 사용하는 것으로, 대부분의 메시징 시스템이 이러한 기본 모델을 확장해 사용하고 있다.
발행/구독(publish/subscribe) 모델에선 여러 시스템들이 다양한 접근법을 사용한다.
각각 트레이드 오프가 있으며, 적절한 접근법을 찾아서 사용해야 하는데 다음과 같은 질문을 통해 이 시스템을 구별하는데 도움이 될 수 있다.
1.
소비자가 메시지를 처리하는 속도보다 생산자가 메시지를 빠르게 전송한다면?
a.
메시지를 버린다.
b.
큐에 메세지를 버퍼링한다.
c.
배압(backpressure, 흐름 제어(flow control)라고도 하며 생산자가 메시지를 더 보내지 못하게 막는다.) 한다.
i.
유닉스 파이프와 TCP는 배압을 사용한다.
2.
노드가 죽거나 일시적으로 오프라인이 된다면 어떻게 될까? 손실되는 메시지가 있을까?
a.
DB를 사용할 때처럼 지속성을 갖추려면 디스크에 기록하거나 복제본 생성을하거나 둘 모두를 해야 한다. 그렇기에 비용이 든다.
메시지의 유실을 허용할지 말지는 애플리케이션에 따라 다르다.
만약 주기적으로 전송되는 센서값이나 지표값은 가끔 데이터가 누락되더라도 큰 문제가 되지 않는다. (물론 너무 많은 누락은 값의 신뢰성을 떨어트린다.)
일괄 처리에서는 강력한 신뢰성 보장이라는 속성을 지원하는데, 스트림에선느 이와 비슷한 처리 보장을 어떻게 해야 하는지도 알아보자.

생산자에서 소비자로 메시지 직접 전달하기

많은 메시지 시스템은 중간 노드를 거치지 않고 생산자와 소비자간에 직접 통신한다.
UDP 멀티 캐스트는 낮은 지연이 필수인 주식 시장과 같은 금융 산업에서 많이 사용된다. UDP자체로는 신뢰성이 낮아도 애플리케이션 단의 프로토콜은 읽어버린 패킷을 복구할 수 있다. (생산자는 필요할 떄 패킷 재전송을 할 수 있도록 전송한 패킷을 기억해야 한다.)
ZeroMQ같은 브로커가 필요없는 메시징 라이브러리와 나노메시지(nanomsg)가 이와 유사한 접근법을 사용하는데 TCP 또는 IP 멀티캐스트 상에서 발행/구독 메시징을 구현한다.
StatsD과 BruBeck은 지표 수집및 모니터링에 UDP 메시징을 사용한다.
소비자가 네트워크에 서비스를 노출하면 생산자는 직접 HTTP나 RPC 요청을 직접 보낼 수 있다.
웹훅(webhook)의 기본 아이디어로 서비스 콜백 URL을 다른 서비스에 등록하는 형식으로 이벤트가 발생할 때마다 콜백 URL로 요청을 보낸다.
직접 메시징 시스템은 메시지가 유실될 가능성을 고려한 코드 작성이 되야 한다. 소비자가 오프라인이기에 메시지를 전달하지 못하는 상황에서 전송된 메시지는 유실될 수 있고, 이런 실패한 메시지 전송을 재시도하다보면 생산자 장비가 죽거나 재시도 하려 했던 메시지 버퍼를 잃어버릴 수 있기에 문제가 있다.

메시지 브로커

메시지 큐라고도 부르는 메시지 브로커를 통해 메시지를 보내는 방법이 있다.
메시지 브로커는 근본적으로 메시지 스트림을 처리하는 데 최적화된 DB의 일종으로 메시지 브로커는 서버로 구동되고 생산자와 소비자는 서버의 클라이언트로 접속한다. 그 다음 생산자는 브로커로 메시지를 전송하고 소비자는 브로커에서 메시지를 읽어 전송받는다.

메시지 브로커와 데이터베이스의 비교

DB는 명시적으로 데이터가 삭제될 떄까지 데이터를 보관한다.
메시지 브로커 대부분은 메시지 전송이 끝나면 자동으로 메시지를 삭제한다.
메시지 브로커는 메시지를 빨리 지우기에 작업 집합이 작다고 가정한다. (큐 크기가 작다)
만약, 소비자가 느려 메시지 버퍼링이 심하다면 개별 메시지 처리 시간과 전체 처리량이 낮아진다.
DB가 보조 색인을 지원한다면, 메시지 브로커는 특정 패턴과 부합하는 토픽의 부분 집합을 구독하는 방식
DB에 질의할 때 그 결과는 일반적으로 질의 시점의 데이터 스냅숏을 기준으로 한다.

복수 소비자

복수 소비자가 같은 토픽에서 메시지를 읽을 떄 사용하는 패턴은 다음과 같다.
(a) 로드 밸런싱: 여러 소비자가 하나의 토픽을 소비하는 작업을 공유. (b) 팬 아웃: 각 메시지를 복수 개의 소비자로 전달
로드 밸런싱
각 메시지는 소비자 중 하나로 전달된다.
브로커는 메시지를 전달할 소비자를 임의로 지정한다.
메시지 처리 비용이 높아 처리를 병렬화 하기 위해 소비자를 추가 하고 싶을 때 유용하다.
JMS에서는 공유 구독(shared subscription)이라 한다.
AMQP는 같은 큐를 소비하는 클라이언트를 여러개 둬 로드밸런싱 구현이 가능하다.
팬 아웃
각 메시지는 모든 소비자에게 전달된다.
여러 독립적인 소비자가 브로드캐스팅된 동일 메시지를 모두 볼 수 있다.
일괄 처리에서 사용하는 방법과 동일
이 두 가지 패턴은 함께 사용 할 수도 있다.
두 개의 소비자 그룹에서 하나의 토픽을 구독하고 각 그룹은 모든 메시지를 받지만 그룹 내에선 각 메시지를 하나의 노드만 받게 할 수 있다.

확인 응답과 재전송

소비자에겐 언제나 문제가 생길 수 있다.
그렇기에 문제가 발생한 시점에 브로커가 메시지를 소비자에게 전달하면 소비자는 메시지를 처리하지 못하거나 일부분만 처리하게 되고 문제가 생길 수 있다. 그래서 메시지 브로커는 확인 응답을 사용해서 대응한다. 클라이언트가 메시지 처리가 끝났을 때 브로커가 메시지를 큐에서 제거할 수 있게 브로커에게 명시적으로 알려야 한다.
만약, 브로커가 확인 응답을 아직 받지 못했는데 클라이언트에 문제가 생겨 연결이 닫히거나 타임아웃되면 브로커는 메시지가 처리되지 않았다고 가정하고 다른 소비자에게 다시 전송한다.
소비자2가 m3를 처리하던 도중 장애가 발생해 이후에 소비자 1로 재전송한다.
문제가 발생했을 때 재전송하는 방식은 메시지 순서에도 영향을 줄 수 있다.
위 그림을 보면 소비자1이 메시지m4를 처리하고 있을 떄 소비자2가 메시지 m3를 처리하가에러발 생한하다. m3는 확인 응답을 받지 못해 소비자 1로 재전송된다.
그 결과 소비자 1에선 m4 → m3 → m5 순으로 메시지를 처리한다.

파티셔닝된 로그

네트워크 상에서 패킷을 전송하거나 네트워크 서비스에 요청하는 작업은 보통 영구적 추적을 남기지 않는 일시적 연산이다.
메시지 브로커 역시 메시지가 소비자에게 전달된 후 즉시 삭제된다. 메시지 브로커자체가 메시지를 일시적으로 보관하는 개념으로 만들어졌기 때문이다.
그렇기 때문에 이미 받은 메시지는 복구할 수 없고, 소비자를 다시 실행해도 동일한 결과를 받을 수 없다. 메시징 시스템에 새로운 소비자를 추가한다면 추가된 등록 시점 이후의 메시지부터 받기 시작하며 이전 메시지는 다시 복구할 수 없다.
DB의 지속성 있는 저장 방식과 메시징 시스템의 지연 시간이 짧은 알림 기능을 조합하기 위해 로그 기반 메시지 브로커(log-based message broker)라는 아이디어가 등장했다.

로그를 사용한 메시지 저장소

로그는 단순히 디스크에 저장된 추가 전용 레코드의 연속이다.
브로커를 구현할 때도 같은 구조를 사용한다. 생산자가 보낸 메시진 로그 끝에 추가하고 소비자는 로그를 순차적으로 읽어 메시지를 받는다. 소비자가 로그 끝에 도달하면 새 메시지가 추가됐다는 알림을 기다린다.
유닉스 도구 tail -f는 파일에 추가되는 데이터를 감시하는데 본질적으로 이 구조와 동일하다.
로그를 파티셔닝해서 처리량을 높힐 수도 있다.
각각의 파티션은 다른 장비에서 서비스할 수 있고, 그 덕에 각 파티션은 다른 파티션과 독립적으로 읽고 쓰기가 가능한 분리된 로그가 된다.
토픽과 같은 형식의 메시지를 전달하는 파티션들의 그룹으로 정의한다.
생산자가 메시지를 전송하면 메시지는 토픽 파티션 파일에 추가되고 소비잔 순서대로 파티션 파일을 읽는다.
각 파티션 내에서 브로커는 모든 메시지에 오프셋이라 부르는, 단조 증가하는 순번을 보여한다.
파티션이 추가 전용이고 따라서 파티션 내 전체 메시지는 전체 순서가 있기에 순번을 부여하는 것은 타당하다. (다른 파티션 간 메시지의 순서는 보장하지 않는다. )
아파티 카프카(Apache Kafka)
아마존 키네시스 스트림(Amazon Kinesis Stream)
트위터의 분산 로그(DistributedLog)
위 세 가지 메시지 브로커가 위에 설명한 방식으로 동작하는 로그 기반 메시지 브로커다
구글 클라우드 Pub/Sub은 아키텍처는 비슷하지만 노출된 API는 로그 추상화가 아닌 JMS 형식이다.
이러한 메시지 브로커는 모든 메시지를 디스크에 저장하지만 여러 장비에 메시지를 파티셔닝해 초당 수백만 개의 메시지를 처리할 수 있고 메시지를 복제함으로써 장애에 대비할 수 있다.
참고: JMS (Java Message Service) : Java를 사용해 메시지 지향 미들웨어(Message-Oriented Middleware, MOM)를 비동기 메시지 전송을 단순화하는 API로 스트림 처리와 결합해 대규모 분산 시스템에서 실시간 데이터 처리를 할 수 있으며 다음과 같은 메시지 모델들을 지원한다. 1. Point-to-Point: 메시지를 큐(Queue)에 저장하고, 하나 이상의 컨슈머에 전달한다. 각 메시지는 오직 하나의 컨슈머에게만 전달된다. 2. Publish-Subscribe: 퍼블리셔가 특정 토픽(Topic)에 메시지를 게시하면 해당 토픽을 구독하는 모든 구독자에게 브로드캐스팅된다.

로그 방식과 전통적인 메시징 방식의 비교

로그 방식 (예: Apache Kafka):
1.
내구성: 메시지를 영구적으로 저장하고 순서를 유지하여 높은 내구성을 제공한다.
2.
재생성 가능: 컨슈머가 이미 처리한 메시지를 다시 처리할 수 있게 하며 이러한 점은 디버깅에 유용하다.
3.
고성능: 데이터를 연속적으로 저장하고 메시지를 빠르게 전달하므로 대량의 데이터 처리에 적합하다.
4.
확장성: 클러스터를 통해 수평적으로 확장할 수 있습니다.
전통적인 메시징 방식 (예: JMS, RabbitMQ, ActiveMQ)
1.
유연한 메시지 처리: 메시지를 큐나 토픽으로 분리하여 다양한 메시지 처리 패턴을 지원한다.
2.
상태 관리: 메시지 상태를 메시징 시스템에서 추적하고 관리할 수 있다.
(Ex: 메시지가 전송되었는지, 처리되었는지 등).
3.
전송 보장: 메시지 전송 여부 확인 및 실패시 재전송할 수 있는 기능을 제공한다.
4.
메시지 필터링: 구독자가 특정 조건을 기반으로 메시지를 선택적으로 수신할 수 있다.
로그 기반 접근법은 팬 아웃 메시징 방식을 제공하기에 대용량 데이터 처리 및 확장성과 내구성을 가질 수 있다. 반면 전통적인 메시징 방식은 메시지 처리를 유연하게 하고 상태 관리를 하는데 중점을 둔다. 그렇기에
메시지 처리 비용이 비싸고,
메시지 단위로 병렬화 처리하고 싶고
메시지 순서는 중요하지 않다면
JMS / AMQP 방식의 메시지 브로커가 적합하다.
메시지 처리량이 많고,
메시지를 처리하는 속도가 빠르지만
메시지 순서가 중요하다면
로그 기반 접근법이 적합하다.

소비자 오프셋

파티션 하나를 순서대로 처리하면 메시지를 어디까지 처리했는지 알기 쉽다.
소비자의 현재 오프셋을 기준으로 이전은 이미 처리한 메시지, 이후는 아직 처리하지 않은 메시지다.
따라서 브로커는 개별 메시지마다 보내는 확인 응답을 추적할 필요가 없고, 주기적으로 소비자 오프셋을 기록하면 된다. 이 방식은 단일 리더 DB 복제에서 사용되는 로그 순차 번호(log sequence number)와 비슷하다.
메시지 브로커가 DB의 리더처럼 동작하고 소비자가 팔로워처럼 동작한다.
그래서 소비자 노드에 장애가 발생할 경우 소비자 그룹 내 다른 노드에 장애가 발생한 소비자의 파티션을 할당하고 기록된 오프셋 메시지를 처리하기 시작한다.
장애가 발생한 소비자가 처리했지만 아직 오프셋을 기록하지 못한 메시지가 있다면 이 메시지는 재시작할 때 두 번 처리된다.

디스크 공간 사용

로깅을 하다보면 디스크 공간이 가득차게 될 것이다.
그렇다고 로깅을 멈추고 확장하거나 정리될 때까지 기다릴 수는 없기에 여러 방식을 사용해 이를 해결한다.
로그를 여러 조각으로 나눠 오래된 것부터 삭제하거나 보관 저장소로 이동(Ex: DB)
소비자의 소비가 너무 늦어서 메시지 생산 속도를 따라잡지 못하면, 소비자의 오프셋이 이미 삭제된 조각을 가리킬 수도 있고 이는 메시지 유실을 의미한다.
로그는크기가 제한된 버퍼로 구현하고버퍼가 가득 차면 오래된 메시지 순서대로 버린다.
이런 버퍼를 원형 버퍼(circular buffer) or 링 버퍼(ring buffer)라 한다.

소비자가 생산자를 따라갈 수 없을 때

위에서 얘기했던 소비자가 생산자의 메시지 생산 속도를 따라갈 수 없는 경우 메시지 버리기, 버퍼링, 배압 적용하기를 사용할 수 있다.
소비자가 뒤처져 필요한 메시지가 디스크에 보유한 메시지보다 오래되면 필요한 메시지는 읽을 수 없다. 그래서 브로커는 버퍼 크기를 넘는 오래된 메시지를 자연스럽게 버린다.
소비자가 로그의 헤드로부터 얼마나 떨어졌는지를 모너티링해서 너무 많이 뒤처진다면 경고할 수도 있다.
어떤 소비자가 너무 뒤처져서 메시지를 잃기 시작해도 해당 소비자만 영향을 받고 다른 소비자들의 메시지에 영향을 주지는 않는다. 이는 운영상 상당한 장점으로 다가온다.

오래된 메시지 재생

AMQP와 JMS 유형의 메시지 브로커에서 메시지를 처리하고 확인 응답하는 작업은 브로커에서 메시지를 제거하기 때문에 파괴적 연산이라 할 수 있다.
반면 로그 기반 메시지 브로커는 메시지를 소비하는게 오히려 파일을 읽는 작업과 더 유사하다. (읽기 전용 연산이기 때문에)
소비자의 출력을 제외하고 메시지 처리의 부수 효과는 소비자 오프셋 이동이 전부다.
소비자 오프셋은 소비자 관리 아래에 있기 때문에 필요할 경우 조작하기가 쉽다.
소비자의 어제 오프셋을 복사했다가 전날 분량의 메시지 재처리를 위해 다른 위치에 출력을 기록할 수 있다.
이러한 부분들은 일괄 처리와 유사한점이 있다. 로그 기반 메시징과 일괄 처리는 변환 처리를 반복해도 입력 데이터에 영향을 주지 않고 파생데이터를 만든다. 로그 기반 메시징 시스템은 많은 실험을 할 수 있고 오류와 버그를 복구하기 쉽기 때문에 조직 내에서 데이터플로를 통합하는데 좋은 도구다.

데이터베이스와 스트림

이번 절에선 이종 데이터 시스템에서 발생하는 문제 한가지를 먼저 살펴본 뒤 이벤트 스트림의 아이디어를 데이터베이스에 적용해 해결하는 방법을 찾아본다.

시스템 동기화 유지하기

해당 포스팅까지 진행하며 단일 시스템 중 데이터 저장과 질의, 처리 요구사항을 모두 만족하는 시스템은 없었다. 모든 요구사항을 만족하기 위해서는 단일 시스템이 아닌 몇 가지 다른 기술의 조합이 필요하다.
사용자 요청에 대응하기 위한 OLTP DB
공통 요청의 응답 속도를 높이기 위한 캐시
검색 질의를 다루기 위한 전문 색인
분석용 데이터 웨어하우스
이 시스템 각각은 데이터의 복제본을 가지고 있고 데이터는 목적에 맞게 최적화된 형태로 저장된다.
관련이 있거나 동일한 데이터가 여러 다른 장소에서 나타날 수 있기에 서로 동기화도 필수다.
데이터 베이스에 아이템 하나가 갱신되면 캐시, 색인, 데이터 웨어하우소 같이 갱신되야 한다.
(데이터 웨어하우스에선 이 동기화 과정을 ETL 과저에서 수행한다. )
주기적으로 DB 전체를 덤프하는 작업이 너무 느리면 대안으로 이중 기록(dual write)이 있다.
이중 기록을 사용하면 데이터가 변할 때마다 애플리케이션 코드에서 명시적으로 각 시스템에 기록한다.
하지만 이중 기록에는 몇 가지 심각한 문제가 있는데, 그 중하나가 다음 그림에 나온 것과 같은 경쟁 조건이다.
두 클라이언트에서 동시에 아이템 X를 업데이트 하려 하고 있다.
각 클라이언트는 새 값을 쓴 후 검색 색인에 기록한다.
클라이언트 1이 값을 쓴 뒤 클라이언트 2가 값을 다르게 바꾸고 색인에 기록하며 최종 값이 바뀐다.
클라이언트 2가 검색 색인에 기록한 뒤 클라이언트 1이 검색 색인에 기록을 시도하면서 색인 최종 값이 A가 된다(실제 값은 B지만)
사실 이러한 문제는 동시성 문제라기보단 내결함성 문제로 두 시스템 간 불일치가 발생하는 현상이 발생한다. 동시 성공 또는 동시 실패를 보장하는 방식은 원자적 커밋 문제다.
단일 리더 복제 DB하나를 사용해 리더가 쓰기 순서를 결정하는 식으로 해결할 수 있지만, 위 그림에선 단일 리더가 존재하지 않는 것으로 보인다. DB에서도 리더가 있고 검색 색인에도 리더가 있을지 모르지만 다른 쪽을 팔로우하지 않기 때문에 충돌이 발생할 수 있다.
색인용 인덱스를 DB의 팔로워로 만들어 실제로 리더 하나만 존재하게 한다면 상황은 나아지겠지만 가능한 이야기일까?

변경 데이터 캡처

대부분의 DB의 복제 로그와 관련된 문제는 공개 API가 아닌 오랫동안 DB내부 상세 구현으로 간주됐다. 클라이언트는 일반적으로 복제 로그를 파싱해서 데이터를 추출하는 방식보단 데이터 모델과 질의 언어를 통해 DB에 질의한다.
많은 DB는 지난 수십년도안 데이터 변경 로그를 얻는 방법에 대해서 기술한 문서를 따로 제공하지 않았기 대문에 DB에서 발생하는 데이터 변화를 감지하고 변경된 내용을 검색 색인, 캐시, 데이터 웨어하우스 같은 다른 저장 기술에 복제하기 어려웠다.
그래서 최근 들어 변경 데이터 캡처(change data capture, CDC)에 관심이 높아지고 있다.
이는 DB에 기록하는 모든 데이터의 변화를 관찰헤 다른 시스템으로 데이터를 복제할 수 있는 형태로 추출하는 과정이다. CDC는 데이터가 기록되자마자 변경 내용을 스트림으로 제공할 수 있으면 특히 유용하다. CDC는 데이터가 기록되자마자 변경 내용을 스트림으로 제공할 수 있으면 특히 유용하다.

데이터베이스의 변경 사항 캡처

DB의 변경 사항을 캡처해 변경 사항을 검색 색인에 꾸준히 반영할 수 있다. 같은 순서로 로그 변경이 반영된다면 DB의 데이터와 색인이 일치할 것이다.

변경 데이터 캡처의 구현

검색 색인과 데이터 웨어하우스에 저장된 데이터는 레코드 시스템에 저장된 데이터의 또 다른 뷰일 뿐이기에 로그 소비자를 파생 데이터 시스템이라 할 수 있다.
변경 데이터 캡처(Change Data Capture, CDC)는 파생 데이터 시스템이 레코드 시스템의 정확한 데이터 복제본을 가지게 하기 위해 레코드 시스템에 발생하는 모든 변경 사항을 파생 데이터 시스템에 반영하는 것을 보장하는 메커니즘이다.
CDC 원본 데이터베이스(DB) 하나를 리더로, 나머지를 팔로워로 두어 변경 사항을 캡처한다.
로그 기반 메시지 브로커는 원본 DB에서 변경 이벤트를 전송하기에 적합하며 메시지 순서를 유지한다.
CDC를 구현할 때 DB 트리거를 사용하기도 하는데, 테이블의 모든 변화를 관찰하는 트리거를 등록하고 변경 로그 테이블에 해당 항목을 추가하는 방식이다. 하지만 이 방식은 고장 위험도 높고 성능 오버헤드도 크다. 복제 로그를 파싱하는 방식은 스키마 변경 대응 등 해결해야 할 여러 문제가 있지만 트리거 방식보다 견고한 방법이다.
이 아이디어는 많은 IT회사에서 사용되고 있는데,
링크드인의 데이터버스(Databus)
페이스북의 웜홀(Wormhole)
야후의 셰르파(Sherpa)
등에서 대규모 데이터를 다룰 때 이 아이디어를 사용한다.
CDC는 메시지 브로커와 동일하게 비동기 방식으로 동작한다. 이는 레코드 DB 시스템이 변경 사항을 커밋하기 전 소비자에게 적용될 때까지 기다리지 않는다.
이러한 설계는 느린 소비자가 추가되더라도 레코드 시스템에 미치는 영향이 적어 운영상 이점이 있지만, 복제 지연과 관련된 모든 문제가 발생할 수 있다는 단점이 있다.

초기 스냅숏

DB에서 발생한 모든 변경 로그가 있다면 로그를 재현해 DB의 전체 상태를 재구축할 수 있다.
그러나 대부분 모든 변경 사항을 영구적으로 보관하는 일은 디스크 공간이 많이 필요하고 로그 재생에 필요한 시간도 너무 크다. 그래서 로그를 적당히 자를 필요가 있다.
전문 색인을 새로 구축할 때를 예로 들면 전체 DB복사본이 필요하다.
최근에 갱신하지 않은 항목은 로그에 없기 때문에 최근 변경 사항만 반영하는 것으로는 충분하지 않다. 따라서 전체 로그 히스토리가 없다면 새로운 팔로워 설정 부분에서 설명한 것처럼 일관성 있는 스냅숏을 사용해야 한다.
DB 스냅숏은 변경 로그의 위치나 오프셋에 대응돼야 한다. 그래야 스냅숏 이후에 변경 사항을 적용할 시점을 알 수 있다.
일부 CDC 도구는 이런 스냅숏 기능을 내장하고 있으나 수작업으로 진행해야 하는 CDC 도구도 있다.

로그 컴팩션

로그 히스토리의 양을 제한한다면 새로운 파생 데이터 시스템을 추가할 때마다 스냅숏을 만들어야 하는데 그에 대한 대안책이 로그 컴팩션(log compaction)이다.
원리는 간단하다.
저장 엔진은 주기적으로 같은 키의 로그 레코드를 찾아 중복을 제거하고
각 키에 대해 가장 최근에 갱신된 내용만 유지한다.
컴팩션과 병합 과정은 백그라운드로 실행된다.
로그 구조화 저장엔진에서 특별한 널 값(툼스톤(tomstone))으로 갱신하는 것은 키의 삭제를 의미하고 로그 컴팩션을 수행할 때 실제로 값을 제거한다. 하지만 툼스톤은 키를 덮어쓰거나 삭제하지 않는 한 영구적으로 유지한다. 컴팩션한 로그를 저장하는 데 필요한 디스크 공간은 지금까지 데이터 베이스에 발생한 쓰기 수가 아니라 현재 DB에 있는 내용에 달려있다.
같은 키를 여러 번 덮어썼다면 이전 값은 결국 가비지 컬렉션이 되고 최신 값이 유지된다.
로그 기반 메시지 브로커와 변경 데이터 캡처의 맥락에서도 마찬가지다. CDC 시스템에서 모든 변경에 기본키가 포함되게 하고 키의 모든 갱신이 해당 키의 이전 값을 교체한다면 특정 키에 대해 최신 쓰기만 유지하면 충분하다.
이제는 검색 색인과 같은 파생 데이터 시스템을 재구축할 때마다 새 소비자는 컴팩션된 로그 토픽의 오프셋 0부터 시작해 순차적으로 DB의 모든 키를 스캔하면 된다. 로그에 DB에 있는 모든 키의 최신 값이 존재하는 것이 보장된다.(컴팩션 중인 경우 이전 값도 약간은 존재 가능)
즉 CDC 원본 DB의 스냅숏을 만들지 않고도 DB 콘텐츠 전체의 복사본을 얻을 수 있다.

변경 스트림용 API 지원

최근 DB들은 기능 개선이나 리버스 엔지니어링을 통해 CDC 지원을 하기보단 점진적으로 변경 스트림을 기본 인터페이스로 지원하기 시작했다.
예를 들어 RethinkDB는 질의 결과에 변경이 있을 때 알림을 받을 수 있게 구독이 가능한 질의를 지원한다. FireBaseCouchDB는 애플리케이션에도 사용 가능한 변경 피드 기반의 데이터 동기화를 지원한다. 그리고 MeteorMongoDB의 oplog를 사용해 데이터 변경 사항을 구독하거나 사용자 인터페이스를 갱신한다.
VoltDB는 스트림 형태로 DB에서 데이터를 지속적으로 내보내는 트랜잭션을 제공한다. VoltDB는 튜플을 삽입할 수 있지만 질의는 할 수 없는 테이블로서 관계형 데이터 모델 내의 출력 스트림을 표현한다. 스트림은 커밋된 트랜잭션들이 이 특수한 테이블에 커밋된 순서대로 기록한 튜플의 로그로 구성된다. 외부 소비자는 이 로그를 비동기로 소비해 파생 데이터 시스템을 갱신하는데 사용한다.
카프카 커넥트(Kafka Connect)는 카프카를 광범위한 데이터 시스템용 변경 데이터 캡처 도구로 활용하기 위한 노력의 일환이다.
변경 이벤트를 스트림하는 데 카프카를 사용하면 검색 색인과 같은 파생 데이터 시스템을 갱신하는 데 사용 가능하다.

참고: RethinkDB에서 구독 질의 동작 예제

RethinkDB에서는 질의 결과 변경이 있을 때 알림을 받을 수 있는데, 이처럼 구독 가능한 질의를 changefeed라고도 한다. DB가 특정 조건에 맞는 데이터 변경이 발생할 때마다 알림을 받아서 실시간으로 변경사항을 추적하고 웹 애플리케이션에 반영할 수 있다.
다음과 같이 질의를 작성해서 구독 가능한 질의를 사용할 수 있다.
1.
RethinkDB에서 DB와 테이블 선택
2.
원하는 테이블에서 changes() 함수 호출로 changefeed 생성
3.
(Optional) 변경 사항에 필터를 적용해 특정 조건에 맞는 변경 사항만 수신할 수 있다.
4.
changefeed를 구독해 애플리케이션에 변경 사항을 전달
public class RethinkDBExample { public static final RethinkDB r = RethinkDB.r; public static void main(String[] args) { // RethinkDB에 연결 Connection conn = r.connection().hostname("localhost").port(28015).connect(); // 'users' 테이블의 changefeed를 구독 Table usersTable = r.table("users"); Changes<Map<String, Object>> changes = usersTable.changes().run(conn); // 변경 사항을 출력 for (Map<String, Object> change : changes) { System.out.println(change); } } }
Java
복사
명시적인 콜백 메서드를 사용하고 있지는 않지만 for 반복문 내에서 changes를 순회하며 변경사항을 가져온다. 이는 블로킹 큐(Blocking Queue)와 유사한 작동 방식으로 changes 객체에 새로운 변경 사항이 들어올 때까지 반복문은 대기하고 새로운 변경 사항이 발생하면 해당 사항을 가져와 출력한다.

이벤트 소싱

여기서 설명한 아이디어와 이벤트 소싱(event sourcing)은 유사한 면이 있다.
이벤트 소싱은 도메인 주도 설계(domain-driven design, DDD) 커뮤니티에서 개발한 기법으로 스트리밍 시스템에 관련한 유용한 아이디어를 포함한다. 이에 대해 가볍게 짚고가면, 이벤트 소싱은 변경 데이터 캡처와 유사하게 애플리케이션 상태 변화를 모두 변경 이벤트 로그로 저장한다.
변경 데이터 캡처와 가장 큰 차이점은 이 아이디어를 적용하는 추상화 레벨이 다르다는 점이다.
변경 데이터 캡처에서 애플리케이션은 DB를 변경 가능한 방식으로 사용해 레코드를 자유롭게 갱신하고 삭제한다. 변경 로그는 DB에서 저수준으로 추출한다(Ex: 복제 로그 파싱). 변경 로그는 DB에서 추출한 쓰기 순서가 실제로 데이터를 기록한 순서와 일치하고 경쟁 조건이 나타나지 않게 보장한다. DB에 기록하는 애플리케이션은 CDC가 실행 중인지 알 필요가 없다.
이벤트 소싱에서 애플리케이션 로직은 이벤트 로그에 기록된 불변 이벤트를 기반으로 명시적으로 구축한다. 이때 이벤트 저장은 단지 추가만 가능하고 갱신이나 삭제는 권장하지 않거나 금지한다. 이벤트는 저수준에서 상태 변경을 반영하는 것이 아닌 애플리케이션 수준에서 발생한 일을 반영하게끔 설계됐다.
이벤트 소싱은 데이터 모델링에 쓸 수 있는 강력한 기법으로 애플리케이션 관점에서 사용자의 행동을 불변 이벤트로 기록하는 방식은 변경 가능한 DB상에서 사용자의 행동에 따른 효과를 기록하는 방식보다 훨씬 유의미하다. 이벤트 소싱을 사용하면 애플리케이션을 지속해서 개선하기가 매우 유리하다.
이벤트 소싱은 연대기 데이터 모델(chronicle data model)과 유사하다. 또한 이벤트 로그와 별 모양 스키마에서 발견한 사실 테이블 사이에도 유사점이 있다.
이벤트 스토어(Event Store)같은 특화된 DB는 이벤트 소싱을 사용하는 애플리케이션을 지원하게끔 개발하고 있지만 일반적으로 이벤트 소싱 접근법은 특정 도구와 독립적이다.
일반 DB나 로그 기반 메시지 브로커에서도 이런 방식으로 애플리케이션을 구축하는 데 사용할 수 있다.

참고: 이벤트소싱, 연대기 데이터 모델, 이벤트 로그, 그리고 사실 테이블의 유사점

모두 시간에 따른 데이터의 변화를 표현하는데 중점을 두고 다음과 같은 유사성을 가진다.
1.
시간 순서: 모두 시간 순서에 따라 데이터를 저장하고 처리한다. 이러한 구조를 통해 시간에 따른 데이터의 변화를 추적할 수 있습니다.
2.
추적 가능성: 이들 모델에서 발생한 이벤트나 데이터 변경은 시간에 따라 추적 가능하며, 이를 통해 과거 상태로 롤백하거나 데이터의 변화를 분석할 수 있다.
3.
이벤트 기반: 이벤트 소싱과 연대기 데이터 모델은 이벤트를 기반으로 데이터를 저장 및 관리한다. 이벤트 로그와 사실 테이블도 이벤트 또는 거래 데이터(Transaction Data)를 중심으로 구성되어 있다.
4.
추가 전용: 이벤트 소싱, 연대기 데이터 모델, 이벤트 로그에서는 데이터를 추가 전용(append-only)으로 저장한다. 이러한 방식은 데이터의 무결성을 유지하고 동시성 문제를 줄여준다. 별 모양 스키마의 사실 테이블에서도 대부분 추가 전용으로 데이터를 저장하며, 변경이 필요한 경우 새로운 행을 추가하는 방식을 사용한다.
5.
읽기 성능 최적화: 모두 읽기 성능을 최적화하는 데 중점을 두고 있으며, 이들 구조를 사용하면 데이터의 변경 내역을 쉽게 조회하고 분석할 수 있다.
6.
분산 시스템과의 호환성: 이들 모델은 시간 순서에 따라 데이터를 저장하고 처리하는 구조로 인해 분산 시스템에서도 잘 동작한다. 분산 환경에서도 데이터 일관성을 유지하고 동시성 문제를 해결하는 데 도움이 된다.

이벤트 로그에서 현재 상태 파생하기

이벤트 로그 자체로는 별로 유용하다고 할 수 없다. 사용자 입장에서는 현재 시스템이 어떤 상태인지가 궁금하지 어떤 수정 히스토리를 가졌는지가 궁금하지 않다.
그렇기에 이벤트 소싱을 사용하는 애플리케이션은 시스템에 기록한 데이터를 표현한 이벤트 로그를 가져와 사용자에게 보여주기 적당한 애플리케이션 상태로 변환해야 한다.
이 변환 과정은 로직을 자유롭게 사용할 수 있지만 결정적 과정이어야 한다.
다시 수행하더라도 이벤트 로그로부터 동일한 애플리케이션 상태를 만들 수 있어야 하기 때문이다.
변경 데이터 캡처와 마찬가지로 이벤트 로그를 재현하면 현재 시스템 상태를 재구성할 수 있다. 하지만 로그 컴팩션은 다르게 처리해야 한다.
레코드 갱신용 CDC이벤트는 일반적으로 레코드의 가장 새로운 버전을 보유한다. 그래서 기본키의 현재 값은 전적으로 기본키의 가장 최신 이벤트로 결정되고 같은 키의 이전 이벤트는 로그 컴팩션을 통해 버린다.
반면 이벤트 소싱은 이벤트를 보다 상위 수준에서 모델링한다. 이벤트는 대개 사용자 행동의 결과로 발생한 상태 갱신 메커니즘이 아닌 사용자 행동 의도를 표현한다. 이 경우 뒤에 발생한 이벤트가 앞선 이벤트를 덮어쓰지 않는다. 그래서 마지막 상태를 재구축하기 위해 이벤트의 전체 히스토리가 필요하다. 이런 방식에선 로그 컴팩션이 불가능하다.
이벤트 소싱을 사용하는 애플리케이션은 일반적으로 이벤트 로그에서 파생된 현재 상태의 스냅숏을 저장하는 메커니즈이 있기에 전체 로그를 반복해서 재처리할 필요는 없다. 하지만 이 메커니즘은 장애 발생 시 읽고 복구하는 성능을 높여주는 최적화에 불과하다. 이벤트 소싱 시스템에는 모든 원시 이벤트를 영원히 저장하고 필요할 때마다 모든 이벤트를 재처리할 수 있어야 한다는 의도가 있다.

명령과 이벤트

이벤트 소싱 철학은 이벤트명령(command)을 구분하는 데 주의한다.
사용자 요청이 처음 도착했을 때 이 요청은 명령이다. 이 시점에선 명령이 실패할 수도 있다. 예로 특정 무결성 조건을 위반하면 실패한다. 그렇기에 애플리케이션은 명령이 실행 가능한지 확인해야 한다. 무결성이 검증되고 명령이 승인되면 명령은 지속성 있는 불변 이벤트가 된다.
영화표 예매 애플리케이션을 예로 들어보자.
해당 애플리케이션에서는
좌석 예매 시도시 해당 사용자명이 이미 사용 중이거나 좌석이 이미 예약이 끝났는지 확인해야 한다.
확인이 성공하면 애플리케이션은 특정 사용자명을 특정 사용자 ID로 등록한다는 이벤트를 생성하거나 특정 고객이 특정 좌석을 예약한다는 이벤트를 생성한다.
이벤트는 생성 시점에 사실(fact)가 되며, 차후 예약 변경 및 취소 이벤트가 발생하더라도 이전에 특정 좌석을 예약했다는 사실은 진실이며 변경 및 취소 이벤트는 추가된 독립적 이벤트이다.
이벤트 스트림 소비자는 이벤트를 거질하지 못하고 소비자가 이벤트를 받은 시점에서 이벤트는 이미 불변 로그의 일부분이다. 따라서 명령의 유효성은 이벤트가 되기 전 동기식으로 검증해야 한다.
이를테면 직렬성 트랜잭션을 사용해 원자적으로 명령을 검증하고 이벤트를 발행할 수 있다.
혹은 좌석 예약에 대한 요청을 두 개의 이벤트로 분할해서 하나는 가예약 하나는 유효한 예약에 대한 확정 이벤트로 만들 수 있다. 이렇게 분할하면 비동기 처리로 유효성 검사를 할 수 있다.

상태와 스트림 그리고 불변성

불변성 원리는 이벤트 소싱과 변경 데이터 캡처를 매우 강력하게 만들어 준다.
하지만 DB는 애플리케이션의 현재 상태를 저장하는데, 상태는 변하기 마련이고 DB는 데이터 삽입뿐 아니라 데이터 갱신이나 삭제도 지원한다. 그런데 어떻게 불변성과 어울릴 수 있는 것일까?
상태가 변할 때마다 해당 상태는 시간이 흐름에 따라 변한 이벤트의 마지막 결과다.
예를 들어 현재 예약 가능한 좌석의 목록은 예약을 처리한 결과이고 현재 계좌 잔고는 계좌의 입금과 출금의 결과다. 그리고 웹 서버의 응답 시간 그래프는 발생한 모든 개별 요청의 응답 시간을 집계한 것이다.
상태가 어떻게 바뀌었든 이런 변화를 일으킨 이벤트들이 있다. 사건이 발생했다 취소되더라도 이벤트가 발생했다는 부분은 변하지 않는다. 모든 변경 로그(changelog)는 시간이 지남에 따라 바뀌는 상태를 나타낸다.

불변 이벤트의 장점

DB에서 불변성을 이용하는 아이디어는 생각보다 오래됐다.
금융 쪽에서 거래(트랜잭션)가 발생하면 거래 정보를 원장(ledger)에 추가만 하는 방식으로 기록한다. 원장은 본질적으로 돈, 상품, 서비스를 교환한 정보를 설명한 이벤트 로그다. 손익 또는 대차대조표 같은 회계는 원장의 거래 내역을 합산해 만든다
만약 실수가 발생해도 회계사는 원장의 잘못된 거래 내역을 지우거나 고치지 않고, 대신 실수를 보완하는 거래 내역을 추가한다. 잘못된 과세에 대한 환급이 그런 예다.
잘못된 거래 내역은 원장에 영원히 남는다. 그것이 회계 감사에 중요한 사유가 될 수 있기 때문이다.
불변 이벤트는 또한 이처럼 규제가 엄격하지 않아도 되는 시스템에서도 유용하다.
우연히 버그가 있는 코드를 배포해 DB에 잘못된 데이터를 기록했을 때 코드가 데이터를 덮어썼다면 복구하기가 매우 어렵다. 추가만 하는 불변 이벤트 로그를 썼다면 훨씬 디버깅과 복구가 쉽다.

동일한 이벤트 로그로 여러 가지 뷰 만들기

불변 이벤트 로그에서 가변 상태를 분리하면 동일한 이벤트 로그로 다른 여러 읽기 전용 뷰를 만들 수 있다. 이는 한 스트림이 여러 소비자를 가질 때와 동일한 방식으로 작동한다.
이벤트 로그에서 DB로 변환하는 명시적인 단계가 있으면 시간의 흐름에 따라 애플리케이션을 발전시키기 쉽다. 기존 데이터를 새로운 방식으로 표현하는 새 기능을 추가하려면 이벤트 로그를 사용해 신규 기능용으로 분리한 읽기 최적화된 뷰를 구축할 수 있다. 또한 기존 시스템을 수정할 필요가 없고 기존 시스템과 함께 운용이 가능하다.
신구 시스템을 나란히 구동하는 것은 기존 시스템에서 복잡한 스키마 이전을 수행하는 것보다 쉽다. 구 시스템이 더 이상 필요하지 않으면 기존 시스템을 내리고 기존 시스템이 사용하던 자원을 회수할 수 있다.
데이터에 어떻게 질의하고 접근하는지에 신경 쓰지 않는다면 데이터 저장은 상당히 직관적인 작업이다. 스키마 설계, 색인, 저장소 엔진이 가진 복잡성은 특정 질의와 특정 접근 형식을 지원하기 위한결과로 발생한다. 이런 이유로 데이터를 쓰는 형식과 읽는 형식을 분리해 다양한 읽기 뷰를 허용한다면 상당한 유연성을 얻을 수 있다.
이 개념을 명령과 질의 책임의 분리(command query responsibility segregation, CQRS) 라 부른다.

동시성 제어

이벤트 소싱과 변경 데이터 캡처의 가장 큰 단점은 이벤트 로그의 소비가 대개 비동기로 이뤄진다는 점이다. 그래서 사용자가 로그에 이벤트를 기록하고 이어서 로그에서 파생된 뷰를 읽어도 기록한 이벤트가 아직 읽기 뷰에 반영되지 않았을 가능성이 있다.
해결책 하나는 읽기 뷰의 갱신과 로그에 이벤트를 추가하는 작업을 동기식으로 수행하는 방법이다.
이 방법을 쓰려면 트랜잭션에서 여러 쓰기를 원자적 단위로 결햅향 하므로 이벤트 로그와 읽기 뷰를 같은 저장 시스템에 담아야 한다. 다른 시스템에 있다면 분산 트랜잭션이 필요하다.
반면 이벤트 로그로 현재 상태를 만들면 동시성 제어 측면이 단순해진다. 다중 객체 트랜잭션은 단일 사용자 동작이 여러 다른 장소의 데이터를 변경해야 할 때 필요하다. 이벤트 소싱을 사용하면 사용자 동작에 대한 설명을 자체적으로 포함하는 이벤트를 설계할 수 있다.
그러면 사용자 동작은 한 장소에서 한 번 쓰기만 필요하다. 즉 이벤트를 로그에 추가만 하면 되며 원자적으로 만들기 쉽다.
이벤트 로그와 애플리케이션 사앹를 같은 방식으로 파티셔닝하면(Ex: 3번 파티션에 있는 사용자의 이벤트를 처리할 때 애플리케이션 상태의 3번 파티션만 갱신하면 된다면) 간단한 단일 스레드 로그 소비자는 쓰기용 동시성 제어는 필요하지 않고, 단일 이벤트를 한 번에 하나씩 처리한다.
파티션 내에서 이벤트의 직렬 순서를 정의하면 로그에서 동시성의 비결정성을 제거할 수 있다. 한 이벤트가 여러 개의 상태 파티션에 영향을 준다면 더 많은 작업이 필요하다.

불변성의 한계

이벤트 소스 모델을 사용하지 않는 많은 시스템에서도 불변성에 의존한다.
다양한 DB는 내부적으로 시점 스냅숏을 지원하기 위해 불변 자료 구조나 다중 버전 데이터를 사용한다.
영구적으로 모든 변화의 불변 히스토리를 유지하는게 어디까지 가능할까?
이는 데이터셋이 뒤틀리는 양에 따라 다르다. 대부분 데이터를 추가하는 작업이고 갱신이나 삭제는 드물게 방생하는 작업부하는 불변으로 만들기 쉽다.
상대적으로 작은 데이터셋에서 매우 빈번히 갱신과 삭제를 하는 작업부하는 불변 히스토리가 감당하기 힘들 정도로 커지거나 파편화 문제가 발생할 수도 있다. 또한 컴팩션과 가비지 컬렉션의 성능 문제가 견고한 운영을 하는 데 큰 골칫거리가 되기도 한다.
성능적인 이슈 외에도 데이터가 모두 불변성임에도 관리상의 이유로 데이터를 삭제할 필요가 있는 상황일 수 있다. 사생활 침해 규제로 인해 사용자가 계정을 폐쇄한 이후 사용자의 개인 정보를 지울 필요가 있다든지 데이터 보호법에 따라 잘못된 정보를 삭제해야 한다든지 민감함 정보가 우발적으로 노출되는 것을 방지해야 하는 경우가 그 예다.
이런 상황에선 이전 데이터를 삭제해야 한다는 또 다른 이벤트를 로그에 추가한다고 해결되지 않는다. 실제로 원하는 바는 히스토리를 새로 쓰고 문제가 되는 데이터를 처음부터 기록하지 않았던 것처럼 하는 것이다.
데이터를 실제로 삭제하는 작업은 정말 어렵다. 많은 곳에 복제본이 남아 있기 때문이다.
예를 들어 저장소 엔진, 파일 시스템 SSD같은 장소에 데이터를 덮어 쓰기보단 주로 새로운 장소에 기록한다. 그리고 사고로 인한 삭제나 손상을 방지하기 위해 백업은 의도적으로 불변으로 만든다. 즉 삭제는 해당 데이터를 찾기 불가능하게 하기보단 찾기 어렵게 하는 문제다.