Kafka shell命令
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。本篇在此记录一些常用的kafka shell
。
Kafka版本:kafka_2.11-2.1.0
创建Topic
1 | bin/kafka-topics.sh --zookeeper master:2181,slave1:2181,slave2:2181 --create --topic test_topic --partitions 2 --replication-factor 2 |
说明:
--zookeeper
也可以只填其中任意一台zookeeper服务--partitions
表示创建的分区数,通常用于提高kafka的吞吐量,但也不是多多益善,具体看需求场景--replication-factor
表示每个分区中消息的副本数,通常都设置为broker的数量,具体看需求场景。
列出所有的Topic
1 | bin/kafka-topics.sh --zookeeper master:2181 --list |
这里我们至指定了一个zookeeper节点。
查看指定的Topic的描述信息
1 | bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test_topic |
输入如下信息:
1 | Topic:test_topic PartitionCount:2 ReplicationFactor:2 Configs: |
给指定Topic生产数据
1 | bin/kafka-console-producer.sh --broker-list slave1:9092,slave2:9092 --topic test_topic |
这里的--broker-list
也可以只填写其中一个节点。
消费指定Topic中的数据
1 | bin/kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic test_topic --from-beginning |
--bootstrap-server
表示kafka的broker
,可以都写全也可以只写一个--from-beginning
表示从开始进行消费,不加则只消费consumer
启动后的数据
查看指定Topic中offset的最大(小)值
查看offset的最大值
1 | bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave1:9092,slave2:9092 --topic gpsnj --time -1 |
此时会输入如下内容:
1 | test_topic:0:2 |
这表示test_topic
中分区0
的offset的最大值为2
,分区1
的offset的最大值为2
,在数据了比较大的情况下,这两个值一般都不一样。
查看offset的最小值
1 | bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave1:9092,slave2:9092 --topic gpsnj --time -2 |
此时会输入如下内容:
1 | test_topic:0:0 |
这表示test_topic
中分区0
的offset的最小值为2
,分区1
的offset的最小值为2
,在数据了比较大的情况下,这两个值一般都不一样。
查看topic指定分区的offset最大最小值
主需要在上面两个命令中加入--partitions
即可
1 | bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list slave1:9092,slave2:9092 --topic test_topic --time -1 --partitions 0 |
time为
-1
时表示最大值,time为-2
时表示最小值
增加分区数
1 | bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test_topic --partitions 5 |
这表示将test_topic
的分区数修改为5个分区,此时这个topic总共有5个分区,而不是7个,可以按照之前介绍的命令查看一下
1 | Topic:test_topic PartitionCount:5 ReplicationFactor:2 Configs: |
删除Topic
1 | bin/kafka-topics.sh --delete --zookeeper master:2181 --topic test_topic |
此时会输入如下内容:
1 | Topic test_topic is marked for deletion. |
其实执行完命令后topic
并没有被删除,而只是被标记删除。此时查看topic list依旧存在。解决办法有如下两种:
修改kafka配置
在kafka的配置文件中加入如下配置:
1 | delete.topic.enable=true |
此时,再使用命令即可成功删除。
删除kafka相关的文件
如果没有执行上述的命令,需要如下几个步骤才可以彻底删除topic
登录Zookeeper
1 | zookeeper-client |
查看topic
1 | ls /brokers/topics |
此时会出现如下日志:
1 | [syslog_web_1,test_topic] |
删除这个节点
1 | rmr /brokers/topics/test_topic |
删除kafka存储目录
这个路径在server.properties
中log.dirs
配置,如果配置了多个路径也都要删除,另外,如果kafka有多个broker,则需要针对每个broker中的数据进行删除,此时test_topic
被彻底删除。
如果执行了上述的命令,需要如下几个步骤才可以彻底删除topic
登录到zookeeper
1 | zookeeper-client |
找到被标记的topic
1 | ls /admin/delete_topics |
删除这个节点
1 | rmr /admin/delete_topics/test_topic |