해당 포스트는 아래의 강의 수강 후 작성됨을 알려드립니다.
[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! |
데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고
www.inflearn.com
프로듀서
카프카 프로듀서는 카프카 클러스터, 토픽을 사용할 때 가장 많이 사용하는 도구 중 하나이다.

카프카에서 토픽을 만든 후에 프로듀서 애플리케이션을 개발한다.
이 때는 kafka console producer를 사용해도되고, 일반적으로는 테스트가 아니라 프로듀서 애플리케이션을 개발해서 토픽에 데이터를 넣는 것이 일반적이다.
데이터는 다양한 데이터를 넣는데, 예를 들어 사용자의 로그 데이터를 넣는 경우에는 사용자가 어떤 클릭을 했을 때 그 클릭 데이터를 서버로 접근하고 접근된 이 데이터에 대해 토픽으로 저장한 후에 데이터베이스에 넣는 순으로 진행된다.
이 때 개발한 프로듀서 애플리케이션은 json과 같은 형태로 데이터 반환 후 토픽으로 넣는다.
이런 프로듀서 애플리케이션은 카프카의 데이터의 시작점이라고 볼 수도 있다.
프로듀서와 컨슈머가 토픽에 접근할 때는 반드시 리더 파티션이 있는 브로커와 통신하게 된다.
프로듀서 애플리케이션의 경우에는 기본적으로 자바 애플리케이션을 개발한다.
다른 언어로도 구현은 가능하지만 오픈소스 카프카에서는 자바 라이브러리만 제공을 하기 때문에 다른 언어의 라이브러리는 자바 라이브러리와 기능적인 면 등에서 차이가 날 수밖에 없다.
프로듀서 내부 구조

가장 처음에는 프로듀서 레코드가 실행된다.
프로듀서 레코드는 우리가 보낼 데이터를 정의하는 레코드의 의미다.
레코드에는 오프셋은 없는데, 오프셋은 카프카의 특정 파션에 데이터가 저장되고나서 정해진다. 따라 그 전까지는 오프셋은 지정되지 않는다.
레코드에서는 토픽, 파티션, 타임스탬프, 메세지 키, 메세지 값을 정할 수 있는데 실제로는 데이터를 보낼 때 토픽과 메세지 값만 있어도 데이터를 보내는 것이 가능하다.
이후 send 호출 시 메세지 보내는 것이 가능해지는데, send는 호출하자마자 레코드를 전송하는 것은 아니다.
메세지는 Partitioner, Accumulator, Sender의 과정을 거치게된다. 하나씩 소개해보면 다음과 같다.
Partitioner는 어느 파티션으로 전송할지 지정하는 파티셔너이다. 기본값으로는 DefaultPartitioner로 설정된다.
파티셔너에 대한 정보를 참고할 때는 메세지키를 확인하면 된다. 메세지키에 특정 데이터를 넣어서 보내면 토픽에 여러 파티션을 가지고 있어도 항상 동일한 파티셔너에 들어가기 때문이다.
Accumulator는 배치로 묶어 전송할 데이터를 모으는 버퍼이다. 데이터를 레코드를 send 할 때마다 보내지는 것이 아니라 배치로 묶어서 TCP 통신으로 데이터를 보내기 때문에 Accumulator를 사용하게 되면 높은 데이터 처리량을 가질 수 있다.
파티셔너
프로듀서의 기본 파티셔너
프로듀서 API를 사용하면 UniformStickyPartitioner와 RoundRobinPartitioner 2개의 파티셔너를 제공한다.
카프카 클라이언트 라이브러리 2.5.0 버전에서는 파티셔너를 지정하지 않으면 기본으로 UniformStickyPartitioner로 설정이된다.
동작의 경우 메세지 키가 있을 때와 없을 때로 나눠진다.
1. 메세지 키가 있을 때
이 때는 UniformStickyPartitioner와 RoundRobinPartitioner 둘 다 메세지 키의 해시값과 파티션을 매칭하여 레코드를 전송한다.
따라 동일한 메세지키가 존재하는 레코드는 동일한 파티션에 전달된다.
하지만 파티션 개수가 변경되는 경우가 있다면 메세지 키와 파티션 번호 매칭이 깨지게된다.
따라 애초에 시작시에 충분히 적절한 파티션 개수로 시작하는 것이 좋다.
2. 메세지 키가 없을 때(=null)
메세지 키가 없을 대는 최대한 동일하게 분배하는 로직이 들어있고 UniformStickyPartitioner와 RoundRobinPartitioner 각각 다르게 동작한다.
UniformStickyPartitioner은 RoundRobinPartitioner의 발전된 버전이다.
RoundRobinPartitioner의 경우는 프로듀서 레코드가 들어오는대로 파티션을 순회하면서 전송한다.
따라 어큐뮤레이터에서 배치로 묶이는 정도가 적기 때문에 전송 성능이 낮아지게된다.
UniformStickyPartitioner는 어큐뮤레이터에서 레코드들이 배치로 묶일 때까지 기다렸다가 전송한다.
배치로 묶일뿐이지 결국 파티션을 순회하면서 보내기 때문에 결과적으로는 모든 파티션에 분배된다.
프로듀서의 커스텀 파티셔너
카프카 클라이언트 라이브러리에서는 사용자 지정 파티셔너를 생성하기 위해 Partition 인터페이스를 제공하고 있다.
파티셔너 인터페이스를 상속받은 사용자 정의 클래스에서 메세지 키 또는 메세지 값에 따른 파티션 지정 로직을 적용할 수 있다.
커스텀 파티셔너를 개발하게 된다면 데이터의 파티션 지정 방식을 새롭게 정의할 수 있다는 특징이 있다.
프로듀서 주요 옵션(필수 옵션)
필수 옵션이란 디폴트 옵션이 없다는 의미로 사용자가 무조건 지정해서 넣어야한다.
지정하지 않는다면 프로듀서 애플리케이션이 실행되지 않는다.
하나씩 살펴보면 다음과 같다.
- bootstrap.server: 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성한다.
2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라고 접속하는데 이슈가 없도록 설정 가능하다.
- key.serializer: 레코드의 메세지 키를 직렬화하는 클래스를 지정한다.
- value.serializer: 레코드의 메세지 값을 직렬화하는 클래스를 지정한다.
프로듀서에서 레코드를 보낼 때는 기본적으로 직렬화를 한다.
직렬화를 통해 카프카의데이터를 브로커로 보내서 데이터로 저장할 때 세상에 존재하는 거의 모든 데이터를 적재할 수 있다.
직렬화 과정을 통하면 세크먼트 로그가 남게되고, 로그를 토대로 컨슈머가 데이터를 브로커로부터 가져가서 데이터를 처리한다는 의미이다.
컨슈머에서는 이 직렬화된 데이터를 다시 풀기 위해서 역직렬화를 하게된다.
직렬화를 할 때 string으로 직렬화를 하지 않으면 몇가지 문제가 존재한다.
우선 string으로 직렬화하지 않은 경우에는 kafka-console-consumer 데이터를 보지 못하는 경우가 발생할 수 있다.
그리고 또한 프로듀서로 보낸 데이터의 경우는 토픽에 들어간 데이터가 어떻게 들어갔는지 모르게되기 때문에 다양하게 컨슈머를 우녕ㅇ할 경우에는 컨슈머가 어떻게 역직렬하는지 모르게되는 경우가 생길 수 있다.
프로듀서 주요옵션(선택 옵션)
선택 옵션의 경우 필수 옵션과는 다르게 디폴트 값이 있는 옵션이다.
- acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는데 사용하는 옵션이다.
0,1,-1(all) 중 하나로 설정할 수 있으며 기본값은 1이다.
프로듀서가 데이터를 리더 파티션으로 보냈을 때 정상적으로 데이터가 적재가 되었으면 성공의 의미로 1을 보낸다.
- linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간이다. 기본값은 0.
- retries: 브로커로부터 에러를 받은 뒤 재전송을 시도하는 횟수를 지정한다. 기본값은 2147483647.
- max.in.flight.requests.per.connection: 한 번에 요청하는 최대 커넥션 개수. 설정된 값만큼 동시에 전달 요청을 수행한다. 기본값은 5이다.
- partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다. 기본값은 org.apache.kafka.clients.producer.internals.DafaultPartitioner이다.
만약 기본 파티셔너가 아니라 직접 구현한 파티셔너를 사용하고 싶다면 partioner.class를 선언하여 직접 만든 커스텀 파티셔너 클래스 이름을 집어 넣으면 된다.
- enable.idempotence: 멱등성 프로듀서로 동작할지 여부를 설정한다. 기본값은 false.
프로듀서와 브로커가 통신할 때 어떤 네트워크에서 이슈가 심각하게 발생하면 프로듀서가 데이터를 중복 전송할 수 있는데 이를 막기위한 용도로 사용된다.
- transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정한다. 기본값은 null.
transactional.id를 지정하게 되면 enable.idempotence는 true가 된다.
ISR(In-Sync-Replicas)와 acks옵션
acks 옵션은 프로듀서가 브로커로 데이터를 보낼 때 어느 정도의 신뢰도로 데이터를 보낼 것이냐에 따라 달라진다.
즉, acks 옵션의 값을 어떻게 설정하느지에 다라 얼마나 안전하게 데이터를 보낼 수 있냐를 결정할 수 있다.
우선 acks에 대해서 잘 살펴보기 전에 ISR을 살펴봐야한다.
ISR은 리더 파티션과 팔로워 파티션의 레코드 오프셋 개수가 동일할 때 모두 싱크가 되었다고 하여 ISR이라고 한다.
싱크가 된 상태에서 리더 파티션과 팔로워 파티션은 안전하게 페일 오버가 된다고 볼 수 있다.
싱크가 된 상태에서만 장애가 발생했을 때 동일한 데이터를 다시 팔로워 파티션에서 그대로 사용할 수 있기 때문이다.

ISR이라는 용어가 나온 이유는 팔로워 파티션이 리더 파티션으로부터 데이터를 복제하는데 시간이 걸리기 때문이다.
프로듀서가 특정 파티션에 데이터를 저장하는 작업은 리더 파티션을 통해 처리하고, 새로운 레코드가 올 때마다 팔로우 파티션은 새로 추가가 되고 리더 파티션에도 데이터가 추가된다.
따라 결과적으로 복제하는 시간 차이 때문에(Replication Lack) 리더 파티션과 팔로우 파티션간에 offset 차이가 발생할 수 있다.
그렇기 때문에 상황에 따라 프로듀서가 데이터를 보내고 리더 파티션의 데이터가 적재되었을 때 리더 파티션에는 오프셋이 10개가 있지만 팔로우 파티션에는 오프셋이 8개 밖에 없을 때도 있을 수 있다.
이 때 acks 옵션을 통해서 프로듀서가 전송한 데이터가 카프카 클러스터에 얼마나 신뢰성 높게 저장될 수 있는지 지정할 수 있다.
따라 acks 옵션을 통해서 신뢰도을 높이고 성능을 낮출지 또는 성능을 높이고 신뢰도를 낮출지 결정할 수 있다.
신뢰도를 높인다는 것은 리더 파티션과 팔로워 파티션의 데이터가 정상적으로 싱크되어 오프셋이 동일한 ISR 형태인 것과 가깝게 둔다는 것이다.
레플리케이션 값이 1인 경우에는 ack의 값에 따른 차이가 큰 편은 아니다.
따라 복제 개수가 2이상인 경우로만 예시를 보자.
1. acks=0

acks를 0으로 설정하는 것은 프로듀서가 리더 파티션으로부터 따로 데이터를 전송했을 때 리더 파티션으로 데이터가 저장되었는지 확인하지 않는다. 데이터의 전송속도는1 혹은 all로 했을 때보다 빠르지만 데이터 유실이 발생한다.
0 옵션의 경우는 GPS나 네비게이션과 같이 지표데이터를 사용할 때는 데이터 유실보다 데이터 속도가 더 중요한 경우에 사용한다.
2. acks=1

acks를 1로 설정하면 프로듀서가 보낸 데이터가 리더 파티션에만 정상적으로 적재되었는지 확인한다.
만약 리더 파티션에 정상적으로 적재되지 않았다면 리더 파티션에 적재될 때까지 재시도할 수 있다.
그러나 팔로워 파티션에 적재되었는지는 확인하지 않기 때문에 복제가 제대로 되지 않을 경우 같은 데이터 유실의 가능성이 존재한다.
실제로는 1로 설정하여 사용하여도 데이터 유실이 크게 발생하지는 않는다고 한다.
큰 장애가 발생하는 것이 아니라면 일반적으로는 1로 설정하여 사용하는 경우가 대다수이다.
3. acks=-1(all)

acks를 all -1로 설정할 경우 프로듀서는 보낸데이터가 리더 파티션과 팔로워 파티션에 모두 정상적으로 적재되었는지 확인한다.
따라 0 이나 -1보다는 처리량 측면에서 확연히 낮다.
신뢰성이 높다고해서 좋은 것만은 아니라 실제 all로 설정할 경우 처리량에서 차이가 너무 나기때문에 속도가 중요한 경우 사용하지 않는 것이 좋다.
토픽 단위로 설정 가능한 min.insync.replicas 옵션값에 따라 데이터의 안정성이 달라진다.
따라 복제가 3개 이상인 경우에도 min.insync.replicas 옵션값을 2로 주면 리더 파티션과 1개의 팔로워 파티션만 확인한다. 보통 동시에 서로 다른 브로커가 각자 장애를 일으킬 확률은 현저히 낮기 때문에 2개만 확인하더라도 안정성은 높아질 수 있다는 것이다.
min.insync.replicas
min.insync.replicas 옵션은 프로듀서가 리더 파티션과 팔로워 파티션에 데티터가 적재되었는지 확인하기 위한 최소 ISR그룹의 파티션 개수이다. 해당 옵션값의 수의 따른 파티션의 응답값을 확인하게 된다.
따라 acks=all로 설정하고 min.insync.replicas=1로 설정하게 되면 리더 파티션만 확인하게 되기 때문에 all로 설정하는 것에 의미가 없다.
따라 all 설정은 min.insync.replicas옵션을 2부터 설정해야 사용에 의미가 생긴다.
'스터디 > kafka' 카테고리의 다른 글
| [Apache Kafka] Apache Kafka Study 06: 카프카 컨슈머 애플리케이션 개발(+프로듀서 개발) (0) | 2024.05.30 |
|---|---|
| [Apache Kafka] Apache Kafka Study 03: 카프카 클러스터 운영 (1) | 2024.05.16 |
| [Apache Kafka] Apache Kafka Study 01: 카프카 기초 (0) | 2024.04.18 |