Kafka常规操作的姿势

一、Low-API操作:

kafka生产消息(--broker-list后跟kafka地址,注:不是zookeeper地址):

./kafka-console-producer.sh --broker-list safecloud-master:9092 --topic safeclound_co2

上述生产的消息key是空的,若想生产指定key的消息:

./kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_co2 --property "parse.key=true" --property "key.separator=:"

如,需使用shell脚本生产一批带key的消息:

for (( i=1; i<=10; i++ )); doecho"key$$i:value$$i" | kafka-console-producer.sh --broker-list localhost:9092 --topic safeclound_co2 --property "parse.key=true" --property "key.separator=:"; done;


kafka消费消息(--zookeeper后跟zk地址):

 ./kafka-console-consumer.sh --zookeeper safecloud-master:3181 --topic safeclound_co2

kafka创建主题:

./kafka-topics.sh --zookeeper safecloud-master:3181 --create --topic safeclound_co2 --partitions 10 --replication-factor 1

kafka查看主题列表:

./kafka-topics.sh --zookeeper safecloud-master:3181 --list

kafka查看主题详情:

./kafka-topics.sh --describe --zookeeper safecloud-master:3181 --topic safeclound_co2

kafka删除主题:

./kafka-topics.sh --zookeeper safecloud-master:3181 --delete --topic safeclound_co2

kafka添加分区:

./kafka-topics.sh --zookeeper safecloud-master:3181 --alter --topic safeclound_co2 --partitions 15


二、High-API操作:

查看主题的所有分区数据信息:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list safecloud-master:9092 --topic safeclound_co2 --time -2             (-2表示最早offset)

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list safecloud-master:9092 --topic safeclound_co2 --time -1             (-1表示最新offset)

查看group.id所有消费情况(offset):

./kafka-consumer-groups.sh --bootstrap-server safecloud-master:9092 --group safeclound_spark_test --describe

或者

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper safecloud-master:3181 --group safeclound_spark_test --topic safeclound_co2


三、使用auto.offset.reset的正确姿势:

若业务只需消费最新数据,将auto.offset.reset=latest,OK~~   经亲生产环境血的经历,对不起,这样设置不生效!

那咋办呢?      ----->   还得加上 enable.auto.commit=false

为什么还要加enable.auto.commit呢? 经各种baidu、Google了解到,因为如果enable.auto.commit=true会自动提交offset,虽然设置了auto.offset.reset=latest,但是它会优先使用已经存储的offset,因此auto.offset.reset=latest不生效!!!  其实仔细想想它这样实现也有道理!


注意:

1、以上使用的版本为kafka_0.10.x(在0.8.x~0.10.x之间存在offset是否存储于zk的区别)

2、Kafka目前不支持减少分区数和改变备份数。


评论
© Saxon | Powered by LOFTER
上一篇 下一篇