해당 포스트는 아래의 강의 수강 후 작성됨을 알려드립니다.
[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! |
데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고
www.inflearn.com
레코드 전송 결과를 확인하는 프로듀서 애플리케이션
기본적은 send는 Future객체를 반환한다. 이 때 RecordMetadata의 비동기 결과 또는 동기 결과를 받을 수 있다.
예를 들어 get() 메서드 사용시에 프로듀서로 보낸 데이터의 결과를 동기적으로 가져와서 어떤 토픽에 어떤 파티션에 몇 번 오프셋으로 데이터가 저장되는지 확인할 수 있다.

위 코드처럼 메타데이터를 동기로 받고 이 메타데이터에 대한 정보를 toString으로 프린트하게 되면 해당 레코드의 전송 결과에 대해서 sync callback으로 받게되고 정보를 함께 볼 수 있게된다.
실습 파일을 통해 확인해보면 다음과같다.

필수 옵션 지정 후 프로듀서 인스턴스를 만들고 레코드에서 (메세지 키와 값으로 (판교, 판교)를 지정하였다.
이후에 send를 실행하는데 get을 통해 레코드 메타데이터를 동기로 받는 것을 볼 수 있다.
코드를 실행하고 로그를 살펴보자.
카프카 프로듀서 애플리케이션, 즉 인스턴스가 생성될 때는 로그를 남기는데 기본저긍로 이 로그에는 ProducerConfig가 무엇으로 설정되는지 출력한다.

이러고 로그의 아래쪽에 콜백에 대한 로그가 나와야하는데...

오류가 나와서 해결하려고 찾아보았지만 해결방법을 찾지 못하여 우선 넘기고 강의 코드를 보며 따라가기로 하였다.
콜백 성공시 'test-0@4'라는 문구가 로그에 나온다.
ack를 1로 설정을 하였기 때문에 리더파티션에는 데이터가 저장되었음을 확인했다는 의미이다.
즉, 'test-0@4'는 0번 파티션(리더 파티션)의 4번 offset에 데이터가 저장되었다는 것을 알려준다.
이 때, ack를 0으로 설정하여 다시 실행해보면, test-0@-1로 로그로 나오게된다.
이는 리더 파티션에 데이터를 전송하였지만 어떤 오프셋에 저장되었는지는 모른다는 의미로, 데이터가 저장되었는지 확인하지 않는 0옵션을 사용하기 때문에 나타나는 결과이다.
프로듀서의 안전한 종료
프로듀서를 안전하게 종료하기 위해서는 close() 메서드를 사용하여 어큐뮤레이터에 저장되어있는 모든 데이터를 카프카로 전송하여야한 다.
카프카 컨슈머
카프카 컨슈머는 카프카 클러스터 토픽에 데이터가 들어오면 이 토픽을 사용하는 용도로 활용하는 것이 컨슈머라고 볼 수 있다.
따라 컨슈머를 운영하려면 먼저 토픽을 만들어야하고, 그 토픽에 데이터를 넣어야지만 컨슈머 애플리케이션에서 사용할 수 있게 된다.
컨슈머 애플리케이션에서는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와 필요한 처리를 한다.

예를 들어 마케팅 문자를 고객에서 보내는 기능이 있다면 컨슈머는 토픽으로부터 데이터를 가져와 SMS 등의 다른 서비스와 연동을 하여 문자 발송 처리를 하게된다.
컨슈머 내부 구조

카프카 클러스터에서 컨슈머를 보내게되면 우선적으로 Fetcher에서 데이터를 받는다.
이후 completedFetchers라고 하는 데이터를 충분히 받게되면 내부에서 poll() 메서드를 통해서 데이터들을 처리할 ConsumerRecords라고 하는 객체를 받게된다.
칸슈머 내부에서는 poll() 메서드를 호출하기도 전에 데이터를 가져오기 때문에 poll 호출을 조금 늦게 하더라도 이미 데이터를 가져왔기 때문에 처리 속도가 느려지지 않는다.
ConsumerRecords는 레코드는 처리하고자 하는 레코드의 모음으로, 브로커에 저장된 오프셋을 레코드에서 확인할 수 있고 처리가 완료되었다면 커밋을 통해 현재 몇번 오프셋까지 처리했는지 체크한다.
컨슈머 그룹

컨슈머 그룹은 특정 토픽에 대해서 목적에 따라 데이터를 처리하는 컨슈머들을 묶을 그룹을 말하며, 각 컨슈머 그룹으로부터 격리된 상황에서 안전하게 운영을 할 수 있다.
동일한 컨슈머 그룹을 가진 컨슈머들은 기본적으로 다 동일한 로직을 가지고 있다.
그렇지만 아닌 경우도 있지만 특수한 경우이기 때문에 우선은 넘기고 보자.
토픽에서 데이터를 가져가더라도 토픽에 있는 데이터는 사라지지 않는다.
따라서 하나의 컨슈머 그룹에서 데이터를 가져가도 동일한 데이터를 다른 컨슈머 그룹에서 가져갈 수 있는 것이다.
컨슈머가 subscribe라고 하는 메서드를 통해 토픽을 구독하게되면 전체 파티션에 대해서 데이터를 가져갈 수 있게된다.
이 때, 구독하는 토픽의 파티션 개수만큼 컨슈머 애플리케이션의 개수를 늘려 운영하는 것이 좋다.
그럼 만약 컨슈머 그룹의 컨슈머가 파티션 개수보다 많을 경우는 어떻게될까?

이 경우에는 파티션은 최대 한 개의 컨슈머만 할당될 수 있기 때문에 1대 1로 매칭된 이후에 더 할당될 수가 없어서 가장 마지막에 실행된 컨슈머는 기본적으로 idle 상태가돼버린다.
따라 기본적으로 컨슈머는 파티션 개수와 맞게 생성하는 편이 좋다.
컨슈머 그룹을 활용하는 이유

운영 서버에는 주요 리소스인 CPU, 메모리 정보를 수집하는 데이터 파이프라인이 필수적이다.
이런 데이터를 대용량으로 수집하기 위해서는 이런 에이전트가 필요하고, 데이터를 오래 보관하기 위해서는 하둡에 저장해야하며, 엘라스틱 서츠를 통해 데이터를 분석할 수 있다.
데이터가 점점 많아질 수록 리소스 수집하고, 엘라스틱서치에 저장하였다가 하둡에 저장하는 이 연결고리들의 처리 시간이 너무 오래걸리게 된다.

이를 해결하기 위해서 카프카 토픽과 컨슈머 그룹으로 데이터를 나눌 수 있다.
프로듀서에서 카프카로 데이터를 보내면 여기서 컨슈머로 데이터가 전송될 때 데이터를 각각의 용도에 따라 서로 다른 컨슈머 그룹으로 보내는 것이다.
따라 모든 데이터가 함께 같은 경로로 움직이는 것이 아니게 되기 때문에 엘라스틱 서치의 장애로 적재가 불가능하게 되는 경우에 하둡으로 데이터를 적재하는데 문제가 없을 것이기 때문이다.
리밸런싱

카프카 컨슈머에는 리밸런싱이라는 독특한 방식이 존재한다.
컨슈머 그룹 내부의 컨슈머 중 일부의 컨슈머에 장애가 발생하게 되면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에게 소유권이 넘어가고, 이런 과정을 리밸런싱이라고 한다.
리밸런싱이 발생하는 경우는 크게 두가지인데, 첫번째는 컨슈머가 추가되는 상황이고 두번째는 컨슈머가 제외되는 상황이다.
리밸런싱은 컨슈머가 데이터를 처리하는 도중에 언제든지 발생 가능하며, 이런 리밸런싱은 장애로도 볼 수 있기 때문에 데이터 처리 중 발생한 리밸런싱에 대응하는 코드를 작성해야한다.
커밋

컨슈머가 카프카 브로커로부터 데이터를 어디까지 가져갔는지 기록하는 것을 커밋이라고 한다.
만약 컨슈머 동작 이슈가 발생하여 토픽의 어느 레코드까지 읽었는지 커밋하지 못하였다면 데이터 처리의 중복이 발생할 수 있다.
따라 중복이 발생하지 않도록 컨슈머 애플리케이션에서 컨슈머 로직 개발 시 오프셋 커밋을 정상 처리하였는지 검증해야한다.
어사이너
어사이너는 컨슈머와 파티션의 할당 정책은 어사이너로 인해 결정된다.
카프카에서는 RangeAssignor(카프카 2.5.0 기본값), RoundRobinAssigner, SrickyAssignor를 제공한다.

기본적으로는 파티션 개수 1개당 컨슈머 1개를 할당하는 것이 좋다.
다만 어사이너 옵션이 있는 경우 이를 보고 결정하면 되는 것이다.
컨슈머 주요 옵션
필수 옵션
필수 옵션은 디폴트 값이 없고 사용자가 직접 설정해야하는 값이다.
- bootstrap.servers: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름: 포트를 1개 이상 작성한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접 속하는 데에 이슈가 없도록 설정 가능하다.
- key.deserializer: 레코드의 메시지 키를 역직렬화하는 클래스를 지정한다.
- value.deserializer: 레코드의 메시지 값을 역직렬화하는 클래스를 지정한다.
선택옵션
- group.id: 컨슈머 그룹 아이디를 지정한다. subscribe() 메서드로 토픽을 구독하여 사용할 때는 이 옵션을 필수로 넣어야 한다. 기본값은 null이다.
- auto.offset.reset: 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우(한번도 커밋을 안한 경우) 어느 오프 셋부터 읽을지 선택하는 옵션이다. 이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다. 기본값은 Iatest이다.
- enable.auto.commit: 자동 커밋으로 할지 수동 커밋으로 할지 선택한다. 기본값은 true(자동커밋) 이다.
- auto.commit.interval.ms: 자동 커밋일 경우 오프셋 커밋 간 격을 지정한다. 기본값은 5000(5초)이다.
- max poll records: poll() 메서드를 통해 반환되는 레코드 개수를 지정한다. 기본값은 500 이 다.
- session.timeout.ms: 컨슈머가 브로커와 연결이 끊기는 최대 시간이다. 기본값은 10000(10초)이다.
- hearbeat.interval.ms: 하트비트를 전송하는 시간 간격이다. 기본값은 3000(3초)이다.
- max.poll.interval.ms: poll( ) 메서드를 호출하는 간격의 최대 시간. 기본값은 300000(5분)이다.
- isolation. level: 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다.
auto.offset.reset
컨슈머의 선택 옵션 중 하나로 컨슈머 오프셋이 있다면 이 옵션값은 무시된다.
latest,earliest, none 중 1개를 설정할 수 있다.
- latest: 설정하면 가장 높은(가장 최근에 넣은) 오프셋부터 읽기 시작한다.
- earliest: 설정하면 가장 낮은(가장 오래전에 넣은) 오프셋부터 읽기 시작한다.
- none: 설정하면 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 만약 커밋 기록이 없으면 오류를 반환하 고, 커밋 기록이 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작한다. 기본값은 latest이다.
컨슈머 애플리케이션 개발

프로듀서와도 동일하게 카프카 클라이언트 라이브러리를 추가해야한다.

코드를 보면 group_id와 필수 옵션 설정 후 subscribe를 통해서 1개 이상의 토픽을 구독하는 컨슈머 인스턴스를 만들었다.
또한 역직렬화에 대해서는 KEY_DESERIALIZER_CLASS_CONFIG와 VALUE_DESERIALIZER_CLASS_CONFIG로 지정하였다.

컨슈머 애플리케이션의 기본 로직은 무한 루프가 내부적으로 도는 것이 원칙이다.
이 무한 루프 안에서는 poll 메서드를 통해 레코드를 받아서 그 레코드를 병렬처리하거나 리스트로 순차처리를 한다.
수동 커밋 컨슈머 애플리케이션
동기 오프셋 커밋 컨슈머

poll() 메서드가 호출된 이후에 commitSync() 메서드를 호출하여 오프셋 커밋을 명시적으로 수행할 수 있다.
commitSync()는 poll() 네서드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋한다.
동기 오프셋 커밋을 사용할 경우에는 poll()메서드로 받은 모든 레코드의 처리가 끝난 이후 commitSync()메서드를 호출해야 한다.
동기 오프셋 커밋(레코드 단위) 컨슈머
레코드 단위로도 커밋을 수행할 수 있다.

레코드 단위로 커밋을 수행하는 경우에는 TopicPartition과 Offset Metadata가 가진 맵을 따로 설정해야한다.
TopicPartition에는 현재 구독하고 있는 레코드의 토픽과 파티션의 값을, Offset Metadata에는 해당 offset+1의 값을 집어넣는다.
이후 commitSync 파라미터로 맵을 넣게되면 레코드 별로 오프셋을 커밋한다는 것을 알 수 있다.
커밋은 많이 하게되면 처리량이 많이지게 되기 때문에 그에 따라 데이터 처리 속도가 느려지게 된다.
일반적으로는 레코드 단위로 오프셋 커밋을 하는 방식은 잘 사용되지않는다.
비동기 오프셋 커밋 컨슈머

동기 오프셋 커밋을 사용할 경우 커밋 응답을 기다리는 동안 데이터 처리가 일시적으로 중단되기 때문에 더 많은 데이터를 처리하기 위해서는 비동기 오프셋 커밋을 사용할 수 있다.
비동기 오프셋 커밋 콜백

offset commit에 대한 콜백을 받을 수 있다.
commit이 실패하였는지, 실패하였다면 어떤 오프셋에서 어떻게 실패하였는지 기록할 수 있다.
퀴즈
1) 1개 컨슈머는 여러 개의 파티션에 할당될 수 있다 (O/X)
2) 하나의 파티션은 여러 컨슈머에 할당될 수 있다 (O/X)
3) 파티션 개수보다 컨슈머 그룹의 컨슈머 개수가 많으면 오류가 발생한다 (O/X)
4) 안정적인 데이터 처리를 위해 session.timeout.ms는 heartbeat.interval.ms보다 작은 값이여야 한다 (O/X)
5) 컨슈머를 안정적으로 종료하기 위해 session.timeout.ms을 사용한다 (O/X)
정답(드래그)
1) O
토픽 내부에 파티션은 여러개고 컨슈머는 하나인 경우에서도 컨슈머가 여러 파티션에 할당되어 작동할 수 있다.
2) X
컨슈머 그룹 내에서 하나의 컨슈머에 할당된 토픽은 다른 컨슈머로 할당될 수 없다.
3) X
오류가 발생되는 것은 아니고 남은 컨슈머가 idle상태로 빠지게 된다.
4) X
반대이다.
session.timeout.ms는 기본값이 10초, heartbear.interval.ms는 3초이다.
5) X
전혀 관계가 없다.
즉 안정적인 종료가 아니라 장애 상황에서 리밸런싱을 적절하게 대응하기 위함이다.
'스터디 > kafka' 카테고리의 다른 글
| [Apache Kafka] Apache Kafka Study 05: 카프카 프로듀서 애플리케이션 개발 (0) | 2024.05.24 |
|---|---|
| [Apache Kafka] Apache Kafka Study 03: 카프카 클러스터 운영 (1) | 2024.05.16 |
| [Apache Kafka] Apache Kafka Study 01: 카프카 기초 (0) | 2024.04.18 |