// 파티션 3개 가지는 topic 생성
kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3
// Consumer Group id group_01을 가지는 consumer를 3개 생성(아래 명령어 3번 수행) , ㅏkey,value 값을 보낼것이고 partition 값도 보여줘
// consumer를 생성할 때마다 broker에서 rebalancing 되는 것을 확인할 수 있을 것이다. (로그 확인)
kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--property print.partition=true
kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--property print.partition=true
kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--property print.partition=true
Consumer Group list 정보
Consumer Group과 Consumer 관계, partition 등에 대한 상세 정보
Consumer Group 삭제
Producer가 전송한 Log message의 지연 Lag 정
// 컨슈머 그룹 정보 확인
kafka-consumer-groups --bootstrap-server localhost:9092 --list
// 컨슈머 그룹의 세부 정보 확인
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01
같은 consumer group에 있지만 consumer id가 달라지는 것을 확인할 수 있다.
partition을 3개 만들어 놓고 실행한 consumer가 없을 경우
partition을 3개 만들어 놓고 1개만 실행중인 경우
partition을 3개 만들어 놓고 2개 실행중인 경우
partition을 3개 만들어 놓고 모 실행중인 경우
다시 돌아와서 컨슈머를 모두 실행하지 않은 경우
5. Key를 가지는 메시지를 전송
kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic \
--property key.separator=: --property parse.key=true
1:aaa
2:bbb
2개의 메세지를 보냈다.
1:aaa
2:bbb
현재는 동작하는 컨슈머가 없기 때문에 LAG 결과값을 확인할 수 있다.
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01
LAG : delay 된 것을 볼 수 있다.
200개의 메세지를 보내보자
1. load.log 파일 만들기
touch load.log
2. Non key 메시지 2000개를 load.log에 기록하기.
for i in {1..2000}
do
echo "test nonkey message sent test00000000000000 $i" >> load.log
done
3. load.log 파일 기반으로 메시지 2000개 전송.
kafka-console-producer --bootstrap-server localhost:9092 --topic multipart-topic < load.log
상태 확인
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01
2000개를 보낸 현재 상태이다.
Log-END-OFFSET과 LAG가 쌓인 상태이다.
이제 kafka consumer를 실행해보자.
5초마다 모니터링해보자
consumer group 삭제
consumer group에 속해있는 consumer를 삭제해도 group은 삭제가 안된다.
일정 기간이 지나면 삭제가 되는 시스템이다.
즉시 삭제할 수 있는 방법을 알아보자.
delete할 때는 consumer가 모두 종료되어야 합니다. (종료되어있지 않으면 Exception이 발생합니다.
1. group list 확인
kafka group을 삭제하기 전에 group list를 확인합니다.
kafka-consumer-groups --bootstrap-server localhost:9092 --list
"group_01" 이라는 컨슈머 그룹이 있는 것을 확인할 수 있습니다.
컨슈머 그룹 동작 확
해당 컨슈머 그룹이 동작하고 있는지 확인합니다.
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01
3. 컨슈머 그룹 삭제
kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group group_01
'공부방 > Kafka' 카테고리의 다른 글
kafka-dump-log 명령어로 로그 파일의 메세지 내용 확인 (0) | 2023.05.13 |
---|---|
Kafka config 구분 및 이해 (0) | 2023.05.13 |
Kafka (0) | 2023.05.05 |
kafka (0) | 2023.05.01 |