Kafka脚本使用及相关问题整理

消息队列Kafka常用的命令比较多,这里对一些常用的命令和相关的问题做一些总结,便于后面自己查阅和参考。其中包括Kafka的Topic创建、查看Kafka的Topic的信息、删除Kafka的Topic的数据、查看Kafka的Topic的描述信息、查看Kafka的数据等相关操作。

第一部分:常用的命令使用

1、创建kafka的Topic

./kafka-topics.sh --create --zookeeper localhost:2181 --topic mytopic --config retention.ms=259200000 --partitions 10 --replication-factor 1

注解1:主题名称为mytopic

注解2:Topic数据保留时长259200000ms

注解3:partition个数10

注解4:备份数量1

2、查看Kafka所有的Topic信息

./kafka-topics.sh  --list  --zookeeper localhost:2181

3、删除Topic中的数据

./kafka-topics.sh  --delete  --zookeeper localhost:2181  --topic  mytopic

4、查看Kafka中的某个Topic的信息描述

./kafka-topics.sh  --describe  --zookeeper localhost:2181  --topic  mytopic

5、从开始查看Kafka中的数据

./kafka-console-consumer.sh --zookeeper localhost:2181 --from beginning  --topic  mytopic

6、从当前状态下查看Kafka的Topic中的数据信息

./kafka-console-consumer.sh --zookeeper localhost:2181  --topic  mytopic

7、查看Kafka中的某个Topic消费情况(方法一)_有时会报错

./kafka-consumer-groups.sh --new-consumer --describe --group control-static --bootstrap-server alph140:9092

8、查看Kafka中的某个Topic的消费情况(方法二)

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group control-static --zookeeper localhost:2181  --topic OTHER_TOPIC

9、查看Kafka中的某个Topic的消费情况(方法三)_没有消费也能看到实际消费情况

./kafka-run-class.sh kafka.tools.GetOffsetShell --topic OTHER_TOPIC  --time -1 --broker-list alph140:9092

10、查看Kafka中的某个Topic的消费情况(方法四)_和方法二返回的结果一样

./kafka-consumer-offset-checker.sh --group control-static  --zookeeper localhost:2181 --topic OTHER_TOPIC

11、查看Kafka的group集合_有疑问

./kafka-consumer-groups.sh --bootstrap-server alph199:9092 --list --new-consumer

 

第二部分:怎样删除一个Topic的数据,之后重新提供服务?(场景应用)

操作步骤如下:

  • 找到相应Topic的文件数据目录并删除该目录
  • 删除Topic
  • 修改当前Topic的相关信息
  • 重启Kafka

参考脚本:

#!/usr/bin/env bash

if [ ! -d tmp/logs ];then

mkdir -p tmp/logs

fi

export LOG_FILE=/tmp/logs/clean-kafka-topic.log

exec 1>> $LOG_FILE    #日志输出

exec 2>> $LOG_FILE    #日志输出

 

#1. find the kafka data dir

HOSTS=`cat /etc/hosts|grep -v -E 'localhost|mirrors'|awk '{print $2}'`     #获取集群服务器IP

for host in ${HOSTS[@]}

do

path_data=`ssh ${host} find / -name  "*OTHER_TOPIC*" | awk '{print $0}'`

for path in ${path_data[@]}

do

echo "${path}"

ssh ${host} "rm -rf ${path}"

done

done

 

#path_data=`find / -name  "* OTHER_TOPIC *" | awk '{print $0}'`   #查询OTHER_TOPIC主题

#for path in ${path_data[@]}

#  do

#    echo "${path}"

#    echo "rm -rf ${path}"

#  done

 

#2. delete kafka topic

export KAFAK_BIN_HOME=/usr/home/SERVICE-KAFKA-8cf7ba67b35449fba40f71dd9ee4f396/bin   #定义Kaka的bin路径

${KAFAK_BIN_HOME}/kafka-topics.sh --zookeeper localhost:2181 --topic OTHER_TOPIC --delete

 

#3. alter kafak topic

${KAFAK_BIN_HOME}/kafka-topics.sh --alter  --zookeeper localhost:2181  --topic OTHER_TOPIC  --config retention.ms=259200000

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: