본문 바로가기

공부방/Kafka

Consumer group 과 Rebalancing 실습

 

 // 파티션 3개 가지는 topic 생성
 kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3
 
 // Consumer Group id group_01을 가지는 consumer를 3개 생성(아래 명령어 3번 수행) , ㅏkey,value 값을 보낼것이고 partition 값도 보여줘
 // consumer를 생성할 때마다 broker에서 rebalancing 되는 것을 확인할 수 있을 것이다. (로그 확인)
kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--property print.partition=true

kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--property print.partition=true


kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--property print.partition=true

 

Consumer Group list 정보

Consumer Group과 Consumer 관계, partition 등에 대한 상세 정보

Consumer Group 삭제

Producer가 전송한 Log message의 지연 Lag 정

 

// 컨슈머 그룹 정보 확인
kafka-consumer-groups --bootstrap-server localhost:9092 --list

// 컨슈머 그룹의 세부 정보 확인
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01

 

같은 consumer group에 있지만 consumer id가 달라지는 것을 확인할 수 있다.

 

partition을 3개 만들어 놓고 실행한 consumer가 없을 경우

 

partition을 3개 만들어 놓고 1개만 실행중인 경우

 

partition을 3개 만들어 놓고 2개 실행중인 경우

 

partition을 3개 만들어 놓고 모 실행중인 경우

 

 

다시 돌아와서 컨슈머를 모두 실행하지 않은 경우

 

5. Key를 가지는 메시지를 전송
kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic \
--property key.separator=: --property parse.key=true
1:aaa
2:bbb

2개의 메세지를 보냈다.

1:aaa

2:bbb

현재는 동작하는 컨슈머가 없기 때문에 LAG 결과값을 확인할 수 있다.

 

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01

 

LAG : delay 된 것을 볼 수 있다.

 

200개의 메세지를 보내보자

 

1. load.log 파일 만들기
touch load.log

2. Non key 메시지 2000개를 load.log에 기록하기. 
for i in {1..2000}
do
echo "test nonkey message sent test00000000000000 $i" >> load.log
done

3. load.log 파일 기반으로 메시지 2000개 전송. 
kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic < load.log

 

상태 확인

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01

 

 

2000개를 보낸 현재 상태이다.

Log-END-OFFSET과 LAG가 쌓인 상태이다.

 

이제 kafka consumer를 실행해보자.

 

 

5초마다 모니터링해보자

 

 

consumer group 삭제

consumer group에 속해있는 consumer를 삭제해도 group은 삭제가 안된다.

일정 기간이 지나면 삭제가 되는 시스템이다.

즉시 삭제할 수 있는 방법을 알아보자.

 

delete할 때는 consumer가 모두 종료되어야 합니다. (종료되어있지 않으면 Exception이 발생합니다.

 

1. group list 확인

 

kafka group을 삭제하기 전에 group list를 확인합니다.

 

kafka-consumer-groups --bootstrap-server localhost:9092 --list

 

"group_01" 이라는 컨슈머 그룹이 있는 것을 확인할 수 있습니다.

 

 

컨슈머 그룹 동작 확

해당 컨슈머 그룹이 동작하고 있는지 확인합니다.

 

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01

 

 

3. 컨슈머 그룹 삭제 

 

kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group group_01

'공부방 > Kafka' 카테고리의 다른 글

kafka-dump-log 명령어로 로그 파일의 메세지 내용 확인  (0) 2023.05.13
Kafka config 구분 및 이해  (0) 2023.05.13
Kafka  (0) 2023.05.05
kafka  (0) 2023.05.01